Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParserBot: Fix line recovery and message dumping #2192

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ CHANGELOG
The `LogLevel` and `ReturnType` Enums were added to `intelmq.lib.datatypes`.
- `intelmq.lib.bot`:
- Enhance behaviour if an unconfigured bot is started (PR#2054 by Sebastian Wagner).
- Fix line recovery and message dumping of the `ParserBot` (PR#2192 by Sebastian Wagner).
- Previously the dumped message was always the last message of a report if the report contained muliple lines leading to data-loss.

### Development

Expand Down Expand Up @@ -72,6 +74,7 @@ CHANGELOG
- `intelmq.bots.parsers.shadowserver._config`:
- Added support for `Accessible AMQP`, `Device Identification Report` (IPv4 and IPv6) (PR#2134 by Mateo Durante).
- Added file name mapping for `SSL-POODLE-Vulnerable-Servers IPv6` (file name `scan6_ssl_poodle`) (PR#2134 by Mateo Durante).
- `intelmq.bots.parsers.generic.parser_csv`: Use RewindableFileHandle to use the original current line for line recovery (PR#2192 by Sebastian Wagner).

#### Experts
- `intelmq.bots.experts.domain_valid`: New bot for checking domain's validity (PR#1966 by Marius Karotkis).
Expand Down
4 changes: 2 additions & 2 deletions intelmq/bots/parsers/abusech/parser_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def parse(self, report: dict):
for line in data_lines:
yield line.strip()

def parse_line(self, line, report):
def parse_line(self, line: str, report):
event = self.new_event(report)
self.__process_defaults(event, line, report['feed.url'])
self.__process_fields(event, line, report['feed.url'])
Expand Down Expand Up @@ -114,7 +114,7 @@ def __process_additional(event, line, feed_url):
def __sanitize_csv_lines(s: str):
return s.replace('"', '')

def recover_line(self, line):
def recover_line(self, line: str):
return '\n'.join(self.comments + [self.header_line, line])


Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/parsers/cymru/parser_full_bogons.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def parse(self, report):
for row in raw_report.splitlines():
yield row.strip()

def parse_line(self, val, report):
def parse_line(self, val: str, report):
if not len(val) or val.startswith('#') or val.startswith('//'):
return

Expand Down
7 changes: 5 additions & 2 deletions intelmq/bots/parsers/generic/parser_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from intelmq.lib.bot import ParserBot
from intelmq.lib.exceptions import InvalidArgument, InvalidValue
from intelmq.lib.harmonization import DateTime
from intelmq.lib.utils import RewindableFileHandle

TIME_CONVERSIONS = {'timestamp': DateTime.from_timestamp,
'windows_nt': DateTime.from_windows_nt,
Expand Down Expand Up @@ -101,8 +102,10 @@ def parse(self, report):
if self.skip_header:
self.tempdata.append(raw_report[:raw_report.find('\n')])
raw_report = raw_report[raw_report.find('\n') + 1:]
for row in csv.reader(io.StringIO(raw_report),
self._handle = RewindableFileHandle(io.StringIO(raw_report))
for row in csv.reader(self._handle,
delimiter=str(self.delimiter)):
self._current_line = self._handle.current_line

if self.filter_text and self.filter_type:
text_in_row = self.filter_text in self.delimiter.join(row)
Expand All @@ -115,7 +118,7 @@ def parse(self, report):
else:
yield row

def parse_line(self, row, report):
def parse_line(self, row: list, report):
event = self.new_event(report)

for keygroup, value, required in zip(self.columns, row, self.columns_required):
Expand Down
8 changes: 6 additions & 2 deletions intelmq/bots/parsers/microsoft/parser_ctip.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,23 @@ class MicrosoftCTIPParserBot(ParserBot):
def parse(self, report):
raw_report = utils.base64_decode(report.get("raw"))
if raw_report.startswith('['):
# Interflow
self.recover_line = self.recover_line_json
yield from self.parse_json(report)
elif raw_report.startswith('{'):
# Azure
self.recover_line = self.recover_line_json_stream
yield from self.parse_json_stream(report)
else:
raise ValueError("Can't parse the received message. It is neither a JSON list nor a JSON dictionary. Please report this bug.")

def parse_line(self, line, report):
if line.get('version', None) == 1.5:
yield from self.parse_interflow(line, report)
else:
yield from self.parse_azure(line, report)

def parse_interflow(self, line, report):
def parse_interflow(self, line: dict, report):
raw = self.recover_line(line)
if line['indicatorthreattype'] != 'Botnet':
raise ValueError('Unknown indicatorthreattype %r, only Botnet is supported.' % line['indicatorthreattype'])
Expand Down Expand Up @@ -257,7 +261,7 @@ def parse_interflow(self, line, report):
yield event

def parse_azure(self, line, report):
raw = self.recover_line(line)
raw = self.recover_line()

event = self.new_event(report)

Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/parsers/shadowserver/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def parse_line(self, row, report):
# Now add additional constant fields.
event.update(conf.get('constant_fields', {}))

event.add('raw', self.recover_line(row))
event.add('raw', self.recover_line())

# Add everything which could not be resolved to extra.
for f in fields:
Expand Down
57 changes: 40 additions & 17 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from datetime import datetime, timedelta
from typing import Any, List, Optional, Union

import psutil

import intelmq.lib.message as libmessage
from intelmq import (DEFAULT_LOGGING_PATH,
HARMONIZATION_CONF_FILE,
Expand Down Expand Up @@ -942,7 +940,7 @@ class ParserBot(Bot):
_csv_params = {}
_ignore_lines_starting = []
_handle = None
_current_line = None
_current_line: Optional[str] = None

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
Expand All @@ -956,6 +954,7 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
def parse_csv(self, report: libmessage.Report):
"""
A basic CSV parser.
The resulting lines are lists.
"""
raw_report: str = utils.base64_decode(report.get("raw")).strip()
raw_report = raw_report.translate({0: None})
Expand All @@ -971,6 +970,7 @@ def parse_csv(self, report: libmessage.Report):
def parse_csv_dict(self, report: libmessage.Report):
"""
A basic CSV Dictionary parser.
The resulting lines are dictionaries with the column names as keys.
"""
raw_report: str = utils.base64_decode(report.get("raw")).strip()
raw_report: str = raw_report.translate({0: None})
Expand Down Expand Up @@ -1024,6 +1024,7 @@ def parse(self, report: libmessage.Report):
for line in utils.base64_decode(report.get("raw")).splitlines():
line = line.strip()
if not any([line.startswith(prefix) for prefix in self._ignore_lines_starting]):
self._current_line = line
yield line

def parse_line(self, line: Any, report: libmessage.Report):
Expand Down Expand Up @@ -1063,14 +1064,14 @@ def process(self):
events: list[libmessage.Event] = [value]
except Exception:
self.logger.exception('Failed to parse line.')
self.__failed.append((traceback.format_exc(), line))
self.__failed.append((traceback.format_exc(), self._current_line))
else:
events_count += len(events)
self.send_message(*events)

for exc, line in self.__failed:
for exc, original_line in self.__failed:
report_dump: libmessage.Message = report.copy()
report_dump.change('raw', self.recover_line(line))
report_dump.change('raw', self.recover_line(original_line))
if self.error_dump_message:
self._dump_message(exc, report_dump)
if self.destination_queues and '_on_error' in self.destination_queues:
Expand Down Expand Up @@ -1115,21 +1116,34 @@ def recover_line(self, line: Optional[str] = None) -> str:
line = line if line else self._current_line
return '\n'.join(tempdata + [line])

def recover_line_csv(self, line: str) -> str:
out = io.StringIO()
writer = csv.writer(out, **self._csv_params)
writer.writerow(line)
def recover_line_csv(self, line: Optional[list]) -> str:
"""
Parameter:
line: Optional line as list. If absent, the current line is used as string.
"""
if line:
out = io.StringIO()
writer = csv.writer(out, **self._csv_params)
writer.writerow(line)
result = out.getvalue()
else:
result = self._current_line
tempdata = '\r\n'.join(self.tempdata) + '\r\n' if self.tempdata else ''
return tempdata + out.getvalue()
return tempdata + result

def recover_line_csv_dict(self, line: str) -> str:
def recover_line_csv_dict(self, line: Union[dict, str, None] = None) -> str:
"""
Converts dictionaries to csv. self.csv_fieldnames must be list of fields.
"""
out = io.StringIO()
writer = csv.DictWriter(out, self.csv_fieldnames, **self._csv_params)
writer.writeheader()
out.write(self._current_line)
if isinstance(line, dict):
writer.writerow(line)
elif isinstance(line, str):
out.write(line)
else:
out.write(self._current_line)

return out.getvalue().strip()

Expand All @@ -1138,20 +1152,29 @@ def recover_line_json(self, line: dict) -> str:
Reverse of parse for JSON pulses.

Recovers a fully functional report with only the problematic pulse.
Using a string as input here is not possible, as the input may span over multiple lines.
Output is not identical to the input, but has the same content.

Parameters:
The line as dict.

Returns:
str: The JSON-encoded line as string.
"""
return json.dumps([line])

def recover_line_json_stream(self, line=None) -> str:
def recover_line_json_stream(self, line: Optional[str] = None) -> str:
"""
recover_line for json streams, just returns the current line, unparsed.
recover_line for JSON streams (one JSON element per line, no outer structure),
just returns the current line, unparsed.

Parameters:
line: None, not required, only for compatibility with other recover_line methods
line: The line itself as dict, if available, falls back to original current line

Returns:
str: unparsed JSON line.
"""
return self._current_line
return line if line else self._current_line


class CollectorBot(Bot):
Expand Down
2 changes: 1 addition & 1 deletion intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class PipelineFactory:

@staticmethod
def create(logger, broker=None, direction=None, queues=None, pipeline_args=None, load_balance=False, is_multithreaded=False):
def create(logger, broker=None, direction=None, queues=None, pipeline_args: Optional[dict] = None, load_balance=False, is_multithreaded=False):
"""
direction: "source" or "destination", optional, needed for queues
queues: needs direction to be set, calls set_queues
Expand Down
44 changes: 34 additions & 10 deletions intelmq/tests/lib/test_parser_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
RAW = """# ignore this
2015/06/04 13:37 +00,example.org,192.0.2.3,reverse.example.net,example description,[email protected],1

2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19a2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19b2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line"""
RAW_SPLIT = RAW.splitlines()

Expand All @@ -39,11 +40,10 @@
"feed.name": "Example",
"raw": utils.base64_encode('\n'.join(RAW_SPLIT[:2]))}

EXPECTED_DUMP = EXAMPLE_REPORT.copy()
del EXPECTED_DUMP['__type']
EXPECTED_DUMP['raw'] = base64.b64encode(b'''# ignore this
2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line''').decode()
EXPECTED_DUMP = [EXAMPLE_REPORT.copy(), EXAMPLE_REPORT.copy()]
del EXPECTED_DUMP[0]['__type'], EXPECTED_DUMP[1]['__type']
EXPECTED_DUMP[0]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[3], RAW_SPLIT[5])).encode()).decode()
EXPECTED_DUMP[1]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[4], RAW_SPLIT[5])).encode()).decode()
EXAMPLE_EMPTY_REPORT = {"feed.url": "http://www.example.com/",
"__type": "Report",
"feed.name": "Example"}
Expand Down Expand Up @@ -129,11 +129,12 @@ class TestDummyParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyParserBot
cls.default_input_message = EXAMPLE_REPORT
cls.allowed_error_count = 1
cls.sysconfig = {'error_dump_message': True}
cls.call_counter = 0

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(EXPECTED_DUMP, message)
self.assertDictEqual(EXPECTED_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
Expand All @@ -142,7 +143,7 @@ def run_bot(self, *args, **kwargs):

def test_event(self):
""" Test DummyParserBot """
self.run_bot()
self.run_bot(allowed_error_count=2)
self.assertMessageEqual(0, EXAMPLE_EVENT)

def test_missing_raw(self):
Expand Down Expand Up @@ -200,6 +201,13 @@ def test_event(self):
'classification.type': 'other',
},
]
JSON_STREAM_BOGUS_REPORT = {'__type': 'Report',
'raw': utils.base64_encode('''{"a": 1, "ip": "10.0.0.a"}
{"a": 2, "ip": "10.0.0.b"}''')}
JSON_STREAM_BOGUS_DUMP = [
{'raw': utils.base64_encode('{"a": 1, "ip": "10.0.0.a"}')},
{'raw': utils.base64_encode('{"a": 2, "ip": "10.0.0.b"}')}
]


class DummyJSONStreamParserBot(bot.ParserBot):
Expand All @@ -210,7 +218,9 @@ def parse_line(self, line, report):
event = self.new_event(report)
event['event_description.text'] = line['a']
event['classification.type'] = 'other'
event['raw'] = self.recover_line(line)
event['raw'] = self.recover_line()
if 'ip' in line:
event['source.ip'] = line['ip']
yield event


Expand All @@ -219,12 +229,26 @@ class TestJSONStreamParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyJSONStreamParserBot
cls.default_input_message = EXAMPLE_JSON_STREAM_REPORT
cls.call_counter = 0

def test_event(self):
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_JSON_STREAM_EVENTS[0])
self.assertMessageEqual(1, EXAMPLE_JSON_STREAM_EVENTS[1])

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(JSON_STREAM_BOGUS_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
self.dump_message):
super().run_bot(*args, **kwargs)

def test_dump(self):
self.input_message = JSON_STREAM_BOGUS_REPORT
self.run_bot(allowed_error_count=2)


if __name__ == '__main__': # pragma: no cover
unittest.main()