Skip to content

Commit

Permalink
Add new bot: domain valid (#1966)
Browse files Browse the repository at this point in the history
* Add new bot: domain valid

* Add license, add description

* Add requirements

* Update domain valid bot description, tests

* Update domain valid bot description, reduce data file

* Add cron and update TLD file function

* fix

* fix

* fix

* fix

* fix

* fix

* Update after develop branch merge

Co-authored-by: Wagner <[email protected]>
  • Loading branch information
2 people authored and waldbauer-certat committed Aug 16, 2021
1 parent 2ebf79a commit 472fd97
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 0 deletions.
2 changes: 2 additions & 0 deletions debian/cron.d/intelmq-update-database
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
45 1 * * * intelmq intelmq.bots.experts.domain_suffix.expert --update-database
# Update database for recordedfuture_iprisk bot, update frequency is unknown:
27 1 * * * intelmq intelmq.bots.experts.recordedfuture_iprisk.expert --update-database
# Update database for domain_valid bot, updated daily:
50 1 * * * intelmq intelmq.bots.experts.domain_valid.expert --update-database
27 changes: 27 additions & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,33 @@ Use this command to create/update the database and reload the bot:
intelmq.bots.experts.domain_suffix.expert --update-database
.. _intelmq.bots.experts.domain_valid.expert:

Domain valid
^^^^^^^^^^^^

**Information**

* `name:` `intelmq.bots.experts.domain_valid.expert`
* `lookup:` no
* `public:` yes
* `cache (redis db):` none
* `description:` Checks if a domain is valid by performing multiple validity checks (see below).

**Configuration Parameters**

* `domain_field`: The name of the field to be validated.
* `tlds_domains_list`: local file with all valid TLDs, default location ``/opt/intelmq/var/lib/bots/domain_valid/tlds-alpha-by-domain.txt``

**Description**

If the field given in `domain_field` does not exist in the event, the event is dropped.
If the domain contains underscores (``_``), the event is dropped.
If the domain is not valid according to the `validators library <https://pypi.org/project/validators/>`_, the event is dropped.
If the domain's last part (the TLD) is not in the TLD-list configured by parameter ``tlds_domains_list``, the field is dropped.
Latest TLD list: https://data.iana.org/TLD/


.. _intelmq.bots.experts.deduplicator.expert:

Deduplicator
Expand Down
4 changes: 4 additions & 0 deletions intelmq/bots/experts/domain_valid/REQUIREMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
# SPDX-License-Identifier: AGPL-3.0-or-later

validators
Empty file.
119 changes: 119 additions & 0 deletions intelmq/bots/experts/domain_valid/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""
Domain validator
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

try:
import validators
except ImportError:
validators = None

import os.path
import pathlib
import sys

import requests.exceptions

from intelmq.lib.bot import Bot
from intelmq.lib.exceptions import MissingDependencyError, ConfigurationError
from intelmq.lib.utils import get_bots_settings, create_request_session
from intelmq.bin.intelmqctl import IntelMQController


class DomainValidExpertBot(Bot):
domain_field: str = 'source.fqdn'
tlds_domains_list: str = '/opt/intelmq/var/lib/bots/domain_valid/tlds-alpha-by-domain.txt'

def init(self):
if validators is None:
raise MissingDependencyError("validators")
self.tlds_list = self.get_tlds_domain_list()

def process(self):
event = self.receive_message()
is_valid = False
if self.domain_field in event:
if validators.domain(event[self.domain_field]) and '_' not in event[self.domain_field] and \
event[self.domain_field].split('.')[-1] in self.tlds_list:
is_valid = True
else:
self.logger.debug(f"Filtered out event with search field {self.domain_field!r}.")

if is_valid:
self.send_message(event)
self.acknowledge_message()

def get_tlds_domain_list(self):
if os.path.isfile(self.tlds_domains_list):
with open(self.tlds_domains_list) as file:
lines = {line.strip().lower() for line in file if not line.startswith('#')}
else:
raise ConfigurationError("File", f"TLD domain list file not found at {self.tlds_domains_list!r}.")
return lines

@classmethod
def run(cls, parsed_args=None):
if not parsed_args:
parsed_args = cls._create_argparser().parse_args()

if parsed_args.update_database:
cls.update_database(verbose=parsed_args.verbose)
else:
super().run(parsed_args=parsed_args)

@classmethod
def _create_argparser(cls):
argparser = super()._create_argparser()
argparser.add_argument("--update-database", action='store_true', help='downloads latest database data')
argparser.add_argument("--verbose", action='store_true', help='be verbose')
return argparser

@classmethod
def update_database(cls, verbose=False):
bots = {}
runtime_conf = get_bots_settings()
try:
for bot in runtime_conf:
if runtime_conf[bot]["module"] == __name__:
bots[bot] = runtime_conf[bot]["parameters"]["tlds_domains_list"]

except KeyError as e:
sys.exit("Database update failed. Your configuration of {0} is missing key {1}.".format(bot, e))

if not bots:
if verbose:
print("Database update skipped. No bots of type {0} present in runtime.conf.".format(__name__))
sys.exit(0)

try:
session = create_request_session()
url = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
if verbose:
print("Downloading the latest database update...")
response = session.get(url)

if not response.ok:
sys.exit("Database update failed. Server responded: {0}.\n"
"URL: {1}".format(response.status_code, response.url))

except requests.exceptions.RequestException as e:
sys.exit("Database update failed. Connection Error: {0}".format(e))

for database_path in set(bots.values()):
database_dir = pathlib.Path(database_path).parent
database_dir.mkdir(parents=True, exist_ok=True)
with open(database_path, "wb") as database:
database.write(response.content)

if verbose:
print("Database updated. Reloading affected bots.")

ctl = IntelMQController()
for bot in bots.keys():
ctl.bot_reload(bot)


BOT = DomainValidExpertBot
Empty file.
104 changes: 104 additions & 0 deletions intelmq/tests/bots/experts/domain_valid/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
"""
Domain validator
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.domain_valid.expert import DomainValidExpertBot
import os.path

EXAMPLE_INPUT_DROP = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': '-soltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"extra.firstseen": "2020-10-11T02:10:59+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_DROP_2 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'so6_ltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"extra.firstseen": "2020-10-11T02:10:59+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_DROP_3 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': '-apk.info',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_PASS = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'soltane-tel-injas-heh.digital',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}
EXAMPLE_INPUT_PASS_2 = {
"__type": "Event",
"feed.accuracy": 90.0,
"feed.name": "Feodo Tracker IPs",
"feed.provider": "abuse.ch",
'source.fqdn': 'apk.info',
"time.observation": "2020-10-13T06:14:49+00:00",
"time.source": "2020-10-13T00:00:00+00:00"
}


@test.skip_exotic()
class TestDomainValidExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for DomainValidExpertBot handling Reports.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = DomainValidExpertBot
cls.input_message = EXAMPLE_INPUT_DROP
cls.sysconfig = {'domain_field': 'source.fqdn',
'tlds_domains_list': str(os.path.join(os.path.dirname(__file__), 'tlds-alpha-by-domain.txt'))}

def test_expert_drop(self):
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_drop_2(self):
self.input_message = EXAMPLE_INPUT_DROP_2
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_drop_3(self):
self.input_message = EXAMPLE_INPUT_DROP_3
self.run_bot()
self.assertOutputQueueLen(0)

def test_expert_pass(self):
self.input_message = EXAMPLE_INPUT_PASS
self.run_bot()
self.assertOutputQueueLen(1)
self.assertMessageEqual(0, EXAMPLE_INPUT_PASS)

def test_expert_pass_2(self):
self.input_message = EXAMPLE_INPUT_PASS_2
self.run_bot()
self.assertOutputQueueLen(1)
self.assertMessageEqual(0, EXAMPLE_INPUT_PASS_2)


if __name__ == '__main__': # pragma: no cover
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Version 2021080500, Last Updated Thu Aug 5 07:07:02 2021 UTC # it is example file - Latest TLD list: https://data.iana.org/TLD/
AAA
CH
DIGITAL
INFO
XN--8Y0A063A
ZW
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later

0 comments on commit 472fd97

Please sign in to comment.