Skip to content

Commit

Permalink
Merge branch 'release/2.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
nadouani committed Feb 25, 2021
2 parents 3978d77 + 8284c09 commit 586a906
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 42 deletions.
17 changes: 11 additions & 6 deletions cortexutils/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import json
import os
import stat

from cortexutils.extractor import Extractor
from cortexutils.worker import Worker
Expand Down Expand Up @@ -76,12 +77,16 @@ def artifacts(self, raw):
def build_artifact(self, data_type, data, **kwargs):
if data_type == 'file':
if os.path.isfile(data):
(dst, filename) = tempfile.mkstemp(dir=os.path.join(self.job_directory, "output"))
with open(data, 'r') as src:
copyfileobj(src, os.fdopen(dst, 'w'))
kwargs.update({'dataType': data_type, 'file': ntpath.basename(filename),
'filename': ntpath.basename(data)})
return kwargs
dst = tempfile.NamedTemporaryFile(
dir=os.path.join(self.job_directory, "output"), delete=False)
with open(data, 'rb') as src:
copyfileobj(src, dst)
dstfname = dst.name
dst.close()
os.chmod(dstfname, 0o444)
kwargs.update({'dataType': data_type, 'file': os.path.basename(dst.name),
'filename': os.path.basename(data)})
return kwargs
else:
kwargs.update({'dataType': data_type, 'data': data})
return kwargs
Expand Down
20 changes: 10 additions & 10 deletions cortexutils/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ def __init__(self, job_directory):
else: # If input file doesn't exist, fallback to old behavior and read input from stdin
self.job_directory = None
self.__set_encoding()
r, w, e = select.select([sys.stdin], [], [], self.READ_TIMEOUT)
if sys.stdin in r:
if not sys.stdin.isatty():
self._input = json.load(sys.stdin)
else:
self.error('Input file doesn''t exist')
Expand Down Expand Up @@ -142,15 +141,16 @@ def error(self, message, ensure_ascii=False):
:param message: Error message
:param ensure_ascii: Force ascii output. Default: False"""

# Get analyzer input
analyzer_input = self._input
if 'password' in analyzer_input.get('config', {}):
analyzer_input['config']['password'] = 'REMOVED'
if 'key' in analyzer_input.get('config', {}):
analyzer_input['config']['key'] = 'REMOVED'
if 'apikey' in analyzer_input.get('config', {}):
analyzer_input['config']['apikey'] = 'REMOVED'
if 'api_key' in analyzer_input.get('config', {}):
analyzer_input['config']['api_key'] = 'REMOVED'

# Define sensitive key values
secrets = ['password', 'key', 'secret']

# Loop over all the sensitive config names and clean them
for config_key, v in analyzer_input.get('config', {}).items():
if any(secret in config_key.lower() for secret in secrets):
analyzer_input.get('config', {})[config_key] = 'REMOVED'

self.__write_output({'success': False,
'input': analyzer_input,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='cortexutils',
version='2.0.0',
version='2.1.0',
description='A Python library for including utility classes for Cortex analyzers and responders',
long_description=open('README').read(),
author='TheHive-Project',
Expand Down
6 changes: 5 additions & 1 deletion tests/fixtures/test-error-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
"password": "secret",
"key": "secret",
"apikey": "secret",
"api_key": "secret"
"api_key": "secret",
"apiSecret": "secret",
"api_Pass": "secret",
"API": "secret"

}
}
18 changes: 13 additions & 5 deletions tests/test_suite_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_check_tlp_disabled(self):

# Using the _Analyzer__check_tlp notation to access managed method
# __check_tlp
self.assertEqual(self.analyzer._Analyzer__check_tlp(), True)
self.assertEqual(self.analyzer._Worker__check_tlp(), True)

def test_check_tlp_ko(self):
self.analyzer.enable_check_tlp = True
Expand All @@ -84,7 +84,7 @@ def test_check_tlp_ko(self):

# Using the _Analyzer__check_tlp notation to access managed method
# __check_tlp
self.assertEqual(self.analyzer._Analyzer__check_tlp(), False)
self.assertEqual(self.analyzer._Worker__check_tlp(), False)

def test_check_tlp_ok(self):
self.analyzer.enable_check_tlp = True
Expand All @@ -93,7 +93,7 @@ def test_check_tlp_ok(self):

# Using the _Analyzer__check_tlp notation to access managed method
# __check_tlp
self.assertEqual(self.analyzer._Analyzer__check_tlp(), True)
self.assertEqual(self.analyzer._Worker__check_tlp(), True)


class TestErrorResponse(unittest.TestCase):
Expand All @@ -107,13 +107,17 @@ def test_error_response(self):
self.assertEqual(self.analyzer.get_param('config.key'), "secret")
self.assertEqual(self.analyzer.get_param('config.apikey'), "secret")
self.assertEqual(self.analyzer.get_param('config.api_key'), "secret")
self.assertEqual(self.analyzer.get_param('config.apiSecret'), "secret")
self.assertEqual(self.analyzer.get_param('config.api_Pass'), "secret")
self.assertEqual(self.analyzer.get_param('config.API'), "secret")


# Run the error method
with self.assertRaises(SystemExit):
self.analyzer.error('Error', True)

# Get the output
output = self.analyzer.fpoutput.getvalue().strip()
output = sys.stdout.getvalue().strip()
json_output = json.loads(output)

self.assertEqual(json_output['success'], False)
Expand All @@ -124,6 +128,10 @@ def test_error_response(self):
self.assertEqual(json_output['input']['config']['key'], 'REMOVED')
self.assertEqual(json_output['input']['config']['apikey'], 'REMOVED')
self.assertEqual(json_output['input']['config']['api_key'], 'REMOVED')
self.assertEqual(json_output['input']['config']['apiSecret'], 'REMOVED')
self.assertEqual(json_output['input']['config']['api_Pass'], 'secret')
self.assertEqual(json_output['input']['config']['API'], 'secret')



class TestReportResponse(unittest.TestCase):
Expand All @@ -137,7 +145,7 @@ def test_report_response(self):
self.analyzer.report({'report_id': '12345'})

# Get the output
output = self.analyzer.fpoutput.getvalue().strip()
output = sys.stdout.getvalue().strip()
json_output = json.loads(output)

self.assertEqual(json_output.get('success'), True)
Expand Down
28 changes: 14 additions & 14 deletions tests/test_suite_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,34 @@ def test_iterable(self):
})
l_expected = [
{
'type': 'hash',
'value': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
'dataType': 'hash',
'data': '7ef8b3dc5bf40268f66721a89b95f4c5f0cc08e34836f8c3a007ceed193654d4'
},
{
'type': 'ip',
'value': '127.0.0.1'
'dataType': 'ip',
'data': '127.0.0.1'
},
{
'type': 'url',
'value': 'https://nestedurl.verynested.com'
'dataType': 'url',
'data': 'https://nestedurl.verynested.com'
},
{
'type': 'domain',
'value': 'google.de'
'dataType': 'domain',
'data': 'google.de'
},
{
'type': 'domain',
'value': 'bing.com'
'dataType': 'domain',
'data': 'bing.com'
},
{
'type': 'fqdn',
'value': 'www.fqdn.de'
'dataType': 'fqdn',
'data': 'www.fqdn.de'
}
]

# Sorting the lists
l_real = sorted(l_real, key=lambda k: k['value'])
l_expected = sorted(l_expected, key=lambda k: k['value'])
l_real = sorted(l_real, key=lambda k: k['data'])
l_expected = sorted(l_expected, key=lambda k: k['data'])

self.assertEqual(
l_real,
Expand Down
10 changes: 5 additions & 5 deletions tests/test_suite_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ def test_output(self):
self.analyzer.report({'result': '1.2.3.4'})

# Grab the output
output = self.analyzer.fpoutput.getvalue().strip()
output = sys.stdout.getvalue().strip()
json_output = json.loads(output)

# Checks
self.assertNotIn(self.analyzer.get_data(), output)
self.assertEqual(json_output['artifacts'][0]['value'], '1.2.3.4')
self.assertEqual(json_output['artifacts'][0]['type'], 'ip')
self.assertEqual(json_output['artifacts'][0]['data'], '1.2.3.4')
self.assertEqual(json_output['artifacts'][0]['dataType'], 'ip')

class AnalyzerExtractorNoResultTest(unittest.TestCase):
def setUp(self):
Expand All @@ -51,8 +51,8 @@ def test_output(self):
})

# Grab the output
output = self.analyzer.fpoutput.getvalue().strip()
output = sys.stdout.getvalue().strip()
json_output = json.loads(output)

# Check for empty artifact list
self.assertEqual(json_output['artifacts'], [], 'Artifact list should be empty.')
self.assertEqual(json_output['artifacts'], [], 'Artifact list should be empty.')

0 comments on commit 586a906

Please sign in to comment.