Skip to content

Commit

Permalink
#609 add operations as result of active response
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 6, 2018
1 parent 9d5c3bb commit 6faceec
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 33 deletions.
2 changes: 1 addition & 1 deletion thehive-cortex/app/connectors/cortex/models/Action.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait ActionAttributes { _: AttributeDef ⇒
val report = optionalAttribute("report", F.textFmt, "Action output", O.unaudited)
val cortexId = optionalAttribute("cortexId", F.stringFmt, "Id of cortex where the job is run", O.readonly)
val cortexJobId = optionalAttribute("cortexJobId", F.stringFmt, "Id of job in cortex", O.readonly)
val operations = multiAttribute("operations", F.textFmt, "Update operations applied at the end of the job", O.readonly)
val operations = attribute("operations", F.textFmt, "Update operations applied at the end of the job", "[]", O.unaudited)
}
@Singleton
class ActionModel @Inject() extends ModelDef[ActionModel, Action]("action", "Action", "/connector/cortex/action") with ActionAttributes with AuditedModel {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package connectors.cortex.services

import scala.concurrent.{ ExecutionContext, Future }

import play.api.libs.json._

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import javax.inject.{ Inject, Singleton }
import models.{ Alert, Case }
import services.CaseSrv

import org.elastic4play.controllers.Fields
import org.elastic4play.database.ModifyConfig
import org.elastic4play.models.{ BaseEntity, ChildModelDef, HiveEnumeration }
import org.elastic4play.services.{ AuthContext, FindSrv }
import org.elastic4play.{ BadRequestError, InternalError }

object ActionOperationStatus extends Enumeration with HiveEnumeration {
type Type = Value
val Waiting, Success, Failure = Value
}

trait ActionOperation {
val status: ActionOperationStatus.Type
val message: String

def updateStatus(newStatus: ActionOperationStatus.Type, newMessage: String): ActionOperation
}

case class AddTagToCase(tag: String, status: ActionOperationStatus.Type = ActionOperationStatus.Waiting, message: String = "") extends ActionOperation {
def updateStatus(newStatus: ActionOperationStatus.Type, newMessage: String): AddTagToCase = copy(status = newStatus, message = newMessage)
}

case class CreateTask(fields: JsObject, status: ActionOperationStatus.Type = ActionOperationStatus.Waiting, message: String = "") extends ActionOperation {
def updateStatus(newStatus: ActionOperationStatus.Type, newMessage: String): CreateTask = copy(status = newStatus, message = newMessage)
}

object ActionOperation {
val addTagToCaseWrites = Json.writes[AddTagToCase]
val createTaskWrites = Json.writes[CreateTask]
implicit val actionOperationReads: Reads[ActionOperation] = Reads[ActionOperation](json
(json \ "type").asOpt[String].fold[JsResult[ActionOperation]](JsError("type is missing in action operation")) {
case "AddTagToCase" (json \ "tag").validate[String].map(tag AddTagToCase(tag))
case "CreateTask" JsSuccess(CreateTask(json.as[JsObject] - "type"))
case other JsError(s"Unknown operation $other")
})
implicit val actionOperationWrites: Writes[ActionOperation] = Writes[ActionOperation] {
case a: AddTagToCase addTagToCaseWrites.writes(a)
case a: CreateTask createTaskWrites.writes(a)
case a Json.obj("unsupported operation" -> a.toString)
}
}

@Singleton
class ActionOperationSrv @Inject() (
caseSrv: CaseSrv,
findSrv: FindSrv,
implicit val system: ActorSystem,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {

def findCaseEntity(entity: BaseEntity): Future[Case] = {
import org.elastic4play.services.QueryDSL._

(entity, entity.model) match {
case (c: Case, _) Future.successful(c)
case (a: Alert, _) a.caze().fold(Future.failed[Case](BadRequestError("Alert hasn't been imported to case")))(caseSrv.get)
case (_, model: ChildModelDef[_, _, _, _])
findSrv(model.parentModel, "_id" ~= entity.parentId.getOrElse(throw InternalError(s"Child entity $entity has no parent ID")), Some("0-1"), Nil)
._1.runWith(Sink.head).flatMap(findCaseEntity)
case _ Future.failed(BadRequestError("Case not found"))
}
}

def execute(entity: BaseEntity, operation: ActionOperation)(implicit authContext: AuthContext): Future[ActionOperation] = {
if (operation.status == ActionOperationStatus.Waiting) {
val updatedOperation = operation match {
case AddTagToCase(tag, _, _)
RetryOnError() { // FIXME find the right exception
for {
caze findCaseEntity(entity)
_ caseSrv.update(caze, Fields.empty.set("tags", Json.toJson(caze.tags() :+ tag)), ModifyConfig(retryOnConflict = 0, version = Some(caze.version)))
} yield operation.updateStatus(ActionOperationStatus.Success, "")
}
case _ Future.successful(operation)
}
updatedOperation.recover { case error operation.updateStatus(ActionOperationStatus.Failure, error.getMessage) }
}
else Future.successful(operation)
}
}
38 changes: 21 additions & 17 deletions thehive-cortex/app/connectors/cortex/services/CortexActionSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.elastic4play.services._
class CortexActionSrv @Inject() (
cortexConfig: CortexConfig,
actionModel: ActionModel,
actionOperationSrv: ActionOperationSrv,
getSrv: GetSrv,
createSrv: CreateSrv,
findSrv: FindSrv,
Expand Down Expand Up @@ -85,28 +86,31 @@ class CortexActionSrv @Inject() (
private def update(action: Action, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Action] =
updateSrv[Action](action, fields, modifyConfig)

def updateActionWithCortex(actionId: String, cortexJobId: String, cortex: CortexClient)(implicit authContext: AuthContext): Unit = {
def updateActionWithCortex(actionId: String, cortexJobId: String, entity: BaseEntity, cortex: CortexClient)(implicit authContext: AuthContext): Unit = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update action $actionId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
case Success(j)
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress || status == JobStatus.Waiting)
updateActionWithCortex(actionId, cortexJobId, cortex)
updateActionWithCortex(actionId, cortexJobId, entity, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty).toString
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject.empty)
val operations = (report \ "operations").asOpt[Seq[ActionOperation]].getOrElse(Nil)
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating action $actionId")
getSrv[ActionModel, Action](actionModel, actionId)
.flatMap { action
val actionFields = Fields.empty
.set("status", status.toString)
.set("report", report)
.set("endDate", Json.toJson(new Date))
update(action, actionFields)
}
.onComplete {
case Failure(e) logger.error(s"Update action fails", e)
case _
}
val updatedAction = for {
action getSrv[ActionModel, Action](actionModel, actionId)
updatedOperations Future.traverse(operations)(actionOperationSrv.execute(entity, _))
actionFields = Fields.empty
.set("status", status.toString)
.set("report", (report - "operations").toString)
.set("endDate", Json.toJson(new Date))
.set("operations", Json.toJson(updatedOperations).toString)
updatedAction update(action, actionFields)
} yield updatedAction
updatedAction.onComplete {
case Failure(e) logger.error(s"Update action fails", e)
case _
}
}
case Failure(CortexError(404, _, _))
logger.debug(s"The job $cortexJobId not found")
Expand All @@ -116,7 +120,7 @@ class CortexActionSrv @Inject() (
update(actionId, actionFields)
case _
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateActionWithCortex(actionId, cortexJobId, cortex)
updateActionWithCortex(actionId, cortexJobId, entity, cortex)
}
()
}
Expand Down Expand Up @@ -188,7 +192,7 @@ class CortexActionSrv @Inject() (
// .set("startDate", Json.toJson(new Date()))
.set("cortexId", cortexClient.name)
.set("cortexJobId", job.id))
_ = updateActionWithCortex(action.id, job.id, cortexClient)
_ = updateActionWithCortex(action.id, job.id, entity, cortexClient)
} yield action
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package connectors.cortex.services

import java.util.Date

import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

Expand Down Expand Up @@ -79,7 +79,7 @@ class CortexAnalyzerSrv @Inject() (
findSrv: FindSrv,
dbRemove: DBRemove,
userSrv: UserSrv,
system: ActorSystem,
implicit val system: ActorSystem,
implicit val ws: WSClient,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {
Expand Down Expand Up @@ -214,17 +214,6 @@ class CortexAnalyzerSrv @Inject() (
} yield job + ("report" -> updatedReport)
}

def retryOnError[A](cond: Throwable Boolean = _ true, maxRetry: Int = 5, initialDelay: FiniteDuration = 1.second)(body: Future[A]): Future[A] = {
body.recoverWith {
case e if maxRetry > 0 && cond(e)
val resultPromise = Promise[A]
system.scheduler.scheduleOnce(initialDelay) {
resultPromise.completeWith(retryOnError(cond, maxRetry - 1, initialDelay * 2)(body))
}
resultPromise.future
}
}

def updateJobWithCortex(jobId: String, cortexJobId: String, cortex: CortexClient)(implicit authContext: AuthContext): Unit = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
Expand All @@ -248,7 +237,7 @@ class CortexAnalyzerSrv @Inject() (
.toOption
.flatMap(r (r \ "summary").asOpt[JsObject])
.getOrElse(JsObject.empty)
retryOnError() {
RetryOnError() {
for {
artifact artifactSrv.get(job.artifactId())
reports = Try(Json.parse(artifact.reports()).asOpt[JsObject]).toOption.flatten.getOrElse(JsObject.empty)
Expand Down
19 changes: 19 additions & 0 deletions thehive-cortex/app/connectors/cortex/services/RetryOnError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package connectors.cortex.services

import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ DurationInt, FiniteDuration }

import akka.actor.ActorSystem

object RetryOnError {
def apply[A](cond: Throwable Boolean = _ true, maxRetry: Int = 5, initialDelay: FiniteDuration = 1.second)(body: Future[A])(implicit system: ActorSystem, ec: ExecutionContext): Future[A] = {
body.recoverWith {
case e if maxRetry > 0 && cond(e)
val resultPromise = Promise[A]
system.scheduler.scheduleOnce(initialDelay) {
resultPromise.completeWith(apply(cond, maxRetry - 1, initialDelay * 2)(body))
}
resultPromise.future
}
}
}

0 comments on commit 6faceec

Please sign in to comment.