From 0942846b38e2fea3bbea6bd23f8917b40d62d6a1 Mon Sep 17 00:00:00 2001 From: To-om Date: Thu, 2 May 2019 11:31:57 +0200 Subject: [PATCH] #182 Add cacheTag attribute --- app/org/thp/cortex/models/Job.scala | 1 + app/org/thp/cortex/services/JobSrv.scala | 30 ++++++++++++------------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/app/org/thp/cortex/models/Job.scala b/app/org/thp/cortex/models/Job.scala index 183949676..9dd9a2978 100644 --- a/app/org/thp/cortex/models/Job.scala +++ b/app/org/thp/cortex/models/Job.scala @@ -36,6 +36,7 @@ trait JobAttributes { val fromCache = optionalAttribute("fromCache", F.booleanFmt, "Indicates if cache is used", O.form) val tpe = attribute("type", F.enumFmt(WorkerType), "", O.readonly) val lbel = optionalAttribute("label", F.stringFmt, "Label of the job") + val cacheTag = optionalAttribute("cacheTag", F.stringFmt, "hash of job discriminant, used for cache", O.readonly) } @Singleton diff --git a/app/org/thp/cortex/services/JobSrv.scala b/app/org/thp/cortex/services/JobSrv.scala index fe568c2c5..7d6074c5e 100644 --- a/app/org/thp/cortex/services/JobSrv.scala +++ b/app/org/thp/cortex/services/JobSrv.scala @@ -20,6 +20,7 @@ import org.thp.cortex.models._ import org.elastic4play._ import org.elastic4play.controllers._ import org.elastic4play.services._ +import org.elastic4play.utils.Hasher @Singleton class JobSrv( @@ -243,11 +244,10 @@ class JobSrv( parameters: JsObject, label: Option[String], force: Boolean)(implicit authContext: AuthContext): Future[Job] = { - val previousJob = if (force) Future.successful(None) - else findSimilarJob(worker, dataType, dataAttachment, tlp, parameters) + val previousJob = findSimilarJob(worker, dataType, dataAttachment, tlp, parameters, force) previousJob.flatMap { - case Some(job) ⇒ Future.successful(job) - case None ⇒ isUnderRateLimit(worker).flatMap { + case Right(job) ⇒ Future.successful(job) + case Left(cacheTag) ⇒ isUnderRateLimit(worker).flatMap { case true ⇒ val fields = Fields(Json.obj( "workerDefinitionId" → worker.workerDefinitionId(), @@ -260,7 +260,8 @@ class JobSrv( "pap" → pap, "message" → message, "parameters" → parameters.toString, - "type" → worker.tpe())) + "type" → worker.tpe(), + "cacheTag" → cacheTag)) .set("label", label.map(JsString.apply)) val fieldWithData = dataAttachment match { case Left(data) ⇒ fields.set("data", data) @@ -298,28 +299,27 @@ class JobSrv( .getOrElse(Future.successful(true)) } - def findSimilarJob(worker: Worker, dataType: String, dataAttachment: Either[String, Attachment], tlp: Long, parameters: JsObject): Future[Option[Job]] = { - val cache = worker.jobCache().fold(jobCache)(_.minutes) - if (cache.length == 0 || worker.tpe() == WorkerType.responder) { + def findSimilarJob(worker: Worker, dataType: String, dataAttachment: Either[String, Attachment], tlp: Long, parameters: JsObject, force: Boolean): Future[Either[String, Job]] = { + val cacheTag = Hasher("MD5").fromString(s"${worker.id}|$dataType|$tlp|${dataAttachment.fold(data ⇒ data, attachment ⇒ attachment.id)}|$parameters").head.toString() + lazy val cache = worker.jobCache().fold(jobCache)(_.minutes) + if (force || cache.length == 0 || worker.tpe() == WorkerType.responder) { logger.info("Job cache is disabled") - Future.successful(None) + Future.successful(Left(cacheTag)) } else { import org.elastic4play.services.QueryDSL._ logger.info(s"Looking for similar job in the last ${cache.toMinutes} minutes (worker=${worker.id}, dataType=$dataType, data=$dataAttachment, tlp=$tlp, parameters=$parameters)") + val now = new Date().getTime find(and( - "workerId" ~= worker.id, + "cacheTag" ~= cacheTag, "status" ~!= JobStatus.Failure, "status" ~!= JobStatus.Deleted, - "startDate" ~>= (now - cache.toMillis), - "dataType" ~= dataType, - "tlp" ~= tlp, - dataAttachment.fold(data ⇒ "data" ~= data, attachment ⇒ "attachment.id" ~= attachment.id), - "parameters" ~= parameters.toString), Some("0-1"), Seq("-createdAt")) + "startDate" ~>= (now - cache.toMillis)), Some("0-1"), Seq("-createdAt")) ._1 .map(j ⇒ new Job(jobModel, j.attributes + ("fromCache" → JsBoolean(true)))) .runWith(Sink.headOption) + .map(_.toRight(cacheTag)) } }