-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtest_alerts.py
134 lines (116 loc) · 5.12 KB
/
test_alerts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import hashlib
from http import HTTPStatus
import uuid
import responses
from intezer_sdk.alerts import get_alerts_by_alert_ids
from intezer_sdk.alerts import Alert
from tests.unit.base_test import BaseTest
from tests.utils import load_binary_file_from_resources
class AlertsSpec(BaseTest):
def _mock_alert_search(self, mock):
mock.add('GET',
url=f'{self.full_url}/alerts/search',
status=HTTPStatus.OK,
json={'result': {
'alerts_count': 1,
'alerts': [{
'alert_id': 'alert_id',
'alert_verdict': 'alert_verdict',
'alert_source': 'alert_source',
'risk_level': 'risk_level',
'risk_category': 'risk_category',
'family_id': 'family_id',
'threat_name': 'threat_name',
'family_name': 'family_name',
'triage_result': {
'alert_verdict': 'alert_verdict',
'alert_verdict_display': 'Alert Verdict Display',
'families': [],
'family_id': 'family_id',
'family_name': 'family_name',
'risk_category': 'risk_category',
'risk_category_display': 'Risk Category Display',
'risk_level': 'risk_level',
'threat_name': 'threat_name',
}
}]}})
def test_get_alerts_by_alert_ids(self):
# Arrange
with responses.RequestsMock() as mock:
self._mock_alert_search(mock)
# Act
alerts_amount, alerts_details = get_alerts_by_alert_ids(['alert_id'])
# Assert
self.assertEqual(alerts_amount, 1)
self.assertEqual(alerts_details[0]['alert_id'], 'alert_id')
def test_get_alerts_by_alerts_ids(self):
# Arrange
with responses.RequestsMock() as mock:
mock.add('GET',
url=f'{self.full_url}/alerts/search',
status=HTTPStatus.OK,
json={'result': {
'alerts_count': 0,
'alerts': []
}})
# Act
alerts_amount, alerts_details = get_alerts_by_alert_ids(['alert_id_2'])
# Assert
self.assertEqual(alerts_amount, 0)
self.assertEqual(alerts_details, [])
def test_ingest_alert_success(self):
# Arrange
alert_id = str(uuid.uuid4())
with responses.RequestsMock() as mock:
mock.add('POST',
url=f'{self.full_url}/alerts/ingest',
status=HTTPStatus.OK,
json={'result': True, 'alert_id': alert_id})
# Act
alert = Alert.send(raw_alert={'alert_id': 'alert_id'},
alert_mapping={'some': 'mapping'},
source='source',
environment='environment',
display_fields=['display_fields'],
alert_sender='alert_sender',
)
# Assert
self.assertEqual(alert.alert_id, alert_id)
def test_alert_from_id(self):
# Arrange
with responses.RequestsMock() as mock:
mock.add('GET',
url=f'{self.full_url}/alerts/get-by-id',
status=HTTPStatus.OK,
json={'result': {}, 'status': 'success'})
# Act
alert = Alert.from_id('alert_id')
# Assert
self.assertEqual(alert.alert_id, 'alert_id')
def test_alert_from_id_waits_from_completion(self):
# Arrange
with responses.RequestsMock() as mock:
mock.get(url=f'{self.full_url}/alerts/get-by-id',
status=HTTPStatus.OK,
json={'result': {}, 'status': 'in_progress'})
mock.get(url=f'{self.full_url}/alerts/get-by-id',
status=HTTPStatus.OK,
json={'result': {}, 'status': 'success'})
# Act
alert = Alert.from_id('alert_id', wait=True)
# Assert
self.assertEqual(alert.alert_id, 'alert_id')
def test_ingest_binary_alert_success(self):
# Arrange
raw_alert = load_binary_file_from_resources('binary_alerts/test.eml')
alert_id = hashlib.sha256(raw_alert.read()).hexdigest()
with responses.RequestsMock() as mock:
mock.add('POST',
url=f'{self.full_url}/alerts/ingest/binary',
status=HTTPStatus.OK,
json={'result': True, 'alert_id': alert_id})
# Act
alert = Alert.send_phishing_email(raw_email=raw_alert,
alert_sender='alert_sender')
# Assert
self.assertEqual(alert.alert_id, alert_id)