Skip to content

Commit

Permalink
#4 #40 Automatic ioc extraction using RegEx
Browse files Browse the repository at this point in the history
* Added RegEx ioc extraction

* Added module to setup.py, changed import statement in analyzer.py
  • Loading branch information
3c7 authored and nadouani committed Apr 18, 2017
1 parent 27e6a50 commit 7df7796
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 2 deletions.
10 changes: 9 additions & 1 deletion contrib/cortexutils/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import codecs
import json
from cortexutils.extractor import Extractor

class Analyzer:

Expand Down Expand Up @@ -39,6 +40,9 @@ def __init__(self):
# Not breaking compatibility
self.artifact = self.__input

# Check for auto extraction config
self.auto_extract = self.get_param('config.auto_extract', True)

# Not breaking compatibility
def notSupported(self):
self.error('This datatype is not supported by this analyzer.')
Expand Down Expand Up @@ -132,8 +136,12 @@ def summary(self, raw):
return {}

def artifacts(self, raw):
# TODO implement a default artifact extraction strategy
# Use the regex extractor, if auto_extract setting is not False
if self.auto_extract:
extractor = Extractor()
return extractor.check_iterable(raw)

# Return empty list
return []

def error(self, message, ensure_ascii=False):
Expand Down
181 changes: 181 additions & 0 deletions contrib/cortexutils/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python
from builtins import str as unicode
import re


class Extractor:
"""
The extractor class tries to detect ioc attribute types using regex-matching. Two functions are provided:
- ``check_string(str)`` which checks a string for a regex match and just returns the type
- ``check_iterable(itr)`` that iterates over a list or a dictionary and returns a list of {type, value} dicts
Currently, this is not a fulltext search, so the the ioc's must be isolated strings, to get found.
This can be iterated for ioc's.
"""

def __init__(self):
self.regex = self.__init_regex()

@staticmethod
def __init_regex():
"""
Returns compiled regex list.
:return: List of {type, regex} dicts
:rtype: list
"""

# IPv4
regex = [{
'type': 'ip',
'regex': re.compile(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')
}]

# IPv6
# RegEx from https://stackoverflow.com/questions/53497/regular-expression-that-matches-valid-ipv6-addresses
r = '(' + \
'([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,7}:|' + \
'([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|' + \
'([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|' + \
'([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|' + \
'([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|' + \
'[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|' + \
':((:[0-9a-fA-F]{1,4}){1,7}|:)|' + \
'fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|' + \
'::(ffff(:0{1,4}){0,1}:){0,1}' + \
'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \
'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|' + \
'([0-9a-fA-F]{1,4}:){1,4}:' + \
'((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}' + \
'(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])' + \
')'
regex.append({
'type': 'ip',
'regex': re.compile(r'{}'.format(r))
})

# URL
regex.append({
'type': 'url',
'regex': re.compile(r'^(http\:\/\/|https:\/\/)')
})

# domain
regex.append({
'type': 'domain',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.\w+$')
})

# hash
regex.append({
'type': 'hash',
'regex': re.compile(r'^([0-9a-fA-F]{32}|[0-9a-fA-F]{40}|[0-9a-fA-F]{64})$')
})

# user-agent
regex.append({
'type': 'user-agent',
'regex': re.compile(r'^(Mozilla\/[45]\.0 |AppleWebKit\/[0-9]{3}\.[0-9]{2} |Chrome\/[0-9]{2}\.[0-9]\.'
r'[0-9]{4}\.[0-9]{3} |Safari\/[0-9]{3}\.[0-9]{2} ).*?$')
})

# uri_path
regex.append({
'type': 'uri_path',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)[A-Za-z]*\:\/\/')
})

# regkey
regex.append({
'type': 'registry',
'regex': re.compile(r'^(HKEY|HKLM|HKCU|HKCR|HKCC)'
r'(_LOCAL_MACHINE|_CURRENT_USER|_CURRENT_CONFIG|_CLASSES_ROOT|)[\\a-zA-Z0-9]+$')
})

# mail
regex.append({
'type': 'mail',
'regex': re.compile(r'[\w\.\-]+@\w+\.[\w\.]+')
})

# fqdn
regex.append({
'type': 'fqdn',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-\.]+\.[\w\-]+\.\w+$')
})

return regex

def __checktype(self, value):
"""Checks if the given value is a known datatype
:param value: The value to check
:type value: str
:return: Data type of value, if known, else empty string
:rtype: str
"""

if isinstance(value, (str, unicode)):
for r in self.regex:
if r.get('regex').match(value):
return r.get('type')
return ''

def check_string(self, value):
"""
Checks if a string matches a datatype.
:param value: String to test
:type value: str
:return: Data type or empty string
:rtype: str
"""
return self.__checktype(value)

def check_iterable(self, iterable):
"""
Checks values of a list or a dict on ioc's. Returns a list of dict {type, value}. Raises TypeError, if iterable
is not an expected type.
:param iterable: List or dict of values
:type iterable: list dict str
:return: List of ioc's matching the regex
:rtype: list
"""
results = []
# Only the string left
if isinstance(iterable, (str, unicode)):
dt = self.__checktype(iterable)
if len(dt) > 0:
results.append({
'type': dt,
'value': iterable
})
elif isinstance(iterable, list):
for item in iterable:
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
results.append({
'type': dt,
'value': item
})
elif isinstance(iterable, dict):
for _, item in iterable.items():
if isinstance(item, list) or isinstance(item, dict):
results.extend(self.check_iterable(item))
else:
dt = self.__checktype(item)
if len(dt) > 0:
results.append({
'type': dt,
'value': item
})
else:
raise TypeError('Not supported type.')

return results
5 changes: 4 additions & 1 deletion contrib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
'Programming Language :: Python',
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules'],
py_modules=['cortexutils.analyzer'],
py_modules=[
'cortexutils.analyzer',
'cortexutils.extractor'
],
install_requires=[],
test_suite='tests'
)
File renamed without changes.
149 changes: 149 additions & 0 deletions contrib/tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#!/usr/bin/env python
"""
This contains the unit tests for the extractor.
"""
import unittest

from cortexutils.extractor import Extractor


class TestExtractorValidInput(unittest.TestCase):
"""This tests the extractor with valid input."""

def setUp(self):
self.extractor = Extractor()

def test_single_fqdn(self):
self.assertEqual(
self.extractor.check_string(value='www.google.de'),
'fqdn',
'FQDN single string: wrong data type.'
)

def test_single_fqdn_as_unicode(self):
self.assertEqual(
self.extractor.check_string(value=u'www.google.de'),
'fqdn',
'FQDN single string: wrong data type.'
)

def test_single_domain(self):
self.assertEqual(
self.extractor.check_string(value='google.de'),
'domain',
'domain single string: wrong data type.'
)

def test_single_url(self):
self.assertEqual(
self.extractor.check_string(value='https://google.de'),
'url',
'url single string: wrong data type.'
)

def test_single_ipv4(self):
self.assertEqual(
self.extractor.check_string(value='10.0.0.1'),
'ip',
'ipv4 single string: wrong data type.'
)

def test_single_ipv6(self):
self.assertEqual(
self.extractor.check_string(value='2001:0db8:85a3:08d3:1319:8a2e:0370:7344'),
'ip',
'ipv6 single string: wrong data type.'
)

def test_single_md5(self):
self.assertEqual(
self.extractor.check_string(value='b373bd6b144e7846f45a1e47ced380b8'),
'hash',
'md5 single string: wrong data type.'
)

def test_single_sha1(self):
self.assertEqual(
self.extractor.check_string(value='94d4d48ba9a79304617f8291982bf69a8ce16fb0'),
'hash',
'sha1 single string: wrong data type.'
)

def test_single_sha256(self):
self.assertEqual(
self.extractor.check_string(value='7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'),
'hash',
'sha256 single string: wrong data type.'
)

def test_single_useragent(self):
self.assertEqual(
self.extractor.check_string(value='Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 '
'Firefox/52.0'),
'user-agent',
'user-agent single string: wrong data type.'
)

def test_single_mail(self):
self.assertEqual(
self.extractor.check_string(value='[email protected]'),
'mail',
'mail single string: wrong data type.'
)

def test_single_regkey(self):
self.assertEqual(
self.extractor.check_string(value='HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion\\Run'),
'registry',
'registry single string: wrong data type.'
)

def test_iterable(self):
l_real = self.extractor.check_iterable({
'results': [
{
'This is an totally unimportant key': '127.0.0.1'
},
{
'Totally nested!': ['https://nestedurl.verynested.com']
}
],
'some_more': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4',
'another_list': ['google.de', 'bing.com', 'www.fqdn.de']
})
l_expected = [
{
'type': 'hash',
'value': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
},
{
'type': 'ip',
'value': '127.0.0.1'
},
{
'type': 'url',
'value': 'https://nestedurl.verynested.com'
},
{
'type': 'domain',
'value': 'google.de'
},
{
'type': 'domain',
'value': 'bing.com'
},
{
'type': 'fqdn',
'value': 'www.fqdn.de'
}
]

# Sorting the lists
l_real = sorted(l_real, key=lambda k: k['value'])
l_expected = sorted(l_expected, key=lambda k: k['value'])

self.assertEqual(
l_real,
l_expected,
'Check_iterable: wrong list returned.'
)

0 comments on commit 7df7796

Please sign in to comment.