Skip to content

Commit

Permalink
#2305 Migration input cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 22, 2022
1 parent ee21355 commit 672dadb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 144 deletions.
20 changes: 0 additions & 20 deletions migration/src/main/scala/org/thp/thehive/migration/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,16 @@ trait Input {
def countOrganisations(filter: Filter): Future[Long]
def listCases(filter: Filter): Source[Try[InputCase], NotUsed]
def countCases(filter: Filter): Future[Long]
def listCaseObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed]
def countCaseObservables(filter: Filter): Future[Long]
def listCaseObservables(caseId: String): Source[Try[(String, InputObservable)], NotUsed]
def countCaseObservables(caseId: String): Future[Long]
def listCaseTasks(filter: Filter): Source[Try[(String, InputTask)], NotUsed]
def countCaseTasks(filter: Filter): Future[Long]
def listCaseTasks(caseId: String): Source[Try[(String, InputTask)], NotUsed]
def countCaseTasks(caseId: String): Future[Long]
def listCaseTaskLogs(filter: Filter): Source[Try[(String, InputLog)], NotUsed]
def countCaseTaskLogs(filter: Filter): Future[Long]
def listCaseTaskLogs(caseId: String): Source[Try[(String, InputLog)], NotUsed]
def countCaseTaskLogs(caseId: String): Future[Long]
def listAlerts(filter: Filter): Source[Try[InputAlert], NotUsed]
def countAlerts(filter: Filter): Future[Long]
def listAlertObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed]
def countAlertObservables(filter: Filter): Future[Long]
def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed]
def countAlertObservables(alertId: String): Future[Long]
def listUsers(filter: Filter): Source[Try[InputUser], NotUsed]
def countUsers(filter: Filter): Future[Long]
def listCustomFields(filter: Filter): Source[Try[InputCustomField], NotUsed]
Expand All @@ -121,25 +113,13 @@ trait Input {
def listCaseTemplate(filter: Filter): Source[Try[InputCaseTemplate], NotUsed]
def countCaseTemplate(filter: Filter): Future[Long]
def listCaseTemplateTask(caseTemplateId: String): Source[Try[(String, InputTask)], NotUsed]
def countCaseTemplateTask(caseTemplateId: String): Future[Long]
def listCaseTemplateTask(filter: Filter): Source[Try[(String, InputTask)], NotUsed]
def countCaseTemplateTask(filter: Filter): Future[Long]
def listJobs(caseId: String): Source[Try[(String, InputJob)], NotUsed]
def countJobs(caseId: String): Future[Long]
def listJobs(filter: Filter): Source[Try[(String, InputJob)], NotUsed]
def countJobs(filter: Filter): Future[Long]
def listJobObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed]
def countJobObservables(filter: Filter): Future[Long]
def listJobObservables(caseId: String): Source[Try[(String, InputObservable)], NotUsed]
def countJobObservables(caseId: String): Future[Long]
def listAction(filter: Filter): Source[Try[(String, InputAction)], NotUsed]
def countAction(filter: Filter): Future[Long]
def listAction(entityId: String): Source[Try[(String, InputAction)], NotUsed]
def listActions(entityIds: Seq[String]): Source[Try[(String, InputAction)], NotUsed]
def countAction(entityId: String): Future[Long]
def listAudit(filter: Filter): Source[Try[(String, InputAudit)], NotUsed]
def countAudit(filter: Filter): Future[Long]
def listAudit(entityId: String, filter: Filter): Source[Try[(String, InputAudit)], NotUsed]
def listAudits(entityIds: Seq[String], filter: Filter): Source[Try[(String, InputAudit)], NotUsed]
def countAudit(entityId: String, filter: Filter): Future[Long]
}
132 changes: 8 additions & 124 deletions migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.thp.thehive.migration.th3
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.alpakka.json.scaladsl.JsonReader
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.google.inject.Guice
Expand Down Expand Up @@ -55,7 +56,7 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
source.map(json => parent(json).flatMap(p => Try(p -> json.as[A])))
}

def readAttachment(id: String): Source[ByteString, NotUsed] =
override def readAttachment(id: String): Source[ByteString, NotUsed] =
Source.unfoldAsync(0) { chunkNumber =>
elaticClient
.get("data", s"${id}_$chunkNumber")
Expand All @@ -66,15 +67,13 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
}

override def listOrganisations(filter: Filter): Source[Try[InputOrganisation], NotUsed] =
Source(
List(
Success(InputOrganisation(MetaData(mainOrganisation, "system", new Date, None, None), Organisation(mainOrganisation, mainOrganisation)))
)
Source.single(
Success(InputOrganisation(MetaData(mainOrganisation, "system", new Date, None, None), Organisation(mainOrganisation, mainOrganisation)))
)

override def countOrganisations(filter: Filter): Future[Long] = Future.successful(1)

def caseFilter(filter: Filter): Seq[JsObject] = {
private def caseFilter(filter: Filter): Seq[JsObject] = {
val dateFilter =
if (filter.caseDateRange._1.isDefined || filter.caseDateRange._2.isDefined)
Seq(range("createdAt", filter.caseDateRange._1, filter.caseDateRange._2))
Expand All @@ -93,52 +92,29 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
override def countCases(filter: Filter): Future[Long] =
elaticClient.count("case", searchQuery(bool(caseFilter(filter))))

override def listCaseObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed] =
elaticClient("case_artifact", searchQuery(hasParentQuery("case", bool(caseFilter(filter)))))
.readWithParent[InputObservable](json => Try((json \ "_parent").as[String]))

override def countCaseObservables(filter: Filter): Future[Long] =
elaticClient.count("case_artifact", searchQuery(hasParentQuery("case", bool(caseFilter(filter)))))

override def listCaseObservables(caseId: String): Source[Try[(String, InputObservable)], NotUsed] =
elaticClient("case_artifact", searchQuery(hasParentQuery("case", idsQuery(caseId))))
.readWithParent[InputObservable](json => Try((json \ "_parent").as[String]))

override def countCaseObservables(caseId: String): Future[Long] =
elaticClient.count("case_artifact", searchQuery(hasParentQuery("case", idsQuery(caseId))))

override def listCaseTasks(filter: Filter): Source[Try[(String, InputTask)], NotUsed] =
elaticClient("case_task", searchQuery(hasParentQuery("case", bool(caseFilter(filter)))))
.readWithParent[InputTask](json => Try((json \ "_parent").as[String]))

override def countCaseTasks(filter: Filter): Future[Long] =
elaticClient.count("case_task", searchQuery(hasParentQuery("case", bool(caseFilter(filter)))))

override def listCaseTasks(caseId: String): Source[Try[(String, InputTask)], NotUsed] =
elaticClient("case_task", searchQuery(hasParentQuery("case", idsQuery(caseId))))
.readWithParent[InputTask](json => Try((json \ "_parent").as[String]))

override def countCaseTasks(caseId: String): Future[Long] =
elaticClient.count("case_task", searchQuery(hasParentQuery("case", idsQuery(caseId))))

override def listCaseTaskLogs(filter: Filter): Source[Try[(String, InputLog)], NotUsed] =
listCaseTaskLogs(bool(caseFilter(filter)))

override def countCaseTaskLogs(filter: Filter): Future[Long] =
countCaseTaskLogs(bool(caseFilter(filter)))

override def listCaseTaskLogs(caseId: String): Source[Try[(String, InputLog)], NotUsed] =
listCaseTaskLogs(idsQuery(caseId))

override def countCaseTaskLogs(caseId: String): Future[Long] =
countCaseTaskLogs(idsQuery(caseId))

private def listCaseTaskLogs(caseQuery: JsObject): Source[Try[(String, InputLog)], NotUsed] =
elaticClient(
"case_task_log",
searchQuery(
bool(
Seq(hasParentQuery("case_task", hasParentQuery("case", caseQuery))),
Seq(hasParentQuery("case_task", hasParentQuery("case", idsQuery(caseId)))),
Nil,
Seq(termQuery("status", "deleted"))
)
Expand All @@ -158,7 +134,7 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
)
)

def alertFilter(filter: Filter): JsObject = {
private def alertFilter(filter: Filter): JsObject = {
val dateFilter =
if (filter.alertDateRange._1.isDefined || filter.alertDateRange._2.isDefined)
Seq(range("createdAt", filter.alertDateRange._1, filter.alertDateRange._2))
Expand All @@ -178,26 +154,6 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
override def countAlerts(filter: Filter): Future[Long] =
elaticClient.count("alert", searchQuery(alertFilter(filter)))

override def listAlertObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed] =
elaticClient("alert", searchQuery(alertFilter(filter)))
.map { json =>
for {
metaData <- json.validate[MetaData]
observablesJson <- (json \ "artifacts").validate[Seq[JsValue]]
} yield (metaData, observablesJson)
}
.mapConcat {
case JsSuccess(x, _) => List(x)
case JsError(errors) =>
val errorStr = errors.map(e => s"\n - ${e._1}: ${e._2.mkString(",")}")
logger.error(s"Alert observable read failure:$errorStr")
Nil
}
.mapConcat {
case (metaData, observablesJson) =>
observablesJson.map(observableJson => Try(metaData.id -> observableJson.as(alertObservableReads(metaData)))).toList
}

override def countAlertObservables(filter: Filter): Future[Long] = Future.failed(new NotImplementedError)

override def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed] = {
Expand Down Expand Up @@ -262,26 +218,6 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
override def countCaseTemplate(filter: Filter): Future[Long] =
elaticClient.count("caseTemplate", searchQuery(matchAll))

override def listCaseTemplateTask(filter: Filter): Source[Try[(String, InputTask)], NotUsed] =
elaticClient("caseTemplate", searchQuery(matchAll))
.map { json =>
for {
metaData <- json.validate[MetaData]
tasksJson <- (json \ "tasks").validateOpt[Seq[JsValue]]
} yield (metaData, tasksJson.getOrElse(Nil))
}
.mapConcat {
case JsSuccess(x, _) => List(x)
case JsError(errors) =>
val errorStr = errors.map(e => s"\n - ${e._1}: ${e._2.mkString(",")}")
logger.error(s"Case template task read failure:$errorStr")
Nil
}
.mapConcat {
case (metaData, tasksJson) =>
tasksJson.map(taskJson => Try(metaData.id -> taskJson.as(caseTemplateTaskReads(metaData)))).toList
}

override def countCaseTemplateTask(filter: Filter): Future[Long] = Future.failed(new NotImplementedError)

def listCaseTemplateTask(caseTemplateId: String): Source[Try[(String, InputTask)], NotUsed] =
Expand All @@ -301,41 +237,13 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
}
.mapMaterializedValue(_ => NotUsed)

override def countCaseTemplateTask(caseTemplateId: String): Future[Long] = Future.failed(new NotImplementedError)

override def listJobs(filter: Filter): Source[Try[(String, InputJob)], NotUsed] =
elaticClient("case_artifact_job", searchQuery(hasParentQuery("case_artifact", hasParentQuery("case", bool(caseFilter(filter))))))
.readWithParent[InputJob](json => Try((json \ "_parent").as[String]))(jobReads, classTag[InputJob])

override def countJobs(filter: Filter): Future[Long] =
elaticClient.count("case_artifact_job", searchQuery(hasParentQuery("case_artifact", hasParentQuery("case", bool(caseFilter(filter))))))

override def listJobs(caseId: String): Source[Try[(String, InputJob)], NotUsed] =
elaticClient("case_artifact_job", searchQuery(hasParentQuery("case_artifact", hasParentQuery("case", idsQuery(caseId)))))
.readWithParent[InputJob](json => Try((json \ "_parent").as[String]))(jobReads, classTag[InputJob])

override def countJobs(caseId: String): Future[Long] =
elaticClient.count(
"case_artifact_job",
searchQuery(hasParentQuery("case_artifact", hasParentQuery("case", idsQuery(caseId))))
)

override def listJobObservables(filter: Filter): Source[Try[(String, InputObservable)], NotUsed] =
elaticClient(
"case_artifact_job",
searchQuery(hasParentQuery("case_artifact", hasParentQuery("case", bool(caseFilter(filter)))))
)
.map { json =>
Try {
val metaData = json.as[MetaData]
(json \ "artifacts").asOpt[Seq[JsValue]].getOrElse(Nil).map(o => Try(metaData.id -> o.as(jobObservableReads(metaData))))
}
}
.mapConcat {
case Success(o) => o.toList
case Failure(error) => List(Failure(error))
}

override def countJobObservables(filter: Filter): Future[Long] = Future.failed(new NotImplementedError)

override def listJobObservables(caseId: String): Source[Try[(String, InputObservable)], NotUsed] =
Expand All @@ -351,27 +259,14 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
case Failure(error) => List(Failure(error))
}

override def countJobObservables(caseId: String): Future[Long] = Future.failed(new NotImplementedError)

override def listAction(filter: Filter): Source[Try[(String, InputAction)], NotUsed] =
elaticClient("action", searchQuery(matchAll))
.read[(String, InputAction)]

override def countAction(filter: Filter): Future[Long] =
elaticClient.count("action", searchQuery(matchAll))

override def listAction(entityId: String): Source[Try[(String, InputAction)], NotUsed] =
elaticClient("action", searchQuery(termQuery("objectId", entityId)))
.read[(String, InputAction)]

override def listActions(entityIds: Seq[String]): Source[Try[(String, InputAction)], NotUsed] =
elaticClient("action", searchQuery(termsQuery("objectId", entityIds)))
.read[(String, InputAction)]

override def countAction(entityId: String): Future[Long] =
elaticClient.count("action", searchQuery(idsQuery(entityId)))

def auditFilter(filter: Filter, objectIds: String*): JsObject = {
private def auditFilter(filter: Filter, objectIds: String*): JsObject = {
val dateFilter =
if (filter.auditDateRange._1.isDefined || filter.auditDateRange._2.isDefined)
Seq(range("createdAt", filter.auditDateRange._1, filter.auditDateRange._2))
Expand All @@ -388,21 +283,10 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient
bool(dateFilter ++ includeFilter ++ objectIdFilter, Nil, excludeFilter)
}

override def listAudit(filter: Filter): Source[Try[(String, InputAudit)], NotUsed] =
elaticClient("audit", searchQuery(auditFilter(filter)))
.read[(String, InputAudit)]

override def countAudit(filter: Filter): Future[Long] =
elaticClient.count("audit", searchQuery(auditFilter(filter)))

override def listAudit(entityId: String, filter: Filter): Source[Try[(String, InputAudit)], NotUsed] =
elaticClient("audit", searchQuery(auditFilter(filter, entityId)))
.read[(String, InputAudit)]

override def listAudits(entityIds: Seq[String], filter: Filter): Source[Try[(String, InputAudit)], NotUsed] =
elaticClient("audit", searchQuery(auditFilter(filter, entityIds: _*)))
.read[(String, InputAudit)]

def countAudit(entityId: String, filter: Filter): Future[Long] =
elaticClient.count("audit", searchQuery(auditFilter(filter, entityId)))
}

0 comments on commit 672dadb

Please sign in to comment.