Skip to content

Commit

Permalink
Merge pull request #123 from CybercentreCanada/update/other-memdump
Browse files Browse the repository at this point in the history
Small bugfixes in typing; Set MEMDUMP parent relation for artifacts c…
  • Loading branch information
cccs-kevin authored Jan 4, 2024
2 parents 8c27677 + 191f0ae commit 2f63d43
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 26 deletions.
30 changes: 23 additions & 7 deletions assemblyline_service_utilities/common/dynamic_service_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
ResultTableSection,
TableRow,
)
from assemblyline_v4_service.common.task import MaxExtractedExceeded
from assemblyline_v4_service.common.task import PARENT_RELATION, MaxExtractedExceeded

from assemblyline.common import log as al_log
from assemblyline.common.attack_map import attack_map, group_map, revoke_map, software_map
Expand Down Expand Up @@ -2199,15 +2199,20 @@ def load_from_json(self, json: Dict[str, Any]) -> None:
self.sandbox_version = json["sandbox_version"]

@staticmethod
def _is_hollowshunter_exe_dump(artifact_name: str) -> True:
def _is_hollowshunter_exe_dump(artifact_name: str) -> bool:
return HOLLOWSHUNTER_EXE_REGEX.match(artifact_name)

@staticmethod
def _is_hollowshunter_dll_dump(artifact_name: str) -> True:
def _is_hollowshunter_dll_dump(artifact_name: str) -> bool:
return HOLLOWSHUNTER_DLL_REGEX.match(artifact_name)

@staticmethod
def _is_hollowshunter_dump(artifact_name: str) -> True:
def _is_description_indicative_of_memdump(artifact_description: str) -> bool:
if any(item in artifact_description.lower() for item in ["memory dump", "memdump"]):
return True

@staticmethod
def _is_hollowshunter_dump(artifact_name: str) -> bool:
if OntologyResults._is_hollowshunter_exe_dump(artifact_name):
return True

Expand All @@ -2217,13 +2222,23 @@ def _is_hollowshunter_dump(artifact_name: str) -> True:
else:
return False

@staticmethod
def _is_memory_dump(artifact_name: str, artifact_description: str = None) -> bool:
# First check by name
if OntologyResults._is_hollowshunter_dump(artifact_name):
return True
# Then check by description
elif artifact_description and OntologyResults._is_description_indicative_of_memdump(artifact_description):
return True
return False

@staticmethod
def handle_artifacts(
artifact_list: List[Dict[str, Any]],
request: ServiceRequest,
collapsed: bool = False,
injection_heur_id: int = 17,
parent_relation: str = "EXTRACTED",
parent_relation: str = PARENT_RELATION.EXTRACTED,
) -> ResultSection:
"""
Goes through each artifact in artifact_list, uploading them and adding result sections accordingly
Expand All @@ -2241,8 +2256,9 @@ def handle_artifacts(
for artifact in validated_artifacts:
OntologyResults._handle_artifact(artifact, artifacts_result_section, injection_heur_id)

if OntologyResults._is_hollowshunter_dump(artifact.name):
parent_relation = "MEMDUMP"
# We will be setting the parent relation to memory dump based on artifact description and/or file name
if OntologyResults._is_memory_dump(artifact.name, artifact.description):
parent_relation = PARENT_RELATION.MEMDUMP

if artifact.to_be_extracted and not any(
artifact.sha256 == previously_extracted["sha256"] for previously_extracted in request.extracted
Expand Down
72 changes: 53 additions & 19 deletions test/test_dynamic_service_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4399,7 +4399,7 @@ def test_load_from_json():
"sha256": "blah",
}
],
None,
{"extracted": [{'path': 'blah', 'name': 'blah', 'description': 'blah', 'parent_relation': 'EXTRACTED'}]},
),
(
[
Expand All @@ -4413,6 +4413,7 @@ def test_load_from_json():
],
None,
),
# HollowsHunter-specific memory dump handling
(
[
{
Expand All @@ -4423,7 +4424,45 @@ def test_load_from_json():
"sha256": "blah",
}
],
True,
{
"title": "Sandbox Artifacts",
"body": "HollowsHunter dumped the following:\n\t- 123_hollowshunter/hh_process_12345_blah123.something.exe",
"sub_heur_id": 17,
"sub_sig_id": "hollowshunter_exe",
"sub_title": "HollowsHunter Injected Portable Executable",
"sub_body": "a",
"sub_tags": {"value": "123_hollowshunter/hh_process_12345_blah123.something.exe", "tag_type": "dynamic.process.file_name"},
"extracted": [
{
"description": "blah",
"name": "123_hollowshunter/hh_process_12345_blah123.something.exe",
"parent_relation": "MEMDUMP",
"path": "blah",
},
]
},
),
# CAPE-specific memory dump handling
(
[
{
"name": "CAPE_something.exe",
"path": "blah",
"description": "Memory Dump",
"to_be_extracted": True,
"sha256": "blah",
}
],
{
"extracted": [
{
"description": "Memory Dump",
"name": "CAPE_something.exe",
"parent_relation": "MEMDUMP",
"path": "blah",
},
]
},
),
],
)
Expand All @@ -4436,23 +4475,17 @@ def test_handle_artifacts(artifact_list, expected_result, dummy_request_class):
if expected_result is None:
assert actual_result is None
else:
expected_result = ResultSection("Sandbox Artifacts")
hh_sec = ResultSection(HOLLOWSHUNTER_TITLE)
hh_sec.set_heuristic(17)
hh_sec.heuristic.add_signature_id("hollowshunter_exe")
hh_sec.add_line("HollowsHunter dumped the following:")
hh_sec.add_line("\t- 123_hollowshunter/hh_process_12345_blah123.something.exe")
hh_sec.add_tag("dynamic.process.file_name", "123_hollowshunter/hh_process_12345_blah123.something.exe")
expected_result.add_subsection(hh_sec)
assert check_section_equality(actual_result, expected_result)
assert r.extracted == [
{
"description": "blah",
"name": "123_hollowshunter/hh_process_12345_blah123.something.exe",
"parent_relation": "MEMDUMP",
"path": "blah",
},
]
# We only expect a result section if a hollowshunter dump is found
if expected_result.get("title"):
parent_sec = ResultSection(expected_result["title"])
hh_sec = ResultSection(expected_result["sub_title"])
hh_sec.set_heuristic(expected_result["sub_heur_id"])
hh_sec.heuristic.add_signature_id(expected_result["sub_sig_id"])
hh_sec.add_line(expected_result["body"])
hh_sec.add_tag(**expected_result["sub_tags"])
parent_sec.add_subsection(hh_sec)
assert check_section_equality(actual_result, parent_sec)
assert r.extracted == expected_result.get("extracted", [])

@staticmethod
def test_get_guids():
Expand Down Expand Up @@ -8342,6 +8375,7 @@ def test_handle_artifact(artifact, expected_result_section_title):
actual_result_section = None

if expected_result_section is None and actual_result_section is None:
print(a.as_primitives())
assert True
else:
assert check_section_equality(actual_result_section, expected_result_section)
Expand Down

0 comments on commit 2f63d43

Please sign in to comment.