Skip to content

Commit

Permalink
#1708 Improve cluster diagnostic logs
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Dec 18, 2020
1 parent b292389 commit cfb2f31
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
27 changes: 25 additions & 2 deletions thehive/app/org/thp/thehive/ClusterSetup.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
package org.thp.thehive

import akka.actor.ActorSystem
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberEvent, _}
import com.google.inject.Injector
import javax.inject.{Inject, Singleton}
import play.api.{Configuration, Logger}

import javax.inject.{Inject, Singleton}

@Singleton
class ClusterSetup @Inject() (
configuration: Configuration,
system: ActorSystem,
injector: Injector
) {
system.actorOf(Props[ClusterListener])
if (configuration.get[Seq[String]]("akka.cluster.seed-nodes").isEmpty) {
val logger: Logger = Logger(getClass)
logger.info("Initialising cluster")
val cluster = Cluster(system)
cluster.join(cluster.system.provider.getDefaultAddress)
}
GuiceAkkaExtension(system).set(injector)

}

class ClusterListener extends Actor {
val cluster: Cluster = Cluster(context.system)
val logger: Logger = Logger(getClass)

override def preStart(): Unit = cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
override def postStop(): Unit = cluster.unsubscribe(self)

def receive: Receive = {
case MemberUp(member) => logger.info(s"Member is Up: ${member.address}")
case UnreachableMember(member) => logger.info(s"Member detected as unreachable: $member")
case MemberRemoved(member, previousStatus) => logger.info(s"Member is Removed: ${member.address} after $previousStatus")
case MemberJoined(member) => logger.debug(s"Member is joined: $member")
case MemberWeaklyUp(member) => logger.debug(s"Member is weaklyUp: $member")
case MemberLeft(member) => logger.debug(s"Member is left: $member")
case MemberExited(member) => logger.debug(s"Member is exited: $member")
case MemberDowned(member) => logger.debug(s"Member is downed: $member")
}
}
30 changes: 27 additions & 3 deletions thehive/app/org/thp/thehive/controllers/v1/StatusCtrl.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
package org.thp.thehive.controllers.v1

import akka.actor.ActorSystem
import akka.cluster.{Cluster, Member}
import akka.cluster.ClusterEvent.CurrentClusterState

import javax.inject.{Inject, Singleton}
import org.thp.scalligraph.ScalligraphApplicationLoader
import org.thp.scalligraph.auth.{AuthCapability, AuthSrv, MultiAuthSrv}
import org.thp.scalligraph.controllers.Entrypoint
import org.thp.scalligraph.services.config.{ApplicationConfig, ConfigItem}
import org.thp.thehive.TheHiveModule
import play.api.libs.json.{JsObject, JsString, Json}
import play.api.libs.json.{JsObject, JsString, Json, Writes}
import play.api.mvc.{AbstractController, Action, AnyContent, Results}

import scala.util.Success

@Singleton
class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig, authSrv: AuthSrv) {
class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig, authSrv: AuthSrv, system: ActorSystem) {

private def getVersion(c: Class[_]): String = Option(c.getPackage.getImplementationVersion).getOrElse("SNAPSHOT")

val passwordConfig: ConfigItem[String, String] = appConfig.item[String]("datastore.attachment.password", "Password used to protect attachment ZIP")
def password: String = passwordConfig.get
val cluster: Cluster = Cluster(system)

implicit val memberWrites: Writes[Member] = Writes[Member] { member =>
Json.obj(
"address" -> member.uniqueAddress.address.toString,
"status" -> member.status.toString,
"roles" -> member.roles
)
}
implicit val clusterStateWrites: Writes[CurrentClusterState] = Writes[CurrentClusterState] { state =>
Json.obj(
"members" -> state.members,
"unreachable" -> state.unreachable,
"seenBy" -> state.seenBy.map(_.toString),
"leader" -> state.leader.map(_.toString),
"unreachableDataCenters" -> state.unreachableDataCenters
//"roleLeaderMap" -> state.roleLeaderMap,
)
}

def get: Action[AnyContent] =
entrypoint("status") { _ =>
Expand All @@ -38,7 +61,8 @@ class StatusCtrl @Inject() (entrypoint: Entrypoint, appConfig: ApplicationConfig
}),
"capabilities" -> authSrv.capabilities.map(c => JsString(c.toString)),
"ssoAutoLogin" -> authSrv.capabilities.contains(AuthCapability.sso)
)
),
"cluster" -> cluster.state
)
)
)
Expand Down
1 change: 1 addition & 0 deletions thehive/app/org/thp/thehive/services/FlowActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FlowActor extends Actor {

lazy val eventSrv: EventSrv = injector.getInstance(classOf[EventSrv])
override def preStart(): Unit = eventSrv.subscribe(StreamTopic(), self)
override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic(), self)
override def receive: Receive = {
case flowId @ FlowId(organisation, caseId) =>
val auditIds = cache.getOrElseUpdate(flowId.toString) {
Expand Down

0 comments on commit cfb2f31

Please sign in to comment.