From 0e330c7b91d835c19ae6bab711c07b9dd9988632 Mon Sep 17 00:00:00 2001 From: To-om Date: Fri, 1 Jul 2022 13:53:05 +0200 Subject: [PATCH] #2400 Use dedicated stream topic for stream dispatcher subscription --- thehive/app/org/thp/thehive/services/AuditSrv.scala | 2 +- thehive/app/org/thp/thehive/services/FlowActor.scala | 4 ++-- thehive/app/org/thp/thehive/services/StreamSrv.scala | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/thehive/app/org/thp/thehive/services/AuditSrv.scala b/thehive/app/org/thp/thehive/services/AuditSrv.scala index bc86e266e8..9a37e28765 100644 --- a/thehive/app/org/thp/thehive/services/AuditSrv.scala +++ b/thehive/app/org/thp/thehive/services/AuditSrv.scala @@ -105,7 +105,7 @@ class AuditSrv @Inject() ( case Status.COMMIT => logger.debug("Sending audit to stream bus and to notification actor") val auditIds = ids.map(_._2) - eventSrv.publish(StreamTopic())(AuditStreamMessage(auditIds: _*)) + eventSrv.publish(StreamTopic.dispatcher)(AuditStreamMessage(auditIds: _*)) notificationActor ! AuditNotificationMessage(auditIds: _*) case _ => } diff --git a/thehive/app/org/thp/thehive/services/FlowActor.scala b/thehive/app/org/thp/thehive/services/FlowActor.scala index 9fbd9883dc..b98dbe0123 100644 --- a/thehive/app/org/thp/thehive/services/FlowActor.scala +++ b/thehive/app/org/thp/thehive/services/FlowActor.scala @@ -50,8 +50,8 @@ class FlowActor extends Actor { def fromDate: Date = new Date(System.currentTimeMillis() - maxAgeConfig.get.toMillis) lazy val eventSrv: EventSrv = injector.getInstance(classOf[EventSrv]) - override def preStart(): Unit = eventSrv.subscribe(StreamTopic(), self) - override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic(), self) + override def preStart(): Unit = eventSrv.subscribe(StreamTopic.dispatcher, self) + override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic.dispatcher, self) def flowQuery( caseId: Option[EntityIdOrName] diff --git a/thehive/app/org/thp/thehive/services/StreamSrv.scala b/thehive/app/org/thp/thehive/services/StreamSrv.scala index 88a73c5d51..d4edb3d84d 100644 --- a/thehive/app/org/thp/thehive/services/StreamSrv.scala +++ b/thehive/app/org/thp/thehive/services/StreamSrv.scala @@ -1,6 +1,6 @@ package org.thp.thehive.services -import akka.actor.{actorRef2Scala, Actor, ActorIdentity, ActorRef, ActorSystem, Cancellable, Identify, PoisonPill, Props} +import akka.actor.{Actor, ActorIdentity, ActorRef, ActorSystem, Cancellable, Identify, PoisonPill, Props} import akka.pattern.{ask, AskTimeoutException} import akka.serialization.Serializer import akka.util.Timeout @@ -26,7 +26,8 @@ import scala.util.{Random, Try} sealed trait StreamMessage extends Serializable object StreamTopic { - def apply(streamId: String = ""): String = if (streamId.isEmpty) "stream" else s"stream-$streamId" + def apply(streamId: String): String = s"stream-$streamId" + val dispatcher: String = "stream" } case class AuditStreamMessage(id: EntityId*) extends StreamMessage @@ -192,7 +193,7 @@ class StreamSrv @Inject() ( ) logger.debug(s"Register stream actor ${streamActor.path}") eventSrv.subscribe(StreamTopic(streamId), streamActor) - eventSrv.subscribe(StreamTopic(), streamActor) + eventSrv.subscribe(StreamTopic.dispatcher, streamActor) streamId }