Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Crowdsec CTI analyzer #1116

Merged
merged 4 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions analyzers/Crowdsec/Crowdsec_analyzer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "Crowdsec_Analyzer",
"version": "1.0",
"author": "CERT-ARKEA",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Query Crowdsec API",
"dataTypeList": ["ip"],
"command": "Crowdsec/crowdsec_analyzer.py",
"baseConfig": "Crowdsec",

"configurationItems": [
{
"name": "api_key",
"description": "Crowdsec API key",
"type": "string",
"multi": false,
"required": true
}
]
}

62 changes: 62 additions & 0 deletions analyzers/Crowdsec/crowdsec_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python3

from cortexutils.analyzer import Analyzer
from crowdsec_api import Crowdsec
from datetime import datetime


class CrowdsecAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
self.crowdsec_key = self.get_param("config.api_key", None, "Missing Crowdsec API key")
self.crowdsec_client = None
self.verbose_taxonomies = self.get_param("config.verbose_taxonomies", False)
self.polling_interval = self.get_param("config.polling_interval", 60)

def summary(self, raw):
taxonomies = []
namespace = "Crowdsec"
levelinfo = "info"
levelorange = "suspicious"
levelgreen = "safe"

if 'as_name' in raw:
taxonomies.append(self.build_taxonomy(levelinfo, namespace, 'ASN', raw['as_name']))

if 'ip_range_score' in raw:
taxonomies.append(self.build_taxonomy(levelinfo, namespace, 'Score', raw['ip_range_score']))

if 'history' in raw:
taxonomies.append(self.build_taxonomy(levelinfo, namespace, 'LastSeen', raw['history']['last_seen']))

if 'attack_categories' in raw:
for attack in raw['attack_categories'] :
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@CERT-ARKEA CERT-ARKEA Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arf yes, endpoint has changed

so yes attack_details instead of attack_categories

i have changed the code

taxonomies.append(self.build_taxonomy(levelorange, namespace, 'Attack', attack['name']))

if len(taxonomies) == 0:
taxonomies.append(self.build_taxonomy(levelgreen, namespace, 'Threat', 'Not found'))

### uncomment for full taxonomies report
#if raw['attack_details']:
# for attackdetails in raw['attack_details'] :
# taxonomies.append(self.build_taxonomy(levelorange, namespace, 'Attack_details', attackdetails['name']))

return {"taxonomies": taxonomies}


def run(self):
Analyzer.run(self)
try:
self.crowdsec_client = Crowdsec(self.crowdsec_key)
data = self.get_param("data", None, "Data is missing")
results = self.crowdsec_client.summary(data, self.data_type)

self.report(results)

except Exception:
pass


if __name__ == "__main__":
CrowdsecAnalyzer().run()

67 changes: 67 additions & 0 deletions analyzers/Crowdsec/crowdsec_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3
from requests.compat import urljoin
import requests


class Crowdsec:
"""Wrapper around the Crowdsec REST API
:param key: The Crowdsec API key
:type key: str
"""

def __init__(self, key: str):
"""Intializes the API object
:param key: The Crowdsec API key
:type key: str
"""
self.api_key = key
self.base_url = "https://cti.api.crowdsec.net"

def _request(self, path: str):
"""Specialized wrapper around the requests module to request data from Crowdsec
:param path: The URL path after the Crowdsec FQDN
:type path: str
"""
headers = {
"x-api-key": self.api_key ,
"accept": "application/json"
}
url = urljoin(self.base_url, path)
response = requests.get(url, headers=headers)

if response.status_code == 429:
raise APIRateLimiting(response.text)
try:
response_data = response.json()
except:
raise APIError("Couldn't parse response JSON")

return response_data

def summary(self, data: str, datatype: str):
"""Return a summary of all information we have for the given IPv{4,6} address.
"""
if datatype == 'ip':
url_path = "/v1/smoke/{ip}".format(ip=data)
return self._request(path=url_path)


class APIError(Exception):
"""This exception gets raised when the returned error code is non-zero positive"""

def __init__(self, value):
self.value = value

def __str__(self):
return self.value


class APIRateLimiting(Exception):
"""This exception gets raised when the 429 HTTP code is returned"""

def __init__(self, value):
self.value = value

def __str__(self):
return self.value