-
Notifications
You must be signed in to change notification settings - Fork 297
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add RPZ file output bot * Small changes * Small changes * Add documentation * Update header for more readable * Add license * fix tests * Add rpz description
- Loading branch information
1 parent
9ee0daf
commit dcf6f0e
Showing
5 changed files
with
318 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
RPZ file output | ||
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]> | ||
SPDX-License-Identifier: AGPL-3.0-or-later | ||
""" | ||
|
||
from datetime import datetime | ||
import os | ||
from collections import defaultdict | ||
from pathlib import Path | ||
|
||
from intelmq.lib.bot import OutputBot | ||
|
||
RPZ_INDICATOR_MAP = { | ||
"source.fqdn": "Intel::DOMAIN" | ||
} | ||
|
||
now = datetime.now() # for timestamp | ||
|
||
|
||
class RpzFileOutputBot(OutputBot): | ||
_file = None | ||
format_filename: bool = False | ||
__Bot_is_multithreadable = False | ||
|
||
encoding_errors_mode = 'strict' | ||
file: str = "/opt/intelmq/var/lib/bots/file-output/rpz" | ||
|
||
cname: str = "" | ||
organization_name: str = '' | ||
rpz_domain: str = '' | ||
hostmaster_rpz_domain: str = '' | ||
rpz_email: str = '' | ||
ttl: int = 3600 | ||
ncachttl: int = 60 | ||
serial: str = now.strftime("%y%m%d%H%M") | ||
refresh: int = 60 | ||
retry: int = 60 | ||
expire: int = 432000 | ||
test_domain: str = '' | ||
|
||
_rpz_header = "" | ||
|
||
def init(self): | ||
self._file = None | ||
|
||
self.set_rpz_header() | ||
|
||
self.logger.debug("Opening %r file.", self.file) | ||
self.errors = getattr(self, 'encoding_errors_mode', 'strict') | ||
if not self.format_filename: | ||
self.open_file(self.file) | ||
self.logger.info("File %r is open.", self.file) | ||
|
||
def set_rpz_header(self): | ||
generate_time: str = now.strftime("%Y-%m-%d %H:%M:%S") | ||
self._rpz_header = f"""$TTL """ + str(self.ttl) + """ | ||
@ SOA """ + self.rpz_domain + """. """ + self.hostmaster_rpz_domain + """. """ + str(self.serial) + """ """ + str(self.refresh) + """ """ + str(self.retry) + """ """ + str(self.expire) + """ """ + str(self.ncachttl) + """ | ||
NS localhost. | ||
; | ||
; """ + self.organization_name + """ Response Policy Zones (RPZ) | ||
; Last updated: """ + str(generate_time) + """ (UTC) | ||
; | ||
; Terms Of Use: https://""" + self.rpz_domain + """ | ||
; For questions please contact """ + self.rpz_email + """ | ||
;\n""" | ||
|
||
if self.test_domain: | ||
self._rpz_header = self._rpz_header + self.test_domain + " CNAME " + self.cname + ".\n" # for test | ||
|
||
def open_file(self, filename: str = None): | ||
if self._file is not None: | ||
self._file.close() | ||
|
||
try: | ||
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.errors) | ||
self.add_rpz_header() | ||
except FileNotFoundError: # directory does not exist | ||
path = Path(os.path.dirname(filename)) | ||
try: | ||
path.mkdir(mode=0o755, parents=True, exist_ok=True) | ||
except IOError: | ||
self.logger.exception('Directory %r could not be created.', path) | ||
self.stop() | ||
else: | ||
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.errors) | ||
self.add_rpz_header() | ||
|
||
def add_rpz_header(self): | ||
self._file.seek(0) | ||
if not (len(self._file.read())): | ||
self._file.write(self._rpz_header) | ||
self._file.flush() | ||
|
||
def process(self): | ||
event = self.receive_message() | ||
if self.format_filename: | ||
ev = defaultdict(None) | ||
ev.update(event) | ||
if 'time.observation' in ev: | ||
try: | ||
ev['time.observation'] = datetime.strptime(ev['time.observation'], | ||
'%Y-%m-%dT%H:%M:%S+00:00') | ||
except ValueError: | ||
ev['time.observation'] = datetime.strptime(ev['time.observation'], | ||
'%Y-%m-%dT%H:%M:%S.%f+00:00') | ||
if 'time.source' in ev: | ||
try: | ||
ev['time.source'] = datetime.strptime(ev['time.source'], | ||
'%Y-%m-%dT%H:%M:%S+00:00') | ||
except ValueError: | ||
ev['time.source'] = datetime.strptime(ev['time.source'], | ||
'%Y-%m-%dT%H:%M:%S.%f+00:00') | ||
filename = self.file.format(event=ev) | ||
if not self.file or filename != self._file.name: | ||
self.open_file(filename) | ||
acknowledge_message = False | ||
for indicator_type in RPZ_INDICATOR_MAP.keys(): | ||
if event.get(indicator_type): | ||
domain_url = event['source.fqdn'] | ||
if domain_url.startswith('www.'): | ||
domain_url = domain_url[len('www.'):] | ||
event_data = domain_url + ' CNAME ' + self.cname + '.\n' | ||
event_data = event_data + '*.' + domain_url + ' CNAME ' + self.cname + '.\n' | ||
|
||
try: | ||
self._file.write(event_data) | ||
self._file.flush() | ||
except FileNotFoundError: | ||
self.init() | ||
else: | ||
if not acknowledge_message: | ||
self.acknowledge_message() | ||
acknowledge_message = True | ||
else: | ||
if not acknowledge_message: | ||
self.acknowledge_message() | ||
acknowledge_message = True | ||
self.logger.debug("Event did not have RPZ indicator types.") | ||
|
||
def shutdown(self): | ||
if self._file: | ||
self._file.close() | ||
|
||
@staticmethod | ||
def check(parameters): | ||
if 'file' not in parameters: | ||
return [["error", "Parameter 'file' not given."]] | ||
dirname = os.path.dirname(parameters['file']) | ||
if not os.path.exists(dirname) and '{ev' not in dirname: | ||
path = Path(dirname) | ||
try: | ||
path.mkdir(mode=0o755, parents=True, exist_ok=True) | ||
except IOError: | ||
return [ | ||
["error", "Directory (%r) of parameter 'file' does not exist and could not be created." % dirname]] | ||
else: | ||
return [ | ||
["info", "Directory (%r) of parameter 'file' did not exist, but has now been created." % dirname]] | ||
|
||
|
||
BOT = RpzFileOutputBot |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
RPZ file output | ||
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]> | ||
SPDX-License-Identifier: AGPL-3.0-or-later | ||
""" | ||
|
||
import os | ||
import tempfile | ||
import unittest | ||
|
||
import intelmq.lib.test as test | ||
from intelmq.bots.outputs.rpz_file.output import RpzFileOutputBot | ||
|
||
INPUT = { | ||
'__type': 'Event', | ||
'feed.accuracy': 100.0, | ||
'feed.name': 'MISP events', | ||
'feed.provider': 'MISP BAE', | ||
'time.observation': '2020-10-20T12:57:33+00:00', | ||
'feed.url': 'https://sig01.threatreveal.com', | ||
'source.fqdn': 'cuhk.edu.hk.itlf.cf', | ||
'misp.event_uuid': '5f6dbd1d-9d04-4795-a0cd-36bd0a09d489', | ||
'misp.attribute_uuid': '3cd903ab-329e-444e-a0d9-fdf6dcc584d8', | ||
'comment': '{"date": "2020-10-19", "priority": 3, "confidence": 5, "comment": "", "context": "Phishing", "expiryDate": "2025-10-18"}', | ||
'event_description.text': 'Network activity', | ||
'event_description.url': 'https://sig01.threatreveal.com/events/view/150354', | ||
'classification.type': 'phishing', | ||
'time.source': '2020-10-20T12:57:33+00:00', | ||
'extra.tags': ['signatures', 'tlp:amber', 'cybercrime', 'misp-galaxy:threat-actor="cobalt dickens"'], | ||
'tlp': 'AMBER', | ||
'extra.orgc': 'bae systems', | ||
'extra.to_ids': True, | ||
'extra.first_seen': '2020-10-19', | ||
'extra.confidence': 'high', | ||
'extra.valid_to': '2025-10-18', | ||
'extra.elastic_index': 'cti-2020-10', | ||
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} | ||
|
||
INPUT_2 = { | ||
'__type': 'Event', | ||
'feed.accuracy': 100.0, | ||
'feed.name': 'MISP events', | ||
'feed.provider': 'MISP BAE', | ||
'time.observation': '2020-10-20T12:57:33+00:00', | ||
'feed.url': 'https://sig01.threatreveal.com', | ||
'source.fqdn': 'pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital', | ||
'misp.event_uuid': '5f6dbd1d-9d04-4795-a0cd-36bd0a09d489', | ||
'misp.attribute_uuid': '3cd903ab-329e-444e-a0d9-fdf6dcc584d8', | ||
'comment': '{"date": "2020-10-19", "priority": 3, "confidence": 5, "comment": "", "context": "Phishing", "expiryDate": "2025-10-18"}', | ||
'event_description.text': 'Network activity', | ||
'event_description.url': 'https://sig01.threatreveal.com/events/view/150354', | ||
'classification.type': 'phishing', | ||
'time.source': '2020-10-20T12:57:33+00:00', | ||
'extra.tags': ['signatures', 'tlp:amber', 'cybercrime', 'misp-galaxy:threat-actor="cobalt dickens"'], | ||
'tlp': 'AMBER', | ||
'extra.orgc': 'bae systems', | ||
'extra.to_ids': True, | ||
'extra.first_seen': '2020-10-19', | ||
'extra.confidence': 'high', | ||
'extra.valid_to': '2025-10-18', | ||
'extra.elastic_index': 'cti-2020-10', | ||
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'} | ||
|
||
|
||
class TestFileOutputBot(test.BotTestCase, unittest.TestCase): | ||
|
||
@classmethod | ||
def set_bot(cls): | ||
cls.bot_reference = RpzFileOutputBot | ||
cls.os_fp, cls.filename = tempfile.mkstemp() | ||
cls.sysconfig = {"hierarchical_output": True, | ||
"file": cls.filename, | ||
"cname": 'cert.aa' | ||
} | ||
|
||
def test_event2(self): | ||
self.input_message = INPUT_2 | ||
self.run_bot() | ||
filepointer = os.fdopen(self.os_fp, 'rt') | ||
filepointer.seek(0) | ||
file_lines = filepointer.readlines() | ||
self.assertEqual('; Response Policy Zones (RPZ)\n', file_lines[4]) | ||
self.assertEqual( | ||
'pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital CNAME cert.aa.\n', | ||
file_lines[10]) | ||
self.assertEqual( | ||
'*.pw.aparat.com.torojoonemadaretkarkonkhasteshodamdigeenqadtestzadam.filterchipedaramodarovordibekeshbiroon.dollarshode20000tomanbaskondigeh.salavatemohammadibefres.soltane-tel-injas-heh.digital CNAME cert.aa.\n', | ||
file_lines[11]) | ||
self.assertEqual(12, len(file_lines)) | ||
filepointer.close() | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
os.remove(cls.filename) | ||
|
||
|
||
if __name__ == '__main__': # pragma: no cover | ||
unittest.main() |