Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPZ file output bot #1962

Merged
8 commits merged into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3652,6 +3652,58 @@ REST API
* `use_json`: boolean


.. _intelmq.bots.outputs.rpz_file.output:

RPZ
^^^^^^^^

The DNS RPZ functionality is "DNS firewall". Bot generate a blocklist.

**Information**

* `name:` `intelmq.bots.outputs.rpz_file.output`
* `lookup:` no
* `public:` yes
* `cache (redis db):` none
* `description:` Generate RPZ file

**Configuration Parameters**

* `cname`: example rpz.yourdomain.eu
* `organization_name`: Your organisation name
* `rpz_domain`: Information website about RPZ
* `hostmaster_rpz_domain`: Technical website
* `rpz_email`: Contact email
* `ttl`: Time to live
* `ncachttl`: DNS negative cache
* `serial`: Time stamp or another numbering
* `refresh`: Refresh time
* `retry`: Retry time
* `expire`: Expiration time
* `test_domain`: For test domain, it's added in first rpz file (after header)

File example:
```
$TTL 3600
@ SOA rpz.yourdomain.eu. hostmaster.rpz.yourdomain.eu. 2105260601 60 60 432000 60
NS localhost.
;
; yourdomain.eu. CERT.XX Response Policy Zones (RPZ)
; Last updated: 2021-05-26 06:01:41 (UTC)
;
; Terms Of Use: https://rpz.yourdomain.eu
; For questions please contact rpz [at] yourdomain.eu
;
*.maliciousdomain.com CNAME rpz.yourdomain.eu.
*.secondmaliciousdomain.com CNAME rpz.yourdomain.eu.
```

**Description**

The prime motivation for creating this feature was to protect users from badness on the Internet related to known-malicious global identifiers such as host names, domain names, IP addresses, or nameservers.
More information: https://dnsrpz.info


.. _intelmq.bots.outputs.smtp.output:

SMTP Output Bot
Expand Down
Empty file.
165 changes: 165 additions & 0 deletions intelmq/bots/outputs/rpz_file/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-

"""
RPZ file output

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

from datetime import datetime
import os
from collections import defaultdict
from pathlib import Path

from intelmq.lib.bot import OutputBot

RPZ_INDICATOR_MAP = {
"source.fqdn": "Intel::DOMAIN"
}

now = datetime.now() # for timestamp


class RpzFileOutputBot(OutputBot):
_file = None
format_filename: bool = False
__Bot_is_multithreadable = False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
__Bot_is_multithreadable = False
_is_multithreadable = False

That was recently fixed/changed in 5c91a8c you could not have foreseen this


encoding_errors_mode = 'strict'
file: str = "/opt/intelmq/var/lib/bots/file-output/rpz"

cname: str = ""
organization_name: str = ''
rpz_domain: str = ''
hostmaster_rpz_domain: str = ''
rpz_email: str = ''
ttl: int = 3600
ncachttl: int = 60
serial: str = now.strftime("%y%m%d%H%M")
refresh: int = 60
retry: int = 60
expire: int = 432000
test_domain: str = ''

_rpz_header = ""

def init(self):
self._file = None

self.set_rpz_header()

self.logger.debug("Opening %r file.", self.file)
self.errors = getattr(self, 'encoding_errors_mode', 'strict')
if not self.format_filename:
self.open_file(self.file)
self.logger.info("File %r is open.", self.file)

def set_rpz_header(self):
generate_time: str = now.strftime("%Y-%m-%d %H:%M:%S")
self._rpz_header = f"""$TTL """ + str(self.ttl) + """
@ SOA """ + self.rpz_domain + """. """ + self.hostmaster_rpz_domain + """. """ + str(self.serial) + """ """ + str(self.refresh) + """ """ + str(self.retry) + """ """ + str(self.expire) + """ """ + str(self.ncachttl) + """
NS localhost.
;
; """ + self.organization_name + """ Response Policy Zones (RPZ)
; Last updated: """ + str(generate_time) + """ (UTC)
;
; Terms Of Use: https://""" + self.rpz_domain + """
; For questions please contact """ + self.rpz_email + """
;\n"""

if self.test_domain:
self._rpz_header = self._rpz_header + self.test_domain + " CNAME " + self.cname + ".\n" # for test

def open_file(self, filename: str = None):
if self._file is not None:
self._file.close()

try:
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.errors)
self.add_rpz_header()
except FileNotFoundError: # directory does not exist
path = Path(os.path.dirname(filename))
try:
path.mkdir(mode=0o755, parents=True, exist_ok=True)
except IOError:
self.logger.exception('Directory %r could not be created.', path)
self.stop()
else:
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.errors)
self.add_rpz_header()

def add_rpz_header(self):
self._file.seek(0)
if not (len(self._file.read())):
self._file.write(self._rpz_header)
self._file.flush()

def process(self):
event = self.receive_message()
if self.format_filename:
ev = defaultdict(None)
ev.update(event)
if 'time.observation' in ev:
try:
ev['time.observation'] = datetime.strptime(ev['time.observation'],
'%Y-%m-%dT%H:%M:%S+00:00')
except ValueError:
ev['time.observation'] = datetime.strptime(ev['time.observation'],
'%Y-%m-%dT%H:%M:%S.%f+00:00')
if 'time.source' in ev:
try:
ev['time.source'] = datetime.strptime(ev['time.source'],
'%Y-%m-%dT%H:%M:%S+00:00')
except ValueError:
ev['time.source'] = datetime.strptime(ev['time.source'],
'%Y-%m-%dT%H:%M:%S.%f+00:00')
filename = self.file.format(event=ev)
if not self.file or filename != self._file.name:
self.open_file(filename)
acknowledge_message = False
for indicator_type in RPZ_INDICATOR_MAP.keys():
if event.get(indicator_type):
domain_url = event['source.fqdn']
if domain_url.startswith('www.'):
domain_url = domain_url[len('www.'):]
event_data = domain_url + ' CNAME ' + self.cname + '.\n'
event_data = event_data + '*.' + domain_url + ' CNAME ' + self.cname + '.\n'

try:
self._file.write(event_data)
self._file.flush()
except FileNotFoundError:
self.init()
else:
if not acknowledge_message:
self.acknowledge_message()
acknowledge_message = True
else:
if not acknowledge_message:
self.acknowledge_message()
acknowledge_message = True
self.logger.debug("Event did not have RPZ indicator types.")

def shutdown(self):
if self._file:
self._file.close()

@staticmethod
def check(parameters):
if 'file' not in parameters:
return [["error", "Parameter 'file' not given."]]
dirname = os.path.dirname(parameters['file'])
if not os.path.exists(dirname) and '{ev' not in dirname:
path = Path(dirname)
try:
path.mkdir(mode=0o755, parents=True, exist_ok=True)
except IOError:
return [
["error", "Directory (%r) of parameter 'file' does not exist and could not be created." % dirname]]
else:
return [
["info", "Directory (%r) of parameter 'file' did not exist, but has now been created." % dirname]]


BOT = RpzFileOutputBot
Empty file.
101 changes: 101 additions & 0 deletions intelmq/tests/bots/outputs/rpz_file/test_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-

"""
RPZ file output

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import os
import tempfile
import unittest

import intelmq.lib.test as test
from intelmq.bots.outputs.rpz_file.output import RpzFileOutputBot

INPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'cuhk.edu.hk.itlf.cf',
'misp.event_uuid': '5f6dbd1d-9d04-4795-a0cd-36bd0a09d489',
'misp.attribute_uuid': '3cd903ab-329e-444e-a0d9-fdf6dcc584d8',
'comment': '{"date": "2020-10-19", "priority": 3, "confidence": 5, "comment": "", "context": "Phishing", "expiryDate": "2025-10-18"}',
'event_description.text': 'Network activity',
'event_description.url': 'https://sig01.threatreveal.com/events/view/150354',
'classification.type': 'phishing',
'time.source': '2020-10-20T12:57:33+00:00',
'extra.tags': ['signatures', 'tlp:amber', 'cybercrime', 'misp-galaxy:threat-actor="cobalt dickens"'],
'tlp': 'AMBER',
'extra.orgc': 'bae systems',
'extra.to_ids': True,
'extra.first_seen': '2020-10-19',
'extra.confidence': 'high',
'extra.valid_to': '2025-10-18',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

INPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital',
'misp.event_uuid': '5f6dbd1d-9d04-4795-a0cd-36bd0a09d489',
'misp.attribute_uuid': '3cd903ab-329e-444e-a0d9-fdf6dcc584d8',
'comment': '{"date": "2020-10-19", "priority": 3, "confidence": 5, "comment": "", "context": "Phishing", "expiryDate": "2025-10-18"}',
'event_description.text': 'Network activity',
'event_description.url': 'https://sig01.threatreveal.com/events/view/150354',
'classification.type': 'phishing',
'time.source': '2020-10-20T12:57:33+00:00',
'extra.tags': ['signatures', 'tlp:amber', 'cybercrime', 'misp-galaxy:threat-actor="cobalt dickens"'],
'tlp': 'AMBER',
'extra.orgc': 'bae systems',
'extra.to_ids': True,
'extra.first_seen': '2020-10-19',
'extra.confidence': 'high',
'extra.valid_to': '2025-10-18',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}


class TestFileOutputBot(test.BotTestCase, unittest.TestCase):

@classmethod
def set_bot(cls):
cls.bot_reference = RpzFileOutputBot
cls.os_fp, cls.filename = tempfile.mkstemp()
cls.sysconfig = {"hierarchical_output": True,
"file": cls.filename,
"cname": 'cert.aa'
}

def test_event2(self):
self.input_message = INPUT_2
self.run_bot()
filepointer = os.fdopen(self.os_fp, 'rt')
filepointer.seek(0)
file_lines = filepointer.readlines()
self.assertEqual('; Response Policy Zones (RPZ)\n', file_lines[4])
self.assertEqual(
'pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital CNAME cert.aa.\n',
file_lines[10])
self.assertEqual(
'*.pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital CNAME cert.aa.\n',
file_lines[11])
self.assertEqual(12, len(file_lines))
filepointer.close()

@classmethod
def tearDownClass(cls):
os.remove(cls.filename)


if __name__ == '__main__': # pragma: no cover
unittest.main()