Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev elasticsearch6 #7

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 51 additions & 80 deletions docTestssl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/python3
# Import testssl.sh CSV to ELasticSearch

from elasticsearch_dsl import DocType, Object, Date, String, Integer, Short, Boolean
from elasticsearch_dsl import DocType, Object, Date, Keyword, Integer, Short, Boolean
from datetime import datetime
from tzlocal import get_localzone
import csv
Expand All @@ -12,7 +12,7 @@

tz = get_localzone()
reDefaultFilename = re.compile("(?:^|/)(?P<ip>\d+\.\d+\.\d+\.\d+)(:(?P<port>\d+))?-(?P<datetime>\d{8}-\d{4})\.csv$")
reProtocol = re.compile("^(?:sslv\\d|tls\\d(?:_\\d)?)$")
reProtocol = re.compile("^(?:SSLv\\d|TLS\\d(?:_\\d)?)$")
reCipherTests = re.compile("^std_(.*)$")
reIpHostColumn = re.compile("^(.*)/(.*)$")
reCipherColumnName = re.compile("^cipher_")
Expand All @@ -38,47 +38,45 @@
reVulnerable = re.compile("\\(NOT ok\\)", re.IGNORECASE)

class DocTestSSLResult(DocType):
class Meta:
doc_type = "TestSSLResult"

source = String(fields={'raw': String(index='not_analyzed')})
source = Keyword(fields={'raw': Keyword()})
result = Boolean()
timestamp = Date()
ip = String(index='not_analyzed')
hostname = String(index='not_analyzed')
ip = Keyword()
hostname = Keyword()
port = Integer()
svcid = String(index='not_analyzed')
protocols = String(index='not_analyzed', multi=True)
ciphers = String(multi=True, fields={'raw': String(index='not_analyzed')})
ciphertests = String(index='not_analyzed', multi=True)
svcid = Keyword()
protocols = Keyword(multi=True)
ciphers = Keyword(multi=True, fields={'raw': Keyword()})
ciphertests = Keyword(multi=True)
serverpref = Object(
properties = {
"cipher_order": Boolean(),
"protocol": String(index='not_analyzed'),
"cipher": String(fields={'raw': String(index='not_analyzed')})
"protocol": Keyword(),
"cipher": Keyword(fields={'raw': Keyword()})
})
cert = Object(
properties = {
"keysize": Short(),
"signalgo": String(fields={'raw': String(index='not_analyzed')}),
"md5_fingerprint": String(index='not_analyzed'),
"sha1_fingerprint": String(index='not_analyzed'),
"sha256_fingerprint": String(index='not_analyzed'),
"cn": String(fields={'raw': String(index='not_analyzed')}),
"san": String(multi=True, fields={'raw': String(index='not_analyzed')}),
"issuer": String(fields={'raw': String(index='not_analyzed')}),
"keysize": Keyword(),
"signalgo": Keyword(fields={'raw': Keyword()}),
"md5_fingerprint": Keyword(),
"sha1_fingerprint": Keyword(),
"sha256_fingerprint": Keyword(),
"cn": Keyword(fields={'raw': Keyword()}),
"san": Keyword(multi=True, fields={'raw': Keyword()}),
"issuer": Keyword(fields={'raw': Keyword()}),
"ev": Boolean(),
"expiration": Date(),
"ocsp_uri": String(fields={'raw': String(index='not_analyzed')}),
"expiration": Keyword(fields={'raw': Keyword()}),
"ocsp_uri": Keyword(fields={'raw': Keyword()}),
"ocsp_stapling": Boolean(),
})
vulnerabilities = String(index='not_analyzed', multi=True)
vulnerabilities = Keyword(multi=True)

def parseCSVLine(self, line):
if line['id'] == "id":
return
if not self.ip or not self.hostname or not self.port: # host, ip and port
m = reIpHostColumn.search(line['host'])
m = reIpHostColumn.search(line['fqdn/ip'])
if m:
self.hostname, self.ip = m.groups()
self.port = int(line['port'])
Expand All @@ -98,61 +96,34 @@ def parseCSVLine(self, line):
self.ciphertests.append(m.group(1))
elif line['id'] == "order": # server prefers cipher
self.serverpref.cipher_order = bool(reOk.search(line['finding']))
elif line['id'] == "order_proto": # preferred protocol
m = reDefaultProtocol.search(line['finding'])
if m:
self.serverpref.protocol = m.group(1)
elif line['id'] == "order_cipher": # preferred cipher
m = reDefaultCipher.search(line['finding'])
if m:
self.serverpref.cipher = m.group(1)
elif line['id'] == "key_size": # certificate key size
m = reKeySize.search(line['finding'])
if m:
self.cert.keysize = int(m.group(1))
elif line['id'] == "algorithm": # certificate sign algorithm
m = reSignAlgorithm.search(line['finding'])
if m:
self.cert.signalgo = m.group(1)
elif line['id'] == "fingerprint": # certificate fingerprints
m = reFPMD5.search(line['finding'])
if m:
self.cert.md5_fingerprint = m.group(1)
m = reFPSHA1.search(line['finding'])
if m:
self.cert.sha1_fingerprint = m.group(1)
m = reFPSHA256.search(line['finding'])
if m:
self.cert.sha256_fingerprint = m.group(1)
elif line['id'] == "cn": # certificate CN
m = reCN.search(line['finding'])
if m:
self.cert.cn = m.group(1)
elif line['id'] == "san": # certificate SAN
m = reSAN.search(line['finding'])
if m:
sans = m.group(1)
for san in sans.split(" "):
if san != "--":
self.cert.san.append(san)
elif line['id'] == "issuer": # certificate issuer
m = reIssuer.search(line['finding'])
if m:
self.cert.issuer = m.group(1)
elif line['id'] == "ev": # certificate extended validation
elif line['id'] == "protocol_negotiated": # preferred protocol
self.serverpref.protocol = line['finding']
elif line['id'] == "cipher_negotiated": # preferred cipher
self.serverpref.cipher = line['finding']
elif line['id'] == "cert_keySize": # certificate key size
self.cert.keysize = line['finding']
elif line['id'] == "cert_signatureAlgorithm": # certificate sign algorithm
self.cert.signalgo = line['finding']
elif line['id'] == "cert_fingerprintSHA1": # certificate fingerprints
self.cert.sha1_fingerprint = line['finding']
elif line['id'] == "cert_fingerprintSHA256":
self.cert.sha256_fingerprint = line['finding']
elif line['id'] == "cert_commonName": # certificate CN
self.cert.cn = line['finding']
elif line['id'] == "cert_subjectAltName": # certificate SAN
sans = line['finding']
for san in sans.split(" "):
if san != "--":
self.cert.san.append(san)
elif line['id'] == "cert_caIssuers": # certificate issuer
self.cert.issuer = line['finding']
elif line['id'] == "cert_certificatePolicies_EV": # certificate extended validation
self.cert.ev = bool(reYes.search(line['finding']))
elif line['id'] == "expiration": # certificate expiration
m = reExpiration.search(line['finding'])
if m:
unparsedDate = m.group(1)
self.cert.expiration = datetime.strptime(unparsedDate, "%Y-%m-%d %H:%M %z")
elif line['id'] == "ocsp_uri": # certificate OCSP URI
m = reOCSPURI.search(line['finding'])
if m:
self.cert.ocsp_uri = m.group(1)
else:
self.cert.ocsp_uri = "-"
elif line['id'] == "ocsp_stapling": # certificate OCSP stapling
elif line['id'] == "cert_expiration_status": # certificate expiration
self.cert.expiration = line['finding']
elif line['id'] == "cert_ocspURL": # certificate OCSP URI
self.cert.ocsp_uri = line['finding']
elif line['id'] == "OCSP_stapling": # certificate OCSP stapling
self.cert.ocsp_stapling = not bool(reNotOffered.search(line['finding']))
elif line['id'] in ("heartbleed", "ccs", "secure_renego", "sec_client_renego", "crime", "breach", "poodle_ssl", "fallback_scsv", "freak", "DROWN", "logjam", "beast", "rc4") and reVulnerable.search(line['finding']):
self.vulnerabilities.append(line['id'].upper())
Expand All @@ -164,7 +135,7 @@ def parseCSV(self, csvfile):
self.ip = m.group('ip')
self.port = int(m.group('port') or 0)
self.timestamp = datetime.strptime(m.group('datetime'), "%Y%m%d-%H%M")
csvReader = csv.DictReader(csvfile, fieldnames=("id", "host", "port", "severity", "finding"), delimiter=',', quotechar='"')
csvReader = csv.DictReader(csvfile, fieldnames=("id", "fqdn/ip", "port", "severity", "finding", "cve", "cwe"), delimiter=',', quotechar='"')
for line in csvReader:
self.parseCSVLine(line)

Expand Down
3 changes: 2 additions & 1 deletion import_testssl.sh_csv_to_ES.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
argparser = argparse.ArgumentParser(description="Import testssl.sh CSV logs into ElasticSearch")
argparser.add_argument("--elasticsearch", "-e", nargs="*", default="localhost", help="ElasticSearch host (default: %(default)s)")
argparser.add_argument("--index", "-i", default="testssl-scan", help="ElasticSearch index (default: %(default)s)")
argparser.add_argument("--ca_cert", "-c", help="ElasticSearch CA certificate")
argparser.add_argument("files", nargs="+", help="List of testssl.sh logs in CSV format")
args = argparser.parse_args()

connections.create_connection(hosts=args.elasticsearch)
connections.create_connection(hosts=args.elasticsearch,ca_certs=args.ca_cert)
idx = Index(args.index)
idx.doc_type(DocTestSSLResult)
DocTestSSLResult.init()
Expand Down