Skip to content

Commit

Permalink
#2134 Remove old observables based on MISP soft-delete query
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 23, 2021
1 parent b041e20 commit a451edc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 84 deletions.
14 changes: 9 additions & 5 deletions misp/client/src/main/scala/org/thp/misp/client/MispClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,24 @@ class MispClient(
.mapMaterializedValue(_ => NotUsed)
}

def searchAttributes(eventId: String, publishDate: Option[Date])(implicit ec: ExecutionContext): Source[Attribute, NotUsed] = {
def searchAttributes(eventId: String, publishDate: Option[Date], deletedOnly: Boolean = false)(implicit
ec: ExecutionContext
): Source[Attribute, NotUsed] = {
logger.debug(s"Search MISP attributes for event #$eventId ${publishDate.fold("")("from " + _)}")
Source
.futureSource(
postStream(
"attributes/restSearch/json",
Json.obj("request" -> Json.obj("timestamp" -> publishDate.fold(0L)(_.getTime / 1000), "eventid" -> eventId))
Json.obj(
"request" ->
Json
.obj("timestamp" -> publishDate.fold(0L)(_.getTime / 1000), "eventid" -> eventId)
.when(deletedOnly)(_ + ("deleted" -> JsString("only")))
)
)
)
// add ("deleted" → 1) to see also deleted attributes
// add ("deleted" → "only") to see only deleted attributes
// .via(JsonFraming.objectScanner(Int.MaxValue))
.via(JsonReader.select("$.response.Attribute[*]"))
.mapConcat { data =>
val maybeAttribute = Try(Json.parse(data.toArray[Byte]).as[Attribute])
Expand All @@ -219,8 +225,6 @@ class MispClient(
.mapMaterializedValue(_ => NotUsed)
}

// .filter(_.date after refDate)

private val fileNameExtractor = """attachment; filename="(.*)"""".r

def downloadAttachment(attachmentId: String)(implicit ec: ExecutionContext): Future[(String, String, Source[ByteString, _])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.thp.thehive.connector.misp.services
import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, Sink, Source}
import akka.util.ByteString
import org.apache.tinkerpop.gremlin.process.traversal.P
import org.thp.misp.dto.{Attribute, Event, Tag => MispTag}
import org.thp.scalligraph.auth.{AuthContext, UserSrv}
import org.thp.scalligraph.controllers.FFile
Expand Down Expand Up @@ -40,6 +39,7 @@ class MispImportSrv @Inject() (
db: Database,
auditSrv: AuditSrv,
userSrv: UserSrv,
attachmentSrv: AttachmentSrv,
implicit val ec: ExecutionContext,
implicit val mat: Materializer
) {
Expand Down Expand Up @@ -240,40 +240,41 @@ class MispImportSrv @Inject() (
src: Source[ByteString, _],
creation: Boolean
)(implicit graph: Graph, authContext: AuthContext): Try[Observable with Entity] = {
val existingObservable =
if (creation) None
else
alertSrv
.get(alert)
.observables
.filterOnType(observable.dataType)
.filterOnAttachmentName(filename)
.filterOnAttachmentName(contentType)
.richObservable
.headOption
existingObservable match {
case None =>
logger.debug(s"Observable ${observable.dataType}:$filename:$contentType doesn't exist, create it")
val file = Files.createTempFile("misp-attachment-", "")
Await.result(src.runWith(FileIO.toPath(file)), 1.hour)
val fFile = FFile(filename, file, contentType)
val res = alertSrv.createObservable(alert, observable, fFile).map(_.observable)
Files.delete(file)
res
case Some(richObservable) =>
logger.debug(s"Observable ${observable.dataType}:$filename:$contentType exists, update it")
for {
obs <-
observableSrv
.get(richObservable.observable)
.when(richObservable.message != observable.message)(_.update(_.message, observable.message))
.when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp))
.when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc))
.when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted))
.when(richObservable.tags.toSet != observable.tags.toSet)(_.update(_.tags, observable.tags))
.getOrFail("Observable")
} yield obs
}
val file = Files.createTempFile("misp-attachment-", "")
try {
Await.result(src.runWith(FileIO.toPath(file)), 1.hour)
val hash = attachmentSrv.hashers.fromPath(file).head.toString
val existingObservable =
if (creation) None
else
alertSrv
.get(alert)
.observables
.filterOnType(observable.dataType)
.filterOnAttachmentName(filename)
.filterOnAttachmentContentType(contentType)
.filterOnAttachmentHash(hash)
.richObservable
.headOption
existingObservable match {
case None =>
logger.debug(s"Observable ${observable.dataType}:$filename:$contentType doesn't exist, create it")
alertSrv.createObservable(alert, observable, FFile(filename, file, contentType)).map(_.observable)
case Some(richObservable) =>
logger.debug(s"Observable ${observable.dataType}:$filename:$contentType exists, update it")
for {
obs <-
observableSrv
.get(richObservable.observable)
.when(richObservable.message != observable.message)(_.update(_.message, observable.message))
.when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp))
.when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc))
.when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted))
.when(richObservable.tags.toSet != observable.tags.toSet)(_.update(_.tags, observable.tags))
.getOrFail("Observable")
} yield obs
}
} finally Files.delete(file)
}

def importAttributes(
Expand All @@ -286,8 +287,35 @@ class MispImportSrv @Inject() (
graph: Graph,
authContext: AuthContext
): Unit = {
logger.info("Removing old observables")
val deletedAttributes = client
.searchAttributes(event.id, lastSynchro, deletedOnly = true)
.mapConcat(attributeToObservable)
.runWith(Sink.queue[(Observable, Either[String, (String, String, Source[ByteString, _])])])

QueueIterator(deletedAttributes)
.flatMap {
case (observable, Left(data)) =>
observableSrv
.startTraversal
.has(_.relatedId, alert._id)
.filterOnType(observable.dataType)
.filterOnData(data)
.toIterator
case (observable, Right((filename, contentType, src))) =>
val hash = attachmentSrv.hashers.fromBinary(src).head.toString
observableSrv
.startTraversal
.has(_.relatedId, alert._id)
.filterOnType(observable.dataType)
.filterOnAttachmentContentType(contentType)
.filterOnAttachmentName(filename)
.filterOnAttachmentHash(hash)
.toIterator
}
.foreach(observableSrv.delete(_))

logger.debug(s"importAttributes ${client.name}#${event.id}")
val startSyncDate = new Date
val queue =
client
.searchAttributes(event.id, lastSynchro)
Expand Down Expand Up @@ -336,24 +364,6 @@ class MispImportSrv @Inject() (
case Failure(error) => logger.error(s"Unable to create observable $observable ($filename) on alert", error)
}
}

logger.info("Removing old observables")
alertSrv
.get(alert)
.observables
.filter(
_.or(
_.has(_._updatedAt, P.lt(startSyncDate)),
_.and(_.hasNot(_._updatedAt), _.has(_._createdAt, P.lt(startSyncDate)))
)
)
.toIterator
.foreach { obs =>
logger.debug(s"Delete $obs")
observableSrv.delete(obs).recover {
case error => logger.error(s"Fail to delete observable $obs", error)
}
}
}

// def convertTag(mispTag: MispTag): Tag = tagSrv.parseString(mispTag.name + mispTag.colour.fold("")(c => f"#$c%06X"))
Expand Down
25 changes: 0 additions & 25 deletions thehive/app/org/thp/thehive/services/AlertSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,6 @@ class AlertSrv @Inject() (
): Try[RichObservable] =
attachmentSrv.create(file).flatMap(attachment => createObservable(alert, observable, attachment))

@deprecated("use createObservable", "0.2")
def addObservable(alert: Alert with Entity, richObservable: RichObservable)(implicit
graph: Graph,
authContext: AuthContext
): Try[Unit] = {
val maybeExistingObservable = richObservable.dataOrAttachment match {
case Left(data) => get(alert).observables.filterOnData(data)
case Right(attachment) => get(alert).observables.filterOnAttachmentId(attachment.attachmentId)
}
maybeExistingObservable
.richObservable
.headOption
.fold {
for {
_ <- alertObservableSrv.create(AlertObservable(), alert, richObservable.observable)
_ <- auditSrv.observableInAlert.create(richObservable.observable, alert, richObservable.toJson)
} yield ()
} { existingObservable =>
val tags = (existingObservable.tags ++ richObservable.tags).toSet
if ((tags -- existingObservable.tags).nonEmpty)
observableSrv.updateTags(existingObservable.observable, tags)
Success(())
}
}

def createCustomField(
alert: Alert with Entity,
inputCf: InputCustomFieldValue
Expand Down

0 comments on commit a451edc

Please sign in to comment.