Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic ioc extraction using RegEx #40

Merged
merged 6 commits into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion contrib/cortexutils/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import codecs
import json
from cortexutils.extractor import Extractor

class Analyzer:

Expand Down Expand Up @@ -39,6 +40,9 @@ def __init__(self):
# Not breaking compatibility
self.artifact = self.__input

# Check for auto extraction config
self.auto_extract = self.get_param('config.auto_extract', True)

# Not breaking compatibility
def notSupported(self):
self.error('This datatype is not supported by this analyzer.')
Expand Down Expand Up @@ -132,8 +136,12 @@ def summary(self, raw):
return {}

def artifacts(self, raw):
# TODO implement a default artifact extraction strategy
# Use the regex extractor, if auto_extract setting is not False
if self.auto_extract:
extractor = Extractor()
return extractor.check_iterable(raw)

# Return empty list
return []

def error(self, message, ensure_ascii=False):
Expand Down
181 changes: 181 additions & 0 deletions contrib/cortexutils/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python
from builtins import str as unicode
import re


class Extractor:
"""
The extractor class tries to detect ioc attribute types using regex-matching. Two functions are provided:
- ``check_string(str)`` which checks a string for a regex match and just returns the type
- ``check_iterable(itr)`` that iterates over a list or a dictionary and returns a list of {type, value} dicts

Currently, this is not a fulltext search, so the the ioc's must be isolated strings, to get found.
This can be iterated for ioc's.
"""

def __init__(self):
self.regex = self.__init_regex()

@staticmethod
def __init_regex():
"""
Returns compiled regex list.

:return: List of {type, regex} dicts
:rtype: list
"""

# IPv4
regex = [{
'type': 'ip',
'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
}]

# IPv6
# RegEx from https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses
r = '(' + \
'([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,7}:|' + \
'([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|' + \
'([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|' + \
'([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|' + \
'[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|' + \
':((:[0-9a-fA-F]{1,4}){1,7}|:)|' + \
'fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|' + \
'::(ffff(:0{1,4}){0,1}:){0,1}' + \
'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \
'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' + \
'([0-9a-fA-F]{1,4}:){1,4}:' + \
'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \
'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' + \
')'
regex.append({
'type': 'ip',
'regex': re.compile(r'{}'.format(r))
})

# URL
regex.append({
'type': 'url',
'regex': re.compile(r'^(http\:\/\/|https:\/\/)')
})

# domain
regex.append({
'type': 'domain',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.\w+$')
})

# hash
regex.append({
'type': 'hash',
'regex': re.compile(r'^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$')
})

# user-agent
regex.append({
'type': 'user-agent',
'regex': re.compile(r'^(Mozilla\/[45]\.0 |AppleWebKit\/[0-9]{3}\.[0-9]{2} |Chrome\/[0-9]{2}\.[0-9]\.'
r'[0-9]{4}\.[0-9]{3} |Safari\/[0-9]{3}\.[0-9]{2} ).*?$')
})

# uri_path
regex.append({
'type': 'uri_path',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)[A-Za-z]*\:\/\/')
})

# regkey
regex.append({
'type': 'registry',
'regex': re.compile(r'^(HKEY|HKLM|HKCU|HKCR|HKCC)'
r'(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$')
})

# mail
regex.append({
'type': 'mail',
'regex': re.compile(r'[\w\.\-]+@\w+\.[\w\.]+')
})

# fqdn
regex.append({
'type': 'fqdn',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-\.]+\.[\w\-]+\.\w+$')
})

return regex

def __checktype(self, value):
"""Checks if the given value is a known datatype

:param value: The value to check
:type value: str
:return: Data type of value, if known, else empty string
:rtype: str
"""

if isinstance(value, (str, unicode)):
for r in self.regex:
if r.get('regex').match(value):
return r.get('type')
return ''

def check_string(self, value):
"""
Checks if a string matches a datatype.

:param value: String to test
:type value: str
:return: Data type or empty string
:rtype: str
"""
return self.__checktype(value)

def check_iterable(self, iterable):
"""
Checks values of a list or a dict on ioc's. Returns a list of dict {type, value}. Raises TypeError, if iterable
is not an expected type.

:param iterable: List or dict of values
:type iterable: list dict str
:return: List of ioc's matching the regex
:rtype: list
"""
results = []
# Only the string left
if isinstance(iterable, (str, unicode)):
dt = self.__checktype(iterable)
if len(dt) > 0:
results.append({
'type': dt,
'value': iterable
})
elif isinstance(iterable, list):
for item in iterable:
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
results.append({
'type': dt,
'value': item
})
elif isinstance(iterable, dict):
for _, item in iterable.items():
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
results.append({
'type': dt,
'value': item
})
else:
raise TypeError('Not supported type.')

return results
5 changes: 4 additions & 1 deletion contrib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
'Programming Language :: Python',
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules'],
py_modules=['cortexutils.analyzer'],
py_modules=[
'cortexutils.analyzer',
'cortexutils.extractor'
],
install_requires=[],
test_suite='tests'
)
File renamed without changes.
149 changes: 149 additions & 0 deletions contrib/tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#!/usr/bin/env python
"""
This contains the unit tests for the extractor.
"""
import unittest

from cortexutils.extractor import Extractor


class TestExtractorValidInput(unittest.TestCase):
"""This tests the extractor with valid input."""

def setUp(self):
self.extractor = Extractor()

def test_single_fqdn(self):
self.assertEqual(
self.extractor.check_string(value='www.google.de'),
'fqdn',
'FQDN single string: wrong data type.'
)

def test_single_fqdn_as_unicode(self):
self.assertEqual(
self.extractor.check_string(value=u'www.google.de'),
'fqdn',
'FQDN single string: wrong data type.'
)

def test_single_domain(self):
self.assertEqual(
self.extractor.check_string(value='google.de'),
'domain',
'domain single string: wrong data type.'
)

def test_single_url(self):
self.assertEqual(
self.extractor.check_string(value='https://google.de'),
'url',
'url single string: wrong data type.'
)

def test_single_ipv4(self):
self.assertEqual(
self.extractor.check_string(value='10.0.0.1'),
'ip',
'ipv4 single string: wrong data type.'
)

def test_single_ipv6(self):
self.assertEqual(
self.extractor.check_string(value='2001:0db8:85a3:08d3:1319:8a2e:0370:7344'),
'ip',
'ipv6 single string: wrong data type.'
)

def test_single_md5(self):
self.assertEqual(
self.extractor.check_string(value='b373bd6b144e7846f45a1e47ced380b8'),
'hash',
'md5 single string: wrong data type.'
)

def test_single_sha1(self):
self.assertEqual(
self.extractor.check_string(value='94d4d48ba9a79304617f8291982bf69a8ce16fb0'),
'hash',
'sha1 single string: wrong data type.'
)

def test_single_sha256(self):
self.assertEqual(
self.extractor.check_string(value='7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'),
'hash',
'sha256 single string: wrong data type.'
)

def test_single_useragent(self):
self.assertEqual(
self.extractor.check_string(value='Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 '
'Firefox/52.0'),
'user-agent',
'user-agent single string: wrong data type.'
)

def test_single_mail(self):
self.assertEqual(
self.extractor.check_string(value='[email protected]'),
'mail',
'mail single string: wrong data type.'
)

def test_single_regkey(self):
self.assertEqual(
self.extractor.check_string(value='HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion\\Run'),
'registry',
'registry single string: wrong data type.'
)

def test_iterable(self):
l_real = self.extractor.check_iterable({
'results': [
{
'This is an totally unimportant key': '127.0.0.1'
},
{
'Totally nested!': ['https://nestedurl.verynested.com']
}
],
'some_more': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4',
'another_list': ['google.de', 'bing.com', 'www.fqdn.de']
})
l_expected = [
{
'type': 'hash',
'value': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
},
{
'type': 'ip',
'value': '127.0.0.1'
},
{
'type': 'url',
'value': 'https://nestedurl.verynested.com'
},
{
'type': 'domain',
'value': 'google.de'
},
{
'type': 'domain',
'value': 'bing.com'
},
{
'type': 'fqdn',
'value': 'www.fqdn.de'
}
]

# Sorting the lists
l_real = sorted(l_real, key=lambda k: k['value'])
l_expected = sorted(l_expected, key=lambda k: k['value'])

self.assertEqual(
l_real,
l_expected,
'Check_iterable: wrong list returned.'
)