Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing CENTS reporting module #605

Merged
merged 29 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6564216
initial CENTS setup
klingerko Sep 13, 2021
cdfbeec
various update after our meeting
klingerko Sep 23, 2021
c3a5257
add trickbot and squirrelwaffle
klingerko Sep 25, 2021
53b380e
add azorult
klingerko Sep 25, 2021
35574c1
deduplicate config dict list and return early if no rules have been c…
klingerko Sep 25, 2021
4f8543d
Merge pull request #1 from klingerko/cents
zoomequipd Oct 11, 2021
661adbe
Merge branch 'kevoreilly:master' into master
zoomequipd Oct 11, 2021
5189adb
impliment remcos rules for CENTS - force DC3-MWCP parser to include t…
zoomequipd Oct 12, 2021
b097733
Merge branch 'master' of https://github.com/zoomequipd/CAPEv2
zoomequipd Oct 12, 2021
f290d5e
add squirrelwaffle signatures
zoomequipd Oct 12, 2021
a2dca54
add date of the analysis run of the sample to rules
klingerko Oct 14, 2021
ca01caf
add link to analysis to rule reference
klingerko Oct 14, 2021
e6a9b67
only display cents download button if we have rules
klingerko Oct 14, 2021
26f71b7
fix typo
klingerko Oct 14, 2021
3022794
fix path
klingerko Oct 14, 2021
1c79bed
reformat date to be align with ET format
klingerko Oct 14, 2021
05b8bd2
Merge branch 'kevoreilly:master' into master
zoomequipd Oct 15, 2021
5c446ef
Merge pull request #4 from klingerko/cents_downloadbutton
zoomequipd Oct 15, 2021
a47e9d0
Merge pull request #3 from klingerko/cents_dateinrule
zoomequipd Oct 15, 2021
12726c6
Merge branch 'kevoreilly:master' into master
zoomequipd Oct 16, 2021
6268cec
trim incomplete malfamiles
zoomequipd Oct 16, 2021
1dadcee
Merge branch 'master' into cents_taskid
klingerko Oct 17, 2021
e923df4
move hostname to web.conf and cosmetic changes
klingerko Oct 17, 2021
4b81537
Merge pull request #2 from klingerko/cents_taskid
zoomequipd Oct 17, 2021
b038cfe
Merge branch 'kevoreilly:master' into master
zoomequipd Oct 17, 2021
c955868
Merge branch 'master' of https://github.com/zoomequipd/CAPEv2
zoomequipd Oct 17, 2021
eee1896
fix issuer --> issuerdn
zoomequipd Oct 17, 2021
9f0ac04
complete trickbot - remove azorult and cobaltstrike
zoomequipd Oct 17, 2021
1dd7e8a
Merge pull request #1 from zoomequipd/master
klingerko Oct 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/reporting.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
# You can also add additional options under the section of your module and
# they will be available in your Python class.

[cents]
enabled = yes
# starting signature id for created Suricata rules
start_sid = 1000000

[mitre]
enabled = no

Expand Down
3 changes: 3 additions & 0 deletions conf/web.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ search_limit = 50
# Allow anon users to browser site but not submit/download
anon_viewable = no
existent_tasks = yes
# hostname of the cape instance
hostname = https://127.0.0.1/
;hostname = https://www.capesandbox.com/

# ratelimit for anon users
[ratelimit]
Expand Down
Empty file.
191 changes: 191 additions & 0 deletions lib/cuckoo/common/cents/cents_remcos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import logging
from Crypto.Cipher import ARC4
from ipaddress import ip_address

log = logging.getLogger(__name__)


def _chunk_stuff(stuff, group_size=20):
# really just need to chunk out the ip into groups of....20?
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
for i in range(0, len(stuff), group_size):
yield ','.join(stuff[i:i + group_size])


def _build_rc4_rule(passphrase):
hex_plain_text = "5b4461746153746172745d00000000"

cipher = ARC4.new(passphrase)
value = bytes.fromhex(hex_plain_text)
enc_value = cipher.encrypt(value)

# conver the encrypted form if the plain text to a hex string
enc_hex_value = enc_value.hex()

first_value = ""
second_value = ""
# split the first part into two char groups as hex, we need these for the rules
# skip over the last 8 bytes
for i in range(0, len(enc_hex_value) - 8, 2):
first_value += f"{enc_hex_value[i:i + 2]} "

# only take the last 4 bytes
for i in range(len(enc_hex_value) - 4, len(enc_hex_value), 2):
second_value += f"{enc_hex_value[i:i + 2]} "

return first_value.rstrip(), second_value.rstrip()


def _parse_mwcp(remcos_config):
remcos_config_list = []
control = remcos_config.get('control', [])
for c in control:
if c and c.startswith("tcp://"):
# maxsplit here incase the passphrase includes :
tmp = c.replace("tcp://", "").split(":", maxsplit=2)
if tmp:
# if we don't have a password, just add a blank one,
if len(tmp) == 2:
remcos_config_list.append(
{"Version": remcos_config.get("version", ""), "C2": tmp[0], "Port": tmp[1], "Password": ""}
)
elif len(tmp) == 3 and tmp[2] != "":
# we can include the passprhase
remcos_config_list.append(
{"Version": remcos_config.get("version", ""), "C2": tmp[0], "Port": tmp[1], "Password": tmp[2]}
)
else:
log.debug(f"[CENTS - Remcos] MWCP config - found to be invalid --> {c}")

return remcos_config_list


def _parse_ratdecoders(remcos_config):
domains = remcos_config.get("domains", [])
remcos_config_list = []
for domain in domains:
# why is this a list of lists
for nested_domain in domain:
remcos_config_list.append(
# notice the typo here including the colon after c2:
# https://github.com/kevthehermit/RATDecoders/blob/master/malwareconfig/decoders/remcos.py#L56
{"C2": nested_domain.get("c2:", ""),
"Port": nested_domain.get("port", ""),
"Password": nested_domain.get("password", ""),
}
)

return remcos_config_list


def cents_remcos(config_dict, sid_counter, md5, date, task_link):
"""Creates Suricata rules from extracted Remcos malware configuration.

:param config_dict: Dictionary with the extracted Remcos configuration.
:type config_dict: `dict`

:param sid_counter: Signature ID of the next Suricata rule.
:type sid_counter: `int`

:param md5: MD5 hash of the source sample.
:type md5: `int`

:param date: Timestamp of the analysis run of the source sample.
:type date: `str`

:param task_link: Link to analysis task of the source sample.
:type task_link: `str`

:return List of Suricata rules (`str`) or empty list if no rule has been created.
"""
if not config_dict or not sid_counter or not md5 or not date or not task_link:
return []

next_sid = sid_counter
# build out an array to store the parsed configs
remcos_config_list = []

# lowercase the key names in the configs for constancy
remcos_config = dict((k.lower(), v) for k, v in config_dict.items())

if not remcos_config:
return []

# there are two remcos parsers that could be at work here
# 1) RATDecoders - https://github.com/kevthehermit/RATDecoders/blob/master/malwareconfig/decoders/remcos.py
# which is an optional configuration that can be enabled in the processing.conf file
# 2) MWCP - https://github.com/kevoreilly/CAPEv2/blob/master/modules/processing/parsers/mwcp/Remcos.py
# which is an optional configuration that can be enabled in the processing.conf file
if 'control' in remcos_config and 'domains' not in remcos_config:
# we have an MWCP config
log.debug("[CENTS - Remcos] Parsing DC3-MWCP based config")
parsed_remcos_config = _parse_mwcp(remcos_config)
for _config in parsed_remcos_config:
if _config not in remcos_config_list:
remcos_config_list.append(_config)

if 'domains' in remcos_config and 'control' not in remcos_config:
# we have a RATDecoders config
log.debug("[CENTS - Remcos] Parsing RATDecoders based config")
parsed_remcos_config = _parse_ratdecoders(remcos_config)
for _config in parsed_remcos_config:
if _config not in remcos_config_list:
remcos_config_list.append(_config)

# if we don't have a parsed config, drop out
log.debug("[CENTS - Remcos] Done Parsing Config")
if not remcos_config_list:
log.debug("[CENTS - Remcos] No parsed configs found")
return []

# Now we want to create Suricata rules finally
rule_list = []
ip_list = set()
domain_list = set()
for c2_server in list(map(lambda x: x.get('C2'), remcos_config_list)):
try:
c2_ip = ip_address(c2_server)
except ValueError:
domain_list.add(c2_server)
else:
# only create rules for "global" ip addresses
if c2_ip.is_global:
ip_list.add(c2_server)
else:
log.debug("[CENTS - Remcos] Skipping c2 server due to non-routable ip")

log.debug("[CENTS - Remcos] Building IP based rules")
for ip_group in _chunk_stuff(list(ip_list)):
rule = f"alert tcp $HOME_NET any -> {ip_group} any (msg:\"ET CENTS Remcos RAT (C2 IP Address) " \
f"C2 Communication - CAPE sandbox config extraction\"; flow:established,to_server; " \
f"reference:md5,{md5}; reference:url,{task_link}; sid:{next_sid}; rev:1; " \
f"metadata:created_at {date};)"
rule_list.append(rule)
next_sid += 1

log.debug("[CENTS - Remcos] Building Domain based rules")
for c2_domain in domain_list:
rule = f"alert dns $HOME_NET any -> any any (msg:\"ET CENTS Remcos RAT (C2 Domain) " \
f"C2 Communication - CAPE sandbox config extraction\"; flow:established,to_server; " \
f"dns.query; content:\"{c2_domain}\"; " \
f"reference:md5,{md5}; reference:url,{task_link}; sid:{next_sid}; rev:1; " \
f"metadata:created_at {date};)"
rule_list.append(rule)
next_sid += 1

log.debug("[CENTS - Remcos] Building Password based rules")
for parsed_config in remcos_config_list:
# if we have a password, we should create a rule for the RC4 encrypted stuff
if parsed_config.get("Password", ""):
first, second = _build_rc4_rule(parsed_config.get('Password'))
rule = f"alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET CENTS Remcos RAT " \
f"(passphrase {parsed_config.get('Password')}) " \
f"C2 Communication - CAPE sandbox config extraction\"; flow:established,to_server; " \
f"content:\"|{first}|\"; startswith; fast_pattern; content:\"|{second}|\"; distance:2; within:2; " \
f"reference:md5,{md5}; reference:url,{task_link}; sid:{next_sid}; rev:1; " \
f"metadata:created_at {date};)"
rule_list.append(rule)
next_sid += 1

log.debug("[CENTS - Remcos] Returning built rules")
return rule_list
61 changes: 61 additions & 0 deletions lib/cuckoo/common/cents/cents_squirrelwaffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
from urllib.parse import urlparse

log = logging.getLogger(__name__)


def cents_squirrelwaffle(config_dict, sid_counter, md5, date, task_link):
"""Creates Suricata rules from extracted SquirrelWaffle malware configuration.

:param config_dict: Dictionary with the extracted SquirrelWaffle configuration.
:type config_dict: `dict`

:param sid_counter: Signature ID of the next Suricata rule.
:type sid_counter: `int`

:param md5: MD5 hash of the source sample.
:type md5: `int`

:param date: Timestamp of the analysis run of the source sample.
:type date: `str`

:param task_link: Link to analysis task of the source sample.
:type task_link: `str`

:return List of Suricata rules (`str`) or empty list if no rule has been created.
"""
if not config_dict or not sid_counter or not md5 or not date or not task_link:
return []

next_sid = sid_counter
rule_list = []
url_list_main = config_dict.get("URLs", [])
for urls in url_list_main:
# why is this a list of lists
for nested_url in urls:
# urlparse expects the url to be introduced with a // https://docs.python.org/3/library/urllib.parse.html
# Following the syntax specifications in RFC 1808, urlparse recognizes a netloc only if it is properly
# introduced by ‘//’. Otherwise the input is presumed to be a relative URL and thus to start with a path
# component.
if not nested_url.lower().startswith("http://") and not nested_url.lower().startswith("https://"):
nested_url = f"http://{nested_url}"
c2 = urlparse(nested_url)
# we'll make two rules, dns and http
http_rule = f"alert http $HOME_NET any -> $EXTERNAL_NET any (msg:\"ET CENTS SquirrelWaffle CnC " \
f"Activity\"; flow:established,to_server; http.method; content:\"POST\"; http.host; " \
f"content:\"{c2.hostname}\"; fast_pattern; reference:md5,{md5}; reference:url,{task_link}; http.uri; " \
f"content:\"{c2.path}\"; bsize:{len(c2.path)}; sid:{next_sid}; rev:1; " \
f"metadata:created_at {date};)"

rule_list.append(http_rule)
next_sid += 1

dns_rule = f"alert dns $HOME_NET any -> any any (msg:\"ET CENTS SquirrelWaffle CnC Domain in DNS Query\"; " \
f"dns.query; content:\"{c2.hostname}\"; fast_pattern; reference:md5,{md5}; reference:url,{task_link}; " \
f"sid:{next_sid}; rev:1; " \
f"metadata:created_at {date};)"

rule_list.append(dns_rule)
next_sid += 1

return rule_list
105 changes: 105 additions & 0 deletions lib/cuckoo/common/cents/cents_trickbot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging

log = logging.getLogger(__name__)

def convert_needed_to_hex(input):
# there has to be a better way to do this....
result = ""
for i in range(0, len(input)):
if 0 <= ord(input[i]) <= 127:
result += input[i]
else:
# determine if the last char was also hex encoded
if i > 0 and ord(input[i - 1]) > 127:
# we don't need the "opening" pipe
result += f"{hex(ord(input[i])).replace('0x', '', 1)}"
else:
# if not, then we need the opening pipe
result += f"|{hex(ord(input[i])).replace('0x', '', 1)}"

# if the next one isn't also going to need hex encoded, then close it.
if i > 0 and ord(input[i + 1]) <= 127:
result += "|"
else:
result += " "
return result

def build_serv_dicts(servs):
result = []
# why is this an array of arrays? idk....
for item in servs:
for s in item:
serv, port = s.split(':', 1)
tmp_dict = {'server': serv, 'port': port}
if tmp_dict not in result:
result.append(tmp_dict)

return result

def cents_trickbot(config_dict, suricata_dict, sid_counter, md5, date, task_link):
"""Creates Suricata rules from extracted TrickBot malware configuration.

:param config_dict: Dictionary with the extracted TrickBot configuration.
:type config_dict: `dict`

:param sid_counter: Signature ID of the next Suricata rule.
:type sid_counter: `int`

:param md5: MD5 hash of the source sample.
:type md5: `int`

:param date: Timestamp of the analysis run of the source sample.
:type date: `str`

:param task_link: Link to analysis task of the source sample.
:type task_link: `str`

:return List of Suricata rules (`str`) or empty list if no rule has been created.
"""
log.debug(f"[CENTS] Config for TrickBot Starting")
if not config_dict or not sid_counter or not md5 or not date or not task_link:
log.debug(f"[CENTS] Config did not get enough data to run")
return []

next_sid = sid_counter
rule_list = []
servs = build_serv_dicts(config_dict.get("servs", []))
# create a list of dicts which contain the server and port
gtag = config_dict.get("gtag", "")
ver = config_dict.get("ver", "")
trickbot_c2_certs = []
log.debug(f"[CENTS - TrickBot] Looking for certs from {len(servs)} c2 servers")
for s in servs:
# see if the server and port are also in the tls certs
matching_tls = list(
filter(
lambda x: x['dstip'] == s['server'] and str(x['dstport']) == str(s['port']),
suricata_dict.get('tls', [])
)
)
log.debug(f"[CENTS - TrickBot] Found {len(matching_tls)} certs for {s}")
for tls in matching_tls:
_tmp_obj = {'subject': tls.get('subject', None), 'issuerdn': tls.get('issuerdn', None)}
if _tmp_obj not in trickbot_c2_certs:
trickbot_c2_certs.append(_tmp_obj)

log.debug(f"[CENTS - TrickBot] Building {len(trickbot_c2_certs)} rules based on c2 certs")
for c2_cert in trickbot_c2_certs:
rule = f"alert tls $EXTERNAL_NET any -> $HOME_NET any (msg:\"ET CENTS Observed TrickBot C2 Certificate " \
f"(gtag {gtag[0]}, version {ver[0]})\"; flow:established,to_client; "
if c2_cert.get('subject'):
# if the subject has some non-ascii printable chars, we need to hex encode them
suri_string = convert_needed_to_hex(c2_cert.get('subject'))
rule += f"tls.cert_subject; content:\"{suri_string}\"; "
if c2_cert.get('issuerdn'):
# if the subject has some non-ascii printable chars, we need to hex encode them
suri_string = convert_needed_to_hex(c2_cert.get('issuerdn'))
rule += f"tls.cert_issuer; content:\"{suri_string}\"; "

rule += f"reference:md5,{md5}; reference:url,{task_link}; sid:{next_sid}; rev:1; metadata:created_at {date};)"
next_sid += 1

rule_list.append(rule)

log.debug("[CENTS - TrickBot] Returning built rules")
return rule_list
Loading