From cfb2f31e32f68e26637fffb29021878dd1e4d2ea Mon Sep 17 00:00:00 2001 From: To-om Date: Fri, 18 Dec 2020 09:16:35 +0100 Subject: [PATCH] #1708 Improve cluster diagnostic logs --- .../app/org/thp/thehive/ClusterSetup.scala | 27 +++++++++++++++-- .../thehive/controllers/v1/StatusCtrl.scala | 30 +++++++++++++++++-- .../org/thp/thehive/services/FlowActor.scala | 1 + 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/thehive/app/org/thp/thehive/ClusterSetup.scala b/thehive/app/org/thp/thehive/ClusterSetup.scala index 308e0f6d8b..7a60a7f625 100644 --- a/thehive/app/org/thp/thehive/ClusterSetup.scala +++ b/thehive/app/org/thp/thehive/ClusterSetup.scala @@ -1,17 +1,20 @@ package org.thp.thehive -import akka.actor.ActorSystem +import akka.actor.{Actor, ActorSystem, Props} import akka.cluster.Cluster +import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberEvent, _} import com.google.inject.Injector -import javax.inject.{Inject, Singleton} import play.api.{Configuration, Logger} +import javax.inject.{Inject, Singleton} + @Singleton class ClusterSetup @Inject() ( configuration: Configuration, system: ActorSystem, injector: Injector ) { + system.actorOf(Props[ClusterListener]) if (configuration.get[Seq[String]]("akka.cluster.seed-nodes").isEmpty) { val logger: Logger = Logger(getClass) logger.info("Initialising cluster") @@ -19,4 +22,24 @@ class ClusterSetup @Inject() ( cluster.join(cluster.system.provider.getDefaultAddress) } GuiceAkkaExtension(system).set(injector) + +} + +class ClusterListener extends Actor { + val cluster: Cluster = Cluster(context.system) + val logger: Logger = Logger(getClass) + + override def preStart(): Unit = cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive: Receive = { + case MemberUp(member) => logger.info(s"Member is Up: ${member.address}") + case UnreachableMember(member) => logger.info(s"Member detected as unreachable: $member") + case MemberRemoved(member, previousStatus) => logger.info(s"Member is Removed: ${member.address} after $previousStatus") + case MemberJoined(member) => logger.debug(s"Member is joined: $member") + case MemberWeaklyUp(member) => logger.debug(s"Member is weaklyUp: $member") + case MemberLeft(member) => logger.debug(s"Member is left: $member") + case MemberExited(member) => logger.debug(s"Member is exited: $member") + case MemberDowned(member) => logger.debug(s"Member is downed: $member") + } } diff --git a/thehive/app/org/thp/thehive/controllers/v1/StatusCtrl.scala b/thehive/app/org/thp/thehive/controllers/v1/StatusCtrl.scala index 3930124ecd..e1602e9a5e 100644 --- a/thehive/app/org/thp/thehive/controllers/v1/StatusCtrl.scala +++ b/thehive/app/org/thp/thehive/controllers/v1/StatusCtrl.scala @@ -1,23 +1,46 @@ package org.thp.thehive.controllers.v1 +import akka.actor.ActorSystem +import akka.cluster.{Cluster, Member} +import akka.cluster.ClusterEvent.CurrentClusterState + import javax.inject.{Inject, Singleton} import org.thp.scalligraph.ScalligraphApplicationLoader import org.thp.scalligraph.auth.{AuthCapability, AuthSrv, MultiAuthSrv} import org.thp.scalligraph.controllers.Entrypoint import org.thp.scalligraph.services.config.{ApplicationConfig, ConfigItem} import org.thp.thehive.TheHiveModule -import play.api.libs.json.{JsObject, JsString, Json} +import play.api.libs.json.{JsObject, JsString, Json, Writes} import play.api.mvc.{AbstractController, Action, AnyContent, Results} import scala.util.Success @Singleton -class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig, authSrv: AuthSrv) { +class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig, authSrv: AuthSrv, system: ActorSystem) { private def getVersion(c: Class[_]): String = Option(c.getPackage.getImplementationVersion).getOrElse("SNAPSHOT") val passwordConfig: ConfigItem[String, String] = appConfig.item[String]("datastore.attachment.password", "Password used to protect attachment ZIP") def password: String = passwordConfig.get + val cluster: Cluster = Cluster(system) + + implicit val memberWrites: Writes[Member] = Writes[Member] { member => + Json.obj( + "address" -> member.uniqueAddress.address.toString, + "status" -> member.status.toString, + "roles" -> member.roles + ) + } + implicit val clusterStateWrites: Writes[CurrentClusterState] = Writes[CurrentClusterState] { state => + Json.obj( + "members" -> state.members, + "unreachable" -> state.unreachable, + "seenBy" -> state.seenBy.map(_.toString), + "leader" -> state.leader.map(_.toString), + "unreachableDataCenters" -> state.unreachableDataCenters + //"roleLeaderMap" -> state.roleLeaderMap, + ) + } def get: Action[AnyContent] = entrypoint("status") { _ => @@ -38,7 +61,8 @@ class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig }), "capabilities" -> authSrv.capabilities.map(c => JsString(c.toString)), "ssoAutoLogin" -> authSrv.capabilities.contains(AuthCapability.sso) - ) + ), + "cluster" -> cluster.state ) ) ) diff --git a/thehive/app/org/thp/thehive/services/FlowActor.scala b/thehive/app/org/thp/thehive/services/FlowActor.scala index 8d023071cd..b92fa907ab 100644 --- a/thehive/app/org/thp/thehive/services/FlowActor.scala +++ b/thehive/app/org/thp/thehive/services/FlowActor.scala @@ -41,6 +41,7 @@ class FlowActor extends Actor { lazy val eventSrv: EventSrv = injector.getInstance(classOf[EventSrv]) override def preStart(): Unit = eventSrv.subscribe(StreamTopic(), self) + override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic(), self) override def receive: Receive = { case flowId @ FlowId(organisation, caseId) => val auditIds = cache.getOrElseUpdate(flowId.toString) {