Skip to content

Commit

Permalink
#14 Fix case merging in backend
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Nov 17, 2016
1 parent 87cda03 commit 4e3ff6a
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 81 deletions.
6 changes: 4 additions & 2 deletions thehive-backend/app/models/Case.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import services.AuditedModel

object CaseStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Ephemeral, Open, FalsePositive, TruePositive, Resolved, Deleted = Value
val Open, Resolved, Deleted = Value
}

object CaseResolutionStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Indeterminate, FalsePositive, TruePositive, Other = Value
val Indeterminate, FalsePositive, TruePositive, Other, Duplicated = Value
}

object CaseImpactStatus extends Enumeration with HiveEnumeration {
Expand All @@ -50,6 +50,8 @@ trait CaseAttributes { _: AttributeDef =>
val resolutionStatus = optionalAttribute("resolutionStatus", F.enumFmt(CaseResolutionStatus), "Resolution status of the case")
val impactStatus = optionalAttribute("impactStatus", F.enumFmt(CaseImpactStatus), "Impact status of the case")
val summary = optionalAttribute("summary", F.textFmt, "Summary of the case, to be provided when closing a case")
val mergeInto = optionalAttribute("mergeInto",F.stringFmt, "Id of the case created by the merge")
val mergeFrom = multiAttribute("mergeFrom",F.stringFmt, "Id of the cases merged")
}

@Singleton
Expand Down
2 changes: 1 addition & 1 deletion thehive-backend/app/models/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait JobAttributes { _: AttributeDef =>
val analyzerId = attribute("analyzerId", F.stringFmt, "Analyzer", O.readonly)
val status = attribute("status", F.enumFmt(JobStatus), "Status of the job", JobStatus.InProgress)
val artifactId = attribute("artifactId", F.stringFmt, "Original artifact on which this job was executed", O.readonly)
val startDate = attribute("startDate", F.dateFmt, "Timestamp of the job start", O.model)
val startDate = attribute("startDate", F.dateFmt, "Timestamp of the job start") // , O.model)
val endDate = optionalAttribute("endDate", F.dateFmt, "Timestamp of the job completion (or fail)")
val report = optionalAttribute("report", F.textFmt, "Analysis result", O.unaudited)

Expand Down
208 changes: 130 additions & 78 deletions thehive-backend/app/services/CaseMergeSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import org.elastic4play.services.JsonFormat.log
import org.elastic4play.services.QueryDSL

import models.{ Artifact, ArtifactStatus, Case, CaseImpactStatus, CaseResolutionStatus, CaseStatus, JobStatus, Task }
import play.api.Logger
import scala.util.Success
import scala.util.Failure
import models.TaskStatus
import models.LogStatus

@Singleton
class CaseMergeSrv @Inject() (caseSrv: CaseSrv,
Expand All @@ -32,9 +37,20 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {

lazy val logger = Logger(getClass)

import QueryDSL._
private[services] def concat[E](entities: Seq[E], sep: String, getId: E => Long, getStr: E => String) = {
JsString(entities.map(e => s"#${getId(e)}:${getStr(e)}").mkString(sep))
private[services] def concat[E](entities: Seq[E], sep: String, getId: E Long, getStr: E String) = {
JsString(entities.map(e s"#${getId(e)}:${getStr(e)}").mkString(sep))
}

private[services] def concatCaseDescription(cases: Seq[Case]) = {
val str = cases
.map { caze
s"#### ${caze.title()} ([#${caze.caseId()}](#/case/${caze.id}/details))\n\n${caze.description()}"
}
.mkString("\n \n")
JsString(str)
}

private[services] def firstDate(dates: Seq[Date]) = Json.toJson(dates.min)
Expand All @@ -43,39 +59,39 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,
val resolutionStatus = cases
.map(_.resolutionStatus())
.reduce[Option[CaseResolutionStatus.Type]] {
case (None, s) => s
case (s, None) => s
case (Some(CaseResolutionStatus.Other), s) => s
case (s, Some(CaseResolutionStatus.Other)) => s
case (Some(CaseResolutionStatus.FalsePositive), s) => s
case (s, Some(CaseResolutionStatus.FalsePositive)) => s
case (Some(CaseResolutionStatus.Indeterminate), s) => s
case (s, Some(CaseResolutionStatus.Indeterminate)) => s
case (s, _) => s //TruePositive
case (None, s) s
case (s, None) s
case (Some(CaseResolutionStatus.Other), s) s
case (s, Some(CaseResolutionStatus.Other)) s
case (Some(CaseResolutionStatus.FalsePositive), s) s
case (s, Some(CaseResolutionStatus.FalsePositive)) s
case (Some(CaseResolutionStatus.Indeterminate), s) s
case (s, Some(CaseResolutionStatus.Indeterminate)) s
case (s, _) s //TruePositive
}
resolutionStatus.map(s => JsString(s.toString))
resolutionStatus.map(s JsString(s.toString))
}

private[services] def mergeImpactStatus(cases: Seq[Case]) = {
val impactStatus = cases
.map(_.impactStatus())
.reduce[Option[CaseImpactStatus.Type]] {
case (None, s) => s
case (s, None) => s
case (Some(CaseImpactStatus.NotApplicable), s) => s
case (s, Some(CaseImpactStatus.NotApplicable)) => s
case (Some(CaseImpactStatus.NoImpact), s) => s
case (s, Some(CaseImpactStatus.NoImpact)) => s
case (s, _) => s // WithImpact
case (None, s) s
case (s, None) s
case (Some(CaseImpactStatus.NotApplicable), s) s
case (s, Some(CaseImpactStatus.NotApplicable)) s
case (Some(CaseImpactStatus.NoImpact), s) s
case (s, Some(CaseImpactStatus.NoImpact)) s
case (s, _) s // WithImpact
}
impactStatus.map(s => JsString(s.toString))
impactStatus.map(s JsString(s.toString))
}

private[services] def mergeSummary(cases: Seq[Case]) = {
val summary = cases
.flatMap(c => c.summary().map(_ -> c.caseId()))
.flatMap(c c.summary().map(_ -> c.caseId()))
.map {
case (summary, caseId) => s"#$caseId:$summary"
case (summary, caseId) s"#$caseId:$summary"
}
if (summary.isEmpty)
None
Expand All @@ -85,13 +101,13 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,

private[services] def mergeMetrics(cases: Seq[Case]): JsObject = {
val metrics = for {
caze <- cases
metrics <- caze.metrics()
metricsObject <- metrics.asOpt[JsObject]
caze cases
metrics caze.metrics()
metricsObject metrics.asOpt[JsObject]
} yield metricsObject

val mergedMetrics: Seq[(String, JsValue)] = metrics.flatMap(_.keys).distinct.map { key =>
val metricValues = metrics.flatMap(m => (m \ key).asOpt[BigDecimal])
val mergedMetrics: Seq[(String, JsValue)] = metrics.flatMap(_.keys).distinct.map { key
val metricValues = metrics.flatMap(m (m \ key).asOpt[BigDecimal])
if (metricValues.size != 1)
key -> JsNull
else
Expand All @@ -105,24 +121,30 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,

private[services] def mergeLogs(oldTask: Task, newTask: Task)(implicit authContext: AuthContext): Future[Done] = {
logSrv.find("_parent" ~= oldTask.id, Some("all"), Nil)._1
.mapAsyncUnordered(5) { log =>
.mapAsyncUnordered(5) { log
logSrv.create(newTask, baseFields(log))
}
.runWith(Sink.ignore)
}

private[services] def mergeTasksAndLogs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext): Future[Done] = {
taskSrv.find(or(cases.map("_parent" ~= _.id)), Some("all"), Nil)._1
.mapAsyncUnordered(5) { task =>
taskSrv.create(newCase, baseFields(task)).map(task -> _)
}
val (tasks, futureTaskCount) = taskSrv.find(and(parent("case", withId(cases.map(_.id): _*)), "status" ~!= TaskStatus.Cancel), Some("all"), Nil)
futureTaskCount.foreach(count logger.info(s"Creating $count task(s):"))
tasks
.mapAsyncUnordered(5) { task taskSrv.create(newCase, baseFields(task)).map(task -> _) }
.flatMapConcat {
case (oldTask, newTask) =>
logSrv.find("_parent" ~= oldTask.id, Some("all"), Nil)._1
.map(_ -> newTask)
case (oldTask, newTask)
logger.info(s"\ttask : ${oldTask.id} -> ${newTask.id} : ${newTask.title()}")
val (logs, futureLogCount) = logSrv.find(and(parent("case_task", withId(oldTask.id)), "status" ~!= LogStatus.Deleted), Some("all"), Nil)
futureLogCount.foreach { count logger.info(s"Creating $count log(s) in task ${newTask.id}") }
logs.map(_ -> newTask)
}
.mapAsyncUnordered(5) {
case (log, task) => logSrv.create(task, baseFields(log))
case (log, task)
val fields = log.attachment().fold(baseFields(log)) { a
baseFields(log).set("attachment", AttachmentInputValue(a.name, a.hashes, a.size, a.contentType, a.id))
}
logSrv.create(task, fields)
}
.runWith(Sink.ignore)
}
Expand All @@ -131,71 +153,86 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,
val status = artifacts
.map(_.status())
.reduce[ArtifactStatus.Type] {
case (ArtifactStatus.Deleted, s) => s
case (s, _) => s
case (ArtifactStatus.Deleted, s) s
case (s, _) s
}
.toString
JsString(status)
}

private[services] def mergeJobs(newArtifact: Artifact, artifacts: Seq[Artifact])(implicit authContext: AuthContext): Future[Done] = {
jobSrv.find(and(or(artifacts.map("_parent" ~= _.id)), "status" ~= JobStatus.Success), Some("all"), Nil)._1
.mapAsyncUnordered(5) { job =>
jobSrv.find(and(parent("case_artifact", withId(artifacts.map(_.id): _*)), "status" ~= JobStatus.Success), Some("all"), Nil)._1
.mapAsyncUnordered(5) { job
jobSrv.create(newArtifact, baseFields(job))
}
.runWith(Sink.ignore)
}

private[services] def mergeArtifactsAndJobs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext): Future[Done] = {
val caseMap = cases.map(c => c.id -> c).toMap
val caseFilter = or(cases.map("_parent" ~= _.id))
val caseMap = cases.map(c c.id -> c).toMap
val caseFilter = and(parent("case", withId(cases.map(_.id): _*)), "status" ~= "Ok")
// Find artifacts hold by cases
artifactSrv.find(caseFilter, Some("all"), Nil)._1
.map { artifact =>
val (artifacts, futureArtifactCount) = artifactSrv.find(caseFilter, Some("all"), Nil)
futureArtifactCount.foreach { count log.info(s"Found $count artifact(s) in merging cases") }
artifacts
.mapAsyncUnordered(5) { artifact
// For each artifact find similar artifacts
val dataFilter = artifact.data().map("data" ~= _) orElse artifact.attachment().map("attachment.id" ~= _.id)
val filter = and(caseFilter,
"status" ~= "Ok",
"dataType" ~= artifact.dataType(),
dataFilter.get)
artifactSrv.find(filter, Some("all"), Nil)._1
.runWith(Sink.seq)
.flatMap { sameArtifacts =>
// Same artifacts are merged
val firstArtifact = sameArtifacts.head
val fields = firstArtifact.attachment().fold(Fields.empty) { a =>
Fields.empty.set("attachment", AttachmentInputValue(a.name, a.hashes, a.size, a.contentType, a.id))
}
.set("data", firstArtifact.data().map(JsString))
.set("dataType", firstArtifact.dataType())
.set("message", concat[Artifact](sameArtifacts, "\n \n", a => caseMap(a.parentId.get).caseId(), _.message()))
.set("startDate", firstDate(sameArtifacts.map(_.startDate())))
.set("tlp", JsNumber(sameArtifacts.map(_.tlp()).min))
.set("tags", JsArray(sameArtifacts.flatMap(_.tags()).map(JsString)))
.set("ioc", JsBoolean(sameArtifacts.map(_.ioc()).reduce(_ || _)))
.set("status", mergeArtifactStatus(sameArtifacts))
// Merged artifact is created under new case
artifactSrv
.create(newCase, fields)
// Then jobs are imported
.flatMap { newArtifact =>
mergeJobs(newArtifact, sameArtifacts)
}
// Errors are logged and ignored (probably document already exists)
.recover {
case error =>
log.warn("Artifact creation fail", error)
Done
}

val (artifacts, futureArtifactCount) = artifactSrv.find(filter, Some("all"), Nil)
futureArtifactCount.foreach { count
logger.debug(s"${count} identical artifact(s) found (${artifact.dataType()}):${(artifact.data() orElse artifact.attachment().map(_.name)).get}")
}
artifacts.runWith(Sink.seq)
}
.mapAsync(5) { sameArtifacts
// Same artifacts are merged
val firstArtifact = sameArtifacts.head
val fields = firstArtifact.attachment().fold(Fields.empty) { a
Fields.empty.set("attachment", AttachmentInputValue(a.name, a.hashes, a.size, a.contentType, a.id))
}
.set("data", firstArtifact.data().map(JsString))
.set("dataType", firstArtifact.dataType())
.set("message", concat[Artifact](sameArtifacts, "\n \n", a caseMap(a.parentId.get).caseId(), _.message()))
.set("startDate", firstDate(sameArtifacts.map(_.startDate())))
.set("tlp", JsNumber(sameArtifacts.map(_.tlp()).min))
.set("tags", JsArray(sameArtifacts.flatMap(_.tags()).map(JsString)))
.set("ioc", JsBoolean(sameArtifacts.map(_.ioc()).reduce(_ || _)))
.set("status", mergeArtifactStatus(sameArtifacts))
// Merged artifact is created under new case
artifactSrv
.create(newCase, fields)
.map(a List(a -> sameArtifacts))
// Errors are logged and ignored (probably document already exists)
.recover {
case e
logger.warn("Artifact creation fail", e)
Nil
}
}
.mapConcat(identity)
.mapAsyncUnordered(5) {
case (newArtifact, sameArtifacts)
// Then jobs are imported
mergeJobs(newArtifact, sameArtifacts)
.recover {
case error
logger.error("Log creation fail", error)
Done
}
}
.runWith(Sink.ignore)
}

private[services] def mergeCases(cases: Seq[Case])(implicit authContext: AuthContext): Future[Case] = {
logger.info("Merging cases: " + cases.map(c s"#${c.caseId()}:${c.title()}").mkString(" / "))
val fields = Fields.empty
.set("title", concat[Case](cases, " / ", _.caseId(), _.title()))
.set("description", concat[Case](cases, "\n \n", _.caseId(), _.description()))
.set("description", concatCaseDescription(cases))
.set("severity", JsNumber(cases.map(_.severity()).max))
.set("startDate", firstDate(cases.map(_.startDate())))
.set("tags", JsArray(cases.flatMap(_.tags()).distinct.map(JsString)))
Expand All @@ -207,15 +244,30 @@ class CaseMergeSrv @Inject() (caseSrv: CaseSrv,
.set("resolutionStatus", mergeResolutionStatus(cases))
.set("impactStatus", mergeImpactStatus(cases))
.set("summary", mergeSummary(cases))
.set("mergeFrom", JsArray(cases.map(c JsString(c.id))))
caseSrv.create(fields)
}

def markCaseAsDuplicated(caseIds: Seq[String], mergeCaseId: String)(implicit authContext: AuthContext): Future[Unit] = {
caseSrv.bulkUpdate(caseIds, Fields.empty
.set("mergeInto", mergeCaseId)
.set("status", CaseStatus.Resolved.toString)
.set("resolutionStatus", CaseResolutionStatus.Duplicated.toString))
.map(_.foreach {
case Success(_) Done
case Failure(error)
log.error("Case update fail", error)
Done
})
}

def merge(caseIds: String*)(implicit authContext: AuthContext): Future[Case] = {
for {
cases <- Future.sequence(caseIds.map(caseSrv.get))
newCase <- mergeCases(cases)
_ <- mergeTasksAndLogs(newCase, cases)
_ <- mergeArtifactsAndJobs(newCase, cases)
cases Future.sequence(caseIds.map(caseSrv.get))
newCase mergeCases(cases)
_ mergeTasksAndLogs(newCase, cases)
_ mergeArtifactsAndJobs(newCase, cases)
_ markCaseAsDuplicated(caseIds, newCase.id)
} yield newCase
}
}

0 comments on commit 4e3ff6a

Please sign in to comment.