From ed3e769bc8154207c034637f1ffd4bc3f761b014 Mon Sep 17 00:00:00 2001 From: To-om Date: Mon, 25 May 2020 09:29:12 +0200 Subject: [PATCH] #1340 Add statistics on migration process --- .../org/thp/thehive/migration/Migrate.scala | 6 +- .../thp/thehive/migration/MigrationOps.scala | 302 ++++++++++++------ .../thp/thehive/migration/th4/Output.scala | 10 +- .../org/thp/thehive/services/UserSrv.scala | 4 +- 4 files changed, 213 insertions(+), 109 deletions(-) diff --git a/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala b/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala index 11c5e36a95..9ea609a428 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala @@ -90,8 +90,10 @@ object Migrate extends App with MigrationOps { val output = th4.Output(Configuration(config.getConfig("output").withFallback(config))) val filter = Filter.fromConfig(config.getConfig("input.filter")) - val process = migrate(input, output, filter) - Await.ready(process, Duration.Inf) + val process = migrate(input, output, filter) + val migrationStats = Await.result(process, Duration.Inf) + println("Migration finished") + println(migrationStats) System.exit(0) } } diff --git a/migration/src/main/scala/org/thp/thehive/migration/MigrationOps.scala b/migration/src/main/scala/org/thp/thehive/migration/MigrationOps.scala index c670b1628a..76161a88d9 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/MigrationOps.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/MigrationOps.scala @@ -1,15 +1,51 @@ package org.thp.thehive.migration -import scala.collection.{immutable, mutable} -import scala.concurrent.{ExecutionContext, Future} -import scala.reflect.{classTag, ClassTag} -import scala.util.{Failure, Success, Try} -import play.api.Logger import akka.NotUsed import akka.stream.Materializer -import akka.stream.scaladsl.{Sink, Source} +import akka.stream.scaladsl.Source import org.thp.scalligraph.{NotFoundError, RichOptionTry} import org.thp.thehive.migration.dto.{InputAlert, InputAudit, InputCase, InputCaseTemplate} +import play.api.Logger + +import scala.collection.{immutable, mutable} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +class MigrationStats(val stats: Map[String, (Int, Int)]) { + def +(other: MigrationStats): MigrationStats = + new MigrationStats((stats.keySet ++ other.stats.keySet).map { key => + val (s1, f1) = stats.getOrElse(key, 0 -> 0) + val (s2, f2) = other.stats.getOrElse(key, 0 -> 0) + key -> (s1 + s2, f1 + f2) + }.toMap) + + def success(name: String): MigrationStats = + new MigrationStats({ + val (s, f) = stats.getOrElse(name, 0 -> 0) + stats.updated(name, s + 1 -> f) + }) + def failure(name: String): MigrationStats = + new MigrationStats({ + val (s, f) = stats.getOrElse(name, 0 -> 0) + stats.updated(name, s -> (f + 1)) + }) + + override def toString: String = + stats + .map { + case (name, (success, failure)) => s"$name: $success/${success + failure}" + } + .toSeq + .sorted + .mkString("\n") +} + +object MigrationStats { + def empty = new MigrationStats(Map.empty) + def apply(name: String, successes: Int, failures: Int) = new MigrationStats(Map(name -> (successes -> failures))) + def success(name: String): MigrationStats = apply(name, 1, 0) + def failure(name: String): MigrationStats = apply(name, 0, 1) +} trait MigrationOps { lazy val logger: Logger = Logger(getClass) @@ -22,143 +58,186 @@ trait MigrationOps { .fold[Try[String]](Failure(NotFoundError(s"Id $id not found")))(m => Success(m.outputId)) } - def migrate[A: ClassTag](source: Source[A, NotUsed], create: A => Try[IdMapping])(implicit mat: Materializer): Future[Seq[IdMapping]] = + def migrate[A](name: String, source: Source[A, NotUsed], create: A => Try[IdMapping])( + implicit ec: ExecutionContext, + mat: Materializer + ): Future[(Seq[IdMapping], MigrationStats)] = source - .map(create) - .mapConcat { - case Success(idMapping) => List(idMapping) - case Failure(error) => - logger.error(s"${classTag[A].runtimeClass.getSimpleName} creation failure", error) - Nil + .map { a => + create(a) + .recoverWith { + case error => + logger.error(s"$name creation failure: $error") + Failure(error) + } + } + .runFold[(Seq[IdMapping], Int, Int)]((Seq.empty, 0, 0)) { + case ((idMappings, successes, failures), Success(idMapping)) => (idMappings :+ idMapping, successes + 1, failures) + case ((idMappings, successes, failures), _) => (idMappings, successes, failures + 1) + } + .map { + case (idMappings, successes, failures) => idMappings -> MigrationStats(name, successes, failures) } - .runWith(Sink.seq) - def migrateWithParent[A: ClassTag, I]( + def migrateWithParent[A]( + name: String, parentIds: Seq[IdMapping], source: Source[(String, A), NotUsed], - create: (String, A) => Try[I] - )(implicit mat: Materializer): Future[Seq[I]] = + create: (String, A) => Try[IdMapping] + )(implicit ec: ExecutionContext, mat: Materializer): Future[(Seq[IdMapping], MigrationStats)] = source .map { case (parentId, a) => - parentIds.fromInput(parentId).flatMap(create(_, a)) + parentIds.fromInput(parentId).flatMap(create(_, a)).recoverWith { + case error => + logger.error(s"$name creation failure: $error") + Failure(error) + } + } + .runFold[(Seq[IdMapping], Int, Int)]((Seq.empty, 0, 0)) { + case ((idMappings, successes, failures), Success(idMapping)) => (idMappings :+ idMapping, successes + 1, failures) + case ((idMappings, successes, failures), _) => (idMappings, successes, failures + 1) } - .mapConcat { - case Success(idMapping) => List(idMapping) - case Failure(error) => - logger.error(s"${classTag[A].runtimeClass.getSimpleName} creation failure", error) - Nil + .map { + case (idMappings, successes, failures) => idMappings -> MigrationStats(name, successes, failures) } - .runWith(Sink.seq) def migrateAudit(ids: Seq[IdMapping], source: Source[(String, InputAudit), NotUsed], create: (String, InputAudit) => Try[Unit])( implicit ec: ExecutionContext, mat: Materializer - ): Future[Unit] = + ): Future[MigrationStats] = source .map { case (contextId, inputAudit) => - for { + (for { cid <- ids.fromInput(contextId) objId = inputAudit.audit.objectId.map(ids.fromInput).flip.getOrElse { - logger.error(s"object Id not found in audit ${inputAudit.audit}") + logger.warn(s"object Id not found in audit ${inputAudit.audit}") None } _ <- create(cid, inputAudit.updateObjectId(objId)) - } yield () + } yield ()) + .recoverWith { + case error => + logger.error(s"Audit creation failure: $error") + Failure(error) + } + } + .runFold[(Int, Int)]((0, 0)) { + case ((successes, failures), Success(_)) => (successes + 1, failures) + case ((successes, failures), _) => (successes, failures + 1) } - .recover { - case error => - logger.error("Audit creation failure", error) - Nil + .map { + case (successes, failures) => MigrationStats("Audit", successes, failures) } - .runWith(Sink.ignore) - .map(_ => ()) def migrateAWholeCaseTemplate(input: Input, output: Output)( inputCaseTemplate: InputCaseTemplate - )(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = + )(implicit ec: ExecutionContext, mat: Materializer): Future[MigrationStats] = output.createCaseTemplate(inputCaseTemplate) match { case Success(caseTemplateId @ IdMapping(inputCaseTemplateId, _)) => - migrateWithParent(Seq(caseTemplateId), input.listCaseTemplateTask(inputCaseTemplateId), output.createCaseTemplateTask).map(_ => ()) + migrateWithParent("CaseTemplate/Task", Seq(caseTemplateId), input.listCaseTemplateTask(inputCaseTemplateId), output.createCaseTemplateTask) + .map(_._2.success("CaseTemplate")) case Failure(error) => - logger.error(s"Migration of case template ${inputCaseTemplate.caseTemplate.name} failed", error) - Future.successful(()) + logger.error(s"Migration of case template ${inputCaseTemplate.caseTemplate.name} failed: $error") + Future.successful(MigrationStats.failure("CaseTemplate")) } - def migrateWholeCaseTemplates(input: Input, output: Output, filter: Filter)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = + def migrateWholeCaseTemplates(input: Input, output: Output, filter: Filter)( + implicit ec: ExecutionContext, + mat: Materializer + ): Future[MigrationStats] = input .listCaseTemplate(filter) .filterNot(output.caseTemplateExists) .mapAsync(1)(migrateAWholeCaseTemplate(input, output)) - .runWith(Sink.ignore) - .map(_ => ()) + .runFold(MigrationStats.empty)(_ + _) def migrateAWholeCase(input: Input, output: Output, filter: Filter)( inputCase: InputCase - )(implicit ec: ExecutionContext, mat: Materializer): Future[IdMapping] = + )(implicit ec: ExecutionContext, mat: Materializer): Future[(Option[IdMapping], MigrationStats)] = output - .createCase(inputCase) - .fold( - Future.failed, { - case caseId @ IdMapping(inputCaseId, _) => - for { - caseTaskIds <- migrateWithParent(Seq(caseId), input.listCaseTasks(inputCaseId), output.createCaseTask) - caseTaskLogIds <- migrateWithParent(caseTaskIds, input.listCaseTaskLogs(inputCaseId), output.createCaseTaskLog) - caseObservableIds <- migrateWithParent(Seq(caseId), input.listCaseObservables(inputCaseId), output.createCaseObservable) - jobIds <- migrateWithParent(caseObservableIds, input.listJobs(inputCaseId), output.createJob) - jobObservableIds <- migrateWithParent(jobIds, input.listJobObservables(inputCaseId), output.createJobObservable) - caseEntitiesIds = caseTaskIds ++ caseTaskLogIds ++ caseObservableIds ++ jobIds ++ jobObservableIds :+ caseId - actionSource = Source(caseEntitiesIds.to[immutable.Iterable]).flatMapConcat(id => input.listAction(id.inputId)) - actionIds <- migrateWithParent(caseEntitiesIds, actionSource, output.createAction) - caseEntitiesAuditIds = caseEntitiesIds ++ actionIds - auditSource = Source(caseEntitiesAuditIds.to[immutable.Iterable]).flatMapConcat(id => input.listAudit(id.inputId, filter)) - _ <- migrateAudit(caseEntitiesAuditIds, auditSource, output.createAudit) - } yield caseId - } - ) + .createCase(inputCase) match { + case Success(caseId @ IdMapping(inputCaseId, _)) => + for { + (caseTaskIds, caseTaskStats) <- migrateWithParent("Case/Task", Seq(caseId), input.listCaseTasks(inputCaseId), output.createCaseTask) + (caseTaskLogIds, caseTaskLogStats) <- migrateWithParent( + "Case/Task/Log", + caseTaskIds, + input.listCaseTaskLogs(inputCaseId), + output.createCaseTaskLog + ) + (caseObservableIds, caseObservableStats) <- migrateWithParent( + "Case/Observable", + Seq(caseId), + input.listCaseObservables(inputCaseId), + output.createCaseObservable + ) + (jobIds, jobStats) <- migrateWithParent("Case/Observable/Job", caseObservableIds, input.listJobs(inputCaseId), output.createJob) + (jobObservableIds, jobObservableStats) <- migrateWithParent( + "Case/Observable/Job/Observable", + jobIds, + input.listJobObservables(inputCaseId), + output.createJobObservable + ) + caseEntitiesIds = caseTaskIds ++ caseTaskLogIds ++ caseObservableIds ++ jobIds ++ jobObservableIds :+ caseId + actionSource = Source(caseEntitiesIds.to[immutable.Iterable]).flatMapConcat(id => input.listAction(id.inputId)) + (actionIds, actionStats) <- migrateWithParent("Action", caseEntitiesIds, actionSource, output.createAction) + caseEntitiesAuditIds = caseEntitiesIds ++ actionIds + auditSource = Source(caseEntitiesAuditIds.to[immutable.Iterable]).flatMapConcat(id => input.listAudit(id.inputId, filter)) + auditStats <- migrateAudit(caseEntitiesAuditIds, auditSource, output.createAudit) + } yield Some(caseId) -> (caseTaskStats + caseTaskLogStats + caseObservableStats + jobStats + jobObservableStats + actionStats + auditStats) + .success("Case") + case Failure(error) => + logger.error(s"Case creation failure, $error") + Future.successful(None -> MigrationStats.failure("Case")) + } - def migrateWholeCases(input: Input, output: Output, filter: Filter)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = - input - .listCases(filter) - .filterNot(output.caseExists) - .mapAsync(1)(migrateAWholeCase(input, output, filter)) // TODO recover failed future - .runWith(Sink.ignore) - .map(_ => ()) +// def migrateWholeCases(input: Input, output: Output, filter: Filter)(implicit ec: ExecutionContext, mat: Materializer): Future[MigrationStats] = +// input +// .listCases(filter) +// .filterNot(output.caseExists) +// .mapAsync(1)(migrateAWholeCase(input, output, filter)) // TODO recover failed future +// .runFold(MigrationStats.empty)(_ + _) def migrateAWholeAlert(input: Input, output: Output, filter: Filter)( inputAlert: InputAlert - )(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = + )(implicit ec: ExecutionContext, mat: Materializer): Future[MigrationStats] = output.createAlert(inputAlert) match { case Success(alertId @ IdMapping(inputAlertId, _)) => for { - alertObservableIds <- migrateWithParent(Seq(alertId), input.listAlertObservables(inputAlertId), output.createAlertObservable) + (alertObservableIds, alertObservableStats) <- migrateWithParent( + "Alert/Observable", + Seq(alertId), + input.listAlertObservables(inputAlertId), + output.createAlertObservable + ) alertEntitiesIds = alertId +: alertObservableIds actionSource = Source(alertEntitiesIds.to[immutable.Iterable]).flatMapConcat(id => input.listAction(id.inputId)) - actionIds <- migrateWithParent(alertEntitiesIds, actionSource, output.createAction) + (actionIds, actionStats) <- migrateWithParent("Action", alertEntitiesIds, actionSource, output.createAction) alertEntitiesAuditIds = alertEntitiesIds ++ actionIds auditSource = Source(alertEntitiesAuditIds.to[immutable.Iterable]).flatMapConcat(id => input.listAudit(id.inputId, filter)) - _ <- migrateAudit(alertEntitiesAuditIds, auditSource, output.createAudit) - } yield () + auditStats <- migrateAudit(alertEntitiesAuditIds, auditSource, output.createAudit) + } yield (alertObservableStats + actionStats + auditStats).success("Alert") case Failure(error) => - logger.error(s"Migration of alert ${inputAlert.alert.`type`}:${inputAlert.alert.source}:${inputAlert.alert.sourceRef} failed", error) - Future.successful(()) + logger.error(s"Migration of alert ${inputAlert.alert.`type`}:${inputAlert.alert.source}:${inputAlert.alert.sourceRef} failed: $error") + Future.successful(MigrationStats.failure("Alert")) } - def migrateWholeAlerts(input: Input, output: Output, filter: Filter)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = - input - .listAlerts(filter) - .filterNot(output.alertExists) - .mapAsync(1)(migrateAWholeAlert(input, output, filter)) - .runWith(Sink.ignore) - .map(_ => ()) +// def migrateWholeAlerts(input: Input, output: Output, filter: Filter)(implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = +// input +// .listAlerts(filter) +// .filterNot(output.alertExists) +// .mapAsync(1)(migrateAWholeAlert(input, output, filter)) +// .runWith(Sink.ignore) +// .map(_ => ()) def migrate(input: Input, output: Output, filter: Filter)( implicit ec: ExecutionContext, mat: Materializer - ): Future[Unit] = { + ): Future[MigrationStats] = { val pendingAlertCase: mutable.Map[String, Seq[InputAlert]] = mutable.HashMap.empty[String, Seq[InputAlert]] - def migrateCasesAndAlerts(): Future[Unit] = { + def migrateCasesAndAlerts(): Future[MigrationStats] = { val ordering: Ordering[Either[InputAlert, InputCase]] = new Ordering[Either[InputAlert, InputCase]] { def createdAt(x: Either[InputAlert, InputCase]): Long = x.fold(_.metaData.createdAt.getTime, _.metaData.createdAt.getTime) override def compare(x: Either[InputAlert, InputCase], y: Either[InputAlert, InputCase]): Int = @@ -169,10 +248,11 @@ trait MigrationOps { val alertSource = input.listAlerts(filter).filterNot(output.alertExists).map(Left.apply) caseSource .mergeSorted(alertSource)(ordering) - .foldAsync(Seq.empty[IdMapping]) { - case (caseIds, Right(case0)) => - migrateAWholeCase(input, output, filter)(case0).transform(_.fold(_ => Success(caseIds), cid => Success(caseIds :+ cid))) - case (caseIds, Left(alert)) => + .runFoldAsync[(Seq[IdMapping], MigrationStats)](Seq.empty -> MigrationStats.empty) { + case ((caseIds, migrationStats), Right(case0)) => + migrateAWholeCase(input, output, filter)(case0) + .map { case (caseId, stats) => (caseIds ++ caseId, migrationStats + stats) } + case ((caseIds, migrationStats), Left(alert)) => alert .caseId .map { caseId => @@ -184,23 +264,41 @@ trait MigrationOps { } .flip .fold( - _ => Future.successful(caseIds), - caseId => migrateAWholeAlert(input, output, filter)(alert.updateCaseId(caseId)).map(_ => caseIds) + _ => Future.successful(caseIds -> migrationStats), + caseId => + migrateAWholeAlert(input, output, filter)(alert.updateCaseId(caseId)) + .map(stats => caseIds -> (migrationStats + stats)) ) } - }.runWith(Sink.ignore) - .map(_ => ()) + .map(_._2) + } for { - _ <- migrate(input.listProfiles(filter).filterNot(output.profileExists), output.createProfile) - _ <- migrate(input.listOrganisations(filter).filterNot(output.organisationExists), output.createOrganisation) - _ <- migrate(input.listUsers(filter).filterNot(output.userExists), output.createUser) - _ <- migrate(input.listImpactStatus(filter).filterNot(output.impactStatusExists), output.createImpactStatus) - _ <- migrate(input.listResolutionStatus(filter).filterNot(output.resolutionStatusExists), output.createResolutionStatus) - _ <- migrate(input.listCustomFields(filter).filterNot(output.customFieldExists), output.createCustomField) - _ <- migrate(input.listObservableTypes(filter).filterNot(output.observableTypeExists), output.createObservableTypes) - _ <- migrateWholeCaseTemplates(input, output, filter) - _ <- migrateCasesAndAlerts() - } yield () + (_, profileStats) <- migrate("Profile", input.listProfiles(filter).filterNot(output.profileExists), output.createProfile _) + (_, organisationStats) <- migrate( + "Organisation", + input.listOrganisations(filter).filterNot(output.organisationExists), + output.createOrganisation _ + ) + (_, userStats) <- migrate("User", input.listUsers(filter).filterNot(output.userExists), output.createUser _) + (_, impactStatuStats) <- migrate( + "ImpactStatus", + input.listImpactStatus(filter).filterNot(output.impactStatusExists), + output.createImpactStatus _ + ) + (_, resolutionStatuStats) <- migrate( + "ResolutionStatus", + input.listResolutionStatus(filter).filterNot(output.resolutionStatusExists), + output.createResolutionStatus _ + ) + (_, customFieldStats) <- migrate("CustomField", input.listCustomFields(filter).filterNot(output.customFieldExists), output.createCustomField _) + (_, observableTypeStats) <- migrate( + "ObservableType", + input.listObservableTypes(filter).filterNot(output.observableTypeExists), + output.createObservableTypes _ + ) + caseTemplateStats <- migrateWholeCaseTemplates(input, output, filter) + caseAndAlertSTats <- migrateCasesAndAlerts() + } yield profileStats + organisationStats + userStats + impactStatuStats + resolutionStatuStats + customFieldStats + observableTypeStats + caseTemplateStats + caseAndAlertSTats } } diff --git a/migration/src/main/scala/org/thp/thehive/migration/th4/Output.scala b/migration/src/main/scala/org/thp/thehive/migration/th4/Output.scala index 49a4933169..a738a6fbeb 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/th4/Output.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/th4/Output.scala @@ -77,6 +77,8 @@ object Output { bindActor[DummyActor]("notification-actor") bindActor[DummyActor]("config-actor") bindActor[DummyActor]("cortex-actor") + bindActor[DummyActor]("data-dedup-actor") + bindActor[DummyActor]("case-dedup-actor") bind[AuditSrv].to[NoAuditSrv] bind[Database].to[JanusDatabase] // bind[Database].to[OrientDatabase] @@ -206,7 +208,7 @@ class Output @Inject() ( for { organisation <- getOrganisation(organisationName) profile <- profileSrv.getOrFail(profileName) - _ <- userSrv.add(createdUser, organisation, profile) + _ <- userSrv.addUserToOrganisation(createdUser, organisation, profile) } yield () } } yield IdMapping(inputUser.metaData.id, createdUser._id) @@ -277,8 +279,10 @@ class Output @Inject() ( _ <- caseTemplateSrv.addTags(richCaseTemplate.caseTemplate, inputCaseTemplate.tags) _ = inputCaseTemplate.customFields.foreach { case (name, value, order) => - caseTemplateSrv.setOrCreateCustomField(richCaseTemplate.caseTemplate, name, value, order).getOrElse { - logger.warn(s"Add custom field $name:$value to case template ${richCaseTemplate.name} failure") + caseTemplateSrv.setOrCreateCustomField(richCaseTemplate.caseTemplate, name, value, order).recoverWith { + case error => + logger.warn(s"Add custom field `$name:${value.getOrElse("")}` to case template `${richCaseTemplate.name}` fails: $error") + Success(()) } } diff --git a/thehive/app/org/thp/thehive/services/UserSrv.scala b/thehive/app/org/thp/thehive/services/UserSrv.scala index c31e9b7578..8e452f84e3 100644 --- a/thehive/app/org/thp/thehive/services/UserSrv.scala +++ b/thehive/app/org/thp/thehive/services/UserSrv.scala @@ -57,7 +57,7 @@ class UserSrv @Inject() (configuration: Configuration, roleSrv: RoleSrv, auditSr else Failure(BadRequestError(s"User login is invalid, it must be an email address (found: ${user.login})")) } - def add(user: User with Entity, organisation: Organisation with Entity, profile: Profile with Entity)( + def addUserToOrganisation(user: User with Entity, organisation: Organisation with Entity, profile: Profile with Entity)( implicit graph: Graph, authContext: AuthContext ): Try[RichUser] = @@ -84,7 +84,7 @@ class UserSrv @Inject() (configuration: Configuration, roleSrv: RoleSrv, auditSr _ <- avatar.map(setAvatar(createdUser, _)).flip } yield createdUser } - .flatMap(add(_, organisation, profile)) + .flatMap(addUserToOrganisation(_, organisation, profile)) def canSetPassword(user: User with Entity)(implicit graph: Graph, authContext: AuthContext): Boolean = { val userOrganisations = get(user).organisations.name.toList.toSet