Skip to content

Commit

Permalink
Merge pull request #1288 from TheHive-Project/censys2-full-rework
Browse files Browse the repository at this point in the history
Refactor Censys Analyzer for Censys API Version 2
  • Loading branch information
nusantara-self authored Oct 23, 2024
2 parents 05c453f + ceb2df7 commit 04e1e90
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 233 deletions.
16 changes: 12 additions & 4 deletions analyzers/Censys/Censys.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"name": "Censys",
"author": "Nils Kuhnert, CERT-Bund",
"author": "Nils Kuhnert, CERT-Bund; nusantara-self",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/censys-analyzer",
"version": "1.0",
"version": "2.0",
"description": "Check IPs, certificate hashes or domains against censys.io.",
"dataTypeList": ["ip", "hash", "domain", "other"],
"dataTypeList": ["ip", "hash", "domain"],
"baseConfig": "Censys",
"command": "Censys/censys_analyzer.py",
"configurationItems": [
{
"name": "uid",
"name": "uid",
"description": "UID for Censys",
"type": "string",
"multi": false,
Expand All @@ -22,6 +22,14 @@
"type": "string",
"multi": false,
"required": true
},
{
"name": "max_records",
"description": "Maximum number of records for domains",
"type": "number",
"multi": false,
"required": true,
"defaultvalue": 10
}
],
"registration_required": true,
Expand Down
Binary file modified analyzers/Censys/assets/censys.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified analyzers/Censys/assets/long_report.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
187 changes: 104 additions & 83 deletions analyzers/Censys/censys_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from censys.search import CensysHosts
from censys.search import CensysHosts, CensysCerts

from censys.common.exceptions import (
CensysNotFoundException,
CensysRateLimitExceededException,
CensysUnauthorizedException,
)

import iocextract


class CensysAnalyzer(Analyzer):
def __init__(self):
Expand All @@ -22,60 +25,41 @@ def __init__(self):
None,
"No API-Key for Censys given. Please add it to the cortex configuration.",
)
self.__fields = self.get_param(
'parameters.fields',
["updated_at", "ip"]
)
self.__max_records = self.get_param(
'parameters.max_records',
1000
)
self.__flatten = self.get_param(
'parameters.flatten',
True
)
self.__max_records = self.get_param("config.max_records", None, 10)

def search_hosts(self, ip):
"""
Searches for a host using its ipv4 address
:param ip: ipv4 address as string
:type ip: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return c.search("ip: " + ip, per_page=1, pages=1)()[0]
query = c.search("ip: " + ip, per_page=1, pages=1)
for result in query:
return result
return {}


def search_certificate(self, hash):
"""
Searches for a specific certificate using its hash
c = CensysCerts(api_id=self.__uid, api_secret=self.__api_key)

try:
result = c.view(hash)
return result
except Exception as e:
self.error(f"Error fetching certificate: {str(e)}")
return {}


:param hash: certificate hash
:type hash: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return c.search("certificate: " + hash, per_page=1, pages=1)()[0]

def search_website(self, dom):
"""
Searches for a website using the domainname
:param dom: domain
:type dom: str
:return: dict
"""
c = CensyshOSTs(api_id=self.__uid, api_secret=self.__api_key)
return c.search("dns.name: " + dom, per_page=1, pages=1)()[0]

def search_ipv4(self, search):
"""
Searches for hosts in IPv4 base
:param search:search as string
:type search: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return [x for x in c.search("ip: " + search, per_page=1, pages=1)()[0]]
query = c.search("dns.names: " + dom, per_page=self.__max_records, pages=1)
for result in query:
return result
return {}


def search_freetext(self, search):
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
results = c.search(search, fields=self.__fields, max_records=self.__max_records, flatten=self.__flatten)
return [result for result in results]


def run(self):
try:
Expand All @@ -91,10 +75,10 @@ def run(self):
self.report({
'website': self.search_website(self.get_data())
})
elif self.data_type == 'other':
self.report({
'matches': self.search_ipv4(self.get_data())
})
# elif self.data_type == 'other':
# self.report({
# 'matches': self.search_freetext(self.get_data())
# })
else:
self.error(
"Data type not supported. Please use this analyzer with data types hash, ip or domain."
Expand All @@ -108,59 +92,96 @@ def run(self):
except CensysRateLimitExceededException:
self.error("Rate limit exceeded.")

def artifacts(self, raw):
artifacts = []
ipv4s = list(iocextract.extract_ipv4s(str(raw)))
# ipv6s = list(iocextract.extract_ipv6s(str(raw)))
domains = list(iocextract.extract_urls(str(raw)))
hashes = list(iocextract.extract_hashes(str(raw)))

if ipv4s:
ipv4s = list(dict.fromkeys(ipv4s))
for i in ipv4s:
artifacts.append(self.build_artifact('ip', str(i)))

# if ipv6s:
# ipv6s = list(dict.fromkeys(ipv6s))
# for i in ipv6s:
# artifacts.append(self.build_artifact('ip', str(i)))

if hashes:
hashes = list(dict.fromkeys(hashes))
for j in hashes:
artifacts.append(self.build_artifact('hash', str(j)))

if domains:
domains = list(dict.fromkeys(domains))
for k in domains:
artifacts.append(self.build_artifact('url', str(k)))
return artifacts

def summary(self, raw):
taxonomies = []

if 'ip' in raw:
raw = raw['ip']
service_count = len(raw.get('services', []))
print(service_count)
heartbleed = raw.get('443', {}).get('https', {}).get('heartbleed', {}).get('heartbleed_vulnerable', False)
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
if heartbleed:
taxonomies.append(self.build_taxonomy('malicious', 'Censys', 'Heartbleed', 'vulnerable'))
for ip_info in raw['ip']:
ip_address = ip_info.get('ip', 'Unknown IP')
asn = ip_info.get('autonomous_system', {}).get('asn', 'Unknown ASN')
country = ip_info.get('location', {}).get('country', 'Unknown Country')
city = ip_info.get('location', {}).get('city', 'Unknown City')
os_product = ip_info.get('operating_system', {}).get('product', 'Unknown OS')
service_count = len(ip_info.get('services', []))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'IP', ip_address))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'ASN', asn))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'Country', country))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'City', city))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'OperatingSystem', os_product))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))

elif 'website' in raw:
raw = raw['website']
service_count = len(raw.get('tags', []))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'recordsFound', len(raw["website"])))
# for site in raw['website']:
# ip = site.get('ip', 'Unknown IP')
# asn = site.get('autonomous_system', {}).get('asn', 'Unknown ASN')
# country = site.get('location', {}).get('country', 'Unknown Country')
# service_count = len(site.get('services', []))
# #taxonomies.append(self.build_taxonomy('info', 'Censys', 'IP', ip))
# #taxonomies.append(self.build_taxonomy('info', 'Censys', 'ASN', asn))
# taxonomies.append(self.build_taxonomy('info', 'Censys', 'Country', country))
# taxonomies.append(self.build_taxonomy('info', 'Censys', 'Services', service_count))

elif 'cert' in raw:
raw = raw['cert']
trusted_count = len(raw.get('validation', []))
validator_count = len(raw.get('validation', []))

for _, validator in raw.get("validation", []).items():
if (
validator.get("blacklisted", False)
or validator.get("in_revocation_set", False)
or (
not validator.get("whitelisted", False)
and not validator.get("valid", False)
)
):
trusted_count -= 1
validator_keys = ["nss", "microsoft", "apple", "chrome"]
validator_count = 0
trusted_count = 0
for key in validator_keys:
validator = raw.get("validation", {}).get(key, {})
if validator.get("is_valid", False) and validator.get("has_trusted_path", False):
trusted_count += 1
validator_count += 1

if trusted_count < validator_count:
taxonomies.append(
self.build_taxonomy(
"suspicious",
"Censys",
"TrustedCount",
"{}/{}".format(trusted_count, validator_count),
f"{trusted_count}/{validator_count}",
)
)
else:
taxonomies.append(self.build_taxonomy('info', 'Censys', 'TrustedCount', '{}/{}'.format(
trusted_count, validator_count
)))

elif 'matches' in raw:
result_count = len(raw.get('matches', []))
taxonomies.append(self.build_taxonomy('info', 'Censys ipv4 search', 'results', result_count))

taxonomies.append(self.build_taxonomy('info', 'Censys', 'TrustedCount', f'{trusted_count}/{validator_count}'))

# elif 'matches' in raw:
# result_count = len(raw.get('matches', []))
# taxonomies.append(self.build_taxonomy('info', 'Censys ipv4 search', 'results', result_count))

return {
'taxonomies': taxonomies
}



if __name__ == "__main__":
CensysAnalyzer().run()
CensysAnalyzer().run()
3 changes: 2 additions & 1 deletion analyzers/Censys/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
cortexutils
censys==2.2.11
censys~=2.2
iocextract
Loading

0 comments on commit 04e1e90

Please sign in to comment.