Skip to content

Commit

Permalink
#982 Download file observable from Cortex analyzer reports
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed May 28, 2019
1 parent 9d8eb63 commit 901469f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package connectors.cortex.services

import java.nio.file.Files
import java.util.Date

import scala.concurrent.duration.FiniteDuration
Expand All @@ -14,14 +15,14 @@ import play.api.libs.ws.WSClient
import akka.NotUsed
import akka.actor.{Actor, ActorSystem}
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.scaladsl.{FileIO, Sink, Source}
import connectors.cortex.models.JsonFormat._
import connectors.cortex.models._
import javax.inject.{Inject, Singleton}
import models.Artifact
import models.{Artifact, Case}
import services.{UserSrv _, _}

import org.elastic4play.controllers.Fields
import org.elastic4play.controllers.{Fields, FileInputValue}
import org.elastic4play.database.{DBRemove, ModifyConfig}
import org.elastic4play.services.JsonFormat.attachmentFormat
import org.elastic4play.services._
Expand Down Expand Up @@ -194,6 +195,19 @@ class CortexAnalyzerSrv @Inject()(

def addImportFieldInArtifacts(job: JsObject): Future[JsObject] = {
import org.elastic4play.services.QueryDSL._
def findArtifactId(caze: Case, dataType: String, data: Option[String], attachmentId: Option[String]) = {
val criteria: Seq[QueryDef] = Seq(
"dataType" ~= dataType,
withParent(caze)
) ++ data.map("data" ~= _) ++ attachmentId.map("attachment.id" ~= _)
artifactSrv
.find(and(criteria: _*), Some("0-1"), Nil)
._1
.runWith(Sink.headOption)
.map(_.fold[JsValue](JsNull)(a JsString(a.id)))
.recover { case _ JsNull }
}

for {
caze caseSrv.find(child("case_artifact", withId((job \ "_parent").as[String])), Some("0-1"), Nil)._1.runWith(Sink.headOption)
updatedReport (job \ "report")
Expand All @@ -202,13 +216,9 @@ class CortexAnalyzerSrv @Inject()(
val artifacts = for {
artifact (report \ "artifacts").asOpt[Seq[JsObject]].getOrElse(Nil)
dataType (artifact \ "dataType").asOpt[String]
data (artifact \ "data").asOpt[String]
foundArtifactId = artifactSrv
.find(and("data" ~= data, "dataType" ~= dataType, withParent(caze.get)), Some("0-1"), Nil)
._1
.runWith(Sink.headOption)
.map(_.fold[JsValue](JsNull)(a JsString(a.id)))
.recover { case _ JsNull }
data = (artifact \ "data").asOpt[String]
attachmentId = (artifact \ "attachment" \ "id").asOpt[String]
foundArtifactId = findArtifactId(caze.get, dataType, data, attachmentId)
} yield foundArtifactId.map(faid artifact + ("id" faid))
Future.sequence(artifacts).map(a report + ("artifacts" JsArray(a)))
}
Expand All @@ -223,10 +233,10 @@ class CortexAnalyzerSrv @Inject()(
retryDelay: FiniteDuration = cortexConfig.refreshDelay,
maxRetryOnError: Int = cortexConfig.maxRetryOnError
)(implicit authContext: AuthContext): Future[Job] = {
def updateArtifactSummary(job: Job, report: String) =
Try(Json.parse(report))
.toOption
.flatMap(r (r \ "summary").asOpt[JsObject])

def updateArtifactSummary(job: Job, report: JsObject): Future[Unit] =
(report \ "summary")
.asOpt[JsObject]
.map { jobSummary
Retry()(classOf[Exception]) {
for {
Expand All @@ -240,11 +250,27 @@ class CortexAnalyzerSrv @Inject()(
)
} yield ()
}.recover {
case NonFatal(t) logger.warn(s"Unable to insert summary report in artifact", t)
}
case NonFatal(t) logger.warn(s"Unable to insert summary report in artifact", t)
}
}
.getOrElse(Future.successful(()))

def downloadAndSaveAttachment(artifact: JsObject, id: String): Future[Option[JsObject]] = {
val file = Files.createTempFile(s"job-$jobId-cortex-$cortexJobId-$id", "")
val fiv = FileInputValue(
(artifact \ "attachment" \ "name").asOpt[String].getOrElse("noname"),
file,
(artifact \ "attachment" \ "contentType").asOpt[String].getOrElse("application/octet-stream"))
cortex
.getAttachment(id)
.flatMap(src src.runWith(FileIO.toPath(file)))
.flatMap(ioResult Future.fromTry(ioResult.status))
.flatMap(_ attachmentSrv.save(fiv))
.andThen { case _ Files.delete(file) }
.map(a Some(artifact + ("attachment" -> Json.toJson(a))))
.recover { case _ None }
}

logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex
.waitReport(cortexJobId, retryDelay)
Expand All @@ -253,14 +279,39 @@ class CortexAnalyzerSrv @Inject()(
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateJobWithCortex(jobId, cortexJobId, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty).toString
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty)
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating job $jobId")

val reportWithDownloadedArtifacts = Future
.traverse(
(report \ "artifacts")
.asOpt[Seq[JsObject]]
.getOrElse(Nil)
) { artifact
(artifact \ "dataType")
.asOpt[String]
.flatMap {
case "file"
(artifact \ "attachment" \ "id").asOpt[String].map { id
downloadAndSaveAttachment(artifact, id)
.andThen {
case attachmentArtifact => logger.debug(s"Download attachment $artifact => $attachmentArtifact")
}
}
case _ Some(Future.successful(Some(artifact)))

}
.getOrElse(Future.successful(None))
}
.map(a report + ("artifacts" JsArray(a.flatten)))

val updatedJob = for {
job getSrv[JobModel, Job](jobModel, jobId)
job getSrv[JobModel, Job](jobModel, jobId)
newReport reportWithDownloadedArtifacts
jobFields = Fields
.empty
.set("status", status.toString)
.set("report", report)
.set("report", newReport.toString)
.set("endDate", Json.toJson(new Date))
updatedJob update(job, jobFields)
_ if (status == JobStatus.Success) updateArtifactSummary(job, report) else Future.successful(())
Expand Down
17 changes: 11 additions & 6 deletions thehive-cortex/app/connectors/cortex/services/CortexClient.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package connectors.cortex.services

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

import play.api.{Configuration, Logger}
import play.api.{ Configuration, Logger }
import play.api.http.HeaderNames
import play.api.libs.json.{JsObject, JsValue, Json}
import play.api.libs.ws.{WSAuthScheme, WSRequest, WSResponse}
import play.api.mvc.MultipartFormData.{DataPart, FilePart}
import play.api.libs.json.{ JsObject, JsValue, Json }
import play.api.libs.ws.{ WSAuthScheme, WSRequest, WSResponse }
import play.api.mvc.MultipartFormData.{ DataPart, FilePart }

import akka.stream.scaladsl.Source
import akka.util.ByteString
import connectors.cortex.models.JsonFormat._
import connectors.cortex.models._
import javax.inject.{Inject, Singleton}
import javax.inject.{ Inject, Singleton }
import models.HealthStatus
import services.CustomWSAPI

Expand Down Expand Up @@ -200,4 +201,8 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
case None HealthStatus.Error
case _ HealthStatus.Ok
}

def getAttachment(id: String)(implicit ec: ExecutionContext): Future[Source[ByteString, _]] = {
request(s"api/datastore/$id", _.get, _.bodyAsSource)
}
}

0 comments on commit 901469f

Please sign in to comment.