Skip to content

Commit

Permalink
#1731 Optimize audit queries for the use of the index
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 5, 2021
1 parent a5ef7e6 commit 480d81e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 43 deletions.
23 changes: 17 additions & 6 deletions thehive/app/org/thp/thehive/services/AuditSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ class AuditSrv @Inject() (
def delete(entity: E with Entity, context: Option[C with Entity])(implicit graph: Graph, authContext: AuthContext): Try[Unit] =
auditSrv.create(Audit(Audit.delete, entity, None), context, None)

def merge(entity: E with Entity, destination: C with Entity, details: Option[JsObject] = None)(implicit graph: Graph, authContext: AuthContext): Try[Unit] =
def merge(entity: E with Entity, destination: C with Entity, details: Option[JsObject] = None)(implicit
graph: Graph,
authContext: AuthContext
): Try[Unit] =
auditSrv.create(Audit(Audit.merge, destination, details.map(_.toString())), Some(destination), Some(destination))
}

Expand All @@ -186,7 +189,10 @@ class AuditSrv @Inject() (
if (details == JsObject.empty) Success(())
else auditSrv.create(Audit(Audit.update, entity, Some(details.toString)), Some(entity), Some(entity))

def delete(entity: E with Entity, context: Product with Entity, details: Option[JsObject] = None)(implicit graph: Graph, authContext: AuthContext): Try[Unit] =
def delete(entity: E with Entity, context: Product with Entity, details: Option[JsObject] = None)(implicit
graph: Graph,
authContext: AuthContext
): Try[Unit] =
auditSrv.create(Audit(Audit.delete, entity, details.map(_.toString())), Some(context), None)
}

Expand Down Expand Up @@ -276,8 +282,11 @@ class AuditSrv @Inject() (

object AuditOps {

implicit class AuditOpsDefs(traversal: Traversal.V[Audit]) {
implicit class VertexDefs(traversal: Traversal[Vertex, Vertex, IdentityConverter[Vertex]]) {
def share: Traversal.V[Share] = traversal.coalesceIdent(_.in[ShareObservable], _.in[ShareTask], _.in[ShareCase]).v[Share]
}

implicit class AuditOpsDefs(traversal: Traversal.V[Audit]) {
def auditContextObjectOrganisation
: Traversal[(Audit with Entity, Option[Entity], Option[Entity], Option[Organisation with Entity]), JMap[String, Any], Converter[
(Audit with Entity, Option[Entity], Option[Entity], Option[Organisation with Entity]),
Expand Down Expand Up @@ -332,17 +341,19 @@ object AuditOps {
def `case`: Traversal.V[Case] =
traversal
.out[AuditContext]
.coalesceIdent[Vertex](_.in().hasLabel("Share"), _.hasLabel("Share"))
.share
.out[ShareCase]
.v[Case]

def organisation: Traversal.V[Organisation] =
traversal
.out[AuditContext]
.coalesceIdent[Vertex](
_.share.in[OrganisationShare],
_.out[AlertOrganisation],
_.hasLabel("Organisation"),
_.in().hasLabel("Share").in[OrganisationShare],
_.both().hasLabel("Organisation")
_.out[CaseTemplateOrganisation],
_.in[OrganisationDashboard]
)
.v[Organisation]

Expand Down
101 changes: 65 additions & 36 deletions thehive/app/org/thp/thehive/services/FlowActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.thp.scalligraph.models.Database
import org.thp.scalligraph.services.EventSrv
import org.thp.scalligraph.services.config.ApplicationConfig.finiteDurationFormat
import org.thp.scalligraph.services.config.{ApplicationConfig, ConfigItem}
import org.thp.scalligraph.traversal.{Converter, Graph, GraphStrategy}
import org.thp.scalligraph.traversal.TraversalOps._
import org.thp.scalligraph.{EntityId, EntityIdOrName}
import org.thp.thehive.GuiceAkkaExtension
Expand All @@ -34,7 +35,7 @@ class FlowActor extends Actor {
lazy val cache: SyncCacheApi = injector.getInstance(classOf[SyncCacheApi])
lazy val auditSrv: AuditSrv = injector.getInstance(classOf[AuditSrv])
lazy val caseSrv: CaseSrv = injector.getInstance(classOf[CaseSrv])
lazy val db: Database = injector.getInstance(GuiceKey.get(classOf[Database], Names.named("with-thehive-schema")))
lazy val db: Database = injector.getInstance(classOf[Database])
lazy val appConfig: ApplicationConfig = injector.getInstance(classOf[ApplicationConfig])
lazy val maxAgeConfig: ConfigItem[FiniteDuration, FiniteDuration] =
appConfig.item[FiniteDuration]("flow.maxAge", "Max age of audit logs shown in initial flow")
Expand All @@ -43,46 +44,74 @@ class FlowActor extends Actor {
lazy val eventSrv: EventSrv = injector.getInstance(classOf[EventSrv])
override def preStart(): Unit = eventSrv.subscribe(StreamTopic(), self)
override def postStop(): Unit = eventSrv.unsubscribe(StreamTopic(), self)

def flowQuery(organisation: EntityIdOrName, caseId: Option[EntityIdOrName])(implicit graph: Graph) =
// caseId
// .fold(
// auditSrv
// .startTraversal(GraphStrategy.without[FilterRankingStrategy])
// .has(_.mainAction, true)
// // .has(_._createdAt, P.gt(fromDate))
// .sort(_.by("_createdAt", Order.desc))
// .visible(organisation)
// )(
// caseSrv.get(_).audits(organisation).sort(_.by("_createdAt", Order.desc))
// )
// .range(0, 10)
// ._id
auditSrv
.startTraversal //(GraphStrategy.without[FilterRankingStrategy]) // FIXME
.has(_.mainAction, true)
.sort(_.by("_createdAt", Order.desc))
.visible(organisation)
.range(0, 10)
._id

override def receive: Receive = {
case flowId @ FlowId(organisation, caseId) =>
val auditIds = cache.getOrElseUpdate(flowId.toString) {
// db.roTransaction { implicit graph =>
// flowQuery(organisation, caseId)
// .onRawMap[TraversalMetrics, TraversalMetrics, Converter.Identity[TraversalMetrics]](_.profile())(Converter.identity)
// .toIterator
// .foreach { metric =>
// logger.debug(s"Flow profile:\n$metric")
// }
// val explanation = flowQuery(organisation, caseId)
// .raw
// .explain()
// logger.debug(s"Flow explanation:\n$explanation")
// }
val auditIds = //cache.getOrElseUpdate(flowId.toString) {
db.roTransaction { implicit graph =>
caseId
.fold(auditSrv.startTraversal.has(_.mainAction, true).has(_._createdAt, P.gt(fromDate)).visible(organisation))(
caseSrv.get(_).audits(organisation)
)
.sort(_.by("_createdAt", Order.desc))
.range(0, 10)
._id
.toSeq
flowQuery(organisation, caseId).toSeq
}
}
//}
sender ! AuditIds(auditIds)
case AuditStreamMessage(ids @ _*) =>
db.roTransaction { implicit graph =>
auditSrv
.getByIds(ids: _*)
.has(_.mainAction, true)
.project(
_.by(_._id)
.by(_.organisation._id.fold)
.by(_.`case`._id.fold)
)
.toIterator
.foreach {
case (id, organisations, cases) =>
organisations.foreach { organisation =>
val cacheKey = FlowId(organisation, None).toString
val ids = cache.get[List[String]](cacheKey).getOrElse(Nil)
cache.set(cacheKey, (id :: ids).take(10))
cases.foreach { caseId =>
val cacheKey: String = FlowId(organisation, Some(caseId)).toString
val ids = cache.get[List[String]](cacheKey).getOrElse(Nil)
cache.set(cacheKey, (id :: ids).take(10))
}
}
}
}
// case AuditStreamMessage(ids @ _*) =>
// db.roTransaction { implicit graph =>
// auditSrv
// .getByIds(ids: _*)
// .has(_.mainAction, true)
// .project(
// _.by(_._id)
// .by(_.organisation._id.fold)
// .by(_.`case`._id.fold)
// )
// .toIterator
// .foreach {
// case (id, organisations, cases) =>
// organisations.foreach { organisation =>
// val cacheKey = FlowId(organisation, None).toString
// val ids = cache.get[List[String]](cacheKey).getOrElse(Nil)
// cache.set(cacheKey, (id :: ids).take(10))
// cases.foreach { caseId =>
// val cacheKey: String = FlowId(organisation, Some(caseId)).toString
// val ids = cache.get[List[String]](cacheKey).getOrElse(Nil)
// cache.set(cacheKey, (id :: ids).take(10))
// }
// }
// }
// }
case _ =>
}
}
Expand Down
2 changes: 1 addition & 1 deletion thehive/test/org/thp/thehive/services/AuditSrvTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AuditSrvTest extends PlaySpecification with TestAppBuilder {
val c1 = app[Database]
.tryTransaction(implicit graph =>
app[CaseSrv].create(
Case(0, "case audit", "desc audit", 1, new Date(), None, flag = false, 1, 1, CaseStatus.Open, None),
Case(0, "case audit", "desc audit", 1, new Date(), None, flag = false, 1, 1, CaseStatus.Open, None, Seq(orgAdmin._id)),
None,
orgAdmin,
Set.empty,
Expand Down

0 comments on commit 480d81e

Please sign in to comment.