Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opencti performance improvement for exact searches #1048

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions analyzers/OpenCTI/opencti.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from pycti import OpenCTIApiClient
import re

class OpenCTIAnalyzer(Analyzer):
"""Searches for given Observables in configured OpenCTI instances. All standard data types are supported."""
Expand Down Expand Up @@ -38,6 +39,18 @@ def __init__(self):
except Exception as e:
self.error(str(e))

# determine which exact hash method is associated to a "hash" value
def get_hash_type(self, ioc):
print("###", ioc)
match_d = {"sha256": "^([a-f0-9]{64})$",
"sha1": "^([a-f0-9]{40})$",
"md5": "^([a-f0-9]{32})$",
}
for k in match_d.keys():
m = re.match(match_d[k], ioc, re.IGNORECASE)
if m: return k
return None

def summary(self, raw):
taxonomies = []
level = "info"
Expand All @@ -57,24 +70,35 @@ def summary(self, raw):
def run(self):

data = self.get_param('data', None, 'Data is missing')
data_type = self.get_param('dataType', None, 'Data type is missing')

response = []

cortex2opencti_types = {"ip": "value",
"url": "value",
"domain": "value",
"mail": "value",
"md5": "hashes_MD5",
"sha1": "hashes_SHA1",
"sha256": "hashes_SHA256",
"filename": "name"}
for opencti in self.openctis:
# Lookup observables
observables = opencti["api_client"].stix_cyber_observable.list(search=data)
# Lookup observables

if self.service == "search_exact":
# Filter results to only keep exact matches
observables = [observable for observable in observables if observable["observable_value"] == data]
# Prepare an OpenCTI type in case of an exact search
if data_type == "hash": data_type = self.get_hash_type(data)
opencti_type = cortex2opencti_types.get(data_type)
if self.service == "search_exact" and opencti_type:
observables = [opencti["api_client"].stix_cyber_observable.read(
filters=[{"key": opencti_type, "values": [data]}])]
else:
observables = opencti["api_client"].stix_cyber_observable.list(search=data)

for observable in observables:
# Strip observable data for lighter output
del(observable["objectMarkingIds"])
del(observable["objectLabelIds"])
del(observable["externalReferencesIds"])
del(observable["indicatorsIds"])
del(observable["parent_types"])
for key in ["objectMarkingIds", "objectLabelIds", "externalReferencesIds",
"indicatorsIds", "parent_types"]:
observable.pop(key, None)

# Get a list of reports containing this observable
reports = opencti["api_client"].report.list(
Expand All @@ -88,13 +112,9 @@ def run(self):

# Strip reports data for lighter output.
for report in reports:
del(report["objects"])
del(report["objectMarkingIds"])
del(report["externalReferencesIds"])
del(report["objectLabelIds"])
del(report["parent_types"])
del(report["objectsIds"])
del(report["x_opencti_graph_data"])
for key in ["objects", "objectMarkingIds", "externalReferencesIds",
"objectLabelIds", "parent_types", "objectsIds", "x_opencti_graph_data"]:
report.pop(key, None)

observable["reports"] = reports

Expand Down