Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ibmxfe updates #1040

Open
wants to merge 9 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion analyzers/IBMXForce/IBMXForce_Lookup.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
"multi": false,
"type": "boolean",
"default": true
},
{
"name": "account",
"description": "Account ID",
"required": false,
"multi": false,
"type": "string"
}
]
}
}
62 changes: 48 additions & 14 deletions analyzers/IBMXForce/ibmxforce_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self):
if not self.verify:
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
self.proxies = self.get_param('config.proxy', None)
self.account = self.get_param('config.account', None)

def parse_data(self, date):
try:
Expand All @@ -30,13 +31,14 @@ def parse_data(self, date):
date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
return date.strftime("%Y-%m-%d")

def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
def cleanup(self, ip_data={}, malware_data={}, dns_data={}, whois_data={}):
response = {
'malware': [],
'history': [],
'dns': [],
'cats': [],
'families': [],
'detectionCoverage': [],
'emails_info': [],
'subjects_info': [],
'score': None,
Expand All @@ -50,6 +52,7 @@ def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
cats = ip_data.get('cats', [])
score = "%d [%d category(s)]" % (score_value, len(cats)) if len(cats) > 0 else score_value
families = []
detectionCoverage = []

elif self.data_type in ['domain', 'url']:
score_value = ip_data.get('result', {}).get('score', 0)
Expand All @@ -58,6 +61,7 @@ def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
cats = [x for x in cats.keys()]
score = "%d [%d category(s)]" % (score_value, len(cats)) if len(cats) > 0 else score_value
families = []
detectionCoverage = []

else:
score_value = malware_data.get('malware', {}).get('risk', 'low')
Expand All @@ -68,7 +72,9 @@ def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
else:
score_nr = 10

families = malware_data.get('malware', {}).get('family', [])
families = malware_data.get('malware').get('origins').get('external').get('family', [])
families = families if families is not None and isinstance(families, list) else []
detectionCoverage = malware_data.get('malware').get('origins').get('external').get('detectionCoverage')
score = "%s [%d family(s)]" % (score_value, len(families)) if len(families) > 0 else score_value
cats = []

Expand All @@ -77,6 +83,7 @@ def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
response['score'] = score
response['score_nr'] = score_nr
response['families'] = families
response['detectionCoverage'] = detectionCoverage

for hist in ip_data.get('history', []):
tmp = {}
Expand Down Expand Up @@ -109,6 +116,9 @@ def cleanup(self, ip_data={}, malware_data={}, dns_data={}):
response['dns'].append(
("", "", ",".join([x for x in dns_data['RDNS']])))

if self.data_type != 'hash':
response['whois'] = whois_data

return response

def ip_query(self, data):
Expand All @@ -117,17 +127,22 @@ def ip_query(self, data):
try:
_session = requests.Session()
_session.auth = (self.key, self.pwd)
if self.account != None:
_session.headers.update({'account': self.account})


_query_ip = _session.get('%s/ipr/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_malware = _session.get(
'%s/ipr/malware/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_info = _session.get('%s/resolve/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_whois = _session.get('%s/whois/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)

ip_data = _query_ip.json() if _query_ip.status_code == 200 else {}
malware_data = _query_malware.json() if _query_malware.status_code == 200 else {}
dns_data = _query_info.json() if _query_info.status_code == 200 else {}
if ip_data or malware_data or dns_data:
return self.cleanup(ip_data=ip_data, malware_data=malware_data, dns_data=dns_data)
whois_data = _query_whois.json() if _query_whois.status_code == 200 else {}
if ip_data or malware_data or dns_data or whois_data:
return self.cleanup(ip_data=ip_data, malware_data=malware_data, dns_data=dns_data, whois_data=whois_data)
else:
self.error('API Access error')

Expand All @@ -142,17 +157,23 @@ def domain_query(self, data):
try:
_session = requests.Session()
_session.auth = (self.key, self.pwd)

if self.account != None:
_session.headers.update({'account': self.account})


_query_url = _session.get('%s/url/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_malware = _session.get(
'%s/url/malware/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_info = _session.get('%s/resolve/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)
_query_whois = _session.get('%s/whois/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)

url_data = _query_url.json() if _query_url.status_code == 200 else {}
malware_data = _query_malware.json() if _query_malware.status_code == 200 else {}
dns_data = _query_info.json() if _query_info.status_code == 200 else {}
if url_data or malware_data or dns_data:
return self.cleanup(ip_data=url_data, malware_data=malware_data, dns_data=dns_data)
whois_data = _query_whois.json() if _query_whois.status_code == 200 else {}
if url_data or malware_data or dns_data or whois_data:
return self.cleanup(ip_data=url_data, malware_data=malware_data, dns_data=dns_data, whois_data=whois_data)
else:
self.error('API Access error')

Expand All @@ -167,14 +188,20 @@ def malware_query(self, data):
try:
_session = requests.Session()
_session.auth = (self.key, self.pwd)

if self.account != None:
_session.headers.update({'account': self.account})

_query_malware = _session.get(
'%s/malware/%s' % (self.url, data), proxies=self.proxies, verify=self.verify)

if _query_malware.status_code == 200:
return self.cleanup(malware_data=_query_malware.json())
else:
self.error('API Access error')
self.report({
'resource': data,
'errorMessage': 'No such sample found'
})

except Exception as e:
self.error("OS error: {0}".format(e))
Expand All @@ -189,17 +216,24 @@ def summary(self, raw):
score_value = raw['score_value']
score = raw['score']

if score_value < 4 or score_value == 'low':
level = "safe"
elif score_value < 7 or score_value == 'medium':
level = "suspicious"
elif score_value >= 7 or score_value == 'high':
level = "malicious"
if type(score_value) == int:
if score_value < 4:
level = "safe"
elif score_value < 7:
level = "suspicious"
elif score_value >= 7:
level = "malicious"
elif type(score_value) == str:
if score_value == 'low':
level = "safe"
elif score_value == 'medium':
level = "suspicious"
elif score_value == 'high':
level = "malicious"

#taxonomies.append(self.build_taxonomy(level, namespace, predicate, "{}".format(score)))
taxonomies.append(self.build_taxonomy(level, namespace, predicate, "{}".format(score)))


return {"taxonomies": taxonomies}

def run(self):
Expand Down