Skip to content

Commit

Permalink
Quickfix for TheHive-Project#169: filter input from artifacts, only a…
Browse files Browse the repository at this point in the history
…llow letters for tld part of domains
  • Loading branch information
3c7 committed Jan 13, 2018
1 parent 4f541f9 commit 69c47cf
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 5 deletions.
3 changes: 1 addition & 2 deletions contrib/cortexutils/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python
# encoding: utf-8

import os
import sys
import codecs
Expand Down Expand Up @@ -154,7 +153,7 @@ def summary(self, raw):
def artifacts(self, raw):
# Use the regex extractor, if auto_extract setting is not False
if self.auto_extract:
extractor = Extractor()
extractor = Extractor(ignore=self.get_data())
return extractor.check_iterable(raw)

# Return empty list
Expand Down
23 changes: 21 additions & 2 deletions contrib/cortexutils/extractor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python
from builtins import str as unicode

import io
import re


Expand All @@ -11,9 +13,13 @@ class Extractor:
Currently, this is not a fulltext search, so the the ioc's must be isolated strings, to get found.
This can be iterated for ioc's.
:param ignore: List of strings or a single string to ignore when matching artifacts to type
:type ignore: list, str
"""

def __init__(self):
def __init__(self, ignore=None):
self.ignore = ignore
self.regex = self.__init_regex()

@staticmethod
Expand Down Expand Up @@ -63,9 +69,10 @@ def __init_regex():
})

# domain
tldpattern = '('
regex.append({
'type': 'domain',
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.\w+$')
'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.[a-zA-Z]+$'.format(tldpattern))
})

# hash
Expand Down Expand Up @@ -108,6 +115,16 @@ def __init_regex():

return regex

@staticmethod
def __get_tlds():
"""Get a list of tlds from the contributed mozille tld list"""
tlds = []
with io.open('contrib/tlds.txt') as tldfile:
for line in tldfile:
if line != '' and not line.beginswith('//'):
tlds.append(line)
return tlds

def __checktype(self, value):
"""Checks if the given value is a known datatype
Expand All @@ -116,6 +133,8 @@ def __checktype(self, value):
:return: Data type of value, if known, else empty string
:rtype: str
"""
if self.ignore and value in self.ignore:
return ''

if isinstance(value, (str, unicode)):
for r in self.regex:
Expand Down
2 changes: 1 addition & 1 deletion contrib/tests/test_suite_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def setUp(self):
load_test_fixture('fixtures/test-report-response.json')
self.analyzer = Analyzer()

def test_error_response(self):
def test_report_response(self):
# Run the analyzer report method
self.analyzer.report({'report_id':'12345'})

Expand Down
7 changes: 7 additions & 0 deletions contrib/tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,10 @@ def test_iterable(self):
l_expected,
'Check_iterable: wrong list returned.'
)

def test_float(self):
self.assertEqual(
self.extractor.check_string(value='0.001234'),
'',
'Check_float: float was recognized, but should not.'
)
35 changes: 35 additions & 0 deletions contrib/tests/test_suite_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python
# coding: utf-8
import json
import unittest
import sys

from cortexutils.analyzer import Analyzer

# Different lib when using python3 or 2
if sys.version_info >= (3, 0):
from io import StringIO
else:
from StringIO import StringIO

class AnalyzerExtractorOutputTest(unittest.TestCase):
def setUp(self):
sys.stdin = StringIO(json.dumps({
"data": "8.8.8.8",
"dataType": "ip"
}))
sys.stdout = StringIO()
self.analyzer = Analyzer()

def test_output(self):
# Run the report method
self.analyzer.report({'result': '1.2.3.4'})

# Grab the output
output = self.analyzer.fpoutput.getvalue().strip()
json_output = json.loads(output)

# Checks
self.assertNotIn(self.analyzer.get_data(), output)
self.assertEqual(json_output['artifacts'][0]['value'], '1.2.3.4')
self.assertEqual(json_output['artifacts'][0]['type'], 'ip')

0 comments on commit 69c47cf

Please sign in to comment.