-
Notifications
You must be signed in to change notification settings - Fork 297
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
including tests and docs fixes #1856
- Loading branch information
Sebastian Wagner
authored and
Wagner
committed
Jun 28, 2021
1 parent
8cefe11
commit 70eb396
Showing
6 changed files
with
279 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
""" | ||
© 2021 Sebastian Wagner <[email protected]> | ||
SPDX-License-Identifier: AGPL-3.0-or-later | ||
https://gitlab.com/intevation/tuency/tuency/-/blob/master/backend/docs/IntelMQ-API.md | ||
Example query: | ||
> curl -s -H "Authorization: Bearer XXX"\ | ||
'https://tuency-demo1.example.com/intelmq/lookup?classification_taxonomy=availability&classification_type=backdoor\ | ||
&feed_provider=Team+Cymru&feed_name=FTP&feed_status=production&ip=123.123.123.23' | ||
same for domain= | ||
a query can contain both ip address and domain | ||
Example response: | ||
{"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"[email protected]"}]}]},"suppress":true,"interval":{"unit":"days","length":1}} | ||
{"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"[email protected]"}]}]},"domain":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"[email protected]"}]}]},"suppress":true,"interval":{"unit":"immediate","length":1}} | ||
""" | ||
|
||
from intelmq.lib.bot import Bot | ||
from intelmq.lib.utils import create_request_session, parse_relative | ||
|
||
|
||
class TuencyExpertBot(Bot): | ||
url: str # Path to the tuency instance | ||
authentication_token: str | ||
overwrite: bool = True | ||
|
||
def init(self): | ||
self.set_request_parameters() | ||
self.session = create_request_session(self) | ||
self.session.headers["Authorization"] = f"Bearer {self.authentication_token}" | ||
self.url = f"{self.url}intelmq/lookup" | ||
|
||
def process(self): | ||
event = self.receive_message() | ||
if not ("source.ip" in event or "source.fqdn" in event): | ||
self.send_message(event) | ||
return | ||
|
||
try: | ||
params = { | ||
"classification_taxonomy": event["classification.taxonomy"], | ||
"classification_type": event["classification.type"], | ||
"feed_provider": event["feed.provider"], | ||
"feed_name": event["feed.name"], | ||
"feed_status": "production", | ||
} | ||
except KeyError as exc: | ||
self.logger.debug('Skipping event because of missing field: %s.', exc) | ||
self.send_message(event) | ||
return | ||
try: | ||
params["ip"] = event["source.ip"] | ||
except KeyError: | ||
pass | ||
try: | ||
params["domain"] = event["source.fqdn"] | ||
except KeyError: | ||
pass | ||
|
||
response = self.session.get(self.url, params=params).json() | ||
self.logger.debug('Received response %r.', response) | ||
|
||
if response.get("suppress", False): | ||
event["extra.notify"] = False | ||
else: | ||
if 'interval' not in response: | ||
# empty response | ||
self.send_message(event) | ||
elif response['interval']['unit'] == 'immediate': | ||
event["extra.ttl"] = 0 | ||
else: | ||
event["extra.ttl"] = parse_relative(f"{response['interval']['length']} {response['interval']['unit']}") * 60 | ||
contacts = [] | ||
for destination in response.get('ip', {'destinations': []})['destinations'] + response.get('domain', {'destinations': []})['destinations']: | ||
contacts.extend(contact['email'] for contact in destination["contacts"]) | ||
event.add('source.abuse_contact', ','.join(contacts), overwrite=self.overwrite) | ||
|
||
self.send_message(event) | ||
|
||
|
||
BOT = TuencyExpertBot |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
""" | ||
© 2021 Sebastian Wagner <[email protected]> | ||
SPDX-License-Identifier: AGPL-3.0-or-later | ||
This unittest can test the bot against a read tuency instance as well as using requests mock. | ||
The latter is the default while the first is only in use if a tunency instance URL and authentication token is given a environment variable. | ||
""" | ||
import os | ||
import unittest | ||
|
||
from intelmq.lib.test import BotTestCase | ||
from intelmq.bots.experts.tuency.expert import TuencyExpertBot | ||
|
||
import requests_mock | ||
|
||
|
||
INPUT = {'__type': 'Event', | ||
'classification.taxonomy': 'availability', | ||
'classification.type': 'system-compromise', | ||
'feed.provider': 'Some Provider', | ||
'feed.name': 'FTP', | ||
'source.ip': '123.123.123.23', | ||
'source.fqdn': 'www.example.at' | ||
} | ||
INPUT_IP = INPUT.copy() | ||
del INPUT_IP['source.fqdn'] | ||
INPUT_IP['source.abuse_contact'] = '[email protected]' | ||
INPUT_DOMAIN = INPUT.copy() | ||
del INPUT_DOMAIN['source.ip'] | ||
OUTPUT = INPUT.copy() | ||
OUTPUT_IP = INPUT_IP.copy() | ||
OUTPUT_IP['extra.notify'] = False | ||
OUTPUT_IP['source.abuse_contact'] = '[email protected]' | ||
OUTPUT_IP_NO_OVERWRITE = OUTPUT_IP.copy() | ||
OUTPUT_IP_NO_OVERWRITE['source.abuse_contact'] = '[email protected]' | ||
OUTPUT_DOMAIN = INPUT_DOMAIN.copy() | ||
OUTPUT_DOMAIN['extra.ttl'] = 24*60*60 # 1 day | ||
OUTPUT_DOMAIN['source.abuse_contact'] = '[email protected]' | ||
OUTPUT_BOTH = OUTPUT.copy() | ||
OUTPUT_BOTH['extra.ttl'] = 24*60*60 # 1 day | ||
OUTPUT_BOTH['source.abuse_contact'] = '[email protected],[email protected]' | ||
EMPTY = {'__type': 'Event', 'comment': 'foobar'} | ||
UNKNOWN_IP = INPUT_IP.copy() | ||
UNKNOWN_IP['source.ip'] = '10.0.0.1' | ||
|
||
|
||
PREFIX = 'http://localhost/intelmq/lookup?classification_taxonomy=availability&classification_type=system-compromise&feed_provider=Some+Provider&feed_name=FTP&feed_status=production' | ||
|
||
|
||
def prepare_mocker(mocker): | ||
# IP address | ||
mocker.get(f'{PREFIX}&ip=123.123.123.23', | ||
request_headers={'Authorization': 'Bearer Lorem ipsum'}, | ||
json={"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"[email protected]"}]}]},"suppress":True,"interval":{"unit":"days","length":1}}) | ||
# Domain: | ||
mocker.get(f'{PREFIX}&domain=www.example.at', | ||
request_headers={'Authorization': 'Bearer Lorem ipsum'}, | ||
json={"domain":{"destinations":[{"source":"portal","name":"EineOrganisation","contacts":[{"email":"[email protected]"}]}]},"suppress":False,"interval":{"unit":"days","length":1}}) | ||
# Both | ||
mocker.get(f'{PREFIX}&ip=123.123.123.23&domain=www.example.at', | ||
request_headers={'Authorization': 'Bearer Lorem ipsum'}, | ||
json={"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"[email protected]"}]}]},"domain":{"destinations":[{"source":"portal","name":"EineOrganisation","contacts":[{"email":"[email protected]"}]}]},"suppress":False,"interval":{"unit":"day","length":1}}) | ||
|
||
# Unknown IP address | ||
mocker.get(f'{PREFIX}&ip=10.0.0.1', | ||
request_headers={'Authorization': 'Bearer Lorem ipsum'}, | ||
json={'ip': {'destinations': [], 'netobject': None}}) | ||
|
||
|
||
@requests_mock.Mocker() | ||
class TestTuencyExpertBot(BotTestCase, unittest.TestCase): | ||
@classmethod | ||
def set_bot(cls): | ||
cls.bot_reference = TuencyExpertBot | ||
if not os.environ.get("INTELMQ_TEST_TUNECY_URL") or not os.environ.get("INTELMQ_TEST_TUNECY_TOKEN"): | ||
cls.mock = True | ||
cls.sysconfig = {"url": 'http://localhost/', | ||
"authentication_token": 'Lorem ipsum', | ||
} | ||
else: | ||
cls.mock = False | ||
cls.sysconfig = {"url": os.environ["INTELMQ_TEST_TUNECY_URL"], | ||
"authentication_token": os.environ["INTELMQ_TEST_TUNECY_TOKEN"], | ||
} | ||
cls.default_input_message = INPUT | ||
|
||
def test_both(self, mocker): | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.run_bot() | ||
self.assertMessageEqual(0, OUTPUT_BOTH) | ||
|
||
def test_ip(self, mocker): | ||
""" | ||
Using an IP address as input. Existing source.abuse_contact should be overwritten. | ||
""" | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.input_message = INPUT_IP | ||
self.run_bot() | ||
self.assertMessageEqual(0, OUTPUT_IP) | ||
|
||
def test_ip_no_overwrite(self, mocker): | ||
""" | ||
Using an IP address as input. Existing source.abuse_contact should not be overwritten. | ||
""" | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.input_message = INPUT_IP | ||
self.run_bot(parameters={'overwrite': False}) | ||
self.assertMessageEqual(0, OUTPUT_IP_NO_OVERWRITE) | ||
|
||
def test_domain(self, mocker): | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.input_message = INPUT_DOMAIN | ||
self.run_bot() | ||
self.assertMessageEqual(0, OUTPUT_DOMAIN) | ||
|
||
def test_empty(self, mocker): | ||
""" | ||
A message with neither an IP address nor a domain, should be ignored and just passed on. | ||
""" | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.input_message = EMPTY | ||
self.run_bot() | ||
self.assertMessageEqual(0, EMPTY) | ||
|
||
def test_no_result(self, mocker): | ||
""" | ||
This IP address is not in the database | ||
""" | ||
if self.mock: | ||
prepare_mocker(mocker) | ||
else: | ||
mocker.real_http = True | ||
self.input_message = UNKNOWN_IP | ||
self.run_bot() | ||
self.assertMessageEqual(0, UNKNOWN_IP) |