Skip to content

Commit

Permalink
TST: add and update tests for correct line recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
wagner-intevation committed Jun 2, 2022
1 parent 5a75d58 commit 62ce61c
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions intelmq/tests/lib/test_parser_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
RAW = """# ignore this
2015/06/04 13:37 +00,example.org,192.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19a2.0.2.3,reverse.example.net,example description,[email protected],1
2015/06/04 13:38 +00,example.org,19b2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line"""
RAW_SPLIT = RAW.splitlines()

Expand All @@ -39,11 +40,10 @@
"feed.name": "Example",
"raw": utils.base64_encode('\n'.join(RAW_SPLIT[:2]))}

EXPECTED_DUMP = EXAMPLE_REPORT.copy()
del EXPECTED_DUMP['__type']
EXPECTED_DUMP['raw'] = base64.b64encode(b'''# ignore this
2015/06/04 13:38 +00,example.org,19d2.0.2.3,reverse.example.net,example description,[email protected],1
#ending line''').decode()
EXPECTED_DUMP = [EXAMPLE_REPORT.copy(), EXAMPLE_REPORT.copy()]
del EXPECTED_DUMP[0]['__type'], EXPECTED_DUMP[1]['__type']
EXPECTED_DUMP[0]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[3], RAW_SPLIT[5])).encode()).decode()
EXPECTED_DUMP[1]['raw'] = base64.b64encode('\n'.join((RAW_SPLIT[0], RAW_SPLIT[4], RAW_SPLIT[5])).encode()).decode()
EXAMPLE_EMPTY_REPORT = {"feed.url": "http://www.example.com/",
"__type": "Report",
"feed.name": "Example"}
Expand Down Expand Up @@ -129,11 +129,12 @@ class TestDummyParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyParserBot
cls.default_input_message = EXAMPLE_REPORT
cls.allowed_error_count = 1
cls.sysconfig = {'error_dump_message': True}
cls.call_counter = 0

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(EXPECTED_DUMP, message)
self.assertDictEqual(EXPECTED_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
Expand All @@ -142,7 +143,7 @@ def run_bot(self, *args, **kwargs):

def test_event(self):
""" Test DummyParserBot """
self.run_bot()
self.run_bot(allowed_error_count=2)
self.assertMessageEqual(0, EXAMPLE_EVENT)

def test_missing_raw(self):
Expand Down Expand Up @@ -200,6 +201,13 @@ def test_event(self):
'classification.type': 'other',
},
]
JSON_STREAM_BOGUS_REPORT = {'__type': 'Report',
'raw': utils.base64_encode('''{"a": 1, "ip": "10.0.0.a"}
{"a": 2, "ip": "10.0.0.b"}''')}
JSON_STREAM_BOGUS_DUMP = [
{'raw': utils.base64_encode('{"a": 1, "ip": "10.0.0.a"}')},
{'raw': utils.base64_encode('{"a": 2, "ip": "10.0.0.b"}')}
]


class DummyJSONStreamParserBot(bot.ParserBot):
Expand All @@ -210,7 +218,9 @@ def parse_line(self, line, report):
event = self.new_event(report)
event['event_description.text'] = line['a']
event['classification.type'] = 'other'
event['raw'] = self.recover_line(line)
event['raw'] = self.recover_line()
if 'ip' in line:
event['source.ip'] = line['ip']
yield event


Expand All @@ -219,12 +229,26 @@ class TestJSONStreamParserBot(test.BotTestCase, unittest.TestCase):
def set_bot(cls):
cls.bot_reference = DummyJSONStreamParserBot
cls.default_input_message = EXAMPLE_JSON_STREAM_REPORT
cls.call_counter = 0

def test_event(self):
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_JSON_STREAM_EVENTS[0])
self.assertMessageEqual(1, EXAMPLE_JSON_STREAM_EVENTS[1])

def dump_message(self, error_traceback, message=None):
self.assertDictEqual(JSON_STREAM_BOGUS_DUMP[self.call_counter], message)
self.call_counter += 1

def run_bot(self, *args, **kwargs):
with mock.patch.object(bot.Bot, "_dump_message",
self.dump_message):
super().run_bot(*args, **kwargs)

def test_dump(self):
self.input_message = JSON_STREAM_BOGUS_REPORT
self.run_bot(allowed_error_count=2)


if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit 62ce61c

Please sign in to comment.