Skip to content

Commit

Permalink
Merge pull request #2192 from wagner-intevation/fix-bot-dump
Browse files Browse the repository at this point in the history
ParserBot: Fix line recovery and message dumping
  • Loading branch information
sebix authored Jul 15, 2022
2 parents ee28f6e + 62ce61c commit 2d7b9ca
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ CHANGELOG
The `LogLevel` and `ReturnType` Enums were added to `intelmq.lib.datatypes`.
- `intelmq.lib.bot`:
- Enhance behaviour if an unconfigured bot is started (PR#2054 by Sebastian Wagner).
- Fix line recovery and message dumping of the `ParserBot` (PR#2192 by Sebastian Wagner).
- Previously the dumped message was always the last message of a report if the report contained muliple lines leading to data-loss.

### Development

Expand Down Expand Up @@ -72,6 +74,7 @@ CHANGELOG
- `intelmq.bots.parsers.shadowserver._config`:
- Added support for `Accessible AMQP`, `Device Identification Report` (IPv4 and IPv6) (PR#2134 by Mateo Durante).
- Added file name mapping for `SSL-POODLE-Vulnerable-Servers IPv6` (file name `scan6_ssl_poodle`) (PR#2134 by Mateo Durante).
- `intelmq.bots.parsers.generic.parser_csv`: Use RewindableFileHandle to use the original current line for line recovery (PR#2192 by Sebastian Wagner).

#### Experts
- `intelmq.bots.experts.domain_valid`: New bot for checking domain's validity (PR#1966 by Marius Karotkis).
Expand Down
4 changes: 2 additions & 2 deletions intelmq/bots/parsers/abusech/parser_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def parse(self, report: dict):
for line in data_lines:
yield line.strip()

def parse_line(self, line, report):
def parse_line(self, line: str, report):
event = self.new_event(report)
self.__process_defaults(event, line, report['feed.url'])
self.__process_fields(event, line, report['feed.url'])
Expand Down Expand Up @@ -114,7 +114,7 @@ def __process_additional(event, line, feed_url):
def __sanitize_csv_lines(s: str):
return s.replace('"', '')

def recover_line(self, line):
def recover_line(self, line: str):
return '\n'.join(self.comments + [self.header_line, line])


Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/parsers/cymru/parser_full_bogons.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def parse(self, report):
for row in raw_report.splitlines():
yield row.strip()

def parse_line(self, val, report):
def parse_line(self, val: str, report):
if not len(val) or val.startswith('#') or val.startswith('//'):
return

Expand Down
7 changes: 5 additions & 2 deletions intelmq/bots/parsers/generic/parser_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from intelmq.lib.bot import ParserBot
from intelmq.lib.exceptions import InvalidArgument, InvalidValue
from intelmq.lib.harmonization import DateTime
from intelmq.lib.utils import RewindableFileHandle

TIME_CONVERSIONS = {'timestamp': DateTime.from_timestamp,
'windows_nt': DateTime.from_windows_nt,
Expand Down Expand Up @@ -101,8 +102,10 @@ def parse(self, report):
if self.skip_header:
self.tempdata.append(raw_report[:raw_report.find('\n')])
raw_report = raw_report[raw_report.find('\n') + 1:]
for row in csv.reader(io.StringIO(raw_report),
self._handle = RewindableFileHandle(io.StringIO(raw_report))
for row in csv.reader(self._handle,
delimiter=str(self.delimiter)):
self._current_line = self._handle.current_line

if self.filter_text and self.filter_type:
text_in_row = self.filter_text in self.delimiter.join(row)
Expand All @@ -115,7 +118,7 @@ def parse(self, report):
else:
yield row

def parse_line(self, row, report):
def parse_line(self, row: list, report):
event = self.new_event(report)

for keygroup, value, required in zip(self.columns, row, self.columns_required):
Expand Down
8 changes: 6 additions & 2 deletions intelmq/bots/parsers/microsoft/parser_ctip.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,23 @@ class MicrosoftCTIPParserBot(ParserBot):
def parse(self, report):
raw_report = utils.base64_decode(report.get("raw"))
if raw_report.startswith('['):
# Interflow
self.recover_line = self.recover_line_json
yield from self.parse_json(report)
elif raw_report.startswith('{'):
# Azure
self.recover_line = self.recover_line_json_stream
yield from self.parse_json_stream(report)
else:
raise ValueError("Can't parse the received message. It is neither a JSON list nor a JSON dictionary. Please report this bug.")

def parse_line(self, line, report):
if line.get('version', None) == 1.5:
yield from self.parse_interflow(line, report)
else:
yield from self.parse_azure(line, report)

def parse_interflow(self, line, report):
def parse_interflow(self, line: dict, report):
raw = self.recover_line(line)
if line['indicatorthreattype'] != 'Botnet':
raise ValueError('Unknown indicatorthreattype %r, only Botnet is supported.' % line['indicatorthreattype'])
Expand Down Expand Up @@ -257,7 +261,7 @@ def parse_interflow(self, line, report):
yield event

def parse_azure(self, line, report):
raw = self.recover_line(line)
raw = self.recover_line()

event = self.new_event(report)

Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/parsers/shadowserver/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def parse_line(self, row, report):
# Now add additional constant fields.
event.update(conf.get('constant_fields', {}))

event.add('raw', self.recover_line(row))
event.add('raw', self.recover_line())

# Add everything which could not be resolved to extra.
for f in fields:
Expand Down
57 changes: 40 additions & 17 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from datetime import datetime, timedelta
from typing import Any, List, Optional, Union

import psutil

import intelmq.lib.message as libmessage
from intelmq import (DEFAULT_LOGGING_PATH,
HARMONIZATION_CONF_FILE,
Expand Down Expand Up @@ -942,7 +940,7 @@ class ParserBot(Bot):
_csv_params = {}
_ignore_lines_starting = []
_handle = None
_current_line = None
_current_line: Optional[str] = None

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
Expand All @@ -956,6 +954,7 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
def parse_csv(self, report: libmessage.Report):
"""
A basic CSV parser.
The resulting lines are lists.
"""
raw_report: str = utils.base64_decode(report.get("raw")).strip()
raw_report = raw_report.translate({0: None})
Expand All @@ -971,6 +970,7 @@ def parse_csv(self, report: libmessage.Report):
def parse_csv_dict(self, report: libmessage.Report):
"""
A basic CSV Dictionary parser.
The resulting lines are dictionaries with the column names as keys.
"""
raw_report: str = utils.base64_decode(report.get("raw")).strip()
raw_report: str = raw_report.translate({0: None})
Expand Down Expand Up @@ -1024,6 +1024,7 @@ def parse(self, report: libmessage.Report):
for line in utils.base64_decode(report.get("raw")).splitlines():
line = line.strip()
if not any([line.startswith(prefix) for prefix in self._ignore_lines_starting]):
self._current_line = line
yield line

def parse_line(self, line: Any, report: libmessage.Report):
Expand Down Expand Up @@ -1063,14 +1064,14 @@ def process(self):
events: list[libmessage.Event] = [value]
except Exception:
self.logger.exception('Failed to parse line.')
self.__failed.append((traceback.format_exc(), line))
self.__failed.append((traceback.format_exc(), self._current_line))
else:
events_count += len(events)
self.send_message(*events)

for exc, line in self.__failed:
for exc, original_line in self.__failed:
report_dump: libmessage.Message = report.copy()
report_dump.change('raw', self.recover_line(line))
report_dump.change('raw', self.recover_line(original_line))
if self.error_dump_message:
self._dump_message(exc, report_dump)
if self.destination_queues and '_on_error' in self.destination_queues:
Expand Down Expand Up @@ -1115,21 +1116,34 @@ def recover_line(self, line: Optional[str] = None) -> str:
line = line if line else self._current_line
return '\n'.join(tempdata + [line])

def recover_line_csv(self, line: str) -> str:
out = io.StringIO()
writer = csv.writer(out, **self._csv_params)
writer.writerow(line)
def recover_line_csv(self, line: Optional[list]) -> str:
"""
Parameter:
line: Optional line as list. If absent, the current line is used as string.
"""
if line:
out = io.StringIO()
writer = csv.writer(out, **self._csv_params)
writer.writerow(line)
result = out.getvalue()
else:
result = self._current_line
tempdata = '\r\n'.join(self.tempdata) + '\r\n' if self.tempdata else ''
return tempdata + out.getvalue()
return tempdata + result

def recover_line_csv_dict(self, line: str) -> str:
def recover_line_csv_dict(self, line: Union[dict, str, None] = None) -> str:
"""
Converts dictionaries to csv. self.csv_fieldnames must be list of fields.
"""
out = io.StringIO()
writer = csv.DictWriter(out, self.csv_fieldnames, **self._csv_params)
writer.writeheader()
out.write(self._current_line)
if isinstance(line, dict):
writer.writerow(line)
elif isinstance(line, str):
out.write(line)
else:
out.write(self._current_line)

return out.getvalue().strip()

Expand All @@ -1138,20 +1152,29 @@ def recover_line_json(self, line: dict) -> str:
Reverse of parse for JSON pulses.
Recovers a fully functional report with only the problematic pulse.
Using a string as input here is not possible, as the input may span over multiple lines.
Output is not identical to the input, but has the same content.
Parameters:
The line as dict.
Returns:
str: The JSON-encoded line as string.
"""
return json.dumps([line])

def recover_line_json_stream(self, line=None) -> str:
def recover_line_json_stream(self, line: Optional[str] = None) -> str:
"""
recover_line for json streams, just returns the current line, unparsed.
recover_line for JSON streams (one JSON element per line, no outer structure),
just returns the current line, unparsed.
Parameters:
line: None, not required, only for compatibility with other recover_line methods
line: The line itself as dict, if available, falls back to original current line
Returns:
str: unparsed JSON line.
"""
return self._current_line
return line if line else self._current_line


class CollectorBot(Bot):
Expand Down
2 changes: 1 addition & 1 deletion intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class PipelineFactory:

@staticmethod
def create(logger, broker=None, direction=None, queues=None, pipeline_args=None, load_balance=False, is_multithreaded=False):
def create(logger, broker=None, direction=None, queues=None, pipeline_args: Optional[dict] = None, load_balance=False, is_multithreaded=False):
"""
direction: "source" or "destination", optional, needed for queues
queues: needs direction to be set, calls set_queues
Expand Down
44 changes: 34 additions & 10 deletions intelmq/tests/lib/test_parser_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
RAW = """# ignore this
2015/06/04 13:37 +00,example.org,192.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19a2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19b2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line"""
RAW_SPLIT = RAW.splitlines()

Expand All @@ -39,11 +40,10 @@
"feed.name": "Example",
"raw": utils.base64_encode('\n'.join(RAW_SPLIT[:2]))}

EXPECTED_DUMP = EXAMPLE_REPORT.copy()
del EXPECTED_DUMP['__type']
EXPECTED_DUMP['raw'] = base64.b64encode(b'''# ignore this
2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line''').decode()
EXPECTED_DUMP = [EXAMPLE_REPORT.copy(), EXAMPLE_REPORT.copy()]
del EXPECTED_DUMP[0]['__type'], EXPECTED_DUMP[1]['__type']
EXPECTED_DUMP[0]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[3], RAW_SPLIT[5])).encode()).decode()
EXPECTED_DUMP[1]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[4], RAW_SPLIT[5])).encode()).decode()
EXAMPLE_EMPTY_REPORT = {"feed.url": "http://www.example.com/",
"__type": "Report",
"feed.name": "Example"}
Expand Down Expand Up @@ -129,11 +129,12 @@ class TestDummyParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyParserBot
cls.default_input_message = EXAMPLE_REPORT
cls.allowed_error_count = 1
cls.sysconfig = {'error_dump_message': True}
cls.call_counter = 0

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(EXPECTED_DUMP, message)
self.assertDictEqual(EXPECTED_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
Expand All @@ -142,7 +143,7 @@ def run_bot(self, *args, **kwargs):

def test_event(self):
""" Test DummyParserBot """
self.run_bot()
self.run_bot(allowed_error_count=2)
self.assertMessageEqual(0, EXAMPLE_EVENT)

def test_missing_raw(self):
Expand Down Expand Up @@ -200,6 +201,13 @@ def test_event(self):
'classification.type': 'other',
},
]
JSON_STREAM_BOGUS_REPORT = {'__type': 'Report',
'raw': utils.base64_encode('''{"a": 1, "ip": "10.0.0.a"}
{"a": 2, "ip": "10.0.0.b"}''')}
JSON_STREAM_BOGUS_DUMP = [
{'raw': utils.base64_encode('{"a": 1, "ip": "10.0.0.a"}')},
{'raw': utils.base64_encode('{"a": 2, "ip": "10.0.0.b"}')}
]


class DummyJSONStreamParserBot(bot.ParserBot):
Expand All @@ -210,7 +218,9 @@ def parse_line(self, line, report):
event = self.new_event(report)
event['event_description.text'] = line['a']
event['classification.type'] = 'other'
event['raw'] = self.recover_line(line)
event['raw'] = self.recover_line()
if 'ip' in line:
event['source.ip'] = line['ip']
yield event


Expand All @@ -219,12 +229,26 @@ class TestJSONStreamParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyJSONStreamParserBot
cls.default_input_message = EXAMPLE_JSON_STREAM_REPORT
cls.call_counter = 0

def test_event(self):
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_JSON_STREAM_EVENTS[0])
self.assertMessageEqual(1, EXAMPLE_JSON_STREAM_EVENTS[1])

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(JSON_STREAM_BOGUS_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
self.dump_message):
super().run_bot(*args, **kwargs)

def test_dump(self):
self.input_message = JSON_STREAM_BOGUS_REPORT
self.run_bot(allowed_error_count=2)


if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit 2d7b9ca

Please sign in to comment.