Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mail incident status #921

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions responders/MailIncidentStatus/MailIncidentStatus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
{
"name": "MailIncidentStatus",
"version": "1.0",
"author": "Manuel Krucker",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Mail a detailed status information of an incident case. The mail is sent to recipients specified by tags prefixed with 'mail='. The responder respects tlp definitions. For tlp:amber mail addresse and for tlp:green mail domains must be pre-defined in the configuration. For tlp:red sending mails is denied. The responser also uses thehive4py to collect information about the status of the tasks of the incidents.",
"dataTypeList": ["thehive:case"],
"command": "MailIncidentStatus/mailincidentstatus.py",
"baseConfig": "MailIncidentStatus",
"configurationItems": [
{
"name": "from",
"description": "email address from which the mail is send",
"type": "string",
"multi": false,
"required": true
},
{
"name": "smtp_host",
"description": "SMTP server used to send mail",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "localhost"
},
{
"name": "smtp_port",
"description": "SMTP server port",
"type": "number",
"multi": false,
"required": true,
"defaultValue": 25
},
{
"name": "smtp_user",
"description": "SMTP server user",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "user"
},
{
"name": "smtp_pwd",
"description": "SMTP server password",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "pwd"
},
{
"name": "mail_subject_prefix",
"description": "Prefix of the mail subject",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "Incident Case Notification: "
},
{
"name": "mail_html_style_tag_content",
"description": "The css content of the style tag for the HTML mail body. Define table, th, hd, .first, and .second elements.",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "table { border: 1px solid black; border-collapse: collapse; text-align: left; vertical-align: top; th { border: 1px solid black; border-collapse: collapse; text-align: left;} td { border: 1px solid black; border-collapse: collapse; text-align: left;} .first { width: 150px; min-width: 150px; max-width: 150px; background-color: #ffe8d4; } .second { background-color: #d7d9f2;}"
},
{
"name": "tlp_amber_mail_addresses",
"description": "Mail addresses which are allowed to receive tlp:amber classified incidents",
"type": "string",
"multi": true,
"required": false
},
{
"name": "tlp_green_mail_domains",
"description": "Mail domains which are allowed to receive tlp:green classified incidents",
"type": "string",
"multi": true,
"required": false
},
{
"name": "thehive_url",
"description": "URL pointing to your TheHive installation, e.g. 'http://127.0.0.1:9000'",
"type": "string",
"multi": false,
"required": true
},
{
"name": "thehive_apikey",
"description": "TheHive API key which is used get tasks and other elements of the incident",
"type": "string",
"multi": false,
"required": true
}
]
}
272 changes: 272 additions & 0 deletions responders/MailIncidentStatus/mailincidentstatus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
#!/usr/bin/env python3
# encoding: utf-8

import ssl
import smtplib
import json
import datetime
from cortexutils.responder import Responder
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from thehive4py.api import TheHiveApi
import thehive4py.query
from thehive4py.models import Case, CaseObservable


class MailIncidentStatus(Responder):
def __init__(self):
Responder.__init__(self)
# Mail settings
self.smtp_host = self.get_param("config.smtp_host", "localhost")
self.smtp_port = self.get_param("config.smtp_port", "25")
self.mail_from = self.get_param(
"config.from", None, "Missing sender email address"
)
self.smtp_user = self.get_param("config.smtp_user", "user", None)
self.smtp_pwd = self.get_param("config.smtp_pwd", "pwd", None)
# TheHive4py settings
self.thehive_url = self.get_param('config.thehive_url', None, "TheHive URL missing!")
self.thehive_apikey = self.get_param('config.thehive_apikey', None, "TheHive API key missing!")
self.tlp_green_mail_domains = self.get_param("config.tlp_green_mail_domains",None,"Error reading tlp_green_mail_domains")
self.tlp_amber_mail_addresses = self.get_param("config.tlp_amber_mail_addresses",None,"Error reading tlp_amber_mail_addresses")

def run(self):
Responder.run(self)

# Validate Config
self.validate_Config()

# Check data_type
if not self.data_type == "thehive:case":
self.error("data type not type 'thehive:case'")

caseID = self.get_param("data.id",None,"case.id is missing")

# CREATE MAIL BODY
body = self.get_HTMLMailBody()

# GET RECIPIENTS
# Search recipient address in case tags
tags = self.get_param(
"data.tags", None, "recipient address not found in tags"
)
mail_addresses = [t[5:].strip('"') for t in tags if t.startswith("mail=") or t.startswith("mail:")]
if len(mail_addresses) == 0:
self.error("recipient address not found in tags")

# CHECK RECIPIENTS FOR CONFORMANCE WITH TLP
self.check_TLPConformance(mail_addresses)

# PREPARE MAIL

# SEND MAIL
message = ""
for mail_address in mail_addresses:
msg = MIMEMultipart()
subject = self.get_param("config.mail_subject_prefix","",None) + caseID + " " + self.get_param("data.title", None, "title is missing")
msg['Subject'] = subject
msg['From'] = self.mail_from
msg["Date"] = formatdate(localtime=True)

#msg.attach(MIMEText(body, "plain", "utf-8"))
msg.attach(MIMEText(body, "html", "utf-8"))
msg['To'] = mail_address

if self.smtp_user and self.smtp_pwd:
try:
context = ssl.create_default_context()
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_address)
except smtplib.SMTPNotSupportedError:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_address)
else:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg, self.mail_from, mail_address)

# SET RETURN MESSAGE
message += "message sent to " + mail_address + ";"
self.report({"message": message })


def validate_Config(self):
"""
The configuration contains mail domains and mail addresses. This is validated before.
"""
status = True
# Check mail domains
for domain in self.tlp_green_mail_domains:
# Just a simple basic step if a '.' is part of the string
if "." not in domain:
self.error(domain + " is no valid domain name. Please change configuration 'tlp_green_mail_domains'")
status = False
# Check mail addresses
for address in self.tlp_amber_mail_addresses:
# Just a simple basic step if an @ is part of the string
if "@" not in address:
self.error(address + " is no valid mail address. Please change configuration 'tlp_amber_mail_addresses'")
status = False
return status

def check_TLPConformance(self,mail_addresses):
"""
The TLP should be respected when sending the incident status. The following rules are applied:
* TLP: Red : Sending mails not allowd -> Error is returned
* TLP: Amber: Check if mail address is listed in configuration item 'tlp_amber_mail_domains'
* TLP: Green: Check if mail domain is listed in configuration item 'tlp_green_mail_domains'
* TLP: White: No checks applied, every recipient receives an email
"""
tlp = self.get_param("data.tlp",None,"Reading data.tlp failed.")
if tlp == 0:
# tlp:white
pass
elif tlp == 1:
# tlp:green
domains = list(map(lambda x: x.split('@')[1],mail_addresses))
for domain in domains:
if domain not in self.tlp_green_mail_domains:
self.error("No mails sent. The domain '" + domain + "'is not listed in the configuration. Add the domain to the configuration or remove the mail addresses with this domains from the incident case tags.\n\nCurrent tlp_green_mail_domains config:\n" + ",".join(self.tlp_green_mail_domains))
elif tlp == 2:
# tlp:amber
for mail_address in mail_addresses:
if mail_address not in self.tlp_amber_mail_addresses:
self.error("No mails sent. The mail address '" + mail_address + "' is not listed in the configuration. Add the address to the configuration or remove the mail address from the incident case tags.\n\nCurrent tlp_amber_mail_addresses config:\n" + ",".join(self.tlp_amber_mail_addresses))
elif tlp == 3:
self.error("The incident has the TLP value 'tlp:red'. Sending mails is not allowed for this tlp classifcation.")
else:
self.error("TLP is an undefined value.")

def get_HTMLMailBody(self):
body = ""
caseID = self.get_param("data.id",None,"case.id is missing")
case_row = ('CaseID',caseID)
title_row = ('Title',self.get_param("data.title"))
severity_row = ('Severity',self.get_HTMLSeverityString(self.get_param("data.severity")))
tlp_row = ('TLP',str(self.get_param("data.tlp",None,"Reading data.tlp failed.")))
status_row = ('Status',self.get_param("data.status"))
description_row = ('Description',self.get_param("data.description"))
## Create tasks summary
tasks_row = ('Task Summary',self.get_HTMLCaseTaskSummary(caseID))
## Time and dates
time = self.get_param("data.startDate",)
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime('%m/%d/%Y %H:%M')
startDate_row = ('StartDate',date_str)
time = self.get_param("data.createdAt")
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime('%m/%d/%Y %H:%M')
createdAt_row = ('createdAt',date_str)
createdBy_row = ('createdBy',self.get_param("data.createdBy"))
time = self.get_param("data.updatedAt")
if time:
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime('%m/%d/%Y %H:%M')
else:
date_str = "Unknown"
updatedAt_row = ('updatedAt',date_str)
updatedBy_row = ('updatedBy',self.get_param("data.updatedBy"))
table_rows = [case_row,title_row,severity_row,tlp_row,status_row,description_row,tasks_row,startDate_row,createdAt_row,createdBy_row,updatedAt_row,updatedBy_row]

## Custom fields
cust_fields = self.get_param("data.customFields",None,"Error loading customFields")
cust_field_rows = []
for item in sorted(cust_fields):
# value of item is dictionary with one element
# sample: "scope-accounts-compromised":{"string":"makr"}
cust_value_type = next(iter(cust_fields.get(item)))
if cust_value_type == "date":
date_int = (cust_fields.get(item)).get(cust_value_type)
if date_int:
date_str = (datetime.datetime.fromtimestamp(date_int / 1e3)).strftime('%m/%d/%Y %H:%M')
else:
date_str = "Date not set"
cust_value_str = date_str
else:
cust_value_str = str((cust_fields.get(item)).get(cust_value_type))
cust_field_rows.append((item,cust_value_str))

table_rows.extend(cust_field_rows)
body = self.create_HTMLTable(table_rows)
return body

def get_HTMLSeverityString(self,severity):
if severity == 1:
return "<p style=\"color:DeepSkyBlue\">Low</p>"
elif severity == 2:
return "<p style=\"color:Orange\">Medium</p>"
elif severity == 3:
return "<p style=\"color:Red\">High</p>"
elif severity == 4:
return "<p style=\"color:DarkRed\">Critical</p>"
else:
return "Severtiy mapping failed"

def create_HTMLTable(self,two_tuple_list):
"""
Create a HTML tabel out of a list of string tuples. In the frist colum the first element of the tuple is representated, in the second column the second element of the tuple is present.
"""

html = "<!DOCTYPE html>\n"
html += "<html>\n"
html += "<head>\n"
# meta definitions
html += "<meta charset=\"UTF-8\"/><meta http-equiv=\"Content-Type\" content=\"text/html\"; charset=\"utf-8\"/>\n"
# styles
html += "<style>" + self.get_param('config.mail_html_style_tag_content',None,"Error loading config 'config.mail_html_style_tag_content'") + "</style>\n"
#html += self.get_HTMLStyle()
html += "</head>\n\n"
html += "<body>\n"
html += "<table width=\"100%\" cellpadding=\"0\" cellspacing=\"0\" border=\"0\" bgcolor=\"#FFFFFF\" align=\"center\">\n"
html += "<colgroup><col class=\"first\"/><col class=\"second\"/></colgroup>\n"
html += "<tr><td colspan=\"2\">Incident Status Report</td></tr>\n"
for i in two_tuple_list:
html += "<tr>"
html += "<td align=\"left\">"
html += i[0]
html += "</td>"
html += "<td align=\"left\">"
html += i[1]
html += "</td>"
html += "</tr>\n"
html += "</table>\n"
html += "</body>\n</html>\n"
# return the HTML code
return html

def get_HTMLCaseTaskSummary(self,caseID):
"""
Get all tasks of a given incident, and calculate statistics of the task. Return them as HTML string.
"""
# get case tasks by th4py
api = TheHiveApi(self.thehive_url, self.thehive_apikey)
response = api.get_case_tasks(caseID)
tasks = json.dumps(response.json(),indent=4,sort_keys=True)
# create statistics
t_total = 0
t_compl = 0
t_inpro = 0
t_waiti = 0
t_cance = 0

summary = tasks
for t in response.json():
t_total += 1
if t["status"] == "Completed":
t_compl += 1
if t["status"] == "InProgress":
t_inpro += 1
if t["status"] == "Waiting":
t_waiti += 1
if t["status"] == "Cancel":
t_cance +=1
# in progress
ratio = "/" + str(t_total)
summary = "Completed: " + str(t_compl) + ratio + "<br/>InProgress: " + str(t_inpro) + ratio + "<br/>Waiting: " + str(t_waiti) + ratio + "<br/>Canceled: " + str(t_cance) + ratio
return summary

if __name__ == "__main__":
MailIncidentStatus().run()
3 changes: 3 additions & 0 deletions responders/MailIncidentStatus/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
smtplib
cortexutils
thehive4py