From a451edc0a09d1bd598f8d9d6034b8974ef2e001b Mon Sep 17 00:00:00 2001 From: To-om Date: Fri, 23 Jul 2021 11:01:42 +0200 Subject: [PATCH] #2134 Remove old observables based on MISP soft-delete query --- .../org/thp/misp/client/MispClient.scala | 14 ++- .../misp/services/MispImportSrv.scala | 118 ++++++++++-------- .../org/thp/thehive/services/AlertSrv.scala | 25 ---- 3 files changed, 73 insertions(+), 84 deletions(-) diff --git a/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala b/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala index 8f23e59300..5aba625329 100644 --- a/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala +++ b/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala @@ -189,18 +189,24 @@ class MispClient( .mapMaterializedValue(_ => NotUsed) } - def searchAttributes(eventId: String, publishDate: Option[Date])(implicit ec: ExecutionContext): Source[Attribute, NotUsed] = { + def searchAttributes(eventId: String, publishDate: Option[Date], deletedOnly: Boolean = false)(implicit + ec: ExecutionContext + ): Source[Attribute, NotUsed] = { logger.debug(s"Search MISP attributes for event #$eventId ${publishDate.fold("")("from " + _)}") Source .futureSource( postStream( "attributes/restSearch/json", - Json.obj("request" -> Json.obj("timestamp" -> publishDate.fold(0L)(_.getTime / 1000), "eventid" -> eventId)) + Json.obj( + "request" -> + Json + .obj("timestamp" -> publishDate.fold(0L)(_.getTime / 1000), "eventid" -> eventId) + .when(deletedOnly)(_ + ("deleted" -> JsString("only"))) + ) ) ) // add ("deleted" → 1) to see also deleted attributes // add ("deleted" → "only") to see only deleted attributes - // .via(JsonFraming.objectScanner(Int.MaxValue)) .via(JsonReader.select("$.response.Attribute[*]")) .mapConcat { data => val maybeAttribute = Try(Json.parse(data.toArray[Byte]).as[Attribute]) @@ -219,8 +225,6 @@ class MispClient( .mapMaterializedValue(_ => NotUsed) } - // .filter(_.date after refDate) - private val fileNameExtractor = """attachment; filename="(.*)"""".r def downloadAttachment(attachmentId: String)(implicit ec: ExecutionContext): Future[(String, String, Source[ByteString, _])] = diff --git a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala index 96d80dd87f..e63afa5c8e 100644 --- a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala +++ b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala @@ -3,7 +3,6 @@ package org.thp.thehive.connector.misp.services import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, Sink, Source} import akka.util.ByteString -import org.apache.tinkerpop.gremlin.process.traversal.P import org.thp.misp.dto.{Attribute, Event, Tag => MispTag} import org.thp.scalligraph.auth.{AuthContext, UserSrv} import org.thp.scalligraph.controllers.FFile @@ -40,6 +39,7 @@ class MispImportSrv @Inject() ( db: Database, auditSrv: AuditSrv, userSrv: UserSrv, + attachmentSrv: AttachmentSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer ) { @@ -240,40 +240,41 @@ class MispImportSrv @Inject() ( src: Source[ByteString, _], creation: Boolean )(implicit graph: Graph, authContext: AuthContext): Try[Observable with Entity] = { - val existingObservable = - if (creation) None - else - alertSrv - .get(alert) - .observables - .filterOnType(observable.dataType) - .filterOnAttachmentName(filename) - .filterOnAttachmentName(contentType) - .richObservable - .headOption - existingObservable match { - case None => - logger.debug(s"Observable ${observable.dataType}:$filename:$contentType doesn't exist, create it") - val file = Files.createTempFile("misp-attachment-", "") - Await.result(src.runWith(FileIO.toPath(file)), 1.hour) - val fFile = FFile(filename, file, contentType) - val res = alertSrv.createObservable(alert, observable, fFile).map(_.observable) - Files.delete(file) - res - case Some(richObservable) => - logger.debug(s"Observable ${observable.dataType}:$filename:$contentType exists, update it") - for { - obs <- - observableSrv - .get(richObservable.observable) - .when(richObservable.message != observable.message)(_.update(_.message, observable.message)) - .when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp)) - .when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc)) - .when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted)) - .when(richObservable.tags.toSet != observable.tags.toSet)(_.update(_.tags, observable.tags)) - .getOrFail("Observable") - } yield obs - } + val file = Files.createTempFile("misp-attachment-", "") + try { + Await.result(src.runWith(FileIO.toPath(file)), 1.hour) + val hash = attachmentSrv.hashers.fromPath(file).head.toString + val existingObservable = + if (creation) None + else + alertSrv + .get(alert) + .observables + .filterOnType(observable.dataType) + .filterOnAttachmentName(filename) + .filterOnAttachmentContentType(contentType) + .filterOnAttachmentHash(hash) + .richObservable + .headOption + existingObservable match { + case None => + logger.debug(s"Observable ${observable.dataType}:$filename:$contentType doesn't exist, create it") + alertSrv.createObservable(alert, observable, FFile(filename, file, contentType)).map(_.observable) + case Some(richObservable) => + logger.debug(s"Observable ${observable.dataType}:$filename:$contentType exists, update it") + for { + obs <- + observableSrv + .get(richObservable.observable) + .when(richObservable.message != observable.message)(_.update(_.message, observable.message)) + .when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp)) + .when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc)) + .when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted)) + .when(richObservable.tags.toSet != observable.tags.toSet)(_.update(_.tags, observable.tags)) + .getOrFail("Observable") + } yield obs + } + } finally Files.delete(file) } def importAttributes( @@ -286,8 +287,35 @@ class MispImportSrv @Inject() ( graph: Graph, authContext: AuthContext ): Unit = { + logger.info("Removing old observables") + val deletedAttributes = client + .searchAttributes(event.id, lastSynchro, deletedOnly = true) + .mapConcat(attributeToObservable) + .runWith(Sink.queue[(Observable, Either[String, (String, String, Source[ByteString, _])])]) + + QueueIterator(deletedAttributes) + .flatMap { + case (observable, Left(data)) => + observableSrv + .startTraversal + .has(_.relatedId, alert._id) + .filterOnType(observable.dataType) + .filterOnData(data) + .toIterator + case (observable, Right((filename, contentType, src))) => + val hash = attachmentSrv.hashers.fromBinary(src).head.toString + observableSrv + .startTraversal + .has(_.relatedId, alert._id) + .filterOnType(observable.dataType) + .filterOnAttachmentContentType(contentType) + .filterOnAttachmentName(filename) + .filterOnAttachmentHash(hash) + .toIterator + } + .foreach(observableSrv.delete(_)) + logger.debug(s"importAttributes ${client.name}#${event.id}") - val startSyncDate = new Date val queue = client .searchAttributes(event.id, lastSynchro) @@ -336,24 +364,6 @@ class MispImportSrv @Inject() ( case Failure(error) => logger.error(s"Unable to create observable $observable ($filename) on alert", error) } } - - logger.info("Removing old observables") - alertSrv - .get(alert) - .observables - .filter( - _.or( - _.has(_._updatedAt, P.lt(startSyncDate)), - _.and(_.hasNot(_._updatedAt), _.has(_._createdAt, P.lt(startSyncDate))) - ) - ) - .toIterator - .foreach { obs => - logger.debug(s"Delete $obs") - observableSrv.delete(obs).recover { - case error => logger.error(s"Fail to delete observable $obs", error) - } - } } // def convertTag(mispTag: MispTag): Tag = tagSrv.parseString(mispTag.name + mispTag.colour.fold("")(c => f"#$c%06X")) diff --git a/thehive/app/org/thp/thehive/services/AlertSrv.scala b/thehive/app/org/thp/thehive/services/AlertSrv.scala index 8d91fd568f..0e10da152b 100644 --- a/thehive/app/org/thp/thehive/services/AlertSrv.scala +++ b/thehive/app/org/thp/thehive/services/AlertSrv.scala @@ -144,31 +144,6 @@ class AlertSrv @Inject() ( ): Try[RichObservable] = attachmentSrv.create(file).flatMap(attachment => createObservable(alert, observable, attachment)) - @deprecated("use createObservable", "0.2") - def addObservable(alert: Alert with Entity, richObservable: RichObservable)(implicit - graph: Graph, - authContext: AuthContext - ): Try[Unit] = { - val maybeExistingObservable = richObservable.dataOrAttachment match { - case Left(data) => get(alert).observables.filterOnData(data) - case Right(attachment) => get(alert).observables.filterOnAttachmentId(attachment.attachmentId) - } - maybeExistingObservable - .richObservable - .headOption - .fold { - for { - _ <- alertObservableSrv.create(AlertObservable(), alert, richObservable.observable) - _ <- auditSrv.observableInAlert.create(richObservable.observable, alert, richObservable.toJson) - } yield () - } { existingObservable => - val tags = (existingObservable.tags ++ richObservable.tags).toSet - if ((tags -- existingObservable.tags).nonEmpty) - observableSrv.updateTags(existingObservable.observable, tags) - Success(()) - } - } - def createCustomField( alert: Alert with Entity, inputCf: InputCustomFieldValue