From ecdb9e09b3863c88391d17fb33f341dccbcd9ffd Mon Sep 17 00:00:00 2001 From: To-om Date: Fri, 11 Jun 2021 10:20:51 +0200 Subject: [PATCH] #1970 #2061 Use an akka actor to generate case number --- .../app/org/thp/thehive/TheHiveModule.scala | 2 + .../org/thp/thehive/services/CaseNumber.scala | 76 +++++++++++++++++++ .../org/thp/thehive/services/CaseSrv.scala | 20 ++++- thehive/conf/play/reference-overrides.conf | 2 + 4 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 thehive/app/org/thp/thehive/services/CaseNumber.scala diff --git a/thehive/app/org/thp/thehive/TheHiveModule.scala b/thehive/app/org/thp/thehive/TheHiveModule.scala index e9922041eb..6667d840b2 100644 --- a/thehive/app/org/thp/thehive/TheHiveModule.scala +++ b/thehive/app/org/thp/thehive/TheHiveModule.scala @@ -1,6 +1,7 @@ package org.thp.thehive import akka.actor.ActorRef +import akka.actor.typed.{ActorRef => TypedActorRef} import com.google.inject.AbstractModule import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder} import org.thp.scalligraph.SingleInstance @@ -106,6 +107,7 @@ class TheHiveModule(environment: Environment, configuration: Configuration) exte integrityCheckOpsBindings.addBinding.to[ObservableIntegrityCheckOps] integrityCheckOpsBindings.addBinding.to[LogIntegrityCheckOps] bind[ActorRef].annotatedWithName("integrity-check-actor").toProvider[IntegrityCheckActorProvider] + bind[TypedActorRef[CaseNumberActor.Request]].annotatedWithName("case-number-actor").toProvider[CaseNumberActorProvider] bind[ActorRef].annotatedWithName("flow-actor").toProvider[FlowActorProvider] diff --git a/thehive/app/org/thp/thehive/services/CaseNumber.scala b/thehive/app/org/thp/thehive/services/CaseNumber.scala new file mode 100644 index 0000000000..7366151eae --- /dev/null +++ b/thehive/app/org/thp/thehive/services/CaseNumber.scala @@ -0,0 +1,76 @@ +package org.thp.thehive.services + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.actor.typed.{ActorRefResolver, Behavior, ActorRef => TypedActorRef} +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.cluster.typed.{ClusterSingleton, SingletonActor} +import akka.serialization.Serializer +import org.thp.scalligraph.models.Database +import org.thp.scalligraph.traversal.TraversalOps._ +import org.thp.thehive.GuiceAkkaExtension +import org.thp.thehive.services.CaseOps._ + +import java.io.NotSerializableException +import javax.inject.{Inject, Provider, Singleton} + +object CaseNumberActor { + sealed trait Message + sealed trait Request extends Message + sealed trait Response extends Message + case class GetNextNumber(replyTo: TypedActorRef[Response]) extends Request + case class NextNumber(number: Int) extends Response + + val behavior: Behavior[Request] = Behaviors.setup[Request] { context => + val injector = GuiceAkkaExtension(context.system).injector + val db = injector.getInstance(classOf[Database]) + val caseSrv = injector.getInstance(classOf[CaseSrv]) + db.roTransaction { implicit graph => + caseNumberProvider(caseSrv.startTraversal.getLast.headOption.fold(0)(_.number) + 1) + } + } + + def caseNumberProvider(nextNumber: Int): Behavior[Request] = + Behaviors.receiveMessage { + case GetNextNumber(replyTo) => + replyTo ! NextNumber(nextNumber) + caseNumberProvider(nextNumber + 1) + } +} + +@Singleton +class CaseNumberActorProvider @Inject() (system: ActorSystem) extends Provider[TypedActorRef[CaseNumberActor.Request]] { + override lazy val get: TypedActorRef[CaseNumberActor.Request] = + ClusterSingleton(system.toTyped) + .init(SingletonActor(CaseNumberActor.behavior, "CaseNumberLeader")) +} + +class CaseNumberSerializer(system: ExtendedActorSystem) extends Serializer { + import CaseNumberActor._ + + private val actorRefResolver = ActorRefResolver(system.toTyped) + + override def identifier: Int = 9739323 + + override def toBinary(o: AnyRef): Array[Byte] = + o match { + case GetNextNumber(replyTo) => 0.toByte +: actorRefResolver.toSerializationFormat(replyTo).getBytes + case NextNumber(number) => + Array(1.toByte, ((number >> 24) % 0xff).toByte, ((number >> 16) % 0xff).toByte, ((number >> 8) % 0xff).toByte, (number % 0xff).toByte) + case _ => throw new NotSerializableException + } + + override def includeManifest: Boolean = false + + override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = + bytes(0) match { + case 0 => GetNextNumber(actorRefResolver.resolveActorRef(new String(bytes.tail))) + case 1 => + NextNumber( + (bytes(2) << 24) + + (bytes(3) << 16) + + (bytes(4) << 8) + + bytes(5) + ) + } +} diff --git a/thehive/app/org/thp/thehive/services/CaseSrv.scala b/thehive/app/org/thp/thehive/services/CaseSrv.scala index 17ecd57d73..a57cef3a73 100644 --- a/thehive/app/org/thp/thehive/services/CaseSrv.scala +++ b/thehive/app/org/thp/thehive/services/CaseSrv.scala @@ -1,6 +1,9 @@ package org.thp.thehive.services import akka.actor.ActorRef +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.{Scheduler, ActorRef => TypedActorRef} +import akka.util.Timeout import org.apache.tinkerpop.gremlin.process.traversal.{Order, P} import org.apache.tinkerpop.gremlin.structure.Vertex import org.thp.scalligraph.auth.{AuthContext, Permission} @@ -30,6 +33,8 @@ import play.api.libs.json.{JsNull, JsObject, JsValue, Json} import java.lang.{Long => JLong} import java.util.{Date, List => JList, Map => JMap} import javax.inject.{Inject, Named, Provider, Singleton} +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success, Try} @Singleton @@ -48,7 +53,10 @@ class CaseSrv @Inject() ( userSrv: UserSrv, alertSrvProvider: Provider[AlertSrv], @Named("integrity-check-actor") integrityCheckActor: ActorRef, - cache: SyncCacheApi + @Named("case-number-actor") caseNumberActor: TypedActorRef[CaseNumberActor.Request], + cache: SyncCacheApi, + implicit val ec: ExecutionContext, + implicit val scheduler: Scheduler ) extends VertexSrv[Case] { lazy val alertSrv: AlertSrv = alertSrvProvider.get @@ -124,7 +132,15 @@ class CaseSrv @Inject() ( .map { case (InputCustomFieldValue(name, value, _), i) => InputCustomFieldValue(name, value, Some(i)) } } - def nextCaseNumber(implicit graph: Graph): Int = startTraversal.getLast.headOption.fold(0)(_.number) + 1 + def nextCaseNumberAsync: Future[Int] = { + implicit val timeout: Timeout = Timeout(1.minute) + caseNumberActor.ask[CaseNumberActor.Response](replyTo => CaseNumberActor.GetNextNumber(replyTo)).map { + case CaseNumberActor.NextNumber(caseNumber) => caseNumber + } + } + + def nextCaseNumber: Int = + Await.result(nextCaseNumberAsync, 1.minute) override def exists(e: Case)(implicit graph: Graph): Boolean = startTraversal.getByNumber(e.number).exists diff --git a/thehive/conf/play/reference-overrides.conf b/thehive/conf/play/reference-overrides.conf index 1c0cb430ae..048f95b8d0 100644 --- a/thehive/conf/play/reference-overrides.conf +++ b/thehive/conf/play/reference-overrides.conf @@ -29,6 +29,7 @@ akka.actor { //thehive-schema-updater = "org.thp.thehive.models.SchemaUpdaterSerializer" flow = "org.thp.thehive.services.FlowSerializer" integrity = "org.thp.thehive.services.IntegrityCheckSerializer" + caseNumber = "org.thp.thehive.services.CaseNumberSerializer" } serialization-bindings { @@ -37,5 +38,6 @@ akka.actor { //"org.thp.thehive.models.SchemaUpdaterMessage" = thehive-schema-updater "org.thp.thehive.services.FlowMessage" = flow "org.thp.thehive.services.IntegrityCheckMessage" = integrity + "org.thp.thehive.services.CaseNumberActor$Message" = caseNumber } }