diff --git a/thehive-cortex/app/connectors/cortex/services/CortexClient.scala b/thehive-cortex/app/connectors/cortex/services/CortexClient.scala index 400ef60e56..1a8c0d0ab1 100644 --- a/thehive-cortex/app/connectors/cortex/services/CortexClient.scala +++ b/thehive-cortex/app/connectors/cortex/services/CortexClient.scala @@ -12,17 +12,15 @@ import services.CustomWSAPI import scala.concurrent.duration.Duration import scala.concurrent.{ ExecutionContext, Future } +case class CortexError(status: Int, requestUrl: String, message: String) extends Exception(s"Cortex error on $requestUrl ($status) \n$message") class CortexClient(val name: String, baseUrl: String, key: String, ws: CustomWSAPI) { - lazy val logger = Logger(getClass) + private[CortexClient] lazy val logger = Logger(getClass) def request[A](uri: String, f: WSRequest ⇒ Future[WSResponse], t: WSResponse ⇒ A)(implicit ec: ExecutionContext): Future[A] = { - logger.info(s"Requesting Cortex $baseUrl") f(ws.url(s"$baseUrl/$uri").withHeaders("auth" → key)).map { case response if response.status / 100 == 2 ⇒ t(response) - case error ⇒ - logger.error(s"Cortex error on $baseUrl (${error.status}) \n${error.body}") - sys.error("") + case error ⇒ throw CortexError(error.status, s"$baseUrl/$uri", error.body) } } diff --git a/thehive-cortex/app/connectors/cortex/services/CortexSrv.scala b/thehive-cortex/app/connectors/cortex/services/CortexSrv.scala index 22f32183ce..9394c2240a 100644 --- a/thehive-cortex/app/connectors/cortex/services/CortexSrv.scala +++ b/thehive-cortex/app/connectors/cortex/services/CortexSrv.scala @@ -1,6 +1,5 @@ package connectors.cortex.services -import java.nio.file.{ Path, Paths } import java.util.Date import javax.inject.{ Inject, Singleton } @@ -54,11 +53,10 @@ object CortexConfig { } @Singleton -case class CortexConfig(truststore: Option[Path], instances: Seq[CortexClient]) { +case class CortexConfig(instances: Seq[CortexClient]) { @Inject def this(configuration: Configuration, globalWS: CustomWSAPI) = this( - configuration.getString("cortex.cert").map(p ⇒ Paths.get(p)), CortexConfig.getInstances(configuration, globalWS)) } @@ -72,13 +70,35 @@ class CortexSrv @Inject() ( createSrv: CreateSrv, updateSrv: UpdateSrv, findSrv: FindSrv, + userSrv: UserSrv, eventSrv: EventSrv, implicit val ws: WSClient, implicit val ec: ExecutionContext, implicit val system: ActorSystem, implicit val mat: Materializer) { - lazy val logger = Logger(getClass) + private[CortexSrv] lazy val logger = Logger(getClass) + + userSrv.inInitAuthContext { implicit authContext ⇒ + import org.elastic4play.services.QueryDSL._ + logger.info(s"Search for unfinished job ...") + val (jobs, total) = find("status" ~= "InProgress", Some("all"), Nil) + total.foreach(t ⇒ logger.info(s"$t jobs found")) + jobs + .runForeach { job ⇒ + logger.info(s"Found job in progress, request its status to Cortex") + (for { + cortexJobId ← job.cortexJobId() + cortexClient ← cortexConfig.instances.find(_.name == job.cortexId) + } yield updateJobWithCortex(job.id, cortexJobId, cortexClient)) + .getOrElse { + val jobFields = Fields.empty + .set("status", JobStatus.Failure.toString) + .set("endDate", Json.toJson(new Date)) + update(job.id, jobFields) + } + } + } private[CortexSrv] val mergeActor = actor(new Act { become { @@ -99,7 +119,11 @@ class CortexSrv @Inject() ( createSrv[JobModel, Job, Artifact](jobModel, artifact, fields.set("artifactId", artifact.id)) } - private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext) = { + private[CortexSrv] def update(jobId: String, fields: Fields)(implicit Context: AuthContext): Future[Job] = { + getJob(jobId).flatMap(job ⇒ update(job, fields)) + } + + private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext): Future[Job] = { updateSrv[Job](job, fields) } @@ -202,7 +226,13 @@ class CortexSrv @Inject() ( case _ ⇒ } } - case Failure(_) ⇒ + case Failure(CortexError(404, _, _)) ⇒ + logger.debug(s"The job $cortexJobId not found") + val jobFields = Fields.empty + .set("status", JobStatus.Failure.toString) + .set("endDate", Json.toJson(new Date)) + update(jobId, jobFields) + case _ ⇒ logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...") updateJobWithCortex(jobId, cortexJobId, cortex) }