From fea8bee31931b9ff38e7d67718ff0c996e38d7d5 Mon Sep 17 00:00:00 2001 From: To-om Date: Mon, 26 Oct 2020 11:44:17 +0100 Subject: [PATCH] #1416 Use dedicated queues for long tasks --- .scalafmt.conf | 6 +- project/Dependencies.scala | 25 +- project/build.properties | 2 +- project/plugins.sbt | 4 +- .../app/controllers/AlertCtrl.scala | 278 ++++++++++-------- .../app/controllers/AttachmentCtrl.scala | 111 +++---- thehive-backend/app/global/TheHive.scala | 2 +- thehive-backend/app/services/AlertSrv.scala | 136 ++++----- .../app/services/ArtifactSrv.scala | 31 +- thehive-backend/app/services/AuditSrv.scala | 22 +- .../app/services/CaseMergeSrv.scala | 13 +- thehive-backend/app/services/CaseSrv.scala | 29 +- .../app/services/CaseTemplateSrv.scala | 15 +- .../app/services/DashboardSrv.scala | 21 +- thehive-backend/app/services/LogSrv.scala | 19 +- thehive-backend/app/services/TaskSrv.scala | 29 +- .../app/services/TheHiveAuthSrv.scala | 1 - thehive-backend/app/services/UserSrv.scala | 30 +- thehive-backend/conf/reference.conf | 15 + .../app/connectors/misp/MispCtrl.scala | 4 +- .../app/connectors/misp/MispSrv.scala | 28 +- 21 files changed, 426 insertions(+), 395 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 4885c26f65..bd4784d1f3 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -20,7 +20,7 @@ includeCurlyBraceInSelectChains = true includeNoParensInSelectChains = true rewriteTokens { - "=>" : "⇒" - "<-" : "←" - "->": "→" + "⇒": "=>" + "←": "<-" + "→": "->" } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7be462cf0c..1b63d94b04 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,27 +1,28 @@ import sbt._ object Dependencies { - val scalaVersion = "2.12.6" + val scalaVersion = "2.12.8" object Library { + val akkaVersion = "2.5.26" object Play { val version = play.core.PlayVersion.current - val ws = "com.typesafe.play" %% "play-ws" % version - val ahc = "com.typesafe.play" %% "play-ahc-ws" % version - val cache = "com.typesafe.play" %% "play-ehcache" % version - val test = "com.typesafe.play" %% "play-test" % version - val specs2 = "com.typesafe.play" %% "play-specs2" % version + val ws = "com.typesafe.play" %% "play-ws" % version + val ahc = "com.typesafe.play" %% "play-ahc-ws" % version + val cache = "com.typesafe.play" %% "play-ehcache" % version + val test = "com.typesafe.play" %% "play-test" % version + val specs2 = "com.typesafe.play" %% "play-specs2" % version val filters = "com.typesafe.play" %% "filters-helpers" % version - val guice = "com.typesafe.play" %% "play-guice" % version + val guice = "com.typesafe.play" %% "play-guice" % version } val scalaGuice = "net.codingwell" %% "scala-guice" % "4.2.3" - val reflections = "org.reflections" % "reflections" % "0.9.11" - val zip4j = "net.lingala.zip4j" % "zip4j" % "1.3.2" - val elastic4play = "org.thehive-project" %% "elastic4play" % "1.11.5" - val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % "2.5.21" - val akkaClusterTools = "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.21" + val reflections = "org.reflections" % "reflections" % "0.9.11" + val zip4j = "net.lingala.zip4j" % "zip4j" % "1.3.2" + val elastic4play = "org.thehive-project" %% "elastic4play" % "1.11.6" + val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % akkaVersion + val akkaClusterTools = "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion } } diff --git a/project/build.properties b/project/build.properties index 080a737edb..a919a9b5f4 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.0 +sbt.version=1.3.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 8069c5c167..21176e1829 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ // Comment to get more information during initialization logLevel := Level.Info -addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.23") +addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.25") addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.0") addSbtPlugin("org.thehive-project" % "sbt-github-changelog" % "0.3.0") diff --git a/thehive-backend/app/controllers/AlertCtrl.scala b/thehive-backend/app/controllers/AlertCtrl.scala index 9e87c3fb0e..8f6e22f56d 100644 --- a/thehive-backend/app/controllers/AlertCtrl.scala +++ b/thehive-backend/app/controllers/AlertCtrl.scala @@ -1,24 +1,22 @@ package controllers -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try - -import play.api.Logger -import play.api.http.Status -import play.api.libs.json.{JsArray, JsObject, Json} -import play.api.mvc._ - import akka.stream.Materializer import javax.inject.{Inject, Singleton} import models.Roles -import services.JsonFormat.caseSimilarityWrites -import services.{AlertSrv, CaseSrv} - import org.elastic4play.controllers.{Authenticated, Fields, FieldsBodyParser, Renderer} import org.elastic4play.models.JsonFormat.baseModelEntityWrites import org.elastic4play.services.JsonFormat.{aggReads, queryReads} import org.elastic4play.services._ import org.elastic4play.{BadRequestError, Timed} +import play.api.Logger +import play.api.http.Status +import play.api.libs.json.{JsArray, JsObject, Json} +import play.api.mvc._ +import services.JsonFormat.caseSimilarityWrites +import services.{AlertSrv, CaseSrv} + +import scala.concurrent.Future +import scala.util.Try @Singleton class AlertCtrl @Inject()( @@ -29,7 +27,7 @@ class AlertCtrl @Inject()( renderer: Renderer, components: ControllerComponents, fieldsBodyParser: FieldsBodyParser, - implicit val ec: ExecutionContext, + executionContextSrv: ExecutionContextSrv, implicit val mat: Materializer ) extends AbstractController(components) with Status { @@ -38,179 +36,213 @@ class AlertCtrl @Inject()( @Timed def create(): Action[Fields] = authenticated(Roles.alert).async(fieldsBodyParser) { implicit request ⇒ - alertSrv - .create( - request - .body - .unset("lastSyncDate") - .unset("case") - .unset("status") - .unset("follow") - ) - .map(alert ⇒ renderer.toOutput(CREATED, alert)) + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .create( + request + .body + .unset("lastSyncDate") + .unset("case") + .unset("status") + .unset("follow") + ) + .map(alert ⇒ renderer.toOutput(CREATED, alert)) + } } @Timed def mergeWithCase(alertId: String, caseId: String): Action[Fields] = authenticated(Roles.write).async(fieldsBodyParser) { implicit request ⇒ - for { - alert ← alertSrv.get(alertId) - caze ← caseSrv.get(caseId) - updatedCaze ← alertSrv.mergeWithCase(alert, caze) - } yield renderer.toOutput(CREATED, updatedCaze) + executionContextSrv.withCustom("longTask") { implicit ec ⇒ + for { + alert ← alertSrv.get(alertId) + caze ← caseSrv.get(caseId) + updatedCaze ← alertSrv.mergeWithCase(alert, caze) + } yield renderer.toOutput(CREATED, updatedCaze) + } } @Timed def bulkMergeWithCase: Action[Fields] = authenticated(Roles.write).async(fieldsBodyParser) { implicit request ⇒ - val caseId = request.body.getString("caseId").getOrElse(throw BadRequestError("Parameter \"caseId\" is missing")) - val alertIds = request.body.getStrings("alertIds").getOrElse(throw BadRequestError("Parameter \"alertIds\" is missing")) - for { - alerts ← Future.traverse(alertIds)(alertSrv.get) - caze ← caseSrv.get(caseId) - updatedCaze ← alertSrv.bulkMergeWithCase(alerts, caze) - } yield renderer.toOutput(CREATED, updatedCaze) + executionContextSrv.withCustom("longTask") { implicit ec ⇒ + val caseId = request.body.getString("caseId").getOrElse(throw BadRequestError("Parameter \"caseId\" is missing")) + val alertIds = request.body.getStrings("alertIds").getOrElse(throw BadRequestError("Parameter \"alertIds\" is missing")) + for { + alerts ← Future.traverse(alertIds)(alertSrv.get) + caze ← caseSrv.get(caseId) + updatedCaze ← alertSrv.bulkMergeWithCase(alerts, caze) + } yield renderer.toOutput(CREATED, updatedCaze) + } } @Timed def get(id: String): Action[AnyContent] = authenticated(Roles.read).async { implicit request ⇒ - val withStats = request - .queryString - .get("nstats") - .flatMap(_.headOption) - .exists(v ⇒ Try(v.toBoolean).getOrElse(v == "1")) - - val withSimilarity = request - .queryString - .get("similarity") - .flatMap(_.headOption) - .exists(v ⇒ Try(v.toBoolean).getOrElse(v == "1")) - - for { - alert ← alertSrv.get(id) - alertsWithStats ← auxSrv.apply(alert, 0, withStats, removeUnaudited = false) - similarCases ← if (withSimilarity) - alertSrv - .similarCases(alert) - .map(sc ⇒ Json.obj("similarCases" → Json.toJson(sc))) - else Future.successful(JsObject.empty) - similarArtifacts ← if (withSimilarity) - alertSrv - .alertArtifactsWithSeen(alert) - .map(aws ⇒ Json.obj("artifacts" → aws)) - else Future.successful(JsObject.empty) - } yield { - renderer.toOutput(OK, alertsWithStats ++ similarCases ++ similarArtifacts) + executionContextSrv.withDefault { implicit ec ⇒ + val withStats = request + .queryString + .get("nstats") + .flatMap(_.headOption) + .exists(v ⇒ Try(v.toBoolean).getOrElse(v == "1")) + + val withSimilarity = request + .queryString + .get("similarity") + .flatMap(_.headOption) + .exists(v ⇒ Try(v.toBoolean).getOrElse(v == "1")) + + for { + alert ← alertSrv.get(id) + alertsWithStats ← auxSrv.apply(alert, 0, withStats, removeUnaudited = false) + similarCases ← if (withSimilarity) + alertSrv + .similarCases(alert) + .map(sc ⇒ Json.obj("similarCases" → Json.toJson(sc))) + else Future.successful(JsObject.empty) + similarArtifacts ← if (withSimilarity) + alertSrv + .alertArtifactsWithSeen(alert) + .map(aws ⇒ Json.obj("artifacts" → aws)) + else Future.successful(JsObject.empty) + } yield { + renderer.toOutput(OK, alertsWithStats ++ similarCases ++ similarArtifacts) + } } } @Timed def update(id: String): Action[Fields] = authenticated(Roles.write).async(fieldsBodyParser) { implicit request ⇒ - alertSrv - .update(id, request.body) - .map { alert ⇒ - renderer.toOutput(OK, alert) - } + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .update(id, request.body) + .map { alert ⇒ + renderer.toOutput(OK, alert) + } + } } @Timed def bulkUpdate(): Action[Fields] = authenticated(Roles.write).async(fieldsBodyParser) { implicit request ⇒ - request.body.getStrings("ids").fold(Future.successful(Ok(JsArray()))) { ids ⇒ - alertSrv.bulkUpdate(ids, request.body.unset("ids")).map(multiResult ⇒ renderer.toMultiOutput(OK, multiResult)) + executionContextSrv.withCustom("longTask") { implicit ec ⇒ + request.body.getStrings("ids").fold(Future.successful(Ok(JsArray()))) { ids ⇒ + alertSrv.bulkUpdate(ids, request.body.unset("ids")).map(multiResult ⇒ renderer.toMultiOutput(OK, multiResult)) + } } } @Timed def delete(id: String, force: Option[Boolean]): Action[AnyContent] = authenticated(Roles.write).async { implicit request ⇒ - alertSrv - .delete(id, force.getOrElse(false)) - .map(_ ⇒ NoContent) + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .delete(id, force.getOrElse(false)) + .map(_ ⇒ NoContent) + } } @Timed def bulkDelete(): Action[Fields] = authenticated(Roles.admin).async(fieldsBodyParser) { implicit request ⇒ - request.body.getStrings("ids").fold(Future.successful(NoContent)) { ids ⇒ - Future - .traverse(ids)(alertSrv.delete(_, request.body.getBoolean("force").getOrElse(false))) - .map(_ ⇒ NoContent) + executionContextSrv.withCustom("longTask") { implicit ec ⇒ + request.body.getStrings("ids").fold(Future.successful(NoContent)) { ids ⇒ + Future + .traverse(ids)(alertSrv.delete(_, request.body.getBoolean("force").getOrElse(false))) + .map(_ ⇒ NoContent) + } } } @Timed def find(): Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request ⇒ - val query = request.body.getValue("query").fold[QueryDef](QueryDSL.any)(_.as[QueryDef]) - val range = request.body.getString("range") - val sort = request.body.getStrings("sort").getOrElse(Nil) - val nparent = request.body.getLong("nparent").getOrElse(0L).toInt - val withStats = request.body.getBoolean("nstats").getOrElse(false) - - val (alerts, total) = alertSrv.find(query, range, sort) - val alertsWithStats = auxSrv.apply(alerts, nparent, withStats, removeUnaudited = false) - renderer.toOutput(OK, alertsWithStats, total) + executionContextSrv.withDefault { implicit ec ⇒ + val query = request.body.getValue("query").fold[QueryDef](QueryDSL.any)(_.as[QueryDef]) + val range = request.body.getString("range") + val sort = request.body.getStrings("sort").getOrElse(Nil) + val nparent = request.body.getLong("nparent").getOrElse(0L).toInt + val withStats = request.body.getBoolean("nstats").getOrElse(false) + + val (alerts, total) = alertSrv.find(query, range, sort) + val alertsWithStats = auxSrv.apply(alerts, nparent, withStats, removeUnaudited = false) + renderer.toOutput(OK, alertsWithStats, total) + } } @Timed def stats(): Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request ⇒ - val query = request - .body - .getValue("query") - .fold[QueryDef](QueryDSL.any)(_.as[QueryDef]) - val aggs = request - .body - .getValue("stats") - .getOrElse(throw BadRequestError("Parameter \"stats\" is missing")) - .as[Seq[Agg]] - alertSrv.stats(query, aggs).map(s ⇒ Ok(s)) + executionContextSrv.withDefault { implicit ec ⇒ + val query = request + .body + .getValue("query") + .fold[QueryDef](QueryDSL.any)(_.as[QueryDef]) + val aggs = request + .body + .getValue("stats") + .getOrElse(throw BadRequestError("Parameter \"stats\" is missing")) + .as[Seq[Agg]] + alertSrv.stats(query, aggs).map(s ⇒ Ok(s)) + } } @Timed def markAsRead(id: String): Action[AnyContent] = authenticated(Roles.write).async { implicit request ⇒ - for { - alert ← alertSrv.get(id) - updatedAlert ← alertSrv.markAsRead(alert) - } yield renderer.toOutput(OK, updatedAlert) + executionContextSrv.withDefault { implicit ec ⇒ + for { + alert ← alertSrv.get(id) + updatedAlert ← alertSrv.markAsRead(alert) + } yield renderer.toOutput(OK, updatedAlert) + } } @Timed def markAsUnread(id: String): Action[AnyContent] = authenticated(Roles.write).async { implicit request ⇒ - for { - alert ← alertSrv.get(id) - updatedAlert ← alertSrv.markAsUnread(alert) - } yield renderer.toOutput(OK, updatedAlert) + executionContextSrv.withDefault { implicit ec ⇒ + for { + alert ← alertSrv.get(id) + updatedAlert ← alertSrv.markAsUnread(alert) + } yield renderer.toOutput(OK, updatedAlert) + } + } @Timed def createCase(id: String): Action[Fields] = authenticated(Roles.write).async(fieldsBodyParser) { implicit request ⇒ - for { - alert ← alertSrv.get(id) - customCaseTemplate = request - .body - .getString("caseTemplate") - .orElse(alert.caseTemplate()) - caze ← alertSrv.createCase(alert, customCaseTemplate) - } yield renderer.toOutput(CREATED, caze) + executionContextSrv.withDefault { implicit ec ⇒ + for { + alert ← alertSrv.get(id) + customCaseTemplate = request + .body + .getString("caseTemplate") + .orElse(alert.caseTemplate()) + caze ← alertSrv.createCase(alert, customCaseTemplate) + } yield renderer.toOutput(CREATED, caze) + } } @Timed def followAlert(id: String): Action[AnyContent] = authenticated(Roles.write).async { implicit request ⇒ - alertSrv - .setFollowAlert(id, follow = true) - .map { alert ⇒ - renderer.toOutput(OK, alert) - } + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .setFollowAlert(id, follow = true) + .map { alert ⇒ + renderer.toOutput(OK, alert) + } + } } @Timed def unfollowAlert(id: String): Action[AnyContent] = authenticated(Roles.write).async { implicit request ⇒ - alertSrv - .setFollowAlert(id, follow = false) - .map { alert ⇒ - renderer.toOutput(OK, alert) - } + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .setFollowAlert(id, follow = false) + .map { alert ⇒ + renderer.toOutput(OK, alert) + } + } } @Timed def fixStatus(): Action[AnyContent] = authenticated(Roles.admin).async { implicit request ⇒ - alertSrv - .fixStatus() - .map(_ ⇒ NoContent) + executionContextSrv.withDefault { implicit ec ⇒ + alertSrv + .fixStatus() + .map(_ ⇒ NoContent) + } } + } diff --git a/thehive-backend/app/controllers/AttachmentCtrl.scala b/thehive-backend/app/controllers/AttachmentCtrl.scala index 46dce09a56..19eb13c0ef 100644 --- a/thehive-backend/app/controllers/AttachmentCtrl.scala +++ b/thehive-backend/app/controllers/AttachmentCtrl.scala @@ -1,23 +1,21 @@ package controllers import java.nio.file.Files -import javax.inject.{Inject, Singleton} - -import play.api.http.HttpEntity -import play.api.libs.Files.DefaultTemporaryFileCreator -import play.api.mvc._ -import play.api.{mvc, Configuration} import akka.stream.scaladsl.FileIO +import javax.inject.{Inject, Singleton} +import models.Roles import net.lingala.zip4j.core.ZipFile import net.lingala.zip4j.model.ZipParameters import net.lingala.zip4j.util.Zip4jConstants -import models.Roles - import org.elastic4play.Timed -import org.elastic4play.controllers.{Authenticated, Renderer} +import org.elastic4play.controllers.Authenticated import org.elastic4play.models.AttachmentAttributeFormat -import org.elastic4play.services.AttachmentSrv +import org.elastic4play.services.{AttachmentSrv, ExecutionContextSrv} +import play.api.http.HttpEntity +import play.api.libs.Files.DefaultTemporaryFileCreator +import play.api.mvc._ +import play.api.{mvc, Configuration} /** * Controller used to access stored attachments (plain or zipped) @@ -29,7 +27,7 @@ class AttachmentCtrl( attachmentSrv: AttachmentSrv, authenticated: Authenticated, components: ControllerComponents, - renderer: Renderer + executionContextSrv: ExecutionContextSrv ) extends AbstractController(components) { @Inject() def this( @@ -38,9 +36,16 @@ class AttachmentCtrl( attachmentSrv: AttachmentSrv, authenticated: Authenticated, components: ControllerComponents, - renderer: Renderer + executionContextSrv: ExecutionContextSrv ) = - this(configuration.get[String]("datastore.attachment.password"), tempFileCreator, attachmentSrv, authenticated, components, renderer) + this( + configuration.get[String]("datastore.attachment.password"), + tempFileCreator, + attachmentSrv, + authenticated, + components, + executionContextSrv + ) /** * Download an attachment, identified by its hash, in plain format @@ -49,18 +54,20 @@ class AttachmentCtrl( */ @Timed("controllers.AttachmentCtrl.download") def download(hash: String, name: Option[String]): Action[AnyContent] = authenticated(Roles.read) { implicit request ⇒ - if (hash.startsWith("{{")) // angularjs hack - NoContent - else if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) - mvc.Results.BadRequest("File name is invalid") - else - Result( - header = ResponseHeader( - 200, - Map("Content-Disposition" → s"""attachment; filename="${name.getOrElse(hash)}"""", "Content-Transfer-Encoding" → "binary") - ), - body = HttpEntity.Streamed(attachmentSrv.source(hash), None, None) - ) + executionContextSrv.withDefault { implicit ec ⇒ + if (hash.startsWith("{{")) // angularjs hack + NoContent + else if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + mvc.Results.BadRequest("File name is invalid") + else + Result( + header = ResponseHeader( + 200, + Map("Content-Disposition" → s"""attachment; filename="${name.getOrElse(hash)}"""", "Content-Transfer-Encoding" → "binary") + ), + body = HttpEntity.Streamed(attachmentSrv.source(hash), None, None) + ) + } } /** @@ -70,33 +77,35 @@ class AttachmentCtrl( */ @Timed("controllers.AttachmentCtrl.downloadZip") def downloadZip(hash: String, name: Option[String]): Action[AnyContent] = authenticated(Roles.read) { implicit request ⇒ - if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) - BadRequest("File name is invalid") - else { - val f = tempFileCreator.create("zip", hash).path - Files.delete(f) - val zipFile = new ZipFile(f.toFile) - val zipParams = new ZipParameters - zipParams.setCompressionLevel(Zip4jConstants.DEFLATE_LEVEL_FASTEST) - zipParams.setEncryptFiles(true) - zipParams.setEncryptionMethod(Zip4jConstants.ENC_METHOD_STANDARD) - zipParams.setPassword(password) - zipParams.setFileNameInZip(name.getOrElse(hash)) - zipParams.setSourceExternalStream(true) - zipFile.addStream(attachmentSrv.stream(hash), zipParams) + executionContextSrv.withDefault { implicit ec ⇒ + if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + BadRequest("File name is invalid") + else { + val f = tempFileCreator.create("zip", hash).path + Files.delete(f) + val zipFile = new ZipFile(f.toFile) + val zipParams = new ZipParameters + zipParams.setCompressionLevel(Zip4jConstants.DEFLATE_LEVEL_FASTEST) + zipParams.setEncryptFiles(true) + zipParams.setEncryptionMethod(Zip4jConstants.ENC_METHOD_STANDARD) + zipParams.setPassword(password) + zipParams.setFileNameInZip(name.getOrElse(hash)) + zipParams.setSourceExternalStream(true) + zipFile.addStream(attachmentSrv.stream(hash), zipParams) - Result( - header = ResponseHeader( - 200, - Map( - "Content-Disposition" → s"""attachment; filename="${name.getOrElse(hash)}.zip"""", - "Content-Type" → "application/zip", - "Content-Transfer-Encoding" → "binary", - "Content-Length" → Files.size(f).toString - ) - ), - body = HttpEntity.Streamed(FileIO.fromPath(f), Some(Files.size(f)), Some("application/zip")) - ) + Result( + header = ResponseHeader( + 200, + Map( + "Content-Disposition" → s"""attachment; filename="${name.getOrElse(hash)}.zip"""", + "Content-Type" → "application/zip", + "Content-Transfer-Encoding" → "binary", + "Content-Length" → Files.size(f).toString + ) + ), + body = HttpEntity.Streamed(FileIO.fromPath(f), Some(Files.size(f)), Some("application/zip")) + ) + } } } } diff --git a/thehive-backend/app/global/TheHive.scala b/thehive-backend/app/global/TheHive.scala index 28d2a421f6..276aa661eb 100644 --- a/thehive-backend/app/global/TheHive.scala +++ b/thehive-backend/app/global/TheHive.scala @@ -79,7 +79,7 @@ class TheHive(environment: Environment, val configuration: Configuration) extend bind[AuthSrv].to[TheHiveAuthSrv] bind[UserMapper].to[MultiUserMapperSrv] - bindActor[AuditActor]("AuditActor") + bindActor[AuditActor]("AuditActor", props = _.withDispatcher("auditTask")) bindActor[LocalStreamActor]("localStreamActor") if (environment.mode == Mode.Prod) diff --git a/thehive-backend/app/services/AlertSrv.scala b/thehive-backend/app/services/AlertSrv.scala index 79755cd5c3..9d65b98af5 100644 --- a/thehive-backend/app/services/AlertSrv.scala +++ b/thehive-backend/app/services/AlertSrv.scala @@ -2,21 +2,12 @@ package services import java.nio.file.Files -import scala.collection.immutable -import scala.concurrent.{ExecutionContext, Future} -import scala.util.matching.Regex -import scala.util.{Failure, Success, Try} - -import play.api.libs.json._ -import play.api.{Configuration, Logger} - import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import connectors.ConnectorRouter import javax.inject.{Inject, Singleton} import models._ - import org.elastic4play.controllers.{Fields, FileInputValue} import org.elastic4play.database.ModifyConfig import org.elastic4play.services.JsonFormat.attachmentFormat @@ -24,11 +15,18 @@ import org.elastic4play.services.QueryDSL.{groupByField, parent, selectCount, wi import org.elastic4play.services._ import org.elastic4play.utils.Collection import org.elastic4play.{ConflictError, InternalError} +import play.api.Logger +import play.api.libs.json._ + +import scala.collection.immutable +import scala.concurrent.{ExecutionContext, Future} +import scala.util.matching.Regex +import scala.util.{Failure, Success, Try} trait AlertTransformer { - def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext): Future[Case] + def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] - def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] + def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] } case class CaseSimilarity(caze: Case, similarIOCCount: Int, iocCount: Int, similarArtifactCount: Int, artifactCount: Int) @@ -38,8 +36,7 @@ object AlertSrv { } @Singleton -class AlertSrv( - templates: Map[String, String], +class AlertSrv @Inject()( alertModel: AlertModel, createSrv: CreateSrv, getSrv: GetSrv, @@ -51,50 +48,14 @@ class AlertSrv( caseTemplateSrv: CaseTemplateSrv, attachmentSrv: AttachmentSrv, connectors: ConnectorRouter, - hashAlg: Seq[String], - implicit val ec: ExecutionContext, implicit val mat: Materializer ) extends AlertTransformer { - @Inject() def this( - configuration: Configuration, - alertModel: AlertModel, - createSrv: CreateSrv, - getSrv: GetSrv, - updateSrv: UpdateSrv, - deleteSrv: DeleteSrv, - findSrv: FindSrv, - caseSrv: CaseSrv, - artifactSrv: ArtifactSrv, - caseTemplateSrv: CaseTemplateSrv, - attachmentSrv: AttachmentSrv, - connectors: ConnectorRouter, - ec: ExecutionContext, - mat: Materializer - ) = - this( - Map.empty[String, String], - alertModel: AlertModel, - createSrv, - getSrv, - updateSrv, - deleteSrv, - findSrv, - caseSrv, - artifactSrv, - caseTemplateSrv, - attachmentSrv, - connectors, - (configuration.get[String]("datastore.hash.main") +: configuration.get[Seq[String]]("datastore.hash.extra")).distinct, - ec, - mat - ) - private[AlertSrv] lazy val logger = Logger(getClass) import AlertSrv._ - def create(fields: Fields)(implicit authContext: AuthContext): Future[Alert] = { + def create(fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = { val artifactsFields = Future.traverse(fields.getValues("artifacts")) { @@ -139,32 +100,32 @@ class AlertSrv( } } - def bulkCreate(fieldSet: Seq[Fields])(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = + def bulkCreate(fieldSet: Seq[Fields])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Alert]]] = createSrv[AlertModel, Alert](alertModel, fieldSet) - def get(id: String): Future[Alert] = + def get(id: String)(implicit ec: ExecutionContext): Future[Alert] = getSrv[AlertModel, Alert](alertModel, id) - def get(tpe: String, source: String, sourceRef: String): Future[Option[Alert]] = { + def get(tpe: String, source: String, sourceRef: String)(implicit ec: ExecutionContext): Future[Option[Alert]] = { import org.elastic4play.services.QueryDSL._ findSrv[AlertModel, Alert](alertModel, and("type" ~= tpe, "source" ~= source, "sourceRef" ~= sourceRef), Some("0-1"), Nil) ._1 .runWith(Sink.headOption) } - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Alert] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Alert] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = for { alert ← get(id) updatedAlert ← update(alert, fields, modifyConfig) } yield updatedAlert - def update(alert: Alert, fields: Fields)(implicit authContext: AuthContext): Future[Alert] = + def update(alert: Alert, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = update(alert, fields, ModifyConfig.default) - def update(alert: Alert, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Alert] = { + def update(alert: Alert, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = { val follow = fields.getBoolean("follow").getOrElse(alert.follow()) val newStatus = if (follow && alert.status() != AlertStatus.New) AlertStatus.Updated else alert.status() val updatedAlert = updateSrv(alert, fields.set("status", Json.toJson(newStatus)), modifyConfig) @@ -180,31 +141,43 @@ class AlertSrv( } } - def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = + def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Alert]]] = bulkUpdate(ids, fields, ModifyConfig.default) - def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = + def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig)( + implicit authContext: AuthContext, + ec: ExecutionContext + ): Future[Seq[Try[Alert]]] = updateSrv[AlertModel, Alert](alertModel, ids, fields, modifyConfig) - def bulkUpdate(updates: Seq[(Alert, Fields)])(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = + def bulkUpdate(updates: Seq[(Alert, Fields)])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Alert]]] = bulkUpdate(updates, ModifyConfig.default) - def bulkUpdate(updates: Seq[(Alert, Fields)], modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = + def bulkUpdate( + updates: Seq[(Alert, Fields)], + modifyConfig: ModifyConfig + )(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Alert]]] = updateSrv[Alert](updates, modifyConfig) - def markAsRead(alert: Alert, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = + def markAsRead( + alert: Alert, + modifyConfig: ModifyConfig = ModifyConfig.default + )(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = alert.caze() match { case Some(_) ⇒ updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Imported"), modifyConfig) case None ⇒ updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Ignored"), modifyConfig) } - def markAsUnread(alert: Alert, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = + def markAsUnread( + alert: Alert, + modifyConfig: ModifyConfig = ModifyConfig.default + )(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = alert.caze() match { case Some(_) ⇒ updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "Updated"), modifyConfig) case None ⇒ updateSrv[AlertModel, Alert](alertModel, alert.id, Fields.empty.set("status", "New"), modifyConfig) } - def getCaseTemplate(alert: Alert, customCaseTemplate: Option[String]): Future[Option[CaseTemplate]] = + def getCaseTemplate(alert: Alert, customCaseTemplate: Option[String])(implicit ec: ExecutionContext): Future[Option[CaseTemplate]] = customCaseTemplate.fold[Future[Option[CaseTemplate]]](Future.successful(None)) { templateName ⇒ caseTemplateSrv .getByName(templateName) @@ -214,7 +187,7 @@ class AlertSrv( .recover { case _ ⇒ None } } - def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext): Future[Case] = + def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = alert.caze() match { case Some(id) ⇒ caseSrv.get(id) case None ⇒ @@ -246,7 +219,7 @@ class AlertSrv( } } - override def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] = + override def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = alert.caze() match { case Some(id) ⇒ caseSrv.get(id) case None ⇒ @@ -274,7 +247,7 @@ class AlertSrv( } } - def bulkMergeWithCase(alerts: Seq[Alert], caze: Case)(implicit authContext: AuthContext): Future[Case] = + def bulkMergeWithCase(alerts: Seq[Alert], caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = Future .traverse(alerts) { alert ⇒ for { @@ -296,7 +269,7 @@ class AlertSrv( ) } - def importArtifacts(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] = { + def importArtifacts(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = { val artifactsFields = alert .artifacts() .flatMap { artifact ⇒ @@ -386,10 +359,16 @@ class AlertSrv( updatedCase } - def setCase(alert: Alert, caze: Case, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = + def setCase(alert: Alert, caze: Case, modifyConfig: ModifyConfig = ModifyConfig.default)( + implicit authContext: AuthContext, + ec: ExecutionContext + ): Future[Alert] = updateSrv(alert, Fields(Json.obj("case" → caze.id, "status" → AlertStatus.Imported)), modifyConfig) - def unsetCase(alert: Alert, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Alert] = { + def unsetCase( + alert: Alert, + modifyConfig: ModifyConfig = ModifyConfig.default + )(implicit authContext: AuthContext, ec: ExecutionContext): Future[Alert] = { val status = alert.status match { case AlertStatus.New ⇒ AlertStatus.New case AlertStatus.Updated ⇒ AlertStatus.New @@ -400,21 +379,22 @@ class AlertSrv( updateSrv(alert, Fields(Json.obj("case" → JsNull, "status" → status)), modifyConfig) } - def delete(id: String, force: Boolean)(implicit authContext: AuthContext): Future[Unit] = + def delete(id: String, force: Boolean)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = if (force) deleteSrv.realDelete[AlertModel, Alert](alertModel, id) else get(id).flatMap(alert ⇒ markAsUnread(alert)).map(_ ⇒ ()) - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Alert, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Alert, NotUsed], Future[Long]) = findSrv[AlertModel, Alert](alertModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(alertModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(alertModel, queryDef, aggs: _*) def setFollowAlert(alertId: String, follow: Boolean, modifyConfig: ModifyConfig = ModifyConfig.default)( - implicit authContext: AuthContext + implicit authContext: AuthContext, + ec: ExecutionContext ): Future[Alert] = updateSrv[AlertModel, Alert](alertModel, alertId, Fields(Json.obj("follow" → follow)), modifyConfig) - def similarCases(alert: Alert): Future[Seq[CaseSimilarity]] = { + def similarCases(alert: Alert)(implicit ec: ExecutionContext): Future[Seq[CaseSimilarity]] = { def similarArtifacts(artifact: JsObject): Option[Source[Artifact, NotUsed]] = for { dataType ← (artifact \ "dataType").asOpt[String] @@ -457,7 +437,7 @@ class AlertSrv( .runWith(Sink.seq) } - def getArtifactSeen(artifact: JsObject): Future[Long] = { + def getArtifactSeen(artifact: JsObject)(implicit ec: ExecutionContext): Future[Long] = { val maybeArtifactSeen = for { dataType ← (artifact \ "dataType").asOpt[String] data ← dataType match { @@ -469,12 +449,12 @@ class AlertSrv( maybeArtifactSeen.getOrElse(Future.successful(0L)) } - def alertArtifactsWithSeen(alert: Alert): Future[Seq[JsObject]] = + def alertArtifactsWithSeen(alert: Alert)(implicit ec: ExecutionContext): Future[Seq[JsObject]] = Future.traverse(alert.artifacts()) { artifact ⇒ getArtifactSeen(artifact).map(seen ⇒ artifact + ("seen" → JsNumber(seen))) } - def fixStatus()(implicit authContext: AuthContext): Future[Unit] = { + def fixStatus()(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = { import org.elastic4play.services.QueryDSL._ val updatedStatusFields = Fields.empty.set("status", "Updated") diff --git a/thehive-backend/app/services/ArtifactSrv.scala b/thehive-backend/app/services/ArtifactSrv.scala index d012dd2610..2f9ccff25e 100644 --- a/thehive-backend/app/services/ArtifactSrv.scala +++ b/thehive-backend/app/services/ArtifactSrv.scala @@ -35,26 +35,25 @@ class ArtifactSrv @Inject()( findSrv: FindSrv, fieldsSrv: FieldsSrv, dbRemove: DBRemove, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { private[ArtifactSrv] lazy val logger = Logger(getClass) - def create(caseId: String, fields: Fields)(implicit authContext: AuthContext): Future[Artifact] = + def create(caseId: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Artifact] = getSrv[CaseModel, Case](caseModel, caseId) .flatMap { caze ⇒ create(caze, fields) } - def create(caze: Case, fields: Fields)(implicit authContext: AuthContext): Future[Artifact] = + def create(caze: Case, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Artifact] = createSrv[ArtifactModel, Artifact, Case](artifactModel, caze, fields) .recoverWith { case _: ConflictError ⇒ updateIfDeleted(caze, fields) // if the artifact already exists, search it and update it } private def updateIfDeleted(caze: Case, fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)( - implicit authContext: AuthContext + implicit authContext: AuthContext, ec: ExecutionContext ): Future[Artifact] = fieldsSrv.parse(fields, artifactModel).toFuture.flatMap { attrs ⇒ val updatedArtifact = for { @@ -76,13 +75,13 @@ class ArtifactSrv @Inject()( } } - def create(caseId: String, fieldSet: Seq[Fields])(implicit authContext: AuthContext): Future[Seq[Try[Artifact]]] = + def create(caseId: String, fieldSet: Seq[Fields])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Artifact]]] = getSrv[CaseModel, Case](caseModel, caseId) .flatMap { caze ⇒ create(caze, fieldSet) } - def create(caze: Case, fieldSet: Seq[Fields])(implicit authContext: AuthContext): Future[Seq[Try[Artifact]]] = + def create(caze: Case, fieldSet: Seq[Fields])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Artifact]]] = createSrv[ArtifactModel, Artifact, Case](artifactModel, fieldSet.map(caze → _)) .flatMap { // if there is failure @@ -94,21 +93,21 @@ class ArtifactSrv @Inject()( case t ⇒ Future.successful(t) } - def get(id: String): Future[Artifact] = + def get(id: String)(implicit ec: ExecutionContext): Future[Artifact] = getSrv[ArtifactModel, Artifact](artifactModel, id) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext): Future[Artifact] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Artifact] = updateSrv[ArtifactModel, Artifact](artifactModel, id, fields, modifyConfig) def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)( - implicit authContext: AuthContext + implicit authContext: AuthContext, ec: ExecutionContext ): Future[Seq[Try[Artifact]]] = updateSrv.apply[ArtifactModel, Artifact](artifactModel, ids, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[Artifact] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Artifact] = deleteSrv[ArtifactModel, Artifact](artifactModel, id) - def realDelete(artifact: Artifact): Future[Unit] = + def realDelete(artifact: Artifact)(implicit ec: ExecutionContext): Future[Unit] = for { _ ← auditSrv .findFor(artifact, Some("all"), Nil) @@ -119,19 +118,19 @@ class ArtifactSrv @Inject()( _ ← dbRemove(artifact) } yield () - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Artifact, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Artifact, NotUsed], Future[Long]) = findSrv[ArtifactModel, Artifact](artifactModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(artifactModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(artifactModel, queryDef, aggs: _*) - def isSeen(artifact: Artifact): Future[Long] = { + def isSeen(artifact: Artifact)(implicit ec: ExecutionContext): Future[Long] = { import org.elastic4play.services.QueryDSL._ findSrv(artifactModel, similarArtifactFilter(artifact), selectCount).map { stats ⇒ (stats \ "count").asOpt[Long].getOrElse(1L) } } - def findSimilar(artifact: Artifact, range: Option[String], sortBy: Seq[String]): (Source[Artifact, NotUsed], Future[Long]) = + def findSimilar(artifact: Artifact, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Artifact, NotUsed], Future[Long]) = find(similarArtifactFilter(artifact), range, sortBy) def findSimilar( @@ -140,7 +139,7 @@ class ArtifactSrv @Inject()( filter: Option[QueryDef], range: Option[String], sortBy: Seq[String] - ): (Source[Artifact, NotUsed], Future[Long]) = + )(implicit ec: ExecutionContext): (Source[Artifact, NotUsed], Future[Long]) = find(similarArtifactFilter(dataType, data, filter.getOrElse(org.elastic4play.services.QueryDSL.any)), range, sortBy) private[services] def similarArtifactFilter(artifact: Artifact): QueryDef = { diff --git a/thehive-backend/app/services/AuditSrv.scala b/thehive-backend/app/services/AuditSrv.scala index bd4c147c24..d4344eebba 100644 --- a/thehive-backend/app/services/AuditSrv.scala +++ b/thehive-backend/app/services/AuditSrv.scala @@ -24,7 +24,7 @@ trait AuditedModel { self: BaseModelDef ⇒ lazy val auditedAttributes: Map[String, Attribute[_]] = attributes.collect { case a if !a.isUnaudited ⇒ a.attributeName → a }.toMap - def selectAuditedAttributes(attrs: JsObject) = JsObject { + def selectAuditedAttributes(attrs: JsObject): JsObject = JsObject { attrs.fields.flatMap { case (attrName, value) ⇒ val attrNames = attrName.split("\\.").toSeq @@ -43,12 +43,11 @@ class AuditSrv @Inject()( auxSrv: AuxSrv, dBRemove: DBRemove, findSrv: FindSrv, - implicit val ec: ExecutionContext ) { private[AuditSrv] lazy val logger = Logger(getClass) - def apply(rootId: Option[String], count: Int): (Source[JsObject, NotUsed], Future[Long]) = { + def apply(rootId: Option[String], count: Int)(implicit ec: ExecutionContext): (Source[JsObject, NotUsed], Future[Long]) = { import org.elastic4play.services.QueryDSL._ val streamableEntities = modelSrv.list.collect { @@ -83,26 +82,25 @@ class AuditSrv @Inject()( (entities, total) } - def realDelete(audit: Audit): Future[Unit] = + def realDelete(audit: Audit)(implicit ec: ExecutionContext): Future[Unit] = dBRemove(audit).map(_ ⇒ ()) - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Audit, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Audit, NotUsed], Future[Long]) = findSrv[AuditModel, Audit](auditModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(auditModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(auditModel, queryDef, aggs: _*) - def findFor(entity: BaseEntity, range: Option[String], sortBy: Seq[String]): (Source[Audit, NotUsed], Future[Long]) = { + def findFor(entity: BaseEntity, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Audit, NotUsed], Future[Long]) = { import org.elastic4play.services.QueryDSL._ findSrv[AuditModel, Audit](auditModel, and("objectId" ~= entity.id, "objectType" ~= entity.model.modelName), range, sortBy) } } @Singleton -class AuditActor @Inject()(auditModel: AuditModel, createSrv: CreateSrv, eventSrv: EventSrv, webHooks: WebHooks, implicit val ec: ExecutionContext) - extends Actor { +class AuditActor @Inject()(auditModel: AuditModel, createSrv: CreateSrv, eventSrv: EventSrv, webHooks: WebHooks) extends Actor { object EntityExtractor { - def unapply(e: BaseEntity) = Some((e.model, e.id, e.routing)) + def unapply(e: BaseEntity): Option[(BaseModelDef, String, String)] = Some((e.model, e.id, e.routing)) } var currentRequestIds = Set.empty[String] private[AuditActor] lazy val logger = Logger(getClass) @@ -133,9 +131,9 @@ class AuditActor @Inject()(auditModel: AuditModel, createSrv: CreateSrv, eventSr "requestId" → requestId ) - createSrv[AuditModel, Audit](auditModel, Fields(audit))(authContext) + createSrv[AuditModel, Audit](auditModel, Fields(audit))(authContext, context.dispatcher) .failed - .foreach(t ⇒ logger.error("Audit error", t)) + .foreach(t ⇒ logger.error("Audit error", t))(context.dispatcher) currentRequestIds = currentRequestIds + requestId webHooks.send(audit) diff --git a/thehive-backend/app/services/CaseMergeSrv.scala b/thehive-backend/app/services/CaseMergeSrv.scala index 896e377b5a..5aec65817c 100644 --- a/thehive-backend/app/services/CaseMergeSrv.scala +++ b/thehive-backend/app/services/CaseMergeSrv.scala @@ -29,7 +29,6 @@ class CaseMergeSrv @Inject()( logSrv: LogSrv, artifactSrv: ArtifactSrv, eventSrv: EventSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { @@ -136,7 +135,7 @@ class CaseMergeSrv @Inject()( private[services] def baseFields(entity: BaseEntity): Fields = Fields(entity.attributes - "_id" - "_routing" - "_parent" - "_type" - "_version" - "createdBy" - "createdAt" - "updatedBy" - "updatedAt" - "user") - private[services] def mergeLogs(oldTask: Task, newTask: Task)(implicit authContext: AuthContext): Future[Done] = + private[services] def mergeLogs(oldTask: Task, newTask: Task)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Done] = logSrv .find(withParent("case_task", oldTask.id), Some("all"), Nil) ._1 @@ -145,7 +144,7 @@ class CaseMergeSrv @Inject()( } .runWith(Sink.ignore) - private[services] def mergeTasksAndLogs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext): Future[Done] = { + private[services] def mergeTasksAndLogs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Done] = { val (tasks, futureTaskCount) = taskSrv.find( and(parent("case", withId(cases.map(_.id): _*)), "status" ~!= TaskStatus.Cancel, "status" ~!= TaskStatus.Waiting), Some("all"), @@ -209,7 +208,7 @@ class CaseMergeSrv @Inject()( JsString(status) } - private[services] def mergeArtifactsAndJobs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext): Future[Done] = { + private[services] def mergeArtifactsAndJobs(newCase: Case, cases: Seq[Case])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Done] = { val caseMap = cases.map(c ⇒ c.id → c).toMap val caseFilter = and(parent("case", withId(cases.map(_.id): _*)), "status" ~= "Ok") // Find artifacts hold by cases @@ -268,7 +267,7 @@ class CaseMergeSrv @Inject()( } } - private[services] def mergeCases(cases: Seq[Case])(implicit authContext: AuthContext): Future[Case] = { + private[services] def mergeCases(cases: Seq[Case])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = { logger.info("Merging cases: " + cases.map(c ⇒ s"#${c.caseId()}:${c.title()}").mkString(" / ")) val fields = Fields .empty @@ -289,7 +288,7 @@ class CaseMergeSrv @Inject()( caseSrv.create(fields) } - def markCaseAsDuplicated(cases: Seq[Case], mergeCase: Case)(implicit authContext: AuthContext): Future[Done] = + def markCaseAsDuplicated(cases: Seq[Case], mergeCase: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Done] = Future .traverse(cases) { caze ⇒ val s = s"Merge into : ${mergeCase.title()} ([#${mergeCase.caseId()}](#/case/${mergeCase.id}/details))" @@ -311,7 +310,7 @@ class CaseMergeSrv @Inject()( Done } - def merge(caseIds: String*)(implicit authContext: AuthContext): Future[Case] = + def merge(caseIds: String*)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = for { cases ← Future.sequence(caseIds.map(caseSrv.get)) newCase ← mergeCases(cases) diff --git a/thehive-backend/app/services/CaseSrv.scala b/thehive-backend/app/services/CaseSrv.scala index 5c0ca170da..a2237005f1 100644 --- a/thehive-backend/app/services/CaseSrv.scala +++ b/thehive-backend/app/services/CaseSrv.scala @@ -31,7 +31,6 @@ class CaseSrv @Inject()( updateSrv: UpdateSrv, deleteSrv: DeleteSrv, findSrv: FindSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { @@ -59,7 +58,7 @@ class CaseSrv @Inject()( .set("customFields", customFields) } - def create(fields: Fields, template: Option[CaseTemplate] = None)(implicit authContext: AuthContext): Future[Case] = { + def create(fields: Fields, template: Option[CaseTemplate] = None)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = { val fieldsWithOwner = fields.getString("owner") match { case None ⇒ fields.set("owner", authContext.userId) case Some(_) ⇒ fields @@ -79,33 +78,33 @@ class CaseSrv @Inject()( } } - def get(id: String): Future[Case] = + def get(id: String)(implicit ec: ExecutionContext): Future[Case] = getSrv[CaseModel, Case](caseModel, id) - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Case] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Case] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = updateSrv[CaseModel, Case](caseModel, id, fields, modifyConfig) - def update(caze: Case, fields: Fields)(implicit authContext: AuthContext): Future[Case] = + def update(caze: Case, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = update(caze, fields, ModifyConfig.default) - def update(caze: Case, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Case] = + def update(caze: Case, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = updateSrv(caze, fields, modifyConfig) def bulkUpdate(ids: Seq[String], fields: Fields, modifyConfig: ModifyConfig = ModifyConfig.default)( - implicit authContext: AuthContext + implicit authContext: AuthContext, ec: ExecutionContext ): Future[Seq[Try[Case]]] = updateSrv[CaseModel, Case](caseModel, ids, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[Case] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = deleteSrv[CaseModel, Case](caseModel, id) - def realDelete(id: String)(implicit authContext: AuthContext): Future[Unit] = + def realDelete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = get(id).flatMap(realDelete) - def realDelete(caze: Case)(implicit authContext: AuthContext): Future[Unit] = { + def realDelete(caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = { import org.elastic4play.services.QueryDSL._ for { _ ← taskSrv @@ -131,12 +130,12 @@ class CaseSrv @Inject()( } yield () } - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Case, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Case, NotUsed], Future[Long]) = findSrv[CaseModel, Case](caseModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(caseModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(caseModel, queryDef, aggs: _*) - def getStats(id: String): Future[JsObject] = { + def getStats(id: String)(implicit ec: ExecutionContext): Future[JsObject] = { import org.elastic4play.services.QueryDSL._ for { taskStats ← taskSrv.stats( @@ -147,7 +146,7 @@ class CaseSrv @Inject()( } yield Json.obj(("tasks", taskStats), ("artifacts", artifactStats)) } - def linkedCases(id: String): Source[(Case, Seq[Artifact]), NotUsed] = { + def linkedCases(id: String)(implicit ec: ExecutionContext): Source[(Case, Seq[Artifact]), NotUsed] = { import org.elastic4play.services.QueryDSL._ findSrv[ArtifactModel, Artifact]( artifactModel, diff --git a/thehive-backend/app/services/CaseTemplateSrv.scala b/thehive-backend/app/services/CaseTemplateSrv.scala index ef7830f8e3..c9876da2f0 100644 --- a/thehive-backend/app/services/CaseTemplateSrv.scala +++ b/thehive-backend/app/services/CaseTemplateSrv.scala @@ -22,17 +22,16 @@ class CaseTemplateSrv @Inject()( updateSrv: UpdateSrv, deleteSrv: DeleteSrv, findSrv: FindSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { - def create(fields: Fields)(implicit authContext: AuthContext): Future[CaseTemplate] = + def create(fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[CaseTemplate] = createSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, fields) - def get(id: String): Future[CaseTemplate] = + def get(id: String)(implicit ec: ExecutionContext): Future[CaseTemplate] = getSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, id) - def getByName(name: String): Future[CaseTemplate] = { + def getByName(name: String)(implicit ec: ExecutionContext): Future[CaseTemplate] = { import org.elastic4play.services.QueryDSL._ findSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, "name" ~= name, Some("0-1"), Nil) ._1 @@ -40,15 +39,15 @@ class CaseTemplateSrv @Inject()( .map(_.getOrElse(throw NotFoundError(s"Case template $name not found"))) } - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[CaseTemplate] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[CaseTemplate] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[CaseTemplate] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[CaseTemplate] = updateSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, id, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[Unit] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = deleteSrv.realDelete[CaseTemplateModel, CaseTemplate](caseTemplateModel, id) - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[CaseTemplate, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[CaseTemplate, NotUsed], Future[Long]) = findSrv[CaseTemplateModel, CaseTemplate](caseTemplateModel, queryDef, range, sortBy) } diff --git a/thehive-backend/app/services/DashboardSrv.scala b/thehive-backend/app/services/DashboardSrv.scala index 34f654bd68..710da6a14b 100644 --- a/thehive-backend/app/services/DashboardSrv.scala +++ b/thehive-backend/app/services/DashboardSrv.scala @@ -22,35 +22,34 @@ class DashboardSrv @Inject()( getSrv: GetSrv, updateSrv: UpdateSrv, deleteSrv: DeleteSrv, - findSrv: FindSrv, - implicit val ec: ExecutionContext + findSrv: FindSrv ) { private[DashboardSrv] lazy val logger = Logger(getClass) - def create(fields: Fields)(implicit authContext: AuthContext): Future[Dashboard] = + def create(fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = createSrv[DashboardModel, Dashboard](dashboardModel, fields) - def get(id: String): Future[Dashboard] = + def get(id: String)(implicit ec: ExecutionContext): Future[Dashboard] = getSrv[DashboardModel, Dashboard](dashboardModel, id) - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Dashboard] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Dashboard] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = updateSrv[DashboardModel, Dashboard](dashboardModel, id, fields, modifyConfig) - def update(dashboard: Dashboard, fields: Fields)(implicit authContext: AuthContext): Future[Dashboard] = + def update(dashboard: Dashboard, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = update(dashboard, fields, ModifyConfig.default) - def update(dashboard: Dashboard, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Dashboard] = + def update(dashboard: Dashboard, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = updateSrv(dashboard, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[Dashboard] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Dashboard] = deleteSrv[DashboardModel, Dashboard](dashboardModel, id) - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Dashboard, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Dashboard, NotUsed], Future[Long]) = findSrv[DashboardModel, Dashboard](dashboardModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(dashboardModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(dashboardModel, queryDef, aggs: _*) } diff --git a/thehive-backend/app/services/LogSrv.scala b/thehive-backend/app/services/LogSrv.scala index 114669b709..214c383cdc 100644 --- a/thehive-backend/app/services/LogSrv.scala +++ b/thehive-backend/app/services/LogSrv.scala @@ -27,36 +27,35 @@ class LogSrv @Inject()( dbRemove: DBRemove, attachmentSrv: AttachmentSrv, findSrv: FindSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { lazy val taskSrv: TaskSrv = taskSrvProvider.get - def create(taskId: String, fields: Fields)(implicit authContext: AuthContext): Future[Log] = + def create(taskId: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Log] = getSrv[TaskModel, Task](taskModel, taskId) .flatMap { task ⇒ create(task, fields) } - def create(task: Task, fields: Fields)(implicit authContext: AuthContext): Future[Log] = { + def create(task: Task, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Log] = { if (task.status() == TaskStatus.Waiting) taskSrv.update(task, Fields.empty.set("status", TaskStatus.InProgress.toString)) createSrv[LogModel, Log, Task](logModel, task, fields.addIfAbsent("owner", authContext.userId)) } - def get(id: String): Future[Log] = + def get(id: String)(implicit ec: ExecutionContext): Future[Log] = getSrv[LogModel, Log](logModel, id) - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Log] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Log] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Log] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Log] = updateSrv[LogModel, Log](logModel, id, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[Log] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Log] = deleteSrv[LogModel, Log](logModel, id) - def realDelete(log: Log): Future[Unit] = + def realDelete(log: Log)(implicit ec: ExecutionContext): Future[Unit] = for { _ ← auditSrv .findFor(log, Some("all"), Nil) @@ -72,8 +71,8 @@ class LogSrv @Inject()( _ ← dbRemove(log) } yield () - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Log, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Log, NotUsed], Future[Long]) = findSrv[LogModel, Log](logModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, agg: Seq[Agg]): Future[JsObject] = findSrv(logModel, queryDef, agg: _*) + def stats(queryDef: QueryDef, agg: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(logModel, queryDef, agg: _*) } diff --git a/thehive-backend/app/services/TaskSrv.scala b/thehive-backend/app/services/TaskSrv.scala index 015d7e3235..d9c8f755ae 100644 --- a/thehive-backend/app/services/TaskSrv.scala +++ b/thehive-backend/app/services/TaskSrv.scala @@ -28,44 +28,43 @@ class TaskSrv @Inject()( dbRemove: DBRemove, findSrv: FindSrv, logSrv: LogSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) { - def create(caseId: String, fields: Fields)(implicit authContext: AuthContext): Future[Task] = + def create(caseId: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = getSrv[CaseModel, Case](caseModel, caseId) .flatMap { caze ⇒ create(caze, fields) } - def create(caze: Case, fields: Fields)(implicit authContext: AuthContext): Future[Task] = + def create(caze: Case, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = createSrv[TaskModel, Task, Case](taskModel, caze, fields) - def create(caseId: String, fields: Seq[Fields])(implicit authContext: AuthContext): Future[Seq[Try[Task]]] = + def create(caseId: String, fields: Seq[Fields])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Task]]] = getSrv[CaseModel, Case](caseModel, caseId) .flatMap { caze ⇒ create(caze, fields) } - def create(caze: Case, fields: Seq[Fields])(implicit authContext: AuthContext): Future[Seq[Try[Task]]] = + def create(caze: Case, fields: Seq[Fields])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Task]]] = createSrv[TaskModel, Task, Case](taskModel, fields.map(caze → _)) - def get(id: String): Future[Task] = + def get(id: String)(implicit ec: ExecutionContext): Future[Task] = getSrv[TaskModel, Task](taskModel, id) - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Task] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Task] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = getSrv[TaskModel, Task](taskModel, id) .flatMap { task ⇒ update(task, fields, modifyConfig) } - def update(task: Task, fields: Fields)(implicit authContext: AuthContext): Future[Task] = + def update(task: Task, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = update(task, fields, ModifyConfig.default) - def update(task: Task, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[Task] = { + def update(task: Task, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = { // if update status from waiting to something else and owner is not set, then set owner to user val f = if (task.status() == TaskStatus.Waiting && @@ -77,7 +76,7 @@ class TaskSrv @Inject()( updateSrv(task, f, modifyConfig) } - def closeTasksOfCase(caseIds: String*)(implicit authContext: AuthContext): Future[Seq[Try[Task]]] = { + def closeTasksOfCase(caseIds: String*)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Seq[Try[Task]]] = { import org.elastic4play.services.QueryDSL._ val filter = and(parent("case", withId(caseIds: _*)), "status" in (TaskStatus.Waiting.toString, TaskStatus.InProgress.toString)) val range = Some("all") @@ -96,10 +95,10 @@ class TaskSrv @Inject()( } } - def delete(id: String)(implicit authContext: AuthContext): Future[Task] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Task] = deleteSrv[TaskModel, Task](taskModel, id) - def realDelete(task: Task): Future[Unit] = { + def realDelete(task: Task)(implicit ec: ExecutionContext): Future[Unit] = { import org.elastic4play.services.QueryDSL._ for { _ ← auditSrv @@ -116,8 +115,8 @@ class TaskSrv @Inject()( } yield () } - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Task, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[Task, NotUsed], Future[Long]) = findSrv[TaskModel, Task](taskModel, queryDef, range, sortBy) - def stats(queryDef: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(taskModel, queryDef, aggs: _*) + def stats(queryDef: QueryDef, aggs: Seq[Agg])(implicit ec: ExecutionContext): Future[JsObject] = findSrv(taskModel, queryDef, aggs: _*) } diff --git a/thehive-backend/app/services/TheHiveAuthSrv.scala b/thehive-backend/app/services/TheHiveAuthSrv.scala index e66c783cdc..a49224aedb 100644 --- a/thehive-backend/app/services/TheHiveAuthSrv.scala +++ b/thehive-backend/app/services/TheHiveAuthSrv.scala @@ -29,7 +29,6 @@ object TheHiveAuthSrv { class TheHiveAuthSrv @Inject()( configuration: Configuration, authModules: immutable.Set[AuthSrv], - userSrv: UserSrv, implicit override val ec: ExecutionContext ) extends MultiAuthSrv( TheHiveAuthSrv.getAuthSrv(configuration.getDeprecated[Option[Seq[String]]]("auth.provider", "auth.type").getOrElse(Seq("local")), authModules), diff --git a/thehive-backend/app/services/UserSrv.scala b/thehive-backend/app/services/UserSrv.scala index 342271b290..6c6573726b 100644 --- a/thehive-backend/app/services/UserSrv.scala +++ b/thehive-backend/app/services/UserSrv.scala @@ -26,16 +26,16 @@ class UserSrv @Inject()( eventSrv: EventSrv, authSrv: Provider[AuthSrv], dbIndex: DBIndex, - implicit val ec: ExecutionContext + defaultExecutionContext: ExecutionContext ) extends EUserSrv { private case class AuthContextImpl(userId: String, userName: String, requestId: String, roles: Seq[Role], authMethod: String) extends AuthContext override def getFromId(request: RequestHeader, userId: String, authMethod: String): Future[AuthContext] = - getSrv[UserModel, User](userModel, userId) + getSrv[UserModel, User](userModel, userId)(defaultExecutionContext) .flatMap { user ⇒ getFromUser(request, user, authMethod) - } + }(defaultExecutionContext) override def getFromUser(request: RequestHeader, user: EUser, authMethod: String): Future[AuthContext] = user match { @@ -45,17 +45,17 @@ class UserSrv @Inject()( } override def getInitialUser(request: RequestHeader): Future[AuthContext] = - dbIndex.getSize(userModel.modelName).map { + dbIndex.getSize(userModel.modelName)(defaultExecutionContext).map { case size if size > 0 ⇒ throw AuthenticationError(s"Use of initial user is forbidden because users exist in database") case _ ⇒ AuthContextImpl("init", "", Instance.getRequestId(request), Seq(Roles.admin, Roles.read, Roles.alert), "init") - } + }(defaultExecutionContext) override def inInitAuthContext[A](block: AuthContext ⇒ Future[A]): Future[A] = { val authContext = AuthContextImpl("init", "", Instance.getInternalId, Seq(Roles.admin, Roles.read, Roles.alert), "init") eventSrv.publish(InternalRequestProcessStart(authContext.requestId)) block(authContext).andThen { case _ ⇒ eventSrv.publish(InternalRequestProcessEnd(authContext.requestId)) - } + }(defaultExecutionContext) } def extraAuthContext[A](block: AuthContext ⇒ Future[A])(implicit authContext: AuthContext): Future[A] = { @@ -63,10 +63,10 @@ class UserSrv @Inject()( eventSrv.publish(InternalRequestProcessStart(ac.requestId)) block(ac).andThen { case _ ⇒ eventSrv.publish(InternalRequestProcessEnd(ac.requestId)) - } + }(defaultExecutionContext) } - def create(fields: Fields)(implicit authContext: AuthContext): Future[User] = + def create(fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = fields.getString("password") match { case None ⇒ createSrv[UserModel, User](userModel, fields) case Some(password) ⇒ @@ -75,23 +75,23 @@ class UserSrv @Inject()( } } - override def get(id: String): Future[User] = getSrv[UserModel, User](userModel, id.toLowerCase) + override def get(id: String): Future[User] = getSrv[UserModel, User](userModel, id.toLowerCase)(defaultExecutionContext) - def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[User] = + def update(id: String, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = update(id, fields, ModifyConfig.default) - def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[User] = + def update(id: String, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = updateSrv[UserModel, User](userModel, id, fields, modifyConfig) - def update(user: User, fields: Fields)(implicit authContext: AuthContext): Future[User] = + def update(user: User, fields: Fields)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = update(user, fields, ModifyConfig.default) - def update(user: User, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext): Future[User] = + def update(user: User, fields: Fields, modifyConfig: ModifyConfig)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = updateSrv(user, fields, modifyConfig) - def delete(id: String)(implicit authContext: AuthContext): Future[User] = + def delete(id: String)(implicit authContext: AuthContext, ec: ExecutionContext): Future[User] = deleteSrv[UserModel, User](userModel, id) - def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[User, NotUsed], Future[Long]) = + def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String])(implicit ec: ExecutionContext): (Source[User, NotUsed], Future[Long]) = findSrv[UserModel, User](userModel, queryDef, range, sortBy) } diff --git a/thehive-backend/conf/reference.conf b/thehive-backend/conf/reference.conf index 344e3dba8f..273eb109bc 100644 --- a/thehive-backend/conf/reference.conf +++ b/thehive-backend/conf/reference.conf @@ -136,6 +136,21 @@ akka { } } } + +longTask { + fork-join-executor { + parallelism-min = 1 + parallelism-factor = 1.0 + parallelism-max = 2 + } +} +auditTask { + fork-join-executor { + parallelism-min = 1 + parallelism-factor = 1.0 + parallelism-max = 2 + } +} migration { // stream { // # Initial size of buffers used in stream elements diff --git a/thehive-misp/app/connectors/misp/MispCtrl.scala b/thehive-misp/app/connectors/misp/MispCtrl.scala index 6e5b2456ee..eaa7d19953 100644 --- a/thehive-misp/app/connectors/misp/MispCtrl.scala +++ b/thehive-misp/app/connectors/misp/MispCtrl.scala @@ -161,9 +161,9 @@ class MispCtrl( } } - override def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext): Future[Case] = + override def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = mispSrv.createCase(alert, customCaseTemplate) - override def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] = + override def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = mispSrv.mergeWithCase(alert, caze) } diff --git a/thehive-misp/app/connectors/misp/MispSrv.scala b/thehive-misp/app/connectors/misp/MispSrv.scala index d2254fea26..635da3bf64 100644 --- a/thehive-misp/app/connectors/misp/MispSrv.scala +++ b/thehive-misp/app/connectors/misp/MispSrv.scala @@ -33,7 +33,6 @@ class MispSrv @Inject()( caseSrv: CaseSrv, artifactSrv: ArtifactSrv, tempSrv: TempSrv, - implicit val ec: ExecutionContext, implicit val mat: Materializer ) extends MispConverter { @@ -48,7 +47,7 @@ class MispSrv @Inject()( Future.successful(instanceConfig) } - def getEvent(mispConnection: MispConnection, eventId: String): Future[MispAlert] = { + def getEvent(mispConnection: MispConnection, eventId: String)(implicit ec: ExecutionContext): Future[MispAlert] = { logger.debug(s"Get MISP event $eventId") require(!eventId.isEmpty) mispConnection(s"events/$eventId") @@ -96,7 +95,7 @@ class MispSrv @Inject()( } } - def getAttributesFromCase(caze: Case): Future[Seq[ExportedMispAttribute]] = { + def getAttributesFromCase(caze: Case)(implicit ec: ExecutionContext): Future[Seq[ExportedMispAttribute]] = { import org.elastic4play.services.QueryDSL._ artifactSrv .find(and(withParent(caze), "status" ~= "Ok", "ioc" ~= true), Some("all"), Nil) @@ -115,7 +114,9 @@ class MispSrv @Inject()( .runWith(Sink.seq) } - def getAttributesFromMisp(mispConnection: MispConnection, eventId: String, fromDate: Option[Date]): Future[Seq[MispArtifact]] = { + def getAttributesFromMisp(mispConnection: MispConnection, eventId: String, fromDate: Option[Date])( + implicit ec: ExecutionContext + ): Future[Seq[MispArtifact]] = { val date = fromDate.fold(0L)(_.getTime / 1000) @@ -151,7 +152,8 @@ class MispSrv @Inject()( } def attributeToArtifact(mispConnection: MispConnection, attr: JsObject, defaultTlp: Long)( - implicit authContext: AuthContext + implicit authContext: AuthContext, + ec: ExecutionContext ): Option[Future[Fields]] = (for { dataType ← (attr \ "dataType").validate[String] @@ -203,7 +205,7 @@ class MispSrv @Inject()( None } - def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext): Future[Case] = + def createCase(alert: Alert, customCaseTemplate: Option[String])(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = alert.caze() match { case Some(id) ⇒ caseSrv.get(id) case None ⇒ @@ -214,21 +216,21 @@ class MispSrv @Inject()( } yield caze } - def importArtifacts(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] = + def importArtifacts(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = for { instanceConfig ← getInstanceConfig(alert.source()) artifacts ← Future.sequence(alert.artifacts().flatMap(attributeToArtifact(instanceConfig, _, alert.tlp()))) _ ← artifactSrv.create(caze, artifacts) } yield caze - def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext): Future[Case] = + def mergeWithCase(alert: Alert, caze: Case)(implicit authContext: AuthContext, ec: ExecutionContext): Future[Case] = for { _ ← importArtifacts(alert, caze) description = caze.description() + s"\n \n#### Merged with MISP event ${alert.title()}\n\n${alert.description().trim}" updatedCase ← caseSrv.update(caze, Fields.empty.set("description", description)) } yield updatedCase - def updateMispAlertArtifact()(implicit authContext: AuthContext): Future[Unit] = { + def updateMispAlertArtifact()(implicit authContext: AuthContext, ec: ExecutionContext): Future[Unit] = { import org.elastic4play.services.QueryDSL._ logger.info("Update MISP attributes in alerts") val (alerts, _) = alertSrv.find("type" ~= "misp", Some("all"), Nil) @@ -311,7 +313,10 @@ class MispSrv @Inject()( private[MispSrv] val fileNameExtractor = """attachment; filename="(.*)"""".r - def downloadAttachment(mispConnection: MispConnection, attachmentId: String)(implicit authContext: AuthContext): Future[FileInputValue] = + def downloadAttachment( + mispConnection: MispConnection, + attachmentId: String + )(implicit authContext: AuthContext, ec: ExecutionContext): Future[FileInputValue] = mispConnection(s"attributes/download/$attachmentId") .withMethod("GET") .stream() @@ -326,8 +331,7 @@ class MispSrv @Inject()( .bodyAsSource .runWith(FileIO.toPath(tempFile)) .map { ioResult ⇒ - if (!ioResult.wasSuccessful) // throw an exception if transfer failed - throw ioResult.getError + ioResult.status.failed.foreach(throw _) // throw an exception if transfer failed val contentType = response.headers.getOrElse("Content-Type", Seq("application/octet-stream")).head val filename = response .headers