Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new bot: bro output file #1963

Merged
17 commits merged into from
Sep 17, 2021
21 changes: 21 additions & 0 deletions docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3299,6 +3299,27 @@ This output bot discards all incoming messages.
* `description`: discards messages


.. _intelmq.bots.outputs.bro_file.output:

Bro file
^^^^^^^^^

**Information**

* `name`: `intelmq.bots.outputs.bro_file.output`
* `lookup`: no
* `public`: yes
* `cache`: no
* `description`: BRO (zeek) file output

**Description**
File example:
```
#fields indicator indicator_type meta.desc meta.cif_confidence meta.source
xxx.xxx.xxx.xxx Intel::ADDR phishing 100 MISP XXX
www.testdomain.com Intel::DOMAIN apt 85 CERT
```

.. _intelmq.bots.outputs.elasticsearch.output:

Elasticsearch Output Bot
Expand Down
Empty file.
144 changes: 144 additions & 0 deletions intelmq/bots/outputs/bro_file/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
"""
Bro file output

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import datetime
import os
from collections import defaultdict
from pathlib import Path

from intelmq.lib.bot import OutputBot

BRO_INDICATOR_MAP = {
"source.ip": "Intel::ADDR",
"source.fqdn": "Intel::DOMAIN",
"source.url": "Intel::URL"
}

BRO_HEADER = "#fields\tindicator\tindicator_type\tmeta.desc\tmeta.cif_confidence\tmeta.source\n"


class BroFileOutputBot(OutputBot):
_file = None
encoding_errors_mode = 'strict'
file: str = "/opt/intelmq/var/lib/bots/file-output/bro"
format_filename: bool = False
hierarchical_output: bool = False
keep_raw_field: bool = False
message_jsondict_as_string: bool = False
message_with_type: bool = False
single_key: bool = False
is_multithreadable = False

def init(self):
# needs to be done here, because in process() FileNotFoundError handling we call init(),
# otherwise the file would not be opened again
self._file = None

self.logger.debug("Opening %r file.", self.file)
if not self.format_filename:
self.open_file(self.file)
self.logger.info("File %r is open.", self.file)

def open_file(self, filename: str = None):
if self._file is not None:
self._file.close()
try:
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.encoding_errors_mode)
self.add_bro_header()
except FileNotFoundError: # directory does not exist
path = Path(os.path.dirname(filename))
try:
path.mkdir(mode=0o755, parents=True, exist_ok=True)
except IOError:
self.logger.exception('Directory %r could not be created.', path)
self.stop()
else:
self._file = open(filename, mode='a+t', encoding='utf-8', errors=self.encoding_errors_mode)
self.add_bro_header()

def add_bro_header(self):
self._file.seek(0)
if self._file.readline() != BRO_HEADER:
self._file.write(BRO_HEADER)
self._file.flush()

def process(self):
event = self.receive_message()
if self.format_filename:
ev = defaultdict(None)
ev.update(event)
if 'time.observation' in ev:
try:
ev['time.observation'] = datetime.datetime.strptime(ev['time.observation'],
'%Y-%m-%dT%H:%M:%S+00:00')
except ValueError:
ev['time.observation'] = datetime.datetime.strptime(ev['time.observation'],
'%Y-%m-%dT%H:%M:%S.%f+00:00')
if 'time.source' in ev:
try:
ev['time.source'] = datetime.datetime.strptime(ev['time.source'],
'%Y-%m-%dT%H:%M:%S+00:00')
except ValueError:
ev['time.source'] = datetime.datetime.strptime(ev['time.source'],
'%Y-%m-%dT%H:%M:%S.%f+00:00')
filename = self.file.format(event=ev)
if not self.file or filename != self._file.name:
self.open_file(filename)
acknowledge_message = False
for indicator_type in BRO_INDICATOR_MAP.keys():
if event.get(indicator_type):
event_data = ''
event_data += event[indicator_type] + '\t'
event_data += BRO_INDICATOR_MAP[indicator_type] + '\t'
if "extra.tags" in event and "apt" in event["extra.tags"]:
event_data += 'apt' + '\t'
else:
event_data += event['classification.type'] + '\t'
event_data += str(int(event['feed.accuracy'])) + '\t'
if 'extra.orgc' in event:
event_data += event['extra.orgc']
elif 'feed.provider' in event:
event_data += event['feed.provider']
event_data += '\n'
try:
self._file.write(event_data)
self._file.flush()
except FileNotFoundError:
self.init()
else:
if not acknowledge_message:
self.acknowledge_message()
acknowledge_message = True
else:
if not acknowledge_message:
self.acknowledge_message()
acknowledge_message = True
self.logger.debug(str(indicator_type) + "Event did not have Bro indicator types.")

def shutdown(self):
if self._file:
self._file.close()

@staticmethod
def check(parameters):
if 'file' not in parameters:
return [["error", "Parameter 'file' not given."]]
dirname = os.path.dirname(parameters['file'])
if not os.path.exists(dirname) and '{ev' not in dirname:
path = Path(dirname)
try:
path.mkdir(mode=0o755, parents=True, exist_ok=True)
except IOError:
return [
["error", "Directory (%r) of parameter 'file' does not exist and could not be created." % dirname]]
else:
return [
["info", "Directory (%r) of parameter 'file' did not exist, but has now been created." % dirname]]


BOT = BroFileOutputBot
Empty file.
76 changes: 76 additions & 0 deletions intelmq/tests/bots/outputs/bro_file/test_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
"""
Bro file output

SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""

import os
import tempfile
import unittest

import intelmq.lib.test as test
from intelmq.bots.outputs.bro_file.output import BroFileOutputBot

INPUT = {
"__type": "Event",
"classification.type": "infected-system",
"feed.url": "http://feed.url",
"feed.accuracy": 80.0,
"source.asn": 64496,
"extra.elastic_id": "elasticid1",
"source.ip": "192.0.2.1",
"feed.name": "Example Feed",
"source.url": "http://192.0.2.1",
}
INPUT1 = {
"__type": "Event",
"classification.type": "infected-system",
"feed.url": "http://feed.url",
"feed.accuracy": 80.0,
"extra.elastic_id": "elasticid2",
"source.asn": 64496,
"feed.name": "Example Feed",
}


class TestBroFileOutputBot(test.BotTestCase, unittest.TestCase):

@classmethod
def set_bot(cls):
cls.bot_reference = BroFileOutputBot
cls.os_fp, cls.filename = tempfile.mkstemp()
cls.sysconfig = {"hierarchical_output": True,
"file": cls.filename}

def test_event(self):
self.input_message = INPUT
self.run_bot()
filepointer = os.fdopen(self.os_fp, 'rt')
filepointer.seek(0)
file_lines = filepointer.readlines()
self.assertEqual(
'#fields indicator indicator_type meta.desc meta.cif_confidence meta.source\n',
file_lines[0])
self.assertEqual(
'192.0.2.1 Intel::ADDR infected-system 80 \n',
file_lines[1])
self.assertEqual(
'http://192.0.2.1 Intel::URL infected-system 80 \n',
file_lines[2])
self.assertEqual(3, len(file_lines))
filepointer.close()

def test_event_did_not_have_bro_indicators(self):
self.input_message = INPUT1
self.run_bot()
self.assertLogMatches(pattern="source.ipEvent did not have Bro indicator types.", levelname="DEBUG")

@classmethod
def tearDownClass(cls):
os.remove(cls.filename)


if __name__ == '__main__': # pragma: no cover
unittest.main()