Skip to content

Commit

Permalink
#171 Don't leave job with status InProgress
Browse files Browse the repository at this point in the history
When TheHive starts, retrieve report of all job in progress
If Cortex returns an error (job not found for example), mark job as
failed.
  • Loading branch information
To-om committed May 8, 2017
1 parent e6544d4 commit 6e712b6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ import services.CustomWSAPI
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

case class CortexError(status: Int, requestUrl: String, message: String) extends Exception(s"Cortex error on $requestUrl ($status) \n$message")
class CortexClient(val name: String, baseUrl: String, key: String, ws: CustomWSAPI) {

lazy val logger = Logger(getClass)
private[CortexClient] lazy val logger = Logger(getClass)

def request[A](uri: String, f: WSRequest Future[WSResponse], t: WSResponse A)(implicit ec: ExecutionContext): Future[A] = {
logger.info(s"Requesting Cortex $baseUrl")
f(ws.url(s"$baseUrl/$uri").withHeaders("auth" key)).map {
case response if response.status / 100 == 2 t(response)
case error
logger.error(s"Cortex error on $baseUrl (${error.status}) \n${error.body}")
sys.error("")
case error throw CortexError(error.status, s"$baseUrl/$uri", error.body)
}
}

Expand Down
42 changes: 36 additions & 6 deletions thehive-cortex/app/connectors/cortex/services/CortexSrv.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package connectors.cortex.services

import java.nio.file.{ Path, Paths }
import java.util.Date
import javax.inject.{ Inject, Singleton }

Expand Down Expand Up @@ -54,11 +53,10 @@ object CortexConfig {
}

@Singleton
case class CortexConfig(truststore: Option[Path], instances: Seq[CortexClient]) {
case class CortexConfig(instances: Seq[CortexClient]) {

@Inject
def this(configuration: Configuration, globalWS: CustomWSAPI) = this(
configuration.getString("cortex.cert").map(p Paths.get(p)),
CortexConfig.getInstances(configuration, globalWS))
}

Expand All @@ -72,13 +70,35 @@ class CortexSrv @Inject() (
createSrv: CreateSrv,
updateSrv: UpdateSrv,
findSrv: FindSrv,
userSrv: UserSrv,
eventSrv: EventSrv,
implicit val ws: WSClient,
implicit val ec: ExecutionContext,
implicit val system: ActorSystem,
implicit val mat: Materializer) {

lazy val logger = Logger(getClass)
private[CortexSrv] lazy val logger = Logger(getClass)

userSrv.inInitAuthContext { implicit authContext
import org.elastic4play.services.QueryDSL._
logger.info(s"Search for unfinished job ...")
val (jobs, total) = find("status" ~= "InProgress", Some("all"), Nil)
total.foreach(t logger.info(s"$t jobs found"))
jobs
.runForeach { job
logger.info(s"Found job in progress, request its status to Cortex")
(for {
cortexJobId job.cortexJobId()
cortexClient cortexConfig.instances.find(_.name == job.cortexId)
} yield updateJobWithCortex(job.id, cortexJobId, cortexClient))
.getOrElse {
val jobFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(job.id, jobFields)
}
}
}

private[CortexSrv] val mergeActor = actor(new Act {
become {
Expand All @@ -99,7 +119,11 @@ class CortexSrv @Inject() (
createSrv[JobModel, Job, Artifact](jobModel, artifact, fields.set("artifactId", artifact.id))
}

private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext) = {
private[CortexSrv] def update(jobId: String, fields: Fields)(implicit Context: AuthContext): Future[Job] = {
getJob(jobId).flatMap(job update(job, fields))
}

private[CortexSrv] def update(job: Job, fields: Fields)(implicit Context: AuthContext): Future[Job] = {
updateSrv[Job](job, fields)
}

Expand Down Expand Up @@ -202,7 +226,13 @@ class CortexSrv @Inject() (
case _
}
}
case Failure(_)
case Failure(CortexError(404, _, _))
logger.debug(s"The job $cortexJobId not found")
val jobFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(jobId, jobFields)
case _
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateJobWithCortex(jobId, cortexJobId, cortex)
}
Expand Down

0 comments on commit 6e712b6

Please sign in to comment.