Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capa Analyzer - Code improvements #1295

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analyzers/Capa/Capa.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "Capa",
"version": "1.0",
"author": "Wes Lambert",
"author": "Wes Lambert; nusantara-self, StrangeBee",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Analyze files with Capa",
Expand Down
159 changes: 95 additions & 64 deletions analyzers/Capa/CapaAnalyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
from cortexutils.analyzer import Analyzer
import os
import subprocess
import argparse
import json
import re
from collections import defaultdict

class CapaAnalyzer(Analyzer):
def __init__(self):
Expand All @@ -26,66 +23,100 @@ def summary(self, raw):
return {"taxonomies": taxonomies}

def run(self):
parser = argparse.ArgumentParser(description='exec capa.')
parser.add_argument('filepath', type=str, help='file path')
args = parser.parse_args()

if os.path.exists(self.filepath):
f = subprocess.check_output([self.capa_path, '-j', self.filepath])
process = json.loads(f)
rules = process['rules']
tactics = []
techniques = []
subtechniques = []
ids = []
capabilities = {}

for rule in rules:
try:
# Metadata
meta = process['rules'][rule]['meta']

# ATT&CK details
attack = meta['att&ck'][0]

# ID
id = attack['id']

# Technique
technique = attack['technique'] + " - " + id

# Subtechnique
subtechnique = attack['subtechnique']

# Tactic
tactic = attack['tactic']

# Capability
capability_name = process['rules'][rule]['meta']['name']

if tactic not in tactics:
tactics.append(tactic)

if subtechnique != "":
if subtechnique not in subtechniques:
subtechniques.append(attack['subtechnique'])

if technique not in techniques:
techniques.append(attack['technique'])

if id not in ids:
ids.append(id)

if tactic not in capabilities:
capabilities[tactic] = {}

if technique not in capabilities[tactic]:
capabilities[tactic][technique] = []

if capability_name not in capabilities[tactic][technique]:
capabilities[tactic][technique].append(capability_name)
except:
continue
self.report({ 'capabilities': capabilities, 'tactics': tactics, 'techniques': techniques, 'subtechniques': subtechniques, 'ids': ids, 'rules': rules })
if not os.path.isfile(self.capa_path) or not os.access(self.capa_path, os.X_OK):
self.error(f"capa binary not found or not executable at path: {self.capa_path}")
return

if not os.path.exists(self.filepath):
self.error(f"File not found: {self.filepath}")
return

try:
result = subprocess.run(
[self.capa_path, '-j', self.filepath],
capture_output=True,
text=True
)
if result.returncode != 0:
self.error(f"capa execution failed with return code {result.returncode}: {result.stderr}")
return
except Exception as e:
self.error(f"An error occurred while executing capa: {e}")
return

try:
process = json.loads(result.stdout)
except json.JSONDecodeError as e:
self.error(f"Failed to parse capa output as JSON: {e}")
return

rules = process.get('rules', {})
tactics = []
techniques = []
subtechniques = []
ids = []
capabilities = {}

for rule_key, rule_value in rules.items():
try:
# Metadata
meta = rule_value['meta']

# ATT&CK details
attack = meta['att&ck'][0]

# ID
attack_id = attack['id']

# Technique
technique = f"{attack['technique']} - {attack_id}"

# Subtechnique
subtechnique = attack.get('subtechnique', '')

# Tactic
tactic = attack['tactic']

# Capability
capability_name = meta['name']

# Collect data
if tactic not in tactics:
tactics.append(tactic)

if subtechnique:
if subtechnique not in subtechniques:
subtechniques.append(subtechnique)

if technique not in techniques:
techniques.append(technique)

if attack_id not in ids:
ids.append(attack_id)

if tactic not in capabilities:
capabilities[tactic] = {}

if technique not in capabilities[tactic]:
capabilities[tactic][technique] = []

if capability_name not in capabilities[tactic][technique]:
capabilities[tactic][technique].append(capability_name)
except KeyError as e:
#self.error(f"KeyError processing rule {rule_key}: {e}")
continue
except Exception as e:
#self.error(f"Unexpected error processing rule {rule_key}: {e}")
continue

self.report({
'capabilities': capabilities,
'tactics': tactics,
'techniques': techniques,
'subtechniques': subtechniques,
'ids': ids,
'rules': rules
})

if __name__ == '__main__':
CapaAnalyzer().run()