Skip to content

Commit

Permalink
#41 #31 import FireholBlocklists from PR41
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromeleonard committed May 17, 2017
1 parent 2867f09 commit 356c03d
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 0 deletions.
11 changes: 11 additions & 0 deletions analyzers/FireholBlocklists/FireholBlocklists.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "FireholBlocklists",
"author": "Nils Kuhnert, CERT-Bund",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "0.1",
"baseConfig": "FireholBlocklists",
"config": {},
"description": "Check ips against the firehol blocklists available at https://github.com/firehol/blocklist-ipsets.",
"dataTypeList": ["ip"],
"command": "FireholBlocklists/firehol_blocklists.py"
}
128 changes: 128 additions & 0 deletions analyzers/FireholBlocklists/firehol_blocklists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python

import ipaddress
import os
import re
from io import open
from time import sleep
import datetime as dt
import pytz
from dateutil.parser import parse

from cortexutils.analyzer import Analyzer


class FireholBlocklistsAnalyzer(Analyzer):
"""Analyzer that compares ips from TheHive to FireHOL ip blocking lists. Check them out under
`iplists.firehol.org <https://iplists.firehol.org/>`_"""

def __init__(self):
Analyzer.__init__(self)

# Get config parameters
self.path = self.getParam('config.blocklistpath', '/tmp/fireholblocklists')
self.ignoredays = self.getParam('config.ignoreolderthandays', 365)
self.utc = pytz.UTC
self.now = dt.datetime.now(tz=self.utc)

# Check if directory exists
if not os.path.exists(self.path):
os.mkdir(self.path, mode=0o700)
# Downloading/updating the list is implemented with an external cronjob which git pulls the repo

# Read files in the given path and prepare file lists for ip- and netsets
files = os.listdir(self.path)
self.ipsets = []
self.netsets = []
for file in files:
if '.ipset' in file:
self.ipsets.append(file)
elif '.netset' in file:
self.netsets.append(file)

def _check_ip(self, ip):
"""Here all the workload happens. Read the files, check if the ip is in there and report the results.
If the lock file is found, which gets created when lists are getting updated, the script starts to sleep 10
seconds before checking again. Also reads the source file date and checks, if its too old (ignoreolderthandays
parameter).
:param ip: IP to search for.
:type ip: str
:returns: List of hits containing dictionaries.
:rtype: list
"""

# hits will be the variable to store all matches
hits = []
description = {}

# Check for lock
while os.path.isfile('{}/.lock'.format(self.path)):
sleep(10)

# First: check the ipsets
for ipset in self.ipsets:
with open('{}/{}'.format(self.path, ipset)) as afile:
ipsetname = ipset.split('.')[0]
description.update({ipsetname: ''})
for l in afile:
if l[0] == '#':
# Check for date and break if too old
if '# Source File Date: ' in l:
datestr = re.sub('# Source File Date: ', '', l.rstrip('\n'))
date = parse(datestr)
if (date - self.now).days > self.ignoredays:
break
description[ipsetname] += re.sub(r'^\[.*\] \(.*\) [a-zA-Z0-9.\- ]*$', '', l.lstrip('# '))\
.replace('\n\n', '\n')
else:
if ip in l:
# On match append to hits and break; next file!
hits.append({'list': ipsetname, 'description': description.get(ipsetname)})
break

# Second: check the netsets
for netset in self.netsets:
with open('{}/{}'.format(self.path, netset)) as afile:
netsetname = netset.split('.')[0]
description.update({netsetname: ''})
for l in afile:
if l[0] == '#':
# Check for date and break if too old
if '# Source File Date: ' in l:
datestr = re.sub('# Source File Date: ', '', l.rstrip('\n'))
date = parse(datestr)
if (date - self.now).days > self.ignoredays:
break
description[netsetname] += re.sub(r'^\[.*\] \(.*\) [a-zA-Z0-9.\- ]*$', '', l.lstrip('# '))\
.replace('\n\n', '\n')
else:
try:
if ipaddress.ip_address(ip) in ipaddress.ip_network(u'{}'.format(l.split('\n')[0])):
hits.append({'list': netsetname, 'description': description.get(netsetname)})
break
except ValueError as e:
self.error('ValueError occured. Used values: ipnetwork {}, ip to check {}, file {}.'
'Error message: {}'.format(l, ip, netset, e))

return hits

def summary(self, raw):
result = {
'count': raw.get('count'),
'hits': []
}
for hit in raw.get('hits'):
result['hits'].append(hit.get('list'))
return result

def run(self):
ip = self.getData()
if '/' in ip:
self.error('CIDR notation currently not supported.')
hits = self._check_ip(ip)
self.report({'hits': hits, 'count': len(hits)})


if __name__ == '__main__':
FireholBlocklistsAnalyzer().run()
4 changes: 4 additions & 0 deletions analyzers/FireholBlocklists/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ipaddress
pytz
dateutil
datetime
9 changes: 9 additions & 0 deletions analyzers/FireholBlocklists/test_data/ips.ipset
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# ipset_test_list
#
# [Test list] (https://xkcd.com/1810/) -
# This is just a testlist
#
# Maintainer : Nils Kuhnert
# Source File Date: Thu Dec 1 00:09:12 UTC 2016
127.0.0.1
9 changes: 9 additions & 0 deletions analyzers/FireholBlocklists/test_data/net.netset
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# netset_test_list
#
# [Test list] (https://xkcd.com/1810/) -
# This is just a testlist
#
# Maintainer : Nils Kuhnert
# Source File Date: Thu Dec 1 00:09:12 UTC 2016
127.0.0.0/30
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"dataType": "ip",
"data": "127.0.0.1",
"config": {
"blocklistpath": "PATH",
"ignoreolderthandays": 200
}
}
9 changes: 9 additions & 0 deletions analyzers/FireholBlocklists/test_data/very_old.ipset
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# very_old_test_list
#
# [Test list] (https://xkcd.com/1810/) -
# This is just a testlist
#
# Maintainer : Nils Kuhnert
# Source File Date: Thu Dec 1 00:09:12 UTC 1823
192.168.0.1
41 changes: 41 additions & 0 deletions analyzers/FireholBlocklists/test_firehol_blocklists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
Unittests for firehol blocklists analyzer
"""
import io
import json
import os
import unittest
import sys
from .firehol_blocklists import FireholBlocklistsAnalyzer


__abspath__ = os.path.dirname(os.path.abspath(__file__))
__stdout__ = sys.stdout
sys.path.insert(0, __abspath__)


def load_data(file: str):
with io.open(os.path.join(__abspath__, 'test_data', file)) as afile:
input_str = afile.read().replace('PATH', os.path.join(__abspath__, 'test_data'))
sys.stdin = io.StringIO(input_str)
sys.stdout = io.StringIO()


class TestFireholBlocklistsValidData(unittest.TestCase):
def setUp(self):
load_data('test_firehol_blocklists.json')
self.analyzer = FireholBlocklistsAnalyzer()

def test_path(self):
self.assertEqual(self.analyzer.path, os.path.join(__abspath__, 'test_data'), 'Wrong path.')

def test_type(self):
self.assertEqual(self.analyzer.data_type, 'ip', 'Wrong data type.')

def test_results(self):
self.analyzer.run()
results = json.loads(sys.stdout.getvalue())
self.assertEqual(results.get('full').get('count'), 2, 'Number of hits are wrong.')
for hit in results.get('full').get('hits'):
self.assertTrue(hit.get('list') == 'ips' or hit.get('list') == 'net', 'Expected lists are wrong.')
31 changes: 31 additions & 0 deletions thehive-templates/FireholBlocklists_0_1/long.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<div class="panel panel-info" ng-if="content.count > 0">
<div class="panel-heading">
Firehol IP Report (https://github.com/firehol/blocklist-ipsets)
</div>
<div class="panel-body">
<dl class="dl-horizontal" ng-repeat="h in content.hits">
<dt>
Match <a href="http://iplists.firehol.org/?ipset={{h.list}}">{{h.list}}</a>:
</dt>
<dd>
<p style="white-space: pre-line;">{{h.description}}</p>
</dd>
</dl>
</div>
</div>
<div class="panel panel-success" ng-if="content.count == 0 && success">
<div class="panel-heading">
Firehol IP Report (https://github.com/firehol/blocklist-ipsets): <b>No matches</b>
</div>
</div>
<div class="panel panel-danger" ng-if="!success">
<div class="panel-heading">
Firehol IP Report (https://github.com/firehol/blocklist-ipsets) <b>Error</b>
</div>
<div class="panel-body">
<dl class="dl-horizontal">
<dt>Error: </dt>
<dd>{{content.error}}</dd>
</dl>
</div>
</div>
5 changes: 5 additions & 0 deletions thehive-templates/FireholBlocklists_0_1/short.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<span ng-if="content.count == 0" class="label label-success">Firehol: No matches</span>&nbsp;
<span ng-if="content.count > 0" class="label label-danger">Firehol: {{content.count}} matches</span>&nbsp;
<div style="display:inline-block;" ng-repeat="h in content.hits">
<a href="http://iplists.firehol.org/?ipset={{h}}"><span class="label label-danger">Firehol: {{h}}</span></a>&nbsp;
</div>

0 comments on commit 356c03d

Please sign in to comment.