-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathanalyzers.py
88 lines (63 loc) · 3.06 KB
/
analyzers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import os
import magic
import json
from typing import List
from cortex4py.query import *
from .abstract import AbstractController
from ..models import Analyzer, Job, AnalyzerDefinition
class AnalyzersController(AbstractController):
def __init__(self, api):
AbstractController.__init__(self, 'analyzer', api)
def find_all(self, query, **kwargs) -> List[Analyzer]:
return self._wrap(self._find_all(query, **kwargs), Analyzer)
def find_one_by(self, query, **kwargs) -> Analyzer:
return self._wrap(self._find_one_by(query, **kwargs), Analyzer)
def get_by_id(self, analyzer_id) -> Analyzer:
return self._wrap(self._get_by_id(analyzer_id), Analyzer)
def get_by_name(self, name) -> Analyzer:
return self._wrap(self._find_one_by(Eq('name', name)), Analyzer)
def get_by_type(self, data_type) -> List[Analyzer]:
return self._wrap(self._api.do_get('analyzer/type/{}'.format(data_type)).json(), Analyzer)
def definitions(self) -> List[AnalyzerDefinition]:
return self._wrap(self._api.do_get('analyzerdefinition').json(), AnalyzerDefinition)
def enable(self, analyzer_name, config) -> Analyzer:
url = 'organization/analyzer/{}'.format(analyzer_name)
config['name'] = analyzer_name
return self._wrap(self._api.do_post(url, config).json(), Analyzer)
def update(self, analyzer_id, config) -> Analyzer:
url = 'analyzer/{}'.format(analyzer_id)
config.pop('name', None)
return self._wrap(self._api.do_patch(url, config).json(), Analyzer)
def disable(self, analyzer_id) -> bool:
return self._api.do_delete('analyzer/{}'.format(analyzer_id))
def run_by_id(self, analyzer_id, observable, **kwargs) -> Job:
tlp = observable.get('tlp', 2)
data_type = observable.get('dataType', None)
post = {
'dataType': data_type,
'tlp': tlp
}
params = {}
if 'force' in kwargs:
params['force'] = kwargs.get('force', 1)
# add additional details
for key in ['message', 'parameters']:
if key in observable:
post[key] = observable.get(key, None)
if observable.get('dataType') == "file":
file_path = observable.get('data', None)
file_def = {
"data": (os.path.basename(file_path), open(file_path, 'rb'),
magic.Magic(mime=True).from_file(file_path))
}
data = {
'_json': json.dumps(post)
}
return self._wrap(self._api.do_file_post('analyzer/{}/run'.format(analyzer_id), data,
files=file_def, params=params).json(), Job)
else:
post['data'] = observable.get('data')
return self._wrap(self._api.do_post('analyzer/{}/run'.format(analyzer_id), post, params).json(), Job)
def run_by_name(self, analyzer_name, observable, **kwargs) -> Job:
analyzer = self.get_by_name(analyzer_name)
return self.run_by_id(analyzer.id, observable, **kwargs)