Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] HybridAnalysis API V2 #1117

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions analyzers/HybridAnalysis/HybridAnalysis_GetReport.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
"author": "Daniil Yugoslavskiy, Tieto",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"dataTypeList": ["hash", "file", "filename"],
"dataTypeList": ["hash", "file", "filename", "url", "domain"],
"description": "Fetch Hybrid Analysis reports associated with hashes and filenames.",
"command": "HybridAnalysis/HybridAnalysis_analyzer.py",
"baseConfig": "HybridAnalysis",
"configurationItems": [
{
"name": "secret",
"description": "HybridAnalysis secret",
"type": "string",
"multi": false,
"required": true
},
{
"name": "key",
"description": "API key",
Expand Down
95 changes: 71 additions & 24 deletions analyzers/HybridAnalysis/HybridAnalysis_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import hashlib
import requests
import time
from datetime import datetime

from requests.auth import HTTPBasicAuth
from cortexutils.analyzer import Analyzer
Expand All @@ -22,11 +23,10 @@
class VxStreamSandboxAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
self.basic_url = 'https://www.hybrid-analysis.com/api/'
self.headers = {'User-Agent': 'VxStream'}
self.basic_url = 'https://www.hybrid-analysis.com/api/v2/'

self.secret = self.get_param('config.secret', None, 'VxStream Sandbox secret key is missing')
self.api_key = self.get_param('config.key', None, 'VxStream Sandbox API key is missing')
self.headers = {'User-Agent': 'VxStream', 'api-key': self.api_key, 'accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded'}

def summary(self, raw_report):
taxonomies = []
Expand All @@ -35,20 +35,52 @@ def summary(self, raw_report):
level = "info"
namespace = "HybridAnalysis"
predicate = "Threat level"
value = "No verdict"
value = "Unknown"

verdicts = {
"no specific threat": 0,
"whitelisted": 1,
"suspicious": 2,
"malicious": 3,
}

# define json keys to loop
if self.data_type in ['hash', 'file']:
minireports = raw_report.get('results').get('response')
elif self.data_type in ['filename']:
minireports = raw_report.get('results').get('response').get('result')
minireports = raw_report["results"]
elif self.data_type in ['filename', 'url', 'domain']:
minireports = raw_report["results"]["result"]

if len(minireports) != 0:
# get first report with not Null verdict
# Previous solution was looping through the report and take the first one that was not empty
# Better solution, loop throught all the last verdicts (less than an hour from last one) and take the worst verdict
# In some cases, HA returns a verdict with "No specific threat" but the one just before (few seconds) from the same scan and different tool was tagued malicious
last_verdict_time = None
last_verdict = None
highest_threat_score = None

for minireport in minireports:
if minireport.get('verdict') is not None:
report_verdict = minireport.get('verdict')
break
if minireport["verdict"] is not None:
if last_verdict_time is None:
last_verdict_time = int(datetime.timestamp(datetime.strptime(minireport["analysis_start_time"][:10] + minireport["analysis_start_time"][11:19], "%Y-%m-%d%H:%M:%S")))
last_verdict = minireport["verdict"]
# Set the initial threat score
highest_threat_score = minireport.get("threat_score")
else:
new_verdict_time = int(datetime.timestamp(datetime.strptime(minireport["analysis_start_time"][:10] + minireport["analysis_start_time"][11:19], "%Y-%m-%d%H:%M:%S")))
if abs(last_verdict_time - new_verdict_time) <= 3600:
last_verdict_time = new_verdict_time
try:
if verdicts[minireport["verdict"]] > verdicts[last_verdict]:
last_verdict = minireport["verdict"]
except KeyError:
continue
# Update the highest threat score if the current one is greater
current_threat_score = minireport.get("threat_score")
if current_threat_score is not None:
if highest_threat_score is None or current_threat_score > highest_threat_score:
highest_threat_score = current_threat_score

report_verdict = last_verdict

# create shield badge for short.html
if report_verdict == 'malicious':
Expand All @@ -63,6 +95,11 @@ def summary(self, raw_report):
elif report_verdict == 'no specific threat':
level = 'info'
value = "No Specific Threat"

# Add the highest threat score if available
if highest_threat_score is not None:
value = f"{value} (Threat Score: {highest_threat_score})"

else:
level = 'info'
value = "Unknown"
Expand All @@ -71,39 +108,48 @@ def summary(self, raw_report):
return {"taxonomies": taxonomies}

def run(self):

try:
if self.data_type == 'hash':
query_url = 'scan/'
query_data = self.get_param('data', None, 'Hash is missing')
query_url = 'search/hash'
query_data = {'hash': self.get_param('data', None, 'Hash is missing')}

elif self.data_type == 'file':
query_url = 'scan/'
query_url = 'search/hash'
hashes = self.get_param('attachment.hashes', None)

if hashes is None:
filepath = self.get_param('file', None, 'File is missing')
query_data = hashlib.sha256(open(filepath, 'rb').read()).hexdigest()
query_data = {'hash': hashlib.sha256(open(filepath, 'rb').read()).hexdigest()}
else:
# find SHA256 hash
query_data = next(h for h in hashes if len(h) == 64)
query_data = {'hash': next(h for h in hashes if len(h) == 64)}

elif self.data_type == 'filename':
query_url = 'search?query=filename:'
query_data = '"{}"'.format(self.get_param('data', None, 'Filename is missing'))
query_url = 'search/terms'
query_data = {'filename': self.get_param('data', None, 'Filename is missing')}

elif self.data_type == 'url':
query_url = 'search/terms'
query_data = {'url': self.get_param('data', None, 'URL is missing')}

elif self.data_type == 'domain':
query_url = 'search/terms'
query_data = {'domain': self.get_param('data', None, 'Domain is missing')}

else:
self.notSupported()

url = str(self.basic_url) + str(query_url) + str(query_data)
url = str(self.basic_url) + str(query_url)

error = True
while error:
r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=True)
if "error" in r.json().get('response'):
if "Exceeded maximum API requests per minute(5)" in r.json().get('response').get('error'):
r = requests.post(url, headers=self.headers, data=query_data, verify=True)

if "validation_errors" in r.json():
if "Exceeded maximum API requests per minute(5)" in r.json()["validation_errors"][0]["errors"]:
time.sleep(60)
else:
self.error(r.json().get('response').get('error'))
self.error(r.json()["validation_errors"][0]["errors"][0])
else:
error = False

Expand All @@ -115,3 +161,4 @@ def run(self):

if __name__ == '__main__':
VxStreamSandboxAnalyzer().run()