diff --git a/thehive/app/org/thp/thehive/services/StreamSrv.scala b/thehive/app/org/thp/thehive/services/StreamSrv.scala index e3a89c3e22..230041f4c2 100644 --- a/thehive/app/org/thp/thehive/services/StreamSrv.scala +++ b/thehive/app/org/thp/thehive/services/StreamSrv.scala @@ -1,23 +1,23 @@ package org.thp.thehive.services -import java.io.NotSerializableException - import akka.actor.{actorRef2Scala, Actor, ActorIdentity, ActorRef, ActorSystem, Cancellable, Identify, PoisonPill, Props} import akka.pattern.{ask, AskTimeoutException} import akka.serialization.Serializer import akka.util.Timeout -import javax.inject.{Inject, Named, Singleton} import org.thp.scalligraph.auth.AuthContext import org.thp.scalligraph.models.Database import org.thp.scalligraph.services.EventSrv import org.thp.scalligraph.services.config.ApplicationConfig.finiteDurationFormat import org.thp.scalligraph.services.config.{ApplicationConfig, ConfigItem} import org.thp.scalligraph.traversal.TraversalOps._ +import org.thp.scalligraph.utils.Retry import org.thp.scalligraph.{EntityId, NotFoundError} import org.thp.thehive.services.AuditOps._ import play.api.Logger import play.api.libs.json.Json +import java.io.NotSerializableException +import javax.inject.{Inject, Named, Singleton} import scala.collection.immutable import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} @@ -161,6 +161,22 @@ class StreamSrv @Inject() ( appConfig.item[FiniteDuration]("stream.longPolling.keepAlive", "Remove the stream after this time of inactivity") val keepAlive: FiniteDuration = keepAliveConfig.get + val maxAttemptsConfig: ConfigItem[Int, Int] = + appConfig.item[Int]("stream.get.maxAttempts", "How many attempts to get stream") + def maxAttempts: Int = maxAttemptsConfig.get + + val minBackoffConfig: ConfigItem[FiniteDuration, FiniteDuration] = + appConfig.item[FiniteDuration]("stream.get.minBackoff", "Minimum backoff for get stream attempts") + def minBackoff: FiniteDuration = minBackoffConfig.get + + val maxBackoffConfig: ConfigItem[FiniteDuration, FiniteDuration] = + appConfig.item[FiniteDuration]("stream.get.maxBackoff", "Maximum backoff for get stream attempts") + def maxBackoff: FiniteDuration = maxBackoffConfig.get + + val randomFactorConfig: ConfigItem[Double, Double] = + appConfig.item[Double]("stream.get.maxBackoff", "Random factor for stream attempts backoff") + def randomFactor: Double = randomFactorConfig.get + def generateStreamId(): String = Seq.fill(streamLength)(alphanumeric(Random.nextInt(alphanumeric.size))).mkString def isValidStreamId(streamId: String): Boolean = streamId.length == streamLength && streamId.forall(alphanumeric.contains) @@ -180,23 +196,25 @@ class StreamSrv @Inject() ( def get(streamId: String): Future[Seq[EntityId]] = { implicit val timeout: Timeout = Timeout(refresh + 1.second) - // Check if stream actor exists - eventSrv - .publishAsk(StreamTopic(streamId))(Identify(1))(Timeout(2.seconds)) - // .ask(s"/user/stream-$streamId", Identify(1))(Timeout(2.seconds)) - .flatMap { - case ActorIdentity(1, Some(streamActor)) => - logger.debug(s"Stream actor found for stream $streamId") - (streamActor ? GetStreamMessages) - .map { - case AuditStreamMessage(ids @ _*) => ids - case _ => Nil - } - case other => Future.failed(NotFoundError(s"Stream $streamId doesn't exist: $other")) - } - .recoverWith { - case _: AskTimeoutException => Future.failed(NotFoundError(s"Stream $streamId doesn't exist")) - } + Retry(maxAttempts).withBackoff(minBackoff, maxBackoff, randomFactor)(system) { + // Check if stream actor exists + eventSrv + .publishAsk(StreamTopic(streamId))(Identify(1))(Timeout(2.seconds)) + // .ask(s"/user/stream-$streamId", Identify(1))(Timeout(2.seconds)) + .flatMap { + case ActorIdentity(1, Some(streamActor)) => + logger.debug(s"Stream actor found for stream $streamId") + (streamActor ? GetStreamMessages) + .map { + case AuditStreamMessage(ids @ _*) => ids + case _ => Nil + } + case other => Future.failed(NotFoundError(s"Stream $streamId doesn't exist: $other")) + } + .recoverWith { + case _: AskTimeoutException => Future.failed(NotFoundError(s"Stream $streamId doesn't exist")) + } + } } } diff --git a/thehive/conf/reference.conf b/thehive/conf/reference.conf index 523fb7d7d1..cfab71532e 100644 --- a/thehive/conf/reference.conf +++ b/thehive/conf/reference.conf @@ -36,12 +36,21 @@ datastore.attachment.password: "malware" describe.cache.expire: 10 minutes -stream.longPolling { - refresh: 1 minute - maxWait: 1 second - graceDuration: 100 milliseconds - keepAlive: 5 minutes - pollingDuration: 1 second +stream { + longPolling { + refresh: 1 minute + maxWait: 1 second + graceDuration: 100 milliseconds + keepAlive: 5 minutes + pollingDuration: 1 second + + } + get { + maxAttempts = 5 + minBackoff = 10 milliseconds + maxBackoff = 50 milliseconds + randomFactor = 0.2 + } } tags {