-
Notifications
You must be signed in to change notification settings - Fork 385
/
Copy pathanyrun_analyzer.py
executable file
·169 lines (155 loc) · 7.81 KB
/
anyrun_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
# encoding: utf-8
import time
import requests
from os.path import basename
from cortexutils.analyzer import Analyzer
from requests.packages.urllib3.exceptions import InsecureRequestWarning
class AnyRunAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
self.url = "https://api.any.run/v1"
self.token = self.get_param("config.token", None, "Service token is missing")
self.privacy_type = self.get_param("config.privacy_type", None, "Privacy type is missing")
self.verify_ssl = self.get_param("config.verify_ssl", True, None)
if not self.verify_ssl:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.env_bitness = self.get_param("config.env_bitness", None, None)
self.env_version = self.get_param("config.env_version", None, None)
self.env_type = self.get_param("config.env_type", None, None)
self.opt_network_connect = self.get_param("config.opt_network_connect", None, None)
self.opt_network_fakenet = self.get_param("config.opt_network_fakenet", None, None)
self.opt_network_tor = self.get_param("config.opt_network_tor", None, None)
self.opt_network_mitm = self.get_param("config.opt_network_mitm", None, None)
self.opt_network_geo = self.get_param("config.opt_network_geo", None, None)
self.opt_kernel_heavyevasion = self.get_param("config.opt_kernel_heavyevasion", None, None)
self.opt_timeout = self.get_param("config.opt_timeout", None, None)
self.obj_ext_startfolder = self.get_param("config.obj_ext_startfolder", None, None)
self.obj_ext_browser = self.get_param("config.obj_ext_browser", None, None)
def summary(self, raw):
taxonomies = []
level = "safe"
namespace = "AnyRun"
predicate = "Sandbox"
value = (
raw.get("analysis", {}).get("scores", {}).get("verdict", {}).get("score", 0)
)
if 50 < value < 100:
level = "suspicious"
elif value == 100:
level = "malicious"
taxonomies.append(
self.build_taxonomy(level, namespace, predicate, "{0}/100".format(value))
)
return {"taxonomies": taxonomies}
def run(self):
Analyzer.run(self)
try:
headers = {"Authorization": "API-Key {0}".format(self.token)}
status_code = None
tries = 0
if self.data_type == "file":
filepath = self.get_param("file", None, "File is missing")
filename = self.get_param("filename", basename(filepath))
while status_code in (None, 429) and tries <= 15:
with open(filepath, "rb") as sample:
files = {"file": (filename, sample)}
data = {"opt_privacy_type": self.privacy_type,
"env_bitness": self.env_bitness,
"env_version": self.env_version,
"env_type": self.env_type,
"opt_network_connect": self.opt_network_connect,
"opt_network_fakenet": self.opt_network_fakenet,
"opt_network_tor": self.opt_network_tor,
"opt_network_mitm": self.opt_network_mitm,
"opt_network_geo": self.opt_network_geo,
"opt_kernel_heavyevasion": self.opt_kernel_heavyevasion,
"opt_timeout": self.opt_timeout,
"obj_ext_startfolder": self.obj_ext_startfolder }
response = requests.post(
"{0}/analysis".format(self.url),
files=files,
data=data,
headers=headers,
verify=self.verify_ssl,
)
status_code = response.status_code
if status_code == 200:
task_id = response.json()["data"]["taskid"]
elif status_code == 201:
task_id = response.json()["taskid"]
elif status_code == 429:
# it not support parallel runs, so we wait and resubmit later
time.sleep(60)
tries += 1
else:
self.error(response.json()["message"])
elif self.data_type == "url":
url = self.get_param("data", None, "Url is missing")
data = {"obj_type": "url",
"obj_url": url,
"opt_privacy_type": self.privacy_type,
"env_bitness": self.env_bitness,
"env_version": self.env_version,
"env_type": self.env_type,
"opt_network_connect": self.opt_network_connect,
"opt_network_fakenet": self.opt_network_fakenet,
"opt_network_tor": self.opt_network_tor,
"opt_network_mitm": self.opt_network_mitm,
"opt_network_geo": self.opt_network_geo,
"opt_kernel_heavyevasion": self.opt_kernel_heavyevasion,
"opt_timeout": self.opt_timeout,
"obj_ext_browser": self.obj_ext_browser }
while status_code in (None, 429) and tries <= 15:
response = requests.post(
"{0}/analysis".format(self.url),
data=data,
headers=headers,
verify=self.verify_ssl,
)
status_code = response.status_code
if status_code == 200:
task_id = response.json()["data"]["taskid"]
elif status_code == 201:
task_id = response.json()["taskid"]
elif status_code == 429:
# it not support parallel runs, so we wait and resubmit later
time.sleep(60)
tries += 1
else:
self.error(response.json()["message"])
else:
self.error("Invalid data type!")
finished = False
tries = 0
while not finished and tries <= 15: # wait max 15 mins
time.sleep(60)
response = requests.get(
"{0}/analysis/{1}".format(self.url, task_id),
headers=headers,
verify=self.verify_ssl,
)
if response.status_code == 200:
finished = (
True if response.json()["data"]["status"] == "done" else False
)
elif 400 < response.status_code < 500:
self.error(response.json()["message"])
tries += 1
if not finished:
self.error("AnyRun analysis timed out")
# this items could be huge, we provide link to the report so avoid them in cortex
final_report = response.json()["data"]
final_report.pop("environments", None)
final_report.pop("modified", None)
for incident in final_report.get("incidents", []):
incident.pop("events", None)
for process in final_report.get("processes", []):
process.pop("modules", None)
self.report(final_report)
except requests.exceptions.RequestException as e:
self.error(str(e))
except Exception as e:
self.unexpectedError(e)
if __name__ == "__main__":
AnyRunAnalyzer().run()