Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copilot ODM documentation update for Result, Signature, Statistics, Submission #1785

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 55 additions & 58 deletions assemblyline/odm/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,112 +32,109 @@
constants = forge.get_constants()


@odm.model(index=True, store=False)
@odm.model(index=True, store=False, description="Represents cyber attack tactics or techniques as identified by the MITRE ATT&CK framework.")
class Attack(odm.Model):
attack_id = odm.Keyword(copyto="__text__", description="ID", ai=False)
pattern = odm.Keyword(copyto="__text__", description="Pattern Name")
categories = odm.List(odm.Keyword(), description="Categories")
attack_id = odm.Keyword(copyto="__text__", description="Unique identifier corresponding to a specific tactic or technique in the MITRE ATT&CK framework.", ai=False)
pattern = odm.Keyword(copyto="__text__", description="The name of the MITRE ATT&CK pattern that is associated with the detected malware or malicious activity.")
categories = odm.List(odm.Keyword(), description="A list of categories that describe and classify the nature of the cyber attack based on the MITRE ATT&CK framework.")


@odm.model(index=True, store=False, description="Heuristic Signatures")
@odm.model(index=True, store=False, description="Describes a signature that has matched, indicating potential malicious behavior.")
class Signature(odm.Model):
name = odm.Keyword(copyto="__text__", description="Name of the signature that triggered the heuristic")
frequency = odm.Integer(default=1, description="Number of times this signature triggered the heuristic")
safe = odm.Boolean(default=False, description="Is the signature safelisted or not")
name = odm.Keyword(copyto="__text__", description="Name of the detection signature that triggered the heuristic.")
frequency = odm.Integer(default=1, description="The count of how many times this particular signature has triggered the heuristic during analysis.")
safe = odm.Boolean(default=False, description="A boolean indicating whether the signature is considered safe and has been safelisted, thus not contributing to the score.")


@odm.model(index=True, store=False, description="Heuristic associated to the Section")
class Heuristic(odm.Model):
heur_id = odm.Keyword(copyto="__text__", description="ID of the heuristic triggered", ai=False)
name = odm.Keyword(copyto="__text__", description="Name of the heuristic")
attack = odm.List(odm.Compound(Attack), default=[], description="List of Att&ck IDs related to this heuristic")
heur_id = odm.Keyword(copyto="__text__", description="Unique identifier of the heuristic that was triggered, contributing to the overall assessment of potential maliciousness.", ai=False)
name = odm.Keyword(copyto="__text__", description="The name of the heuristic rule that was triggered during the analysis.")
attack = odm.List(odm.Compound(Attack), default=[], description="A list of MITRE ATT&CK identifiers that are associated with this heuristic, linking detected behavior to known techniques.")
signature = odm.List(odm.Compound(Signature), default=[],
description="List of signatures that triggered the heuristic", ai=False)
score = odm.Integer(description="Calculated Heuristic score")
description="A list of signatures whose detection has contributed to the triggering of this heuristic.", ai=False)
score = odm.Integer(description="The score assigned by this heuristic, which contributes to the overall threat assessment of the analyzed artifact.")


@odm.model(index=True, store=False, description="Result Section")
class Section(odm.Model):
auto_collapse = odm.Boolean(default=False, description="Should the section be collapsed when displayed?", ai=False)
body = odm.Optional(odm.Text(copyto="__text__"), description="Text body of the result section")
classification = odm.Classification(description="Classification of the section", ai=False)
body_format = odm.Enum(values=BODY_FORMAT, index=False, description="Type of body in this section")
body_config = odm.Optional(odm.Mapping(odm.Any(), index=False,
description="Configurations for the body of this section"), ai=False)
depth = odm.Integer(index=False, description="Depth of the section", ai=False)
heuristic = odm.Optional(odm.Compound(Heuristic), description="Heuristic used to score result section")
tags = odm.Compound(Tagging, default={}, description="List of tags associated to this section")
safelisted_tags = odm.FlattenedListObject(store=False, default={}, description="List of safelisted tags", ai=False)
title_text = odm.Text(copyto="__text__", description="Title of the section")
auto_collapse = odm.Boolean(default=False, description="Indicates whether the section should be initially displayed as collapsed in the user interface.", ai=False)
body = odm.Optional(odm.Text(copyto="__text__"), description="The main content of the result section, which may include detailed analysis findings or descriptions.")
classification = odm.Classification(description="The classification level assigned to the information within the section, dictating who can view it.", ai=False)
body_format = odm.Enum(values=BODY_FORMAT, index=False, description="The format of the body content, such as text, JSON, or image, which determines how it is displayed.")
body_config = odm.Optional(odm.Mapping(odm.Any(), index=False),
description="Additional configurations that specify how the body content should be rendered or processed.", ai=False)
depth = odm.Integer(index=False, description="The nesting level of the section within the overall result hierarchy, used for organizing complex results.", ai=False)
heuristic = odm.Optional(odm.Compound(Heuristic), description="The heuristic analysis that contributed to the scoring of this section, if applicable.")
tags = odm.Compound(Tagging, default={}, description="A collection of tags that categorize or label the section based on the analysis findings.")
safelisted_tags = odm.FlattenedListObject(store=False, default={}, description="Tags that have been deemed safe and are excluded from contributing to the overall threat score.", ai=False)
title_text = odm.Text(copyto="__text__", description="The title of the section, summarizing its content or purpose.")
promote_to = odm.Optional(odm.Enum(
values=PROMOTE_TO,
description="This is the type of data that the current section should be promoted to.", ai=False))
values=PROMOTE_TO, ai=False), description="The category of data that this section's content should be elevated to for reporting or further analysis.")


@odm.model(index=True, store=True, description="Result Body")
class ResultBody(odm.Model):
score = odm.Integer(default=0, description="Aggregate of the score for all heuristics")
sections = odm.List(odm.Compound(Section), default=[], description="List of sections")
score = odm.Integer(default=0, description="The total score calculated from all heuristics applied, indicating overall severity.")
sections = odm.List(odm.Compound(Section), default=[], description="An ordered list of Section objects that detail the analysis results.")


@odm.model(index=False, store=False, description="Service Milestones")
class Milestone(odm.Model):
service_started = odm.Date(default="NOW", description="Date the service started scanning")
service_completed = odm.Date(default="NOW", description="Date the service finished scanning")
service_started = odm.Date(default="NOW", description="Timestamp marking when the service began its analysis of the artifact.")
service_completed = odm.Date(default="NOW", description="Timestamp marking when the service completed its analysis, signaling the end of processing for the artifact.")


@odm.model(index=True, store=False, description="File related to the Response")
class File(odm.Model):
name = odm.Keyword(copyto="__text__", description="Name of the file")
sha256 = odm.SHA256(copyto="__text__", description="SHA256 of the file")
description = odm.Text(copyto="__text__", description="Description of the file")
classification = odm.Classification(description="Classification of the file", ai=False)
name = odm.Keyword(copyto="__text__", description="The original name of the file being analyzed or generated during the analysis process.")
sha256 = odm.SHA256(copyto="__text__", description="The SHA256 hash of the file, serving as a unique identifier for the content.")
description = odm.Text(copyto="__text__", description="A brief description of the file's purpose or contents, especially if it is an output of the analysis.")
classification = odm.Classification(description="The classification level of the file, indicating the sensitivity of its contents.", ai=False)
is_section_image = odm.Boolean(default=False,
description="Is this an image used in an Image Result Section?", ai=False)
description="A flag indicating whether the file is an image that is used within an image-based result section.", ai=False)
# Possible values for PARENT_RELATION can be found in
# assemblyline-v4-service/assemblyline_v4_service/common/task.py.
parent_relation = odm.Text(
default="EXTRACTED",
description="File relation to parent, if any.\
<br>Values: `\"ROOT\", \"EXTRACTED\", \"INFORMATION\", \"DYNAMIC\", \"MEMDUMP\", \"DOWNLOADED\"`", ai=False
description="Describes the relationship of this file to the parent file, such as `EXTRACTED` or `DOWNLOADED`.", ai=False
)
allow_dynamic_recursion = odm.Boolean(
default=False,
description="Allow file to be analysed during Dynamic Analysis"
"even if Dynamic Recursion Prevention is enabled.", ai=False)
description="Specifies whether the file can be analyzed during dynamic analysis, even with recursion prevention.", ai=False)


@odm.model(index=True, store=True, description="Response Body of Result")
class ResponseBody(odm.Model):
milestones = odm.Compound(Milestone, default={}, description="Milestone block", ai=False)
service_version = odm.Keyword(store=False, description="Version of the service", ai=False)
service_name = odm.Keyword(copyto="__text__", description="Name of the service that scanned the file")
milestones = odm.Compound(Milestone, default={}, description="A set of key timestamps that mark important stages in the service's processing of the file.", ai=False)
service_version = odm.Keyword(store=False, description="The version of the service that performed the analysis, important for tracking analysis provenance.", ai=False)
service_name = odm.Keyword(copyto="__text__", description="The name of the service that conducted the analysis, useful for identifying the source of the results.")
service_tool_version = odm.Optional(
odm.Keyword(copyto="__text__"),
description="Tool version of the service", ai=False)
supplementary = odm.List(odm.Compound(File), default=[], description="List of supplementary files", ai=False)
extracted = odm.List(odm.Compound(File), default=[], description="List of extracted files")
description="The specific version of the analytical tool used by the service, if applicable.", ai=False)
supplementary = odm.List(odm.Compound(File), default=[], description="A list of additional files generated during analysis that support the main findings.", ai=False)
extracted = odm.List(odm.Compound(File), default=[], description="A list of files that were extracted from the analyzed artifact during the service's processing.")
service_context = odm.Optional(
odm.Keyword(index=False, store=False),
description="Context about the service", ai=False)
description="Additional context or metadata about the service's execution environment or configuration.", ai=False)
service_debug_info = odm.Optional(
odm.Keyword(index=False, store=False),
description="Debug info about the service", ai=False)
description="Information that can be used for debugging or understanding the service's analysis process.", ai=False)


@odm.model(index=True, store=True, description="Result Model")
class Result(odm.Model):
archive_ts = odm.Optional(odm.Date(description="Time at which the result was archived", ai=False))
classification = odm.Classification(description="Aggregate classification for the result", ai=False)
created = odm.Date(default="NOW", description="Date at which the result object got created", ai=False)
expiry_ts = odm.Optional(odm.Date(store=False), description="Expiry timestamp", ai=False)
response: ResponseBody = odm.compound(ResponseBody, description="The body of the response from the service")
result: ResultBody = odm.compound(ResultBody, default={}, description="The result body")
sha256 = odm.SHA256(store=False, description="SHA256 of the file the result object relates to")
type = odm.Optional(odm.Keyword())
size = odm.Optional(odm.Integer())
drop_file = odm.Boolean(default=False, description="Use to not pass to other stages after this run", ai=False)
from_archive = odm.Boolean(index=False, default=False, description="Was loaded from the archive", ai=False)
archive_ts = odm.Optional(odm.Date(ai=False), description="The timestamp when the result was moved to long-term storage or archived.")
classification = odm.Classification(description="The highest classification level assigned to any part of the result, dictating overall access control.", ai=False)
created = odm.Date(default="NOW", description="The creation timestamp for the result record, marking when the analysis result was first generated.", ai=False)
expiry_ts = odm.Optional(odm.Date(store=False), description="The timestamp when the result is scheduled to be purged or deleted from the system.", ai=False)
response: ResponseBody = odm.compound(ResponseBody, description="The container for all the response data provided by the service after analyzing the file.")
result: ResultBody = odm.compound(ResultBody, default={}, description="The container for the detailed results of the analysis, including sections and scores.")
sha256 = odm.SHA256(store=False, description="The SHA256 hash of the file that was analyzed, linking the result to the specific artifact.")
type = odm.Optional(odm.Keyword(), description="The MIME type or other file classification identified by Assemblyline that is linked to the result, providing insight into the file's content or format.")
size = odm.Optional(odm.Integer(), description="The size (in bytes) of the analyzed file pertinent to the result.")
drop_file = odm.Boolean(default=False, description="A flag indicating whether the file should be excluded from subsequent analysis stages.", ai=False)
from_archive = odm.Boolean(index=False, default=False, description="Indicates whether the result was retrieved from an archive rather than produced from a recent analysis.", ai=False)

def build_key(self, service_tool_version=None, task=None):
return self.help_build_key(
Expand Down
33 changes: 19 additions & 14 deletions assemblyline/odm/models/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
RULE_STATUSES = DEPLOYED_STATUSES + DRAFT_STATUSES + STALE_STATUSES


@odm.model(index=True, store=True)
@odm.model(index=True, store=True, description="""The Signature model within Assemblyline serves as a central framework for defining and managing security signatures, crucial components for the detection and analysis of malware. It provides a structured format that encompasses essential attributes such as a signature's name, classification level, source, and unique identifier. Moreover, it incorporates revision history, last modification timestamps, and statistical data to gauge performance and utility.

The model presents users with a mix of static and dynamic information, ranging from immutable identification details to variable metadata reflecting the signature's current operational status and history of changes. This latter aspect is captured through fields that log the date and user associated with the last status update, offering a view into the signature's lifecycle.

Understanding the Signature model is vital for cybersecurity professionals who are tasked with crafting precise Lucene-based search queries in Assemblyline. Mastery of this model's components will enable users to efficiently search, filter, and analyze signatures based on various parameters, thereby facilitating effective management and deployment in a cybersecurity context.
""")
class Signature(odm.Model):
classification = odm.Classification(store=True, default=Classification.UNRESTRICTED)
data = odm.Text(index=False, store=False)
last_modified = odm.Date(default="NOW")
name = odm.Keyword(copyto="__text__")
order = odm.Integer(default=1, store=False)
revision = odm.Keyword(default="1")
signature_id = odm.Optional(odm.Keyword())
source = odm.Keyword()
state_change_date = odm.Optional(odm.Date(store=False))
state_change_user = odm.Optional(odm.Keyword(store=False))
stats = odm.Compound(Statistics, default={})
status = odm.Enum(values=RULE_STATUSES, copyto="__text__")
type = odm.Keyword(copyto="__text__")
classification = odm.Classification(store=True, default=Classification.UNRESTRICTED, description="Indicates the sensitivity level of the signature, which dictates who can access it based on their clearance.")
data = odm.Text(index=False, store=False, description="Stores the actual signature data or pattern used for malware detection.")
last_modified = odm.Date(default="NOW", description="Records the timestamp of the most recent update to the signature. Defaults to the current time when the signature is modified.")
name = odm.Keyword(copyto="__text__", description="A unique and descriptive name for the signature.")
order = odm.Integer(default=1, store=False, description="**TODO**:Lower number means higher priority? **Generated**:Determines the processing order of the signature relative to others. A lower number indicates higher priority.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cccs-douglass I don't think this field really does anything, maybe a hold over from v3?

We should probably remove it if it's not being used

revision = odm.Keyword(default="1", description="Tracks the version of the signature, with the default starting value set to \"1\".")
signature_id = odm.Optional(odm.Keyword(), description="A unique identifier for the signature, which can be used for tracking and referencing purposes.")
source = odm.Keyword(description="Identifies the origin or the entity that provided the signature.")
state_change_date = odm.Optional(odm.Date(store=False), description="Captures the date when the signature's status was last updated. ")
state_change_user = odm.Optional(odm.Keyword(store=False), description="Records the username of the individual who last modified the signature's status.")
stats = odm.Compound(Statistics, default={}, description="Holds various statistical data related to the signature's performance and usage")
status = odm.Enum(values=RULE_STATUSES, copyto="__text__", description="Reflects the operational state of the signature, indicating whether it is deployed, in testing, or otherwise.")
type = odm.Keyword(copyto="__text__", description="Specifies the category or classification of the signature, which can be used for organizing and filtering signatures.")
Loading