Skip to content

Commit

Permalink
#1731 Add API for index status
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Feb 12, 2021
1 parent fabe574 commit 61e66e6
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 81 deletions.
92 changes: 92 additions & 0 deletions thehive/app/org/thp/thehive/controllers/v1/AdminCtrl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.thp.thehive.controllers.v1

import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import org.thp.scalligraph.controllers.Entrypoint
import org.thp.scalligraph.models.Database
import org.thp.scalligraph.services.GenIntegrityCheckOps
import org.thp.scalligraph.traversal.TraversalOps._
import org.thp.thehive.models.Permissions
import org.thp.thehive.services.{CheckState, CheckStats, GetCheckStats, GlobalCheckRequest}
import play.api.Logger
import play.api.libs.json.{JsObject, Json, OWrites}
import play.api.mvc.{Action, AnyContent, Results}

import javax.inject.{Inject, Named, Singleton}
import scala.collection.immutable
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success

@Singleton
class AdminCtrl @Inject() (
entrypoint: Entrypoint,
@Named("integrity-check-actor") integrityCheckActor: ActorRef,
integrityCheckOps: immutable.Set[GenIntegrityCheckOps],
db: Database,
implicit val ec: ExecutionContext
) {

implicit val timeout: Timeout = Timeout(5.seconds)
implicit val checkStatsWrites: OWrites[CheckStats] = Json.writes[CheckStats]
implicit val checkStateWrites: OWrites[CheckState] = OWrites[CheckState] { state =>
Json.obj(
"needCheck" -> state.needCheck,
"duplicateTimer" -> state.duplicateTimer.isDefined,
"duplicateStats" -> state.duplicateStats,
"globalStats" -> state.globalStats,
"globalCheckRequestTime" -> state.globalCheckRequestTime
)
}
lazy val logger: Logger = Logger(getClass)

def triggerCheck(name: String): Action[AnyContent] =
entrypoint("Trigger check")
.authPermitted(Permissions.managePlatform) { _ =>
integrityCheckActor ! GlobalCheckRequest(name)
Success(Results.NoContent)
}

def checkStats: Action[AnyContent] =
entrypoint("Get check stats")
.asyncAuthPermitted(Permissions.managePlatform) { _ =>
Future
.traverse(integrityCheckOps.toSeq) { c =>
(integrityCheckActor ? GetCheckStats(c.name))
.mapTo[CheckState]
.recover {
case error =>
logger.error(s"Fail to get check stats of ${c.name}", error)
CheckState.empty
}
.map(c.name -> _)
}
.map { results =>
Results.Ok(JsObject(results.map(r => r._1 -> Json.toJson(r._2))))
}
}

private val indexedModels = Seq("Alert", "Attachment", "Audit", "Case", "Log", "Observable", "Tag", "Task")
def indexStatus: Action[AnyContent] =
entrypoint("Get index status")
.authPermittedRoTransaction(db, Permissions.managePlatform) { _ => graph =>
val status = indexedModels.map { label =>
val mixedCount = graph.V(label).getCount
val compositeCount = graph.underlying.traversal().V().has("_label", label).count().next().toLong
label -> Json.obj(
"mixedCount" -> mixedCount,
"compositeCount" -> compositeCount,
"status" -> (if (mixedCount == compositeCount) "OK" else "Error")
)
}
Success(Results.Ok(JsObject(status)))
}

def reindex(label: String): Action[AnyContent] =
entrypoint("Reindex data")
.authPermitted(Permissions.managePlatform) { _ =>
Future(db.reindexData(label))
Success(Results.NoContent)
}
}
7 changes: 7 additions & 0 deletions thehive/app/org/thp/thehive/controllers/v1/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import javax.inject.{Inject, Singleton}

@Singleton
class Router @Inject() (
adminCtrl: AdminCtrl,
authenticationCtrl: AuthenticationCtrl,
alertCtrl: AlertCtrl,
// attachmentCtrl: AttachmentCtrl,
Expand Down Expand Up @@ -40,6 +41,12 @@ class Router @Inject() (
override def routes: Routes = {
case GET(p"/status") => statusCtrl.get
// GET /health controllers.StatusCtrl.health

case GET(p"/admin/check/stats") => adminCtrl.checkStats
case GET(p"/admin/check/$name/trigger") => adminCtrl.triggerCheck(name)
case GET(p"/admin/index/status") => adminCtrl.indexStatus
case GET(p"/admin/index/$name/reindex") => adminCtrl.reindex(name)

// GET /logout controllers.AuthenticationCtrl.logout()
case GET(p"/logout") => authenticationCtrl.logout
case POST(p"/logout") => authenticationCtrl.logout
Expand Down
2 changes: 1 addition & 1 deletion thehive/app/org/thp/thehive/models/Alert.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class AlertCaseTemplate()
case class AlertTag()

@BuildVertexEntity
@DefineIndex(IndexType.basic, "type", "source", "sourceRef")
@DefineIndex(IndexType.unique, "type", "source", "sourceRef", "organisationId")
@DefineIndex(IndexType.standard, "type")
@DefineIndex(IndexType.standard, "source")
@DefineIndex(IndexType.standard, "sourceRef")
Expand Down
1 change: 1 addition & 0 deletions thehive/app/org/thp/thehive/models/Permissions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object Permissions extends Perms {
lazy val manageTaxonomy: PermissionDesc = PermissionDesc("manageTaxonomy", "Manage taxonomies", "admin")
lazy val manageTask: PermissionDesc = PermissionDesc("manageTask", "Manage tasks", "organisation")
lazy val manageUser: PermissionDesc = PermissionDesc("manageUser", "Manage users", "organisation", "admin")
lazy val managePlatform: PermissionDesc = PermissionDesc("managePlatform", "Manage TheHive platform", "admin")

lazy val list: Set[PermissionDesc] =
Set(
Expand Down
10 changes: 7 additions & 3 deletions thehive/app/org/thp/thehive/models/TheHiveSchemaDefinition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,22 @@ class TheHiveSchemaDefinition @Inject() extends Schema with UpdatableSchema {
.project(
_.by
.by(_.out("TaskUser").property("login", UMapping.string).option)
.by(_.coalesceIdent(_.in("ShareTask").out("ShareCase"), _.in("CaseTemplateTask"))._id)
.by(_.coalesceIdent(_.in("ShareTask").out("ShareCase"), _.in("CaseTemplateTask"))._id.option)
.by(_.coalesceIdent(_.in("ShareTask").in("OrganisationShare"), _.in("CaseTemplateTask").out("CaseTemplateOrganisation"))._id.fold)
)
.foreach {
case (vertex, assignee, relatedId, organisationIds) =>
case (vertex, assignee, Some(relatedId), organisationIds) =>
assignee.foreach(vertex.property("assignee", _))
vertex.property("relatedId", relatedId.value)
organisationIds.foreach(id => vertex.property(Cardinality.list, "organisationIds", id.value))
case _ =>
}
Success(())
}
.rebuildIndexes
.updateGraph("Add managePlatform permission to admin profile", "Profile") { traversal =>
traversal.unsafeHas("name", "admin").raw.property("permissions", "managePlatform").iterate()
Success(())
}

val reflectionClasses = new Reflections(
new ConfigurationBuilder()
Expand Down
119 changes: 108 additions & 11 deletions thehive/app/org/thp/thehive/services/AlertSrv.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.thp.thehive.services

import akka.actor.ActorRef
import org.apache.tinkerpop.gremlin.process.traversal.P
import org.apache.tinkerpop.gremlin.process.traversal.{Order, P}
import org.thp.scalligraph.auth.{AuthContext, Permission}
import org.thp.scalligraph.controllers.FFile
import org.thp.scalligraph.models._
Expand Down Expand Up @@ -586,17 +586,114 @@ object AlertOps {
implicit class AlertCustomFieldsOpsDefs(traversal: Traversal.E[AlertCustomField]) extends CustomFieldValueOpsDefs(traversal)
}

class AlertIntegrityCheckOps @Inject() (val db: Database, val service: AlertSrv) extends IntegrityCheckOps[Alert] {
override def check(): Unit = {
db.tryTransaction { implicit graph =>
service
class AlertIntegrityCheckOps @Inject() (val db: Database, val service: AlertSrv, organisationSrv: OrganisationSrv) extends IntegrityCheckOps[Alert] {

override def resolve(entities: Seq[Alert with Entity])(implicit graph: Graph): Try[Unit] = {
val (imported, notImported) = entities.partition(_.caseId.isDefined)
if (imported.nonEmpty && notImported.nonEmpty)
// Remove all non imported alerts
service.getByIds(notImported.map(_._id): _*).remove()
// Keep the last created alert
lastCreatedEntity(entities).foreach(e => service.getByIds(e._2.map(_._id): _*).remove())
Success(())
}

override def globalCheck(): Map[String, Long] = {
val metrics = super.globalCheck()
implicit val authContext: AuthContext = LocalUserSrv.getSystemAuthContext
val multiImport = db.tryTransaction { implicit graph =>
// Remove extra link with case
val linkIds = service
.startTraversal
.flatMap(_.outE[AlertCase].range(1, 100))
.remove()
Success(())
.flatMap(_.outE[AlertCase].range(1, 100)._id)
.toSeq
if (linkIds.nonEmpty)
graph.E[AlertCase](linkIds: _*).remove()
Success(linkIds.length.toLong)
}
()
}

override def resolve(entities: Seq[Alert with Entity])(implicit graph: Graph): Try[Unit] = Success(())
val orgMetrics: Map[String, Long] = db
.tryTransaction { implicit graph =>
// Check links with organisation
Success {
service
.startTraversal
.project(
_.by
.by(_.organisation._id.fold)
)
.toIterator
.flatMap {
case (alert, Seq(organisationId)) if alert.organisationId == organisationId => None // It's OK

case (alert, Seq(organisationId)) =>
logger.warn(
s"Invalid organisationId in alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}), " +
s"got ${alert.organisationId}, should be $organisationId. Fixing it."
)
service.get(alert).update(_.organisationId, organisationId).iterate()
Some("invalid")

case (alert, organisationIds) if organisationIds.isEmpty =>
organisationSrv.getOrFail(alert.organisationId) match {
case Success(organisation) =>
logger.warn(
s"Link between alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) and " +
s"organisation ${alert.organisationId} has disappeared. Fixing it."
)
service.alertOrganisationSrv.create(AlertOrganisation(), alert, organisation).failed.foreach { error =>
logger.error(
s"Fail to create link between alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) " +
s"and organisation ${alert.organisationId}",
error
)
}
Some("missing")
case _ =>
logger.warn(
s"Alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) is not linked to " +
s"existing organisation. Fixing it."
)
service.get(alert).remove()
Some("missingAndFail")
}

case (alert, organisationIds) if organisationIds.contains(alert.organisationId) =>
val (extraLinks, extraOrganisationIds) = organisationIds.partition(_ == alert.organisationId)
if (extraOrganisationIds.nonEmpty) {
logger.warn(
s"Alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) is not linked to " +
s"extra organisation(s): ${extraOrganisationIds.mkString(",")}. Fixing it."
)
service.get(alert).outE[AlertOrganisation].filter(_.inV.hasId(extraOrganisationIds: _*)).remove()
}
if (extraLinks.length > 1) {
logger.warn(
s"Alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) is linked more than once to " +
s"organisation: ${alert.organisationId}. Fixing it."
)
service.get(alert).flatMap(_.outE[AlertOrganisation].range(1, 100)).remove()
}
Some("extraLink")

case (alert, organisationIds) =>
logger.warn(
s"Alert ${alert._id}(${alert.`type`}:${alert.source}:${alert.sourceRef}) has inconsistent organisation links: " +
s"organisation is ${alert.organisationId} but links are ${organisationIds.mkString(",")}. Fixing it."
)
service.get(alert).flatMap(_.outE[AlertOrganisation].sort(_.by("_createdAt", Order.asc)).range(1, 100)).remove()
service.get(alert).organisation._id.getOrFail("Organisation").foreach { organisationId =>
service.get(alert).update(_.organisationId, organisationId).iterate()
}
Some("incoherent")
}
.toSeq
}
}
.getOrElse(Seq("globalFailure"))
.groupBy(identity)
.mapValues(_.size.toLong)

orgMetrics ++ metrics + ("multiImport" -> multiImport.getOrElse(0L))
}
}
2 changes: 1 addition & 1 deletion thehive/app/org/thp/thehive/services/CaseSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ object CaseOps {

class CaseIntegrityCheckOps @Inject() (val db: Database, val service: CaseSrv) extends IntegrityCheckOps[Case] {
def removeDuplicates(): Unit =
duplicateEntities
findDuplicates()
.foreach { entities =>
db.tryTransaction { implicit graph =>
resolve(entities)
Expand Down
7 changes: 6 additions & 1 deletion thehive/app/org/thp/thehive/services/CaseTemplateSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class CaseTemplateIntegrityCheckOps @Inject() (
val service: CaseTemplateSrv,
organisationSrv: OrganisationSrv
) extends IntegrityCheckOps[CaseTemplate] {
override def duplicateEntities: Seq[Seq[CaseTemplate with Entity]] =
override def findDuplicates: Seq[Seq[CaseTemplate with Entity]] =
db.roTransaction { implicit graph =>
organisationSrv
.startTraversal
Expand All @@ -237,4 +237,9 @@ class CaseTemplateIntegrityCheckOps @Inject() (
Success(())
case _ => Success(())
}

override def findOrphans(): Seq[CaseTemplate with Entity] =
db.roTransaction { implicit graph =>
service.startTraversal.filterNot(_.organisation).toSeq
}
}
Loading

0 comments on commit 61e66e6

Please sign in to comment.