Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Censys Analyzer for Censys API Version 2 #1288

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions analyzers/Censys/Censys.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"name": "Censys",
"author": "Nils Kuhnert, CERT-Bund",
"author": "Nils Kuhnert, CERT-Bund; nusantara-self",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/censys-analyzer",
"version": "1.0",
"version": "2.0",
"description": "Check IPs, certificate hashes or domains against censys.io.",
"dataTypeList": ["ip", "hash", "domain", "other"],
"dataTypeList": ["ip", "hash", "domain"],
"baseConfig": "Censys",
"command": "Censys/censys_analyzer.py",
"configurationItems": [
{
"name": "uid",
"name": "uid",
"description": "UID for Censys",
"type": "string",
"multi": false,
Expand All @@ -22,6 +22,14 @@
"type": "string",
"multi": false,
"required": true
},
{
"name": "max_records",
"description": "Maximum number of records for domains",
"type": "number",
"multi": false,
"required": true,
"defaultvalue": 10
}
],
"registration_required": true,
Expand Down
Binary file modified analyzers/Censys/assets/censys.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified analyzers/Censys/assets/long_report.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
187 changes: 104 additions & 83 deletions analyzers/Censys/censys_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from censys.search import CensysHosts
from censys.search import CensysHosts, CensysCerts

from censys.common.exceptions import (
CensysNotFoundException,
CensysRateLimitExceededException,
CensysUnauthorizedException,
)

import iocextract


class CensysAnalyzer(Analyzer):
def __init__(self):
Expand All @@ -22,60 +25,41 @@ def __init__(self):
None,
"No API-Key for Censys given. Please add it to the cortex configuration.",
)
self.__fields = self.get_param(
'parameters.fields',
["updated_at", "ip"]
)
self.__max_records = self.get_param(
'parameters.max_records',
1000
)
self.__flatten = self.get_param(
'parameters.flatten',
True
)
self.__max_records = self.get_param("config.max_records", None, 10)

def search_hosts(self, ip):
"""
Searches for a host using its ipv4 address

:param ip: ipv4 address as string
:type ip: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return c.search("ip: " + ip, per_page=1, pages=1)()[0]
query = c.search("ip: " + ip, per_page=1, pages=1)
for result in query:
return result
return {}


def search_certificate(self, hash):
"""
Searches for a specific certificate using its hash
c = CensysCerts(api_id=self.__uid, api_secret=self.__api_key)

try:
result = c.view(hash)
return result
except Exception as e:
self.error(f"Error fetching certificate: {str(e)}")
return {}


:param hash: certificate hash
:type hash: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return c.search("certificate: " + hash, per_page=1, pages=1)()[0]

def search_website(self, dom):
"""
Searches for a website using the domainname
:param dom: domain
:type dom: str
:return: dict
"""
c = CensyshOSTs(api_id=self.__uid, api_secret=self.__api_key)
return c.search("dns.name: " + dom, per_page=1, pages=1)()[0]

def search_ipv4(self, search):
"""
Searches for hosts in IPv4 base
:param search:search as string
:type search: str
:return: dict
"""
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
return [x for x in c.search("ip: " + search, per_page=1, pages=1)()[0]]
query = c.search("dns.names: " + dom, per_page=self.__max_records, pages=1)
for result in query:
return result
return {}


def search_freetext(self, search):
c = CensysHosts(api_id=self.__uid, api_secret=self.__api_key)
results = c.search(search, fields=self.__fields, max_records=self.__max_records, flatten=self.__flatten)
return [result for result in results]


def run(self):
try:
Expand All @@ -91,10 +75,10 @@ def run(self):
self.report({
'website': self.search_website(self.get_data())
})
elif self.data_type == 'other':
self.report({
'matches': self.search_ipv4(self.get_data())
})
# elif self.data_type == 'other':
# self.report({
# 'matches': self.search_freetext(self.get_data())
# })
else:
self.error(
"Data type not supported. Please use this analyzer with data types hash, ip or domain."
Expand All @@ -108,59 +92,96 @@ def run(self):
except CensysRateLimitExceededException:
self.error("Rate limit exceeded.")

def artifacts(self, raw):
artifacts = []
ipv4s = list(iocextract.extract_ipv4s(str(raw)))
# ipv6s = list(iocextract.extract_ipv6s(str(raw)))
domains = list(iocextract.extract_urls(str(raw)))
hashes = list(iocextract.extract_hashes(str(raw)))

if ipv4s:
ipv4s = list(dict.fromkeys(ipv4s))
for i in ipv4s:
artifacts.append(self.build_artifact('ip', str(i)))

# if ipv6s:
# ipv6s = list(dict.fromkeys(ipv6s))
# for i in ipv6s:
# artifacts.append(self.build_artifact('ip', str(i)))

if hashes:
hashes = list(dict.fromkeys(hashes))
for j in hashes:
artifacts.append(self.build_artifact('hash', str(j)))

if domains:
domains = list(dict.fromkeys(domains))
for k in domains:
artifacts.append(self.build_artifact('url', str(k)))
return artifacts

def summary(self, raw):
taxonomies = []

if 'ip' in raw:
raw = raw['ip']
service_count = len(raw.get('services', []))
print(service_count)
heartbleed = raw.get('443', {}).get('https', {}).get('heartbleed', {}).get('heartbleed_vulnerable', False)
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
if heartbleed:
taxonomies.append(self.build_taxonomy('malicious', 'Censys', 'Heartbleed', 'vulnerable'))
for ip_info in raw['ip']:
ip_address = ip_info.get('ip', 'Unknown IP')
asn = ip_info.get('autonomous_system', {}).get('asn', 'Unknown ASN')
country = ip_info.get('location', {}).get('country', 'Unknown Country')
city = ip_info.get('location', {}).get('city', 'Unknown City')
os_product = ip_info.get('operating_system', {}).get('product', 'Unknown OS')
service_count = len(ip_info.get('services', []))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'IP', ip_address))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'ASN', asn))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'Country', country))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'City', city))
#taxonomies.append(self.build_taxonomy('info', 'Censys', 'OperatingSystem', os_product))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))

elif 'website' in raw:
raw = raw['website']
service_count = len(raw.get('tags', []))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'recordsFound', len(raw["website"])))
# for site in raw['website']:
# ip = site.get('ip', 'Unknown IP')
# asn = site.get('autonomous_system', {}).get('asn', 'Unknown ASN')
# country = site.get('location', {}).get('country', 'Unknown Country')
# service_count = len(site.get('services', []))
# #taxonomies.append(self.build_taxonomy('info', 'Censys', 'IP', ip))
# #taxonomies.append(self.build_taxonomy('info', 'Censys', 'ASN', asn))
# taxonomies.append(self.build_taxonomy('info', 'Censys', 'Country', country))
# taxonomies.append(self.build_taxonomy('info', 'Censys', 'Services', service_count))

elif 'cert' in raw:
raw = raw['cert']
trusted_count = len(raw.get('validation', []))
validator_count = len(raw.get('validation', []))

for _, validator in raw.get("validation", []).items():
if (
validator.get("blacklisted", False)
or validator.get("in_revocation_set", False)
or (
not validator.get("whitelisted", False)
and not validator.get("valid", False)
)
):
trusted_count -= 1
validator_keys = ["nss", "microsoft", "apple", "chrome"]
validator_count = 0
trusted_count = 0
for key in validator_keys:
validator = raw.get("validation", {}).get(key, {})
if validator.get("is_valid", False) and validator.get("has_trusted_path", False):
trusted_count += 1
validator_count += 1

if trusted_count < validator_count:
taxonomies.append(
self.build_taxonomy(
"suspicious",
"Censys",
"TrustedCount",
"{}/{}".format(trusted_count, validator_count),
f"{trusted_count}/{validator_count}",
)
)
else:
taxonomies.append(self.build_taxonomy('info', 'Censys', 'TrustedCount', '{}/{}'.format(
trusted_count, validator_count
)))

elif 'matches' in raw:
result_count = len(raw.get('matches', []))
taxonomies.append(self.build_taxonomy('info', 'Censys ipv4 search', 'results', result_count))

taxonomies.append(self.build_taxonomy('info', 'Censys', 'TrustedCount', f'{trusted_count}/{validator_count}'))

# elif 'matches' in raw:
# result_count = len(raw.get('matches', []))
# taxonomies.append(self.build_taxonomy('info', 'Censys ipv4 search', 'results', result_count))

return {
'taxonomies': taxonomies
}



if __name__ == "__main__":
CensysAnalyzer().run()
CensysAnalyzer().run()
3 changes: 2 additions & 1 deletion analyzers/Censys/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
cortexutils
censys==2.2.11
censys~=2.2
iocextract
Loading