Skip to content

Commit

Permalink
[reversinglabs-spectra-analyze] Add classification API support in wor…
Browse files Browse the repository at this point in the history
…kflow (#3420)
  • Loading branch information
DinkoReversingLabs authored Feb 13, 2025
1 parent 1aa8bbe commit 894080f
Showing 1 changed file with 107 additions and 48 deletions.
155 changes: 107 additions & 48 deletions internal-enrichment/reversinglabs-spectra-analyze/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _get_config_variables(self):
)

self.reversinglabs_spectra_user_agent = (
"ReversingLabs Spectra Analyze OpenCTI v1.1.0"
"ReversingLabs Spectra Analyze OpenCTI v1.2.0"
)

"""
Expand Down Expand Up @@ -202,26 +202,36 @@ def _submit_file_for_analysis(self, stix_entity, opencti_entity, hash, hash_type
try:
response = self.a1000client.get_detailed_report_v2(
sample_hashes=self.hash,
retry=True,
retry=False,
)

if response.status_code == 200:
analysis_report = json.loads(response.text)
else:
raise Exception(
f"{self.helper.connect_name}: There was issue with getting report from Spectra Analyze. HTTP Status code {str(response.status_code)}"
)
except Exception as err:
raise ValueError(
f"{self.helper.connect_name}: Looks like the sample you are trying to access may not be available on your Spectra Analyze instance. On Spectra Analyze run fetch and analyze on the sample."
) from err

if response.status_code == 200:
analysis_report = json.loads(response.text)
else:
raise Exception(
f"{self.helper.connect_name}: There was issue with getting report from Spectra Analyze. HTTP Status code {str(response.status_code)}"
)
else:
raise ValueError(
f"{self.helper.connect_name}: Unsupported type provided for analysis result retrieval!"
)

return analysis_report

def _submit_file_for_classification(self, stix_entity, opencti_entity, hash):

response = self.a1000client.get_classification_v3(
sample_hash=hash, local_only=False, av_scanners=False
)

analysis_report = json.loads(response.text)

return analysis_report

def _submit_url_for_analysis(self, stix_entity, opencti_entity, url_sample):
self.stix_entity = stix_entity
self.opencti_entity = opencti_entity
Expand Down Expand Up @@ -325,6 +335,50 @@ def _process_file_analysis_result(

return results

def _process_file_classification_results(
self, stix_objects, stix_entity, opencti_entity, analysis_result
):
self.stix_objects = stix_objects
self.stix_entity = stix_entity
self.opencti_entity = opencti_entity
self.analysis_result = analysis_result

results = {}
results["labels"] = []

try:
results["sample_name"] = self.analysis_result["sha1"]
results["classification"] = self.analysis_result["classification"]
results["sha256"] = self.analysis_result["sha256"]
if results["classification"] in ("suspicious", "malicious"):
threat_name = self.analysis_result["classification_result"].split(".")
results["threat_name"] = threat_name
results["platform"] = threat_name[0]
results["threat_type"] = threat_name[1]
results["malware_family_name"] = threat_name[2]
# Creating label out of malware type and family
results["labels"].append(results["threat_type"])
results["labels"].append(results["malware_family_name"])
results["risk_score"] = self.analysis_result["riskscore"]
results["score"] = results["risk_score"] * 10
results["description"] = (
"Sample was processed by Spectra Analyze! Sample is classified as "
+ results["classification"]
)

# Creating label out of classification
results["labels"].append(results["classification"])

except Exception as err:
raise ValueError(
f"{self.helper.connect_name}: INFO: Fetching analysis data failed. Please try again shortly!"
) from err

# Add score and description to the Observable
self._upsert_observable(results)

return results

def _process_url_analysis_result(
self, stix_objects, stix_entity, opencti_entity, analysis_result
):
Expand Down Expand Up @@ -1284,6 +1338,29 @@ def _url_report_flow(self, stix_entity, opencti_entity, url_sample):
f"{self.helper.connect_name}: Number of stix bundles sent to workers round 1: {str(len(bundles_sent))}"
)

def _process_malicious(self, stix_objects, stix_entity, results):
if (results["classification"] == "malicious") or (
results["classification"] == "suspicious"
):

self.helper.log_info(
f"{self.helper.connect_name}: Create STIX objects for malicious sample results!"
)

# Create indicators based on result
self._create_indicators(results)

# Create Malware and add relationship to artifact
self._generate_stix_malware(results)

# Create Stix Bundle and send it to OpenCTI
bundle = self._generate_stix_bundle(stix_objects, stix_entity)
bundles_sent = self.helper.send_stix2_bundle(bundle)

self.helper.log_info(
f"{self.helper.connect_name}: Number of stix bundles sent for workers: {str(len(bundles_sent))}"
)

def _process_message(self, data: Dict):
stix_objects = data["stix_objects"]
stix_entity = data["stix_entity"]
Expand Down Expand Up @@ -1318,38 +1395,36 @@ def _process_message(self, data: Dict):
hash = ent_hash["hash"]
hash_type = ent_hash["algorithm"]

# Submit File sample for analysis
analysis_result = self._submit_file_for_analysis(
stix_entity, opencti_entity, hash, hash_type
# Get file classification
analysis_result = self._submit_file_for_classification(
stix_entity, opencti_entity, hash
)

if not analysis_result:
raise ValueError(
f"{self.helper.connect_name}: Provided sample does not exist on the appliance. Try to upload it first and re-run!"
self.helper.log_info(
f"{self.helper.connect_name}: There is no analysis result for provided sample!"
)

# Integrate analysis results with OpenCTI
results = self._process_file_analysis_result(
stix_objects, stix_entity, opencti_entity, analysis_result
)
# Integrate classification analysis results with OpenCTI
if "results" not in analysis_result:

if (results["classification"] == "malicious") or (
results["classification"] == "suspicious"
):

# Create indicators based on result
self._create_indicators(results)
results = self._process_file_classification_results(
stix_objects, stix_entity, opencti_entity, analysis_result
)

# Create Malware and add relationship to artifact
self._generate_stix_malware(results)
self._process_malicious(stix_objects, stix_entity, results)

# Create Stix Bundle and send it to OpenCTI
bundle = self._generate_stix_bundle(stix_objects, stix_entity)
bundles_sent = self.helper.send_stix2_bundle(bundle)
# Submit File sample for analysis
analysis_result = self._submit_file_for_analysis(
stix_entity, opencti_entity, hash, hash_type
)

self.helper.log_info(
f"{self.helper.connect_name}: Number of stix bundles sent for workers: {str(len(bundles_sent))}"
# Integrate file analysis results with OpenCTI
if "results" in analysis_result:
results = self._process_file_analysis_result(
stix_objects, stix_entity, opencti_entity, analysis_result
)
self._process_malicious(stix_objects, stix_entity, results)

elif opencti_type == "Url":
url_sample = stix_entity["value"]
Expand Down Expand Up @@ -1380,23 +1455,7 @@ def _process_message(self, data: Dict):
stix_objects, stix_entity, opencti_entity, analysis_result
)

if (results["classification"] == "malicious") or (
results["classification"] == "suspicious"
):

# Create indicators based on result
self._create_indicators(results)

# Create Malware and add relationship to artifact
self._generate_stix_malware(results)

# Create Stix Bundle and send it to OpenCTI
bundle = self._generate_stix_bundle(stix_objects, stix_entity)
bundles_sent = self.helper.send_stix2_bundle(bundle)

self.helper.log_info(
f"{self.helper.connect_name}: Number of stix bundles sent to workers round 1: {str(len(bundles_sent))}"
)
self._process_malicious(stix_objects, stix_entity, results)

elif opencti_type == "IPv4-Addr":
ip_sample = stix_entity["value"]
Expand Down

0 comments on commit 894080f

Please sign in to comment.