Skip to content

Commit

Permalink
#609 Put the end of job and worker operations in separated flow item
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 23, 2018
1 parent 8bbf148 commit 600cafa
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 100 deletions.
8 changes: 8 additions & 0 deletions thehive-backend/app/services/UserSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class UserSrv @Inject() (
}
}

def extraAuthContext[A](block: AuthContext Future[A])(implicit authContext: AuthContext): Future[A] = {
val ac = AuthContextImpl(authContext.userId, authContext.userName, Instance.getInternalId, authContext.roles)
eventSrv.publish(InternalRequestProcessStart(authContext.requestId))
block(ac).andThen {
case _ eventSrv.publish(InternalRequestProcessEnd(authContext.requestId))
}
}

def create(fields: Fields)(implicit authContext: AuthContext): Future[User] = {
fields.getString("password") match {
case None createSrv[UserModel, User](userModel, fields)
Expand Down
91 changes: 47 additions & 44 deletions thehive-cortex/app/connectors/cortex/services/CortexActionSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,34 @@ package connectors.cortex.services

import java.util.Date

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import scala.util.matching.Regex
import scala.util.{ Failure, Success }

import play.api.Logger
import play.api.libs.json._

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }
import connectors.cortex.models._
import connectors.cortex.models.JsonFormat._
import connectors.cortex.models._
import javax.inject.{ Inject, Singleton }
import services.UserSrv

import org.elastic4play.{ BadRequestError, MissingAttributeError, NotFoundError }
import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.models.BaseEntity
import org.elastic4play.services._
import org.elastic4play.{ BadRequestError, MissingAttributeError, NotFoundError }

@Singleton
class CortexActionSrv @Inject() (
cortexConfig: CortexConfig,
actionModel: ActionModel,
actionOperationSrv: ActionOperationSrv,
userSrv: UserSrv,
getSrv: GetSrv,
createSrv: CreateSrv,
findSrv: FindSrv,
Expand Down Expand Up @@ -99,48 +100,48 @@ class CortexActionSrv @Inject() (
private def update(action: Action, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Action] =
updateSrv[Action](action, fields, modifyConfig)

def updateActionWithCortex(actionId: String, cortexJobId: String, entity: BaseEntity, cortex: CortexClient, maxError: Int = 3)(implicit authContext: AuthContext): Unit = {
def updateActionWithCortex(actionId: String, cortexJobId: String, entity: BaseEntity, cortex: CortexClient, maxError: Int = 3)(implicit authContext: AuthContext): Future[Action] = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update action $actionId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
case Success(j)
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateActionWithCortex(actionId, cortexJobId, entity, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty)
val operations = (report \ "operations").asOpt[Seq[ActionOperation]].getOrElse(Nil)
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating action $actionId")
val updatedAction = for {
action getSrv[ActionModel, Action](actionModel, actionId)
updatedOperations Future.traverse(operations)(actionOperationSrv.execute(entity, _))
actionFields = Fields.empty
.set("status", status.toString)
.set("report", (report - "operations").toString)
.set("endDate", Json.toJson(new Date))
.set("operations", Json.toJson(updatedOperations).toString)
updatedAction update(action, actionFields)
} yield updatedAction
updatedAction.onComplete {
case Failure(e) logger.error(s"Update action fails", e)
case _
cortex.waitReport(cortexJobId, 1.minute).flatMap { j
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateActionWithCortex(actionId, cortexJobId, entity, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty)
val operations = (report \ "operations").asOpt[Seq[ActionOperation]].getOrElse(Nil)
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating action $actionId")
val updatedAction = for {
action getSrv[ActionModel, Action](actionModel, actionId)
updatedOperations Future.traverse(operations) { op
userSrv.extraAuthContext(ac actionOperationSrv.execute(entity, op)(ac))
}
}
case Failure(CortexError(404, _, _))
logger.debug(s"The job $cortexJobId not found")
val actionFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(actionId, actionFields)
case _ if maxError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateActionWithCortex(actionId, cortexJobId, entity, cortex, maxError - 1)
case _
logger.error(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting")
update(actionId, Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date)))
actionFields = Fields.empty
.set("status", status.toString)
.set("report", (report - "operations").toString)
.set("endDate", Json.toJson(new Date))
.set("operations", Json.toJson(updatedOperations).toString)
updatedAction update(action, actionFields)
} yield updatedAction
updatedAction.failed.foreach(logger.error(s"Update action fails", _))
updatedAction
}
}
()
.recoverWith {
case CortexError(404, _, _)
logger.debug(s"The job $cortexJobId not found")
val actionFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(actionId, actionFields)
case _ if maxError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateActionWithCortex(actionId, cortexJobId, entity, cortex, maxError - 1)
case _
logger.error(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting")
update(actionId, Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date)))
}
}

def getEntity(objectType: String, objectId: String): Future[BaseEntity] = {
Expand Down Expand Up @@ -213,7 +214,9 @@ class CortexActionSrv @Inject() (
// .set("startDate", Json.toJson(new Date()))
.set("cortexId", cortexClient.name)
.set("cortexJobId", job.id))
_ = updateActionWithCortex(action.id, job.id, entity, cortexClient)
_ = userSrv.extraAuthContext { extraAuthContext
updateActionWithCortex(action.id, job.id, entity, cortexClient)(extraAuthContext)
}
} yield action
}
}
112 changes: 56 additions & 56 deletions thehive-cortex/app/connectors/cortex/services/CortexAnalyzerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.util.Date

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

import play.api.Logger
import play.api.libs.json._
Expand Down Expand Up @@ -214,64 +214,64 @@ class CortexAnalyzerSrv @Inject() (
} yield job + ("report" updatedReport)
}

def updateJobWithCortex(jobId: String, cortexJobId: String, cortex: CortexClient, maxError: Int = 3)(implicit authContext: AuthContext): Unit = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
case Success(j)
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateJobWithCortex(jobId, cortexJobId, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty).toString
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating job $jobId")
getSrv[JobModel, Job](jobModel, jobId)
.flatMap { job
val jobFields = Fields.empty
.set("status", status.toString)
.set("report", report)
.set("endDate", Json.toJson(new Date))
update(job, jobFields)
.andThen {
case _ if status == JobStatus.Success
val jobSummary = Try(Json.parse(report))
.toOption
.flatMap(r (r \ "summary").asOpt[JsObject])
.getOrElse(JsObject.empty)
RetryOnError() {
for {
artifact artifactSrv.get(job.artifactId())
reports = Try(Json.parse(artifact.reports()).asOpt[JsObject]).toOption.flatten.getOrElse(JsObject.empty)
newReports = reports + (job.analyzerDefinition().getOrElse(job.analyzerId()) jobSummary)
updatedArtifact artifactSrv.update(job.artifactId(), Fields.empty.set("reports", newReports.toString), ModifyConfig(retryOnConflict = 0, version = Some(artifact.version)))
} yield updatedArtifact
}
.recover {
case NonFatal(t) logger.warn(s"Unable to insert summary report in artifact", t)
}

}
}
.onComplete {
case Failure(e) logger.error(s"Update job fails", e)
case _
def updateJobWithCortex(jobId: String, cortexJobId: String, cortex: CortexClient, maxError: Int = 3)(implicit authContext: AuthContext): Future[Job] = {
def updateArtifactSummary(job: Job, report: String) = {
Try(Json.parse(report))
.toOption
.flatMap(r (r \ "summary").asOpt[JsObject])
.map { jobSummary
RetryOnError() {
for {
artifact artifactSrv.get(job.artifactId())
reports = Try(Json.parse(artifact.reports()).asOpt[JsObject]).toOption.flatten.getOrElse(JsObject.empty)
newReports = reports + (job.analyzerDefinition().getOrElse(job.analyzerId()) jobSummary)
_ artifactSrv.update(job.artifactId(), Fields.empty.set("reports", newReports.toString), ModifyConfig(retryOnConflict = 0, version = Some(artifact.version)))
} yield ()
}
.recover {
case NonFatal(t) logger.warn(s"Unable to insert summary report in artifact", t)
}
}
case Failure(CortexError(404, _, _))
logger.debug(s"The job $cortexJobId not found")
val jobFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(jobId, jobFields)
case _ if maxError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateJobWithCortex(jobId, cortexJobId, cortex, maxError - 1)
case _
logger.error(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting")
update(jobId, Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date)))
.getOrElse(Future.successful(()))
}

logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex.waitReport(cortexJobId, 1.minute).flatMap { j
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateJobWithCortex(jobId, cortexJobId, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty).toString
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating job $jobId")
val updatedJob = for {
job getSrv[JobModel, Job](jobModel, jobId)
jobFields = Fields.empty
.set("status", status.toString)
.set("report", report)
.set("endDate", Json.toJson(new Date))
updatedJob update(job, jobFields)
_ if (status == JobStatus.Success) updateArtifactSummary(job, report) else Future.successful(())
} yield updatedJob
updatedJob.failed.foreach(logger.error(s"Update job fails", _))
updatedJob
}
}
()
.recoverWith {
case CortexError(404, _, _)
logger.debug(s"The job $cortexJobId not found")
val jobFields = Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date))
update(jobId, jobFields)
case _ if maxError > 0
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateJobWithCortex(jobId, cortexJobId, cortex, maxError - 1)
case _
logger.error(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails and the number of errors reaches the limit, aborting")
update(jobId, Fields.empty
.set("status", JobStatus.Failure.toString)
.set("endDate", Json.toJson(new Date)))
}
}

def submitJob(cortexId: Option[String], analyzerName: String, artifactId: String)(implicit authContext: AuthContext): Future[Job] = {
Expand Down

0 comments on commit 600cafa

Please sign in to comment.