Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new bot: domain valid #1966

Merged
14 commits merged into from
Aug 13, 2021
2 changes: 2 additions & 0 deletions debian/cron.d/intelmq-update-database
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
45 1 * * * intelmq intelmq.bots.experts.domain_suffix.expert --update-database
# Update database for recordedfuture_iprisk bot, update frequency is unknown:
27 1 * * * intelmq intelmq.bots.experts.recordedfuture_iprisk.expert --update-database
# Update database for domain_valid bot, updated daily:
50 1 * * * intelmq intelmq.bots.experts.domain_valid.expert --update-database
27 changes: 27 additions & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,33 @@ Use this command to create/update the database and reload the bot:
intelmq.bots.experts.domain_suffix.expert --update-database


.. _intelmq.bots.experts.domain_valid.expert:

Domain valid
^^^^^^^^^^^^

**Information**

* `name:` `intelmq.bots.experts.domain_valid.expert`
* `lookup:` no
* `public:` yes
* `cache (redis db):` none
* `description:` Checks if a domain is valid by performing multiple validity checks (see below).

**Configuration Parameters**

* `domain_field`: The name of the field to be validated.
* `tlds_domains_list`: local file with all valid TLDs, default location ``/opt/intelmq/var/lib/bots/domain_valid/tlds-alpha-by-domain.txt``

**Description**

If the field given in `domain_field` does not exist in the event, the event is dropped.
If the domain contains underscores (``_``), the event is dropped.
If the domain is not valid according to the `validators library <https://pypi.org/project/validators/>`_, the event is dropped.
If the domain's last part (the TLD) is not in the TLD-list configured by parameter ``tlds_domains_list``, the field is dropped.
Latest TLD list: https://data.iana.org/TLD/


.. _intelmq.bots.experts.deduplicator.expert:

Deduplicator
Expand Down
1 change: 1 addition & 0 deletions intelmq/bots/experts/domain_valid/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
validators
Empty file.
114 changes: 114 additions & 0 deletions intelmq/bots/experts/domain_valid/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""
Domain validator

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import validators

import os.path
import pathlib
import sys

import requests.exceptions

from intelmq.lib.bot import Bot
from intelmq.lib.exceptions import MissingDependencyError, ConfigurationError
from intelmq.lib.utils import get_bots_settings, create_request_session
from intelmq.bin.intelmqctl import IntelMQController


class DomainValidExpertBot(Bot):
domain_field: str = 'source.fqdn'
tlds_domains_list: str = '/opt/intelmq/var/lib/bots/domain_valid/tlds-alpha-by-domain.txt'

def init(self):
if validators is None:
raise MissingDependencyError("validators")
self.tlds_list = self.get_tlds_domain_list()

def process(self):
event = self.receive_message()
is_valid = False
if self.domain_field in event:
if validators.domain(event[self.domain_field]) and '_' not in event[self.domain_field] and \
event[self.domain_field].split('.')[-1] in self.tlds_list:
is_valid = True
else:
self.logger.debug(f"Filtered out event with search field {self.domain_field!r}.")

if is_valid:
self.send_message(event)
self.acknowledge_message()

def get_tlds_domain_list(self):
if os.path.isfile(self.tlds_domains_list):
with open(self.tlds_domains_list) as file:
next(file)
lines = {line.strip().lower() for line in file}
else:
raise ConfigurationError("File", f"TLD domain list file not found at {self.tlds_domains_list!r}.")
return lines

@classmethod
def run(cls, parsed_args=None):
if not parsed_args:
parsed_args = cls._create_argparser().parse_args()

if parsed_args.update_database:
cls.update_database()

else:
super().run(parsed_args=parsed_args)

@classmethod
def _create_argparser(cls):
argparser = super()._create_argparser()
argparser.add_argument("--update-database", action='store_true', help='downloads latest database data')
return argparser

@classmethod
def update_database(cls):
bots = {}
runtime_conf = get_bots_settings()
try:
for bot in runtime_conf:
if runtime_conf[bot]["module"] == __name__:
bots[bot] = runtime_conf[bot]["parameters"]["suffix_file"]

except KeyError as e:
sys.exit("Database update failed. Your configuration of {0} is missing key {1}.".format(bot, e))

if not bots:
print("Database update skipped. No bots of type {0} present in runtime.conf.".format(__name__))
sys.exit(0)

try:
session = create_request_session()
url = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
print("Downloading the latest database update...")
response = session.get(url)

if not response.ok:
sys.exit("Database update failed. Server responded: {0}.\n"
"URL: {1}".format(response.status_code, response.url))

except requests.exceptions.RequestException as e:
sys.exit("Database update failed. Connection Error: {0}".format(e))

for database_path in set(bots.values()):
database_dir = pathlib.Path(database_path).parent
database_dir.mkdir(parents=True, exist_ok=True)
with open(database_path, "wb") as database:
database.write(response.content)

print("Database updated. Reloading affected bots.")

ctl = IntelMQController()
for bot in bots.keys():
ctl.bot_reload(bot)


BOT = DomainValidExpertBot
Empty file.
104 changes: 104 additions & 0 deletions intelmq/tests/bots/experts/domain_valid/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
Domain validator

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.domain_valid.expert import DomainValidExpertBot
import os.path

EXAMPLE_INPUT_DROP = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': '-soltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"extra.firstseen": "2020-10-11T02:10:59+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_DROP_2 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'so6_ltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"extra.firstseen": "2020-10-11T02:10:59+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_DROP_3 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': '-apk.info',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_PASS = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'soltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_PASS_2 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'apk.info',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}


@test.skip_exotic()
class TestDomainValidExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for DomainValidExpertBot handling Reports.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = DomainValidExpertBot
cls.input_message = EXAMPLE_INPUT_DROP
cls.sysconfig = {'domain_field': 'source.fqdn',
'tlds_domains_list': str(os.path.join(os.path.dirname(__file__), 'tlds-alpha-by-domain.txt'))}

def test_expert_drop(self):
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_drop_2(self):
self.input_message = EXAMPLE_INPUT_DROP_2
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_drop_3(self):
self.input_message = EXAMPLE_INPUT_DROP_3
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_pass(self):
self.input_message = EXAMPLE_INPUT_PASS
self.run_bot()
self.assertOutputQueueLen(1)
self.assertMessageEqual(0, EXAMPLE_INPUT_PASS)

def test_expert_pass_2(self):
self.input_message = EXAMPLE_INPUT_PASS_2
self.run_bot()
self.assertOutputQueueLen(1)
self.assertMessageEqual(0, EXAMPLE_INPUT_PASS_2)


if __name__ == '__main__': # pragma: no cover
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Version 2021080500, Last Updated Thu Aug 5 07:07:02 2021 UTC # it is example file - Latest TLD list: https://data.iana.org/TLD/
AAA
CH
DIGITAL
INFO
XN--8Y0A063A
ZW