Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new bot: cut string from string #1965

Merged
12 commits merged into from
Sep 24, 2021
Empty file.
33 changes: 33 additions & 0 deletions intelmq/bots/experts/cut_from_string/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
Cut from string
"""
from intelmq.lib.bot import Bot


class CutFromStringExpertBot(Bot):
string_from_start: int = 1 # 1 - from start, 0 - from end
string_for_cut: str = 'www.'
field_for_cut: str = 'source.fqdn'

def init(self):
pass

def process(self):
event = self.receive_message()

if self.field_for_cut in event:
field_string = event[self.field_for_cut]
if self.string_from_start == 1 and field_string.startswith(self.string_for_cut):
field_string = field_string[len(self.string_for_cut):]
event.change(self.field_for_cut, field_string)

if self.string_from_start == 0 and field_string.endswith(self.string_for_cut):
field_string = field_string[:-len(self.string_for_cut)]
event.change(self.field_for_cut, field_string)

self.send_message(event)
self.acknowledge_message()


BOT = CutFromStringExpertBot
Empty file.
89 changes: 89 additions & 0 deletions intelmq/tests/bots/experts/cut_from_string/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
"""
Testing cut from string
"""
import unittest
import intelmq.lib.test as test
from intelmq.bots.experts.cut_from_string.expert import CutFromStringExpertBot

EXAMPLE_INPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'google.lt',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT1 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'source.fqdn': 'www.google',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_INPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}

EXAMPLE_OUTPUT_2 = {
'__type': 'Event',
'feed.accuracy': 100.0,
'feed.name': 'MISP events',
'feed.provider': 'MISP BAE',
'time.observation': '2020-10-20T12:57:33+00:00',
'feed.url': 'https://sig01.threatreveal.com',
'extra.elastic_index': 'cti-2020-10',
'extra.elastic_id': 'VwVnSnUBXjJtaqsUSw8T'}


class TestCutFromStringExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for TestCutFromStringExpertBot.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = CutFromStringExpertBot

def test_event_cut_start(self):
self.input_message = EXAMPLE_INPUT
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT)

def test_event_cut_without_field(self):
self.input_message = EXAMPLE_INPUT_2
self.run_bot()
self.assertMessageEqual(0, EXAMPLE_OUTPUT_2)

def test_event_cut_end(self):
self.input_message = EXAMPLE_INPUT
self.run_bot(parameters={"string_from_start": 0, "string_for_cut": ".lt"})
self.assertMessageEqual(0, EXAMPLE_OUTPUT1)


if __name__ == '__main__': # pragma: no cover
unittest.main()