Skip to content

Commit

Permalink
#2182 Reload last case number from database periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Aug 26, 2021
1 parent aa6d529 commit 35f8db6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 27 deletions.
2 changes: 1 addition & 1 deletion ScalliGraph
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.thp.thehive.cloner

import akka.actor.ActorSystem
import akka.actor.typed.ActorRef
import akka.stream.Materializer
import com.google.inject.{Guice, Injector => GInjector}
import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder}
Expand Down Expand Up @@ -40,7 +41,7 @@ trait IntegrityCheckApp {
bindActor[DummyActor]("config-actor")
bindActor[DummyActor]("cortex-actor")
bindActor[DummyActor]("integrity-check-actor")
bindTypedActor(CaseNumberActor.behavior, "case-number-actor")
bind[ActorRef[CaseNumberActor.Request]].toProvider[CaseNumberActorProvider]

val integrityCheckOpsBindings = ScalaMultibinder.newSetBinder[GenIntegrityCheckOps](binder)
integrityCheckOpsBindings.addBinding.to[ProfileIntegrityCheckOps]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.thp.thehive.migration.th4

import akka.actor.ActorSystem
import akka.actor.typed.Scheduler
import akka.actor.typed.{ActorRef, Scheduler}
import akka.stream.Materializer
import com.google.inject.{Guice, Injector => GInjector}
import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder}
Expand Down Expand Up @@ -52,7 +52,7 @@ object Output {
bindActor[DummyActor]("config-actor")
bindActor[DummyActor]("cortex-actor")
bindActor[DummyActor]("integrity-check-actor")
bindTypedActor(CaseNumberActor.behavior, "case-number-actor")
bind[ActorRef[CaseNumberActor.Request]].toProvider[CaseNumberActorProvider]

val schemaBindings = ScalaMultibinder.newSetBinder[UpdatableSchema](binder)
schemaBindings.addBinding.to[TheHiveSchemaDefinition]
Expand Down
49 changes: 27 additions & 22 deletions thehive/app/org/thp/thehive/services/CaseNumber.scala
Original file line number Diff line number Diff line change
@@ -1,59 +1,61 @@
package org.thp.thehive.services

import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.actor.typed.{ActorRefResolver, Behavior, ActorRef => TypedActorRef}
import akka.actor.{ActorSystem, ExtendedActorSystem}
import akka.cluster.typed.{ClusterSingleton, SingletonActor}
import akka.serialization.Serializer
import org.thp.scalligraph.models.Database
import org.thp.scalligraph.services.config.ApplicationConfig
import org.thp.scalligraph.services.config.ApplicationConfig.finiteDurationFormat
import org.thp.scalligraph.traversal.TraversalOps._
import org.thp.thehive.GuiceAkkaExtension
import org.thp.thehive.services.CaseOps._

import java.io.NotSerializableException
import java.nio.ByteBuffer
import javax.inject.{Inject, Provider, Singleton}
import scala.concurrent.duration.FiniteDuration

object CaseNumberActor {
sealed trait Message
sealed trait Request extends Message
sealed trait Response extends Message
case class GetNextNumber(replyTo: TypedActorRef[Response]) extends Request
case class NextNumber(number: Int) extends Response
case object ReloadFromDatabase extends Request

val behavior: Behavior[Request] = Behaviors.setup[Request](context => waitFirstRequest(context))

def getNextCaseNumber(context: ActorContext[Request]): Int = {
val injector = GuiceAkkaExtension(context.system).injector
val db = injector.getInstance(classOf[Database])
val caseSrv = injector.getInstance(classOf[CaseSrv])
db.roTransaction { implicit graph =>
caseSrv.startTraversal.getLast.headOption.fold(0)(_.number) + 1
def behavior(db: Database, appConfig: ApplicationConfig, caseSrvProvider: Provider[CaseSrv]): Behavior[Request] = {
lazy val caseSrv: CaseSrv = caseSrvProvider.get
val caseNumberReloadIntervalConfig =
appConfig.item[FiniteDuration]("caseNumber.reload.interval", "Reload last case number from database interval")
Behaviors.withTimers { timers =>
val reloadTimer: () => Unit = () => timers.startSingleTimer(ReloadFromDatabase, caseNumberReloadIntervalConfig.get)
val getNextNumber: () => Int = () =>
db.roTransaction { implicit graph =>
caseSrv.startTraversal.getLast.headOption.fold(0)(_.number) + 1
}
caseNumberProvider(getNextNumber, reloadTimer, getNextNumber())
}
}

def waitFirstRequest(context: ActorContext[Request]): Behaviors.Receive[Request] =
Behaviors.receiveMessage {
case GetNextNumber(replyTo) =>
val nextNumber = getNextCaseNumber(context)
replyTo ! NextNumber(nextNumber)
caseNumberProvider(nextNumber + 1)
}

def caseNumberProvider(nextNumber: Int): Behavior[Request] =
def caseNumberProvider(getNextNumber: () => Int, reloadTimer: () => Unit, nextNumber: Int): Behavior[Request] =
Behaviors.receiveMessage {
case GetNextNumber(replyTo) =>
replyTo ! NextNumber(nextNumber)
caseNumberProvider(nextNumber + 1)
caseNumberProvider(getNextNumber, reloadTimer, nextNumber + 1)
case ReloadFromDatabase =>
reloadTimer()
caseNumberProvider(getNextNumber, reloadTimer, Math.max(getNextNumber(), nextNumber))
}
}

@Singleton
class CaseNumberActorProvider @Inject() (system: ActorSystem) extends Provider[TypedActorRef[CaseNumberActor.Request]] {
class CaseNumberActorProvider @Inject() (system: ActorSystem, db: Database, appConfig: ApplicationConfig, caseSrvProvider: Provider[CaseSrv])
extends Provider[TypedActorRef[CaseNumberActor.Request]] {
override lazy val get: TypedActorRef[CaseNumberActor.Request] =
ClusterSingleton(system.toTyped)
.init(SingletonActor(CaseNumberActor.behavior, "CaseNumberLeader"))
.init(SingletonActor(CaseNumberActor.behavior(db, appConfig, caseSrvProvider), "CaseNumberLeader"))
}

class CaseNumberSerializer(system: ExtendedActorSystem) extends Serializer {
Expand All @@ -67,6 +69,7 @@ class CaseNumberSerializer(system: ExtendedActorSystem) extends Serializer {
o match {
case GetNextNumber(replyTo) => 0.toByte +: actorRefResolver.toSerializationFormat(replyTo).getBytes
case NextNumber(number) => ByteBuffer.allocate(5).put(1.toByte).putInt(number).array()
case ReloadFromDatabase => Array(2)
case _ => throw new NotSerializableException
}

Expand All @@ -76,5 +79,7 @@ class CaseNumberSerializer(system: ExtendedActorSystem) extends Serializer {
bytes(0) match {
case 0 => GetNextNumber(actorRefResolver.resolveActorRef(new String(bytes.tail)))
case 1 => NextNumber(ByteBuffer.wrap(bytes).getInt(1))
case 2 => ReloadFromDatabase
case _ => throw new NotSerializableException
}
}
2 changes: 2 additions & 0 deletions thehive/conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ stream {
}
}

caseNumber.reload.interval: 10 minutes

tags.freeTagColour: "#000000"

user {
Expand Down
4 changes: 3 additions & 1 deletion thehive/test/org/thp/thehive/TestAppBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,7 @@ class BasicDatabaseProvider @Inject() (database: Database) extends Provider[Data

class TestNumberActorProvider @Inject() (actorSystem: ActorSystem) extends Provider[TypedActorRef[CaseNumberActor.Request]] {
override def get: TypedActorRef[CaseNumberActor.Request] =
actorSystem.toTyped.systemActorOf(CaseNumberActor.caseNumberProvider(36), "case-number")
actorSystem
.toTyped
.systemActorOf(CaseNumberActor.caseNumberProvider(getNextNumber = () => 36, reloadTimer = () => (), nextNumber = 36), "case-number")
}

0 comments on commit 35f8db6

Please sign in to comment.