Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New analyzer: malwares.com #252

Merged
merged 1 commit into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions analyzers/Malwares/Malwares_GetReport.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "Malwares_GetReport",
"version": "1.0",
"author": "CERT-LDO",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the latest Malwares report for a file, hash, domain or an IP address.",
"dataTypeList": ["file", "hash", "domain", "ip"],
"baseConfig": "Malwares",
"config": {
"check_tlp": true,
"max_tlp": 3,
"service": "get"
},
"command": "Malwares/malwares.py"
}
16 changes: 16 additions & 0 deletions analyzers/Malwares/Malwares_Scan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "Malwares_Scan",
"version": "1.0",
"author": "CERT-LDO",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Use Malwares api to scan a file or URL.",
"dataTypeList": ["file", "url"],
"baseConfig": "Malwares",
"config": {
"check_tlp": true,
"service": "scan",
"max_tlp": 1
},
"command": "Malwares/malwares.py"
}
170 changes: 170 additions & 0 deletions analyzers/Malwares/malwares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
# encoding: utf-8
import sys
import os
import json
import codecs
import time
import hashlib

from malwares_api import Api
from cortexutils.analyzer import Analyzer

try:
from StringIO import StringIO
except ImportError:
from io import StringIO

class MalwaresAnalyzer(Analyzer):

def __init__(self):
Analyzer.__init__(self)
self.service = self.getParam('config.service', None, 'Service parameter is missing')
self.key = self.getParam('config.key', None, 'Missing Malware API key')
self.polling_interval = self.getParam('config.polling_interval', 60)

def wait_file_report(self, id):
results = self.check_response(self.m_api.get_file_report(id))
code = results.get('result_code', None)
if code == 1:
self.report(results)
else:
time.sleep(self.polling_interval)
self.wait_file_report(id)

def wait_url_report(self, id):
results = self.check_response(self.m_api.get_url_report(id))
code = results.get('result_code', None)
if code == 1:
self.report(results)
else:
time.sleep(self.polling_interval)
self.wait_url_report(id)

def check_response(self, response):
if type(response) is not dict:
self.error('Bad response : ' + str(response))
status = response.get('response_code', -1)
if status in (-14, -15):
self.error('Malwares api rate limit exceeded.')
elif int(status) < 1:
self.error('Bad status : ' + str(status))
results = response.get('results', {})
return results

# 0 => not found
# -2 => in queue
# 1 => ready

def read_scan_response(self, response, func):
results = self.check_response(response)
code = results.get('result_code', None)
md5 = results.get('md5', None)
url = results.get('url', None)
if code in (1,2) and md5 is not None:
func(md5)
elif code in (1,2) and url is not None:
func(url)
else:
self.error('%d %s %s - Scan not found' % (code, md5, url))

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "Malwares"
predicate = "Score"
value = "\"No info\""
score = -1

result = {
"has_result": True
}

if "ai_score" in raw.keys():
score = raw["ai_score"]
if score < 10:
level = "safe"
elif 10 <= score < 30:
level = "suspicious"
else:
level = "malicious"

result['score'] = score

value = "\"{}/100\"".format(score)

else:
if "detected_communicating_file" in raw.keys() or "detected_url" in raw.keys() or "detected_downloaded_file" in raw.keys():
score = max(
raw.get("detected_communicating_file", {}).get("total", 0),
raw.get("detected_url", {}).get("total", 0),
raw.get("detected_downloaded_file", {}).get("total", 0)
)
value = "\"{} results\"".format(score)

elif "virustotal" in raw.keys():
score = raw.get("virustotal", {}).get("positives", 0)
total = raw.get("virustotal", {}).get("total", 0)
value = "\"{}/{} positives\"".format(score, total)

if score == 0:
level = "safe"
elif 0 < score <= 5:
level = "suspicious"
elif score > 5:
level = "malicious"

taxonomies.append(self.build_taxonomy(
level, namespace, predicate, value))
return {"taxonomies": taxonomies}

def run(self):
Analyzer.run(self)
self.m_api = Api(self.key)

if self.service == 'scan':
if self.data_type == 'file':
filename = self.getParam('filename', 'noname.ext')
filepath = self.getParam('file', None, 'File is missing')
self.read_scan_response(self.m_api.scan_file(
open(filepath, 'rb'), filename), self.wait_file_report)
elif self.data_type == 'url':
data = self.getParam('data', None, 'Data is missing')
self.read_scan_response(
self.m_api.scan_url(data), self.wait_url_report)
else:
self.error('Invalid data type')
elif self.service == 'get':
if self.data_type == 'domain':
data = self.getParam('data', None, 'Data is missing')
self.report(self.check_response(
self.m_api.get_domain_report(data)))
elif self.data_type == 'ip':
data = self.getParam('data', None, 'Data is missing')
self.report(self.check_response(self.m_api.get_ip_report(data)))
elif self.data_type == 'file':

hashes = self.getParam('attachment.hashes',
None)
if hashes is None:
filepath = self.getParam('file', None, 'File is missing')
hash = hashlib.sha256(open(filepath, 'r').read()).hexdigest();
else:
# find SHA256 hash
hash = next(h for h in hashes if len(h) == 64)

self.report(self.check_response(self.m_api.get_file_report(hash)))

elif self.data_type == 'hash':
data = self.getParam('data', None, 'Data is missing')
self.report(self.check_response(self.m_api.get_file_report(data)))
else:
self.error('Invalid data type')
else:
self.error('Invalid service')


if __name__ == '__main__':
MalwaresAnalyzer().run()


170 changes: 170 additions & 0 deletions analyzers/Malwares/malwares_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
#import StringIO
import requests

class Api():

def __init__(self, api_key=None):
self.api_key = api_key
self.base = 'https://www.malwares.com/api/v2/'
self.version = 2
if api_key is None:
raise ApiError("You must supply a valid Malwares API key.")

def scan_file(self, this_file, this_filename):
""" Submit a file to be scanned by Malwares

:param this_file: File to be scanned (200MB file size limit)
:param this_filename: Filename for scanned file
:return: JSON response that contains scan_id and permalink.
"""
params = {
'api_key': self.api_key,
'filename': this_filename
}
try:
files = {'file': (this_file.name, open(this_file.name, 'rb'), 'application/octet-stream')}
except TypeError as e:
return dict(error=e.message)

try:
response = requests.post(self.base + 'file/upload', files=files, data=params)
except requests.RequestException as e:
return dict(error=e.message)

return _return_response_and_status_code(response)

def get_file_report(self, this_hash):
""" Get the scan results for a file.

:param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
retrieve or scan_ids from a previous call to scan_file.
:return:
"""
params = {'api_key': self.api_key, 'hash': this_hash}

try:
response_info = requests.get(self.base + 'file/mwsinfo', params=params)
response_additional = requests.get(self.base + 'file/addinfo', params=params)
except requests.RequestException as e:
return dict(error=e.message)

ri = _return_response_and_status_code(response_info)
ra = _return_response_and_status_code(response_additional)

if ri['response_code'] == '1' and ra['response_code'] == '1': # both ok
response = dict(results={**ri['results'], **ra['results']}, response_code=1)
elif ri['response_code'] == '1' and ra['response_code'] == '0': # advance non exists but standard ok
response = ri
elif ri['response_code'] == '2': # main is still loading
response = dict(results={}, response_code=2)
else: # error generic
response = ri
return response

def scan_url(self, this_url):
""" Submit a URL to be scanned by Malwares.

:param this_url: The URL that should be scanned.
:return: JSON response that contains scan_id and permalink.
"""
params = {'api_key': self.api_key, 'url': this_url}

try:
response = requests.post(self.base + 'url/request', data=params)
except requests.RequestException as e:
return dict(error=e.message)

return _return_response_and_status_code(response)

def get_url_report(self, this_url):
""" Get the scan results for a URL.

:param this_url: a URL will retrieve the most recent report on the given URL.
:return: JSON response
"""
params = {'api_key': self.api_key, 'url': this_url}

try:
response = requests.post(self.base + 'url/info', data=params)
except requests.RequestException as e:
return dict(error=e.message)

return _return_response_and_status_code(response)

def get_ip_report(self, this_ip):
""" Get IP address reports.

:param this_ip: a valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
supported.
:return: JSON response
"""
params = {'api_key': self.api_key, 'ip': this_ip}

try:
response = requests.get(self.base + 'ip/info', params=params)
except requests.RequestException as e:
return dict(error=e.message)

return _return_response_and_status_code(response)

def get_domain_report(self, this_domain):
""" Get information about a given domain.

:param this_domain: a domain name.
:return: JSON response
"""
params = {'api_key': self.api_key, 'hostname': this_domain}

try:
response = requests.get(self.base + 'hostname/info', params=params)
except requests.RequestException as e:
return dict(error=e.message)

return _return_response_and_status_code(response)

class ApiError(Exception):
pass


def _return_response_and_status_code(response):
""" Output the requests response JSON and status code

:rtype : dict
:param response: requests response object
:return: dict containing the JSON response and/or the status code with error string.
"""

result_codes = {
"-11" : "No matching data to API Key API Key error",
"-12" : "No authority to use No authority to use",
"-13" : "Expired API Key API Key expired",
"-14" : "Over the daily request limit Request limit per daily exceeded",
"-15" : "Over the hourly request limit Request limit per hour exceeded",
"-1" : "Invalid Parameters / Invalid Request",
"-25" : "File Upload Quota Limit Error in file size to upload",
"-2" : "Invalid URL Error in URL type",
"-31" : "Invalid type of hash error in Hash type",
"-400" : "No file attached No file attached",
"-404" : "No result No result",
"-415" : "Ectype of upload form is not multipart/form-data Error in upload form type",
"-41" : "Invalid type of url Error in URL type",
"-500" : "Internal Server Error System error",
"-51" : "Invalid type of ip Error in IP type",
"-61" : "Invalid type of hostname Error in Hostname type",
"0" : "Data is not exist No information found in DB.",
"1" : "Data exists / Analysis request succeeded /Successful upload (new)",
"2" : "Analysis in progress / Successful upload (duplicated)",
"-999": "Error"

}

results = response.json()

result_code = str(response.json().get('result_code', '-999'))
result_message = result_codes[result_code]
return dict(results=results, response_code=result_code, result_message=result_message)

2 changes: 2 additions & 0 deletions analyzers/Malwares/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cortexutils
requests
Loading