From afd6b4a3c0ec7ae45c556fd16ed74a31fd6a7153 Mon Sep 17 00:00:00 2001 From: Mark Kikta Date: Wed, 22 May 2019 14:01:36 -0500 Subject: [PATCH 1/3] Updating to use new API --- analyzers/SinkDB/SinkDB.json | 12 ++--- analyzers/SinkDB/sinkdb.py | 93 +++++++++++++++++++++++------------- 2 files changed, 65 insertions(+), 40 deletions(-) diff --git a/analyzers/SinkDB/SinkDB.json b/analyzers/SinkDB/SinkDB.json index 1f547007e..12985b4cf 100644 --- a/analyzers/SinkDB/SinkDB.json +++ b/analyzers/SinkDB/SinkDB.json @@ -1,17 +1,17 @@ { "name": "SinkDB", - "author": "Nils Kuhnert, CERT-Bund", + "author": "Mark Kikta, RedLegg Cybersecurity Solutions", "license": "AGPL-V3", - "url": "https://github.com/BSI-CERT-Bund/sinkdb-analyzer", - "version": "1.0", - "description": "Check if ip is sinkholed via sinkdb.abuse.ch", - "dataTypeList": ["ip"], + "url": "https://github.com/RedLegg/sinkdb-analyzer", + "version": "1.1", + "description": "Check if ip is sinkholed via the new sinkdb.abuse.ch HTTPS API. Original analyzer can be found at https://github.com/BSI-CERT-Bund/sinkdb-analyzer", + "dataTypeList": ["ip", "domain", "mail"], "command": "SinkDB/sinkdb.py", "baseConfig": "SinkDB", "configurationItems": [ { "name": "key", - "description": "Define the API Key", + "description": "Define the HTTPS API Key", "type": "string", "multi": false, "required": true diff --git a/analyzers/SinkDB/sinkdb.py b/analyzers/SinkDB/sinkdb.py index ec341c92f..81f042d3b 100755 --- a/analyzers/SinkDB/sinkdb.py +++ b/analyzers/SinkDB/sinkdb.py @@ -1,52 +1,77 @@ #!/usr/bin/env python -import subprocess +import json, requests, traceback from cortexutils.analyzer import Analyzer class SinkDBAnalyzer(Analyzer): - def __init__(self): - Analyzer.__init__(self) + def __init__(self): + Analyzer.__init__(self) - if self.data_type != 'ip': - self.error('SinkDB Analyzer only usable with ip data type.') + if self.data_type not in ['ip', 'domain', 'mail']: + self.error('SinkDB Analyzer only usable with the ip, domain, and mail data types.') - self.apikey = self.get_param('config.key', None, 'API Key needed for querying SinkDB.') - self.data = self.get_data().split('.') - self.data.reverse() - self.data = '.'.join(self.data) + self.apikey = self.get_param('config.key', None, 'HTTPS API Key needed for querying SinkDB.') + self.data = self.get_data() - def dig(self, ip): - proc = subprocess.Popen(['dig', '+short', '{}.{}.sinkdb-api.abuse.ch'.format(ip, self.apikey)], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc.communicate() - out = out.decode('utf-8').strip('\n') + def query_db(self, indicator): - if err: - self.error('Error while calling dig: {}.'.format(err)) + if self.data_type == 'ip': + return self.parse_entries(json.loads(self.do_post("api_key={}&ipv4={}".format(self.apikey, self.data)).text)) - if out == '127.0.0.2': - return True + elif self.data_type == 'domain': + return self.parse_entries(json.loads(self.do_post("api_key={}&domain={}".format(self.apikey, self.data)).text)) - return False + elif self.data_type == 'mail': + return self.parse_entries(json.loads(self.do_post("api_key={}&email={}".format(self.apikey, self.data)).text)) - def run(self): - self.report({ - "is_sinkhole": self.dig(self.data) - }) + else: + raise TypeError('Error in query_db function. This error should not occur.') - def summary(self, raw): - taxonomies = [] + def parse_entries(self, entries): + ret = { + "Sinkhole": [], + "Phishing": [], + "Scanner": [] + } + if entries['query_status'] == 'ok': + for entry in entries['results']: + if entry['source'] == 'sinkhole': + ret['Sinkhole'].append(entry) + elif entry['source'] == 'awareness': + ret['Phishing'].append(entry) + elif entry['source'] == 'scanner': + ret['Scanner'].append(entry) + return ret + elif entries['query_status'] == 'no_results': + return ret + elif entries['query_status'] == 'invalid_ipaddress': + self.error("SinkDB did not recognize the IP as valid. Here is the full response:\n{}".format(json.dumps(entries))) + else: + self.error("There was an unknown error communicating with the SinkDB API. Here is the full response:\n{}".format(json.dumps(entries))) - if raw.get('is_sinkhole'): - taxonomies.append(self.build_taxonomy('safe', 'SinkDB', 'IsSinkhole', 'True')) - else: - taxonomies.append(self.build_taxonomy('suspicious', 'SinkDB', 'IsSinkhole', 'False')) - return { - "taxonomies": taxonomies - } + def do_post(self, data): + return requests.post('https://sinkdb-api.abuse.ch/api/v1/', headers={"Content-Type": "application/x-www-form-urlencoded"}, data=data) + + def run(self): + try: + self.report(self.query_db(self.data)) + except: + self.error("Error when attempting to retrieve data:\n{}".format(traceback.format_exc())) + + def summary(self, raw): + taxonomies = [] + + for k, v in raw.iteritems(): + if v: + taxonomies.append(self.build_taxonomy('suspicious', 'SinkDB', k, 'True')) + else: + taxonomies.append(self.build_taxonomy('safe', 'SinkDB', k, 'False')) + + return { + "taxonomies": taxonomies + } if __name__ == '__main__': - SinkDBAnalyzer().run() + SinkDBAnalyzer().run() From d5f3e6acf26cbf915e6e4b69d5b3b54ab4ec13e9 Mon Sep 17 00:00:00 2001 From: Mark Kikta Date: Wed, 22 May 2019 14:48:28 -0500 Subject: [PATCH 2/3] Updating template --- thehive-templates/SinkDB_1_0/long.html | 81 ++++++++++++++++++++++++-- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/thehive-templates/SinkDB_1_0/long.html b/thehive-templates/SinkDB_1_0/long.html index bb29f8cb0..8560107fd 100644 --- a/thehive-templates/SinkDB_1_0/long.html +++ b/thehive-templates/SinkDB_1_0/long.html @@ -7,11 +7,84 @@
Status

- - IP is sinkholed. + + Artifact is sinkholed. + + + + + + + + + + + + + + + + + + +
SinkDB IDSinkDB URLTypeClassificationOperator
{{r.id}}{{r.sinkdb_reference}}{{r.type}}{{r.classification}}{{r.operator}}
+
+ + Artifact is not sinkholed. + +

+

+ + Artifact is part of a phishing awareness Campaign. + + + + + + + + + + + + + + + + + + +
SinkDB IDSinkDB URLTypeClassificationOperator
{{r.id}}{{r.sinkdb_reference}}{{r.type}}{{r.classification}}{{r.operator}}
+
+ + Artifact is not part of a phishing awareness campaign. + +

+

+ + Artifact is a scanner. + + + + + + + + + + + + + + + + + + +
SinkDB IDSinkDB URLTypeClassificationOperator
{{r.id}}{{r.sinkdb_reference}}{{r.type}}{{r.classification}}{{r.operator}}
- - IP is not sinkholed. + + Artifact is not a scanner.

From 6cee433ae02fa8a256ddfd97a24e3c3732a40c74 Mon Sep 17 00:00:00 2001 From: Davide Arcuri Date: Mon, 4 May 2020 14:04:31 +0200 Subject: [PATCH 3/3] add fqdn support --- analyzers/SinkDB/SinkDB.json | 2 +- analyzers/SinkDB/sinkdb.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/analyzers/SinkDB/SinkDB.json b/analyzers/SinkDB/SinkDB.json index 12985b4cf..82f50c56e 100644 --- a/analyzers/SinkDB/SinkDB.json +++ b/analyzers/SinkDB/SinkDB.json @@ -5,7 +5,7 @@ "url": "https://github.com/RedLegg/sinkdb-analyzer", "version": "1.1", "description": "Check if ip is sinkholed via the new sinkdb.abuse.ch HTTPS API. Original analyzer can be found at https://github.com/BSI-CERT-Bund/sinkdb-analyzer", - "dataTypeList": ["ip", "domain", "mail"], + "dataTypeList": ["ip", "domain", "fqdn", "mail"], "command": "SinkDB/sinkdb.py", "baseConfig": "SinkDB", "configurationItems": [ diff --git a/analyzers/SinkDB/sinkdb.py b/analyzers/SinkDB/sinkdb.py index 81f042d3b..018e21d2e 100755 --- a/analyzers/SinkDB/sinkdb.py +++ b/analyzers/SinkDB/sinkdb.py @@ -8,7 +8,7 @@ class SinkDBAnalyzer(Analyzer): def __init__(self): Analyzer.__init__(self) - if self.data_type not in ['ip', 'domain', 'mail']: + if self.data_type not in ['ip', 'domain', 'fqdn', 'mail']: self.error('SinkDB Analyzer only usable with the ip, domain, and mail data types.') self.apikey = self.get_param('config.key', None, 'HTTPS API Key needed for querying SinkDB.') @@ -19,7 +19,7 @@ def query_db(self, indicator): if self.data_type == 'ip': return self.parse_entries(json.loads(self.do_post("api_key={}&ipv4={}".format(self.apikey, self.data)).text)) - elif self.data_type == 'domain': + elif self.data_type in ('domain', 'fqdn'): return self.parse_entries(json.loads(self.do_post("api_key={}&domain={}".format(self.apikey, self.data)).text)) elif self.data_type == 'mail':