Skip to content

Commit

Permalink
Merge pull request #252 from CybercentreCanada/feature/safelist_signa…
Browse files Browse the repository at this point in the history
…tures

Feature/safelist signatures
  • Loading branch information
cccs-sgaron authored Sep 2, 2021
2 parents 818e3c4 + 8089798 commit c8880cc
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 8 deletions.
2 changes: 1 addition & 1 deletion assemblyline_ui/api/v4/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def get_file_results(sha256, **kwargs):

# Process Signatures
for signature in sec['heuristic'].get('signature', []):
sig = (signature['name'], h_type)
sig = (signature['name'], h_type, signature.get('safe', False))
if sig not in output['signatures']:
output['signatures'].add(sig)

Expand Down
32 changes: 30 additions & 2 deletions assemblyline_ui/api/v4/safelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def add_or_update_hash(**kwargs):
"reason": ["We've seen this file many times and it leads to False positives"],
"type": "user"}
],
"signature": { # Signature information - Only used in signature mode
"name": "Avira.Eicar", # Name of signature
},
"tag": { # Tag information - Only used in tag mode
"type": "network.url", # Type of tag
"value": "google.ca" # Value of the tag
Expand Down Expand Up @@ -139,9 +142,23 @@ def add_or_update_hash(**kwargs):
data['hashes']['sha1'] = hashlib.sha1(hashed_value).hexdigest()
data['hashes']['sha256'] = hashlib.sha256(hashed_value).hexdigest()
data.pop('file', None)
data.pop('signature', None)

elif data['type'] == 'signature':
sig_data = data.get('signature', None)
if sig_data is None or 'name' not in sig_data:
return make_api_response(None, "Signature data not found", 400)

hashed_value = f"signature: {sig_data['name']}".encode('utf8')
data['hashes']['md5'] = hashlib.md5(hashed_value).hexdigest()
data['hashes']['sha1'] = hashlib.sha1(hashed_value).hexdigest()
data['hashes']['sha256'] = hashlib.sha256(hashed_value).hexdigest()
data.pop('tag', None)
data.pop('file', None)

elif data['type'] == 'file':
data.pop('tag', None)
data.pop('signature', None)
data.setdefault('file', {})

data['added'] = data['updated'] = now_as_iso()
Expand Down Expand Up @@ -227,6 +244,9 @@ def add_update_many_hashes(**_):
"reason": ["We've seen this file many times and it leads to False positives"],
"type": "user"}
],
"signature": { # Signature information - Only used in signature mode
"name": "Avira.Eicar", # Name of signature
},
"tag": { # Tag information - Only used in tag mode
"type": "network.url", # Type of tag
"value": "google.ca" # Value of the tag
Expand All @@ -251,8 +271,13 @@ def add_update_many_hashes(**_):
hash_data.setdefault('classification', CLASSIFICATION.UNRESTRICTED)
if hash_data['type'] == 'tag':
hash_data.pop('file', None)
hash_data.pop('signature', None)
elif hash_data['type'] == 'file':
hash_data.pop('tag', None)
hash_data.pop('signature', None)
elif hash_data['type'] == 'signature':
hash_data.pop('tag', None)
hash_data.pop('file', None)

# Find the hash used for the key
key = hash_data['hashes'].get('sha256', hash_data['hashes'].get('sha1', hash_data['hashes'].get('md5', None)))
Expand Down Expand Up @@ -331,9 +356,12 @@ def check_hash_exists(qhash, **kwargs):
"reason": ["We've seen this file many times and it leads to False positives"],
"type": "user"}
],
"signature": { # Signature information - Only used in signature mode
"name": "Avira.Eicar", # Name of signature
},
"tag": { # Tag information - Only used in tag mode
"type": "network.url", # Type of tag
"value": "google.ca" # Value of the tag
"type": "network.url", # Type of tag
"value": "google.ca" # Value of the tag
},
"type": "tag" # Type of safelist hash (tag or file)
}
Expand Down
2 changes: 1 addition & 1 deletion assemblyline_ui/api/v4/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def get_file_submission_results(sid, sha256, **kwargs):

# Process Signatures
for signature in sec['heuristic'].get('signature', []):
sig = (signature['name'], h_type)
sig = (signature['name'], h_type, signature.get('safe', False))
if sig not in output['signatures']:
output['signatures'].add(sig)

Expand Down
60 changes: 56 additions & 4 deletions test/test_safelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ def test_safelist_add_file(datastore, login_session):
assert added == updated
assert added is not None and updated is not None

# Make sure tag is none
tag = ds_sl.pop('tag', {})
# Make sure tag and signature are none
tag = ds_sl.pop('tag', None)
signature = ds_sl.pop('signature', None)
assert tag is None
assert signature is None

# Test classification
classification = ds_sl.pop('classification', None)
Expand Down Expand Up @@ -140,9 +142,59 @@ def test_safelist_add_tag(datastore, login_session):
assert added == updated
assert added is not None and updated is not None

# Make sure file is none
# Make sure file and signature are None
file = ds_sl.pop('file', {})
signature = ds_sl.pop('signature', None)
assert file is None
assert signature is None

# Test classification
classification = ds_sl.pop('classification', None)
assert classification is not None

# Test enabled
enabled = ds_sl.pop('enabled', None)
assert enabled

# Test rest
assert ds_sl == sl_data


def test_safelist_add_signature(datastore, login_session):
_, session, host = login_session

sig_name = 'McAfee.Eicar'
hashed_value = f"signature: {sig_name}".encode('utf8')

# Generate a random safelist
sl_data = {
'hashes': {'md5': hashlib.md5(hashed_value).hexdigest(),
'sha1': hashlib.sha1(hashed_value).hexdigest(),
'sha256': hashlib.sha256(hashed_value).hexdigest()},
'signature': {'name': sig_name},
'sources': [ADMIN_SOURCE],
'type': 'signature'
}

# Insert it and test return value
resp = get_api_data(session, f"{host}/api/v4/safelist/", method="PUT", data=json.dumps(sl_data))
assert resp['success']
assert resp['op'] == 'add'

# Load inserted data from DB
ds_sl = datastore.safelist.get(hashlib.sha256(hashed_value).hexdigest(), as_obj=False)

# Test dates
added = ds_sl.pop('added', None)
updated = ds_sl.pop('updated', None)
assert added == updated
assert added is not None and updated is not None

# Make sure file and signature are None
file = ds_sl.pop('file', {})
tag = ds_sl.pop('tag', None)
assert file is None
assert tag is None

# Test classification
classification = ds_sl.pop('classification', None)
Expand Down Expand Up @@ -198,7 +250,7 @@ def test_safelist_update(datastore, login_session):

# Test rest
assert {k: v for k, v in ds_sl.items()
if k not in ['added', 'updated', 'classification', 'enabled', 'tag']} == sl_data
if k not in ['added', 'updated', 'classification', 'enabled', 'tag', 'signature']} == sl_data

u_data = {
'classification': cl_eng.RESTRICTED,
Expand Down

0 comments on commit c8880cc

Please sign in to comment.