diff --git a/contrib/cortexutils/analyzer.py b/contrib/cortexutils/analyzer.py index 7d9a4afd1..e4ac2355c 100644 --- a/contrib/cortexutils/analyzer.py +++ b/contrib/cortexutils/analyzer.py @@ -1,6 +1,5 @@ #!/usr/bin/env python # encoding: utf-8 - import os import sys import codecs @@ -154,7 +153,7 @@ def summary(self, raw): def artifacts(self, raw): # Use the regex extractor, if auto_extract setting is not False if self.auto_extract: - extractor = Extractor() + extractor = Extractor(ignore=self.get_data()) return extractor.check_iterable(raw) # Return empty list diff --git a/contrib/cortexutils/extractor.py b/contrib/cortexutils/extractor.py index f77c1df13..808a4e79e 100644 --- a/contrib/cortexutils/extractor.py +++ b/contrib/cortexutils/extractor.py @@ -1,5 +1,6 @@ #!/usr/bin/env python from builtins import str as unicode + import re @@ -11,9 +12,13 @@ class Extractor: Currently, this is not a fulltext search, so the the ioc's must be isolated strings, to get found. This can be iterated for ioc's. + + :param ignore: List of strings or a single string to ignore when matching artifacts to type + :type ignore: list, str """ - def __init__(self): + def __init__(self, ignore=None): + self.ignore = ignore self.regex = self.__init_regex() @staticmethod @@ -65,7 +70,7 @@ def __init_regex(): # domain regex.append({ 'type': 'domain', - 'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.\w+$') + 'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-]+\.[a-zA-Z]+$') }) # hash @@ -103,7 +108,7 @@ def __init_regex(): # fqdn regex.append({ 'type': 'fqdn', - 'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-\.]+\.[\w\-]+\.\w+$') + 'regex': re.compile(r'^(?!http\:\/\/|https\:\/\/)^[\w\-\.]+\.[\w\-]+\.[a-zA-Z]+$') }) return regex @@ -116,6 +121,8 @@ def __checktype(self, value): :return: Data type of value, if known, else empty string :rtype: str """ + if self.ignore and value in self.ignore: + return '' if isinstance(value, (str, unicode)): for r in self.regex: diff --git a/contrib/setup.py b/contrib/setup.py index 157d4f4f4..cea67c1a3 100644 --- a/contrib/setup.py +++ b/contrib/setup.py @@ -2,7 +2,7 @@ setup( name='cortexutils', - version='1.2.0', + version='1.2.1', description='A Python library for including utility classes for Cortex analyzers', long_description=open('README').read(), author='TheHive-Project', diff --git a/contrib/tests/test_suite_analyzer.py b/contrib/tests/test_suite_analyzer.py index b171afb36..9192a2d8c 100644 --- a/contrib/tests/test_suite_analyzer.py +++ b/contrib/tests/test_suite_analyzer.py @@ -132,7 +132,7 @@ def setUp(self): load_test_fixture('fixtures/test-report-response.json') self.analyzer = Analyzer() - def test_error_response(self): + def test_report_response(self): # Run the analyzer report method self.analyzer.report({'report_id':'12345'}) diff --git a/contrib/tests/test_suite_extractor.py b/contrib/tests/test_suite_extractor.py index 2b764b9b4..782e38e85 100644 --- a/contrib/tests/test_suite_extractor.py +++ b/contrib/tests/test_suite_extractor.py @@ -147,3 +147,17 @@ def test_iterable(self): l_expected, 'Check_iterable: wrong list returned.' ) + + def test_float_domain(self): + self.assertEqual( + self.extractor.check_string(value='0.001234'), + '', + 'Check_float: float was recognized as domain, but should not.' + ) + + def test_float_fqdn(self): + self.assertEqual( + self.extractor.check_string(value='0.1234.5678'), + '', + 'Check_float_fqdn: float was recognized as fqdn but should not.' + ) diff --git a/contrib/tests/test_suite_integration.py b/contrib/tests/test_suite_integration.py new file mode 100644 index 000000000..04bec6821 --- /dev/null +++ b/contrib/tests/test_suite_integration.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# coding: utf-8 +import json +import unittest +import sys + +from cortexutils.analyzer import Analyzer + +# Different lib when using python3 or 2 +if sys.version_info >= (3, 0): + from io import StringIO +else: + from StringIO import StringIO + +class AnalyzerExtractorOutputTest(unittest.TestCase): + def setUp(self): + sys.stdin = StringIO(json.dumps({ + "data": "8.8.8.8", + "dataType": "ip" + })) + sys.stdout = StringIO() + self.analyzer = Analyzer() + + def test_output(self): + # Run the report method + self.analyzer.report({'result': '1.2.3.4'}) + + # Grab the output + output = self.analyzer.fpoutput.getvalue().strip() + json_output = json.loads(output) + + # Checks + self.assertNotIn(self.analyzer.get_data(), output) + self.assertEqual(json_output['artifacts'][0]['value'], '1.2.3.4') + self.assertEqual(json_output['artifacts'][0]['type'], 'ip')