diff --git a/ScalliGraph b/ScalliGraph index 7f2f8c4fd0..ddbc847ef3 160000 --- a/ScalliGraph +++ b/ScalliGraph @@ -1 +1 @@ -Subproject commit 7f2f8c4fd04f254f76545090350fb80917582d00 +Subproject commit ddbc847ef30f2507e1287d894ad2191d873a0a87 diff --git a/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala b/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala index edae588bc5..8917e681f1 100644 --- a/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala +++ b/misp/client/src/main/scala/org/thp/misp/client/MispClient.scala @@ -1,19 +1,19 @@ package org.thp.misp.client -import java.util.Date - import akka.NotUsed import akka.stream.alpakka.json.scaladsl.JsonReader import akka.stream.scaladsl.{JsonFraming, Source} import akka.util.ByteString import org.thp.client.{ApplicationError, Authentication, ProxyWS} -import org.thp.misp.dto.{Attribute, Event, Organisation, Tag, User} +import org.thp.misp.dto._ import org.thp.scalligraph.InternalError +import org.thp.scalligraph.utils.FunctionalCondition._ import play.api.Logger import play.api.http.Status -import play.api.libs.json.{JsObject, JsString, JsValue, Json} +import play.api.libs.json._ import play.api.libs.ws.{WSClient, WSRequest} +import java.util.Date import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -29,6 +29,7 @@ class MispClient( ws: WSClient, maxAge: Option[Duration], excludedOrganisations: Seq[String], + whitelistOrganisations: Seq[String], excludedTags: Set[String], whitelistTags: Set[String] ) { @@ -148,7 +149,7 @@ class MispClient( def getEvent(eventId: String)(implicit ec: ExecutionContext): Future[Event] = { logger.debug(s"Get MISP event $eventId") - require(!eventId.isEmpty) + require(eventId.nonEmpty) get(s"events/$eventId") .map(e => (e \ "Event").as[Event]) } @@ -163,7 +164,17 @@ class MispClient( .recover { case _ => Json.obj("name" -> name, "version" -> "", "status" -> "ERROR", "url" -> baseUrl) } def searchEvents(publishDate: Option[Date] = None)(implicit ec: ExecutionContext): Source[Event, NotUsed] = { - val query = publishDate.fold(JsObject.empty)(d => Json.obj("searchpublish_timestamp" -> ((d.getTime / 1000) + 1))) + val fromDate = (maxAge.map(a => System.currentTimeMillis() - a.toMillis).toSeq ++ publishDate.map(_.getTime)) + .sorted(Ordering[Long].reverse) + .headOption + .map(d => "searchpublish_timestamp" -> JsNumber((d / 1000) + 1)) + val tagFilter = (whitelistTags ++ excludedTags.map("!" + _)).map(JsString.apply) + val organisationFilter = (whitelistOrganisations ++ excludedOrganisations.map("!" + _)).map(JsString.apply) + val query = JsObject + .empty + .merge(fromDate)(_ + _) + .when(tagFilter.nonEmpty)(_ + ("searchtag" -> JsArray(tagFilter.toSeq))) + .when(organisationFilter.nonEmpty)(_ + ("searchorg" -> JsArray(organisationFilter))) logger.debug("Search MISP events") Source .futureSource(postStream("events/index", query)) diff --git a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispActor.scala b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispActor.scala index dfd4d06694..cc9e0b8236 100644 --- a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispActor.scala +++ b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispActor.scala @@ -6,12 +6,9 @@ import org.thp.scalligraph.auth.UserSrv import play.api.Logger import javax.inject.{Inject, Named, Provider} -import scala.concurrent.Future -import scala.util.{Failure, Success} sealed trait MispMessage -case object Synchro extends MispMessage -case class EndOfSynchro(error: Option[String]) extends MispMessage +case object Synchro extends MispMessage class MispActor @Inject() ( connector: Connector, @@ -25,39 +22,22 @@ class MispActor @Inject() ( override def preStart(): Unit = { super.preStart() logger.info(s"[$self] Starting actor MISP") - context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInitialDelay, self, Synchro))) + context.become(receive(context.system.scheduler.scheduleOnce(connector.syncInitialDelay, self, Synchro))) } override def receive: Receive = { case other => logger.warn(s"Unknown message $other (${other.getClass})") } - def running: Receive = { - case Synchro => logger.info("MISP synchronisation is already in progress") - case EndOfSynchro(None) => - logger.info("MISP synchronisation is complete") - context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro))) - case EndOfSynchro(Some(error)) => - logger.error(s"MISP synchronisation fails: $error") - context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro))) - case other => logger.warn(s"Unknown message $other (${other.getClass})") - } - - def waiting(scheduledSynchronisation: Cancellable): Receive = { + def receive(scheduledSynchronisation: Cancellable): Receive = { case Synchro => scheduledSynchronisation.cancel() - context.become(running) logger.info(s"Synchronising MISP events for ${connector.clients.map(_.name).mkString(",")}") - Future - .traverse(connector.clients.filter(_.canImport))(mispImportSrv.syncMispEvents(_)(userSrv.getSystemAuthContext)) - .map(_ => ()) - .onComplete { - case _: Success[_] => self ! EndOfSynchro(None) - case Failure(error) => - logger.error("MISP synchronisation failure", error) - self ! EndOfSynchro(Some(error.toString)) - } - case other => logger.warn(s"Unknown message $other (${other.getClass})") + connector.clients.filter(_.canImport).foreach { mispClient => + mispImportSrv.syncMispEvents(mispClient)(userSrv.getSystemAuthContext) + } + logger.info("MISP synchronisation is complete") + context.become(receive(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro))) } } diff --git a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala index 4318dd1ad4..7de5bebf28 100644 --- a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala +++ b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/MispImportSrv.scala @@ -1,27 +1,31 @@ package org.thp.thehive.connector.misp.services -import java.nio.file.Files -import java.util.Date - import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, Sink, Source} import akka.util.ByteString -import javax.inject.{Inject, Named, Singleton} import org.apache.tinkerpop.gremlin.process.traversal.P +import org.apache.tinkerpop.gremlin.structure.Graph import org.thp.misp.dto.{Attribute, Event, Tag => MispTag} import org.thp.scalligraph.auth.AuthContext import org.thp.scalligraph.controllers.FFile import org.thp.scalligraph.models._ import org.thp.scalligraph.traversal.TraversalOps._ +import org.thp.scalligraph.utils.FunctionalCondition._ import org.thp.scalligraph.{EntityName, RichSeq} +import org.thp.thehive.controllers.v1.Conversion._ import org.thp.thehive.models._ import org.thp.thehive.services.AlertOps._ import org.thp.thehive.services.ObservableOps._ import org.thp.thehive.services.OrganisationOps._ import org.thp.thehive.services._ import play.api.Logger +import play.api.libs.json._ -import scala.concurrent.{ExecutionContext, Future} +import java.nio.file.Files +import java.util.Date +import javax.inject.{Inject, Named, Singleton} +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContext} import scala.util.{Failure, Success, Try} @Singleton @@ -33,6 +37,7 @@ class MispImportSrv @Inject() ( observableTypeSrv: ObservableTypeSrv, attachmentSrv: AttachmentSrv, caseTemplateSrv: CaseTemplateSrv, + auditSrv: AuditSrv, @Named("with-thehive-schema") db: Database, implicit val ec: ExecutionContext, implicit val mat: Materializer @@ -69,27 +74,25 @@ class MispImportSrv @Inject() ( ) } - def convertAttributeType(attributeCategory: String, attributeType: String): Try[(ObservableType with Entity, Seq[String])] = { + def convertAttributeType(attributeCategory: String, attributeType: String)(implicit + graph: Graph + ): Try[(ObservableType with Entity, Seq[String])] = { val obsTypeFromConfig = connector .attributeConverter(attributeCategory, attributeType) .flatMap { attrConv => - db.roTransaction { implicit graph => - observableTypeSrv - .get(attrConv.`type`) - .headOption - .map(_ -> attrConv.tags) - } + observableTypeSrv + .get(attrConv.`type`) + .headOption + .map(_ -> attrConv.tags) } - db.roTransaction { implicit graph => - obsTypeFromConfig - .orElse(observableTypeSrv.get(EntityName(attributeType)).headOption.map(_ -> Nil)) - .fold(observableTypeSrv.getOrFail(EntityName("other")).map(_ -> Seq.empty[String]))(Success(_)) - } + obsTypeFromConfig + .orElse(observableTypeSrv.get(EntityName(attributeType)).headOption.map(_ -> Nil)) + .fold(observableTypeSrv.getOrFail(EntityName("other")).map(_ -> Seq.empty[String]))(Success(_)) } def attributeToObservable( attribute: Attribute - ): List[(Observable, ObservableType with Entity, Set[String], Either[String, (String, String, Source[ByteString, _])])] = + )(implicit graph: Graph): List[(Observable, ObservableType with Entity, Set[String], Either[String, (String, String, Source[ByteString, _])])] = attribute .`type` .split('|') @@ -152,23 +155,23 @@ class MispImportSrv @Inject() ( Nil } - def getLastSyncDate(client: TheHiveMispClient, mispOrganisation: String, organisations: Seq[Organisation with Entity]): Option[Date] = { - val lastOrgSynchro = db.roTransaction { implicit graph => - client - .organisationFilter(organisationSrv.startTraversal) - .group( - _.by, - _.by( - _.alerts - .filterBySource(mispOrganisation) - .filterByType("misp") - .value(a => a.lastSyncDate) - .max - ) + def getLastSyncDate(client: TheHiveMispClient, mispOrganisation: String, organisations: Seq[Organisation with Entity])(implicit + graph: Graph + ): Option[Date] = { + val lastOrgSynchro = client + .organisationFilter(organisationSrv.startTraversal) + .group( + _.by, + _.by( + _.alerts + .filterBySource(mispOrganisation) + .filterByType("misp") + .value(a => a.lastSyncDate) + .max ) - .head - }.values -// .asInstanceOf[Seq[Date]] + ) + .head + .values if (lastOrgSynchro.size == organisations.size && organisations.nonEmpty) Some(lastOrgSynchro.min) else None @@ -179,37 +182,42 @@ class MispImportSrv @Inject() ( observable: Observable, observableType: ObservableType with Entity, data: String, - tags: Set[String] - )(implicit authContext: AuthContext): Try[Observable with Entity] = - db.tryTransaction { implicit graph => - alertSrv - .get(alert) - .observables - .filterOnType(observableType.name) - .filterOnData(data) - .richObservable - .headOption match { - case None => - logger.debug(s"Observable ${observableType.name}:$data doesn't exist, create it") - for { - richObservable <- observableSrv.create(observable, observableType, data, tags, Nil) - _ <- alertSrv.addObservable(alert, richObservable) - } yield richObservable.observable - case Some(richObservable) => - logger.debug(s"Observable ${observableType.name}:$data exists, update it") - for { - updatedObservable <- - Some(observableSrv.get(richObservable.observable)) - .map(t => if (richObservable.message != observable.message) t.update(_.message, observable.message) else t) - .map(t => if (richObservable.tlp != observable.tlp) t.update(_.tlp, observable.tlp) else t) - .map(t => if (richObservable.ioc != observable.ioc) t.update(_.ioc, observable.ioc) else t) - .map(t => if (richObservable.sighted != observable.sighted) t.update(_.sighted, observable.sighted) else t) - .get - .getOrFail("Observable") - _ <- observableSrv.updateTagNames(updatedObservable, tags) - } yield updatedObservable - } + tags: Set[String], + creation: Boolean + )(implicit graph: Graph, authContext: AuthContext): Try[Unit] = { + + val existingObservable = + if (creation) None + else + alertSrv + .get(alert) + .observables + .filterOnType(observableType.name) + .filterOnData(data) + .richObservable + .headOption + existingObservable match { + case None => + logger.debug(s"Observable ${observableType.name}:$data doesn't exist, create it") + for { + richObservable <- observableSrv.create(observable, observableType, data, tags, Nil) + _ <- alertSrv.addObservable(alert, richObservable) + } yield () + case Some(richObservable) => + logger.debug(s"Observable ${observableType.name}:$data exists, update it") + for { + updatedObservable <- + observableSrv + .get(richObservable.observable) + .when(richObservable.message != observable.message)(_.update(_.message, observable.message)) + .when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp)) + .when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc)) + .when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted)) + .getOrFail("Observable") + _ <- observableSrv.updateTagNames(updatedObservable, tags) + } yield () } + } def updateOrCreateObservable( alert: Alert with Entity, @@ -218,115 +226,101 @@ class MispImportSrv @Inject() ( filename: String, contentType: String, src: Source[ByteString, _], - tags: Set[String] - )(implicit authContext: AuthContext): Future[Observable with Entity] = - db.roTransaction { implicit graph => - alertSrv - .get(alert) - .observables - .filterOnType(observableType.name) - .filterOnAttachmentName(filename) - .filterOnAttachmentName(contentType) - .richObservable - .headOption - } match { + tags: Set[String], + creation: Boolean + )(implicit graph: Graph, authContext: AuthContext): Try[Unit] = { + val existingObservable = + if (creation) None + else + alertSrv + .get(alert) + .observables + .filterOnType(observableType.name) + .filterOnAttachmentName(filename) + .filterOnAttachmentName(contentType) + .richObservable + .headOption + existingObservable match { case None => logger.debug(s"Observable ${observableType.name}:$filename:$contentType doesn't exist, create it") val file = Files.createTempFile("misp-attachment-", "") - (for { - _ <- src.runWith(FileIO.toPath(file)) - fFile = FFile(filename, file, contentType) - createdObservable <- Future.fromTry { - db.tryTransaction { implicit graph => - for { - createdAttachment <- attachmentSrv.create(fFile) - richObservable <- observableSrv.create(observable, observableType, createdAttachment, tags, Nil) - _ <- alertSrv.addObservable(alert, richObservable) - } yield richObservable - } - } - } yield createdObservable.observable) - .andThen { case _ => Files.delete(file) } + Await.result(src.runWith(FileIO.toPath(file)), 1.hour) + val fFile = FFile(filename, file, contentType) + for { + createdAttachment <- attachmentSrv.create(fFile) + richObservable <- observableSrv.create(observable, observableType, createdAttachment, tags, Nil) + _ <- alertSrv.addObservable(alert, richObservable) + _ = Files.delete(file) + } yield () case Some(richObservable) => logger.debug(s"Observable ${observableType.name}:$filename:$contentType exists, update it") - Future.fromTry { - db.tryTransaction { implicit graph => - for { - updatedObservable <- - Some(observableSrv.get(richObservable.observable)) - .map(t => if (richObservable.message != observable.message) t.update(_.message, observable.message) else t) - .map(t => if (richObservable.tlp != observable.tlp) t.update(_.tlp, observable.tlp) else t) - .map(t => if (richObservable.ioc != observable.ioc) t.update(_.ioc, observable.ioc) else t) - .map(t => if (richObservable.sighted != observable.sighted) t.update(_.sighted, observable.sighted) else t) - .get - .getOrFail("Observable") - _ <- observableSrv.updateTagNames(updatedObservable, tags) - } yield updatedObservable - } - } + for { + updatedObservable <- + observableSrv + .get(richObservable.observable) + .when(richObservable.message != observable.message)(_.update(_.message, observable.message)) + .when(richObservable.tlp != observable.tlp)(_.update(_.tlp, observable.tlp)) + .when(richObservable.ioc != observable.ioc)(_.update(_.ioc, observable.ioc)) + .when(richObservable.sighted != observable.sighted)(_.update(_.sighted, observable.sighted)) + .getOrFail("Observable") + _ <- observableSrv.updateTagNames(updatedObservable, tags) + } yield () } + } def importAttibutes(client: TheHiveMispClient, event: Event, alert: Alert with Entity, lastSynchro: Option[Date])(implicit + graph: Graph, authContext: AuthContext - ): Future[Unit] = { - logger.debug(s"importAttibutes ${client.name}#${event.id}") + ): Unit = { + logger.debug(s"importAttributes ${client.name}#${event.id}") val startSyncDate = new Date - client - .searchAttributes(event.id, lastSynchro) - .mapConcat(attributeToObservable) - .fold( - Map.empty[ - (String, String), - (Observable, ObservableType with Entity, Set[String], Either[String, (String, String, Source[ByteString, _])]) - ] - ) { - case (distinctMap, data @ (_, t, _, Left(d))) => distinctMap + ((t.name, d) -> data) - case (distinctMap, data @ (_, t, _, Right((n, _, _)))) => distinctMap + ((t.name, n) -> data) - } - .mapConcat { m => - m.values.toList - } - .runWith(Sink.foreachAsync(1) { - case (observable, observableType, tags, Left(data)) => - updateOrCreateObservable(alert, observable, observableType, data, tags) - .fold( - error => { - logger.error(s"Unable to create observable $observable ${observableType.name}:$data", error) - Future.failed(error) - }, - _ => Future.successful(()) - ) - case (observable, observableType, tags, Right((filename, contentType, src))) => - updateOrCreateObservable(alert, observable, observableType, filename, contentType, src, tags) - .transform { - case Success(_) => Success(()) - case Failure(error) => - logger.error( - s"Unable to create observable $observable ${observableType.name}:$filename", - error - ) - Success(()) - } - }) - .flatMap { _ => - Future.fromTry { - logger.info("Removing old observables") - db.tryTransaction { implicit graph => - alertSrv - .get(alert) - .observables - .filter( - _.or( - _.has(_._updatedAt, P.lt(startSyncDate)), - _.and(_.hasNot(_._updatedAt), _.has(_._createdAt, P.lt(startSyncDate))) - ) - ) - .toIterator - .toTry { obs => - logger.info(s"Remove $obs") - observableSrv.remove(obs) - } - }.map(_ => ()) + val queue = + client + .searchAttributes(event.id, lastSynchro) + .mapConcat(attributeToObservable) + .fold( + Map.empty[ + (String, String), + (Observable, ObservableType with Entity, Set[String], Either[String, (String, String, Source[ByteString, _])]) + ] + ) { + case (distinctMap, data @ (_, t, _, Left(d))) => distinctMap + ((t.name, d) -> data) + case (distinctMap, data @ (_, t, _, Right((n, _, _)))) => distinctMap + ((t.name, n) -> data) + } + .mapConcat { m => + m.values.toList + } + .runWith(Sink.queue[(Observable, ObservableType with Entity, Set[String], Either[String, (String, String, Source[ByteString, _])])]) + QueueIterator(queue).foreach { + case (observable, observableType, tags, Left(data)) => + updateOrCreateObservable(alert, observable, observableType, data, tags, lastSynchro.isEmpty) + .recover { + case error => + logger.error(s"Unable to create observable $observable ${observableType.name}:$data", error) + } + case (observable, observableType, tags, Right((filename, contentType, src))) => + updateOrCreateObservable(alert, observable, observableType, filename, contentType, src, tags, lastSynchro.isEmpty) + .recover { + case error => + logger.error(s"Unable to create observable $observable ${observableType.name}:$filename", error) + } + } + + logger.info("Removing old observables") + alertSrv + .get(alert) + .observables + .filter( + _.or( + _.has(_._updatedAt, P.lt(startSyncDate)), + _.and(_.hasNot(_._updatedAt), _.has(_._createdAt, P.lt(startSyncDate))) + ) + ) + .toIterator + .foreach { obs => + logger.debug(s"Delete $obs") + observableSrv.remove(obs).recover { + case error => logger.error(s"Fail to delete observable $obs", error) } } } @@ -339,78 +333,99 @@ class MispImportSrv @Inject() ( mispOrganisation: String, event: Event, caseTemplate: Option[CaseTemplate with Entity] - )(implicit - authContext: AuthContext - ): Try[Alert with Entity] = { + )(implicit graph: Graph, authContext: AuthContext): Try[(Alert with Entity, JsObject)] = { logger.debug(s"updateOrCreateAlert ${client.name}#${event.id} for organisation ${organisation.name}") eventToAlert(client, event).flatMap { alert => - db.tryTransaction { implicit graph => - organisationSrv - .get(organisation) - .alerts - .getBySourceId("misp", mispOrganisation, event.id) - .richAlert - .headOption match { - case None => // if the related alert doesn't exist, create it - logger.debug(s"Event ${client.name}#${event.id} has no related alert for organisation ${organisation.name}") - alertSrv - .create(alert, organisation, event.tags.map(_.name).toSet, Seq(), caseTemplate) - .map(_.alert) - case Some(richAlert) => - logger.debug(s"Event ${client.name}#${event.id} have already been imported for organisation ${organisation.name}, updating the alert") - for { - updatedAlert <- - Some(alertSrv.get(richAlert.alert)) - .map(t => if (richAlert.title != alert.title) t.update(_.title, alert.title) else t) - .map(t => if (richAlert.lastSyncDate != alert.lastSyncDate) t.update(_.lastSyncDate, alert.lastSyncDate) else t) - .map(t => if (richAlert.description != alert.description) t.update(_.description, alert.description) else t) - .map(t => if (richAlert.severity != alert.severity) t.update(_.severity, alert.severity) else t) - .map(t => if (richAlert.date != alert.date) t.update(_.date, alert.date) else t) - .map(t => if (richAlert.tlp != alert.tlp) t.update(_.tlp, alert.tlp) else t) - .map(t => if (richAlert.pap != alert.pap) t.update(_.pap, alert.pap) else t) - .map(t => if (richAlert.externalLink != alert.externalLink) t.update(_.externalLink, alert.externalLink) else t) - .get - .getOrFail("Alert") - _ <- alertSrv.updateTagNames(updatedAlert, event.tags.map(_.name).toSet) - } yield updatedAlert - } + organisationSrv + .get(organisation) + .alerts + .getBySourceId("misp", mispOrganisation, event.id) + .richAlert + .headOption match { + case None => // if the related alert doesn't exist, create it + logger.debug(s"Event ${client.name}#${event.id} has no related alert for organisation ${organisation.name}") + alertSrv + .create(alert, organisation, event.tags.map(_.name).toSet, Seq(), caseTemplate) + .map(ra => ra.alert -> ra.toJson.asInstanceOf[JsObject]) + case Some(richAlert) => + logger.debug(s"Event ${client.name}#${event.id} have already been imported for organisation ${organisation.name}, updating the alert") + val (updatedAlertTraversal, updatedFields) = (alertSrv.get(richAlert.alert), JsObject.empty) + .when(richAlert.title != alert.title)(_.update(_.title, alert.title), _ + ("title" -> JsString(alert.title))) + .when(richAlert.lastSyncDate != alert.lastSyncDate)( + _.update(_.lastSyncDate, alert.lastSyncDate), + _ + ("lastSyncDate" -> JsNumber(alert.lastSyncDate.getTime)) + ) + .when(richAlert.description != alert.description)( + _.update(_.description, alert.description), + _ + ("description" -> JsString(alert.description)) + ) + .when(richAlert.severity != alert.severity)(_.update(_.severity, alert.severity), _ + ("severity" -> JsNumber(alert.severity))) + .when(richAlert.date != alert.date)(_.update(_.date, alert.date), _ + ("date" -> JsNumber(alert.date.getTime))) + .when(richAlert.tlp != alert.tlp)(_.update(_.tlp, alert.tlp), _ + ("tlp" -> JsNumber(alert.tlp))) + .when(richAlert.pap != alert.pap)(_.update(_.pap, alert.pap), _ + ("pap" -> JsNumber(alert.pap))) + .when(richAlert.externalLink != alert.externalLink)( + _.update(_.externalLink, alert.externalLink), + _ + ("externalLink" -> alert.externalLink.fold[JsValue](JsNull)(JsString.apply)) + ) + val tags = event.tags.map(_.name) + for { + (addedTags, removedTags) <- alertSrv.updateTagNames(richAlert.alert, tags.toSet) + updatedAlert <- updatedAlertTraversal.getOrFail("Alert") + updatedFieldWithTags = + if (addedTags.nonEmpty || removedTags.nonEmpty) updatedFields + ("tags" -> JsArray(tags.map(JsString))) else updatedFields + } yield (updatedAlert, updatedFieldWithTags) } } } - def syncMispEvents(client: TheHiveMispClient)(implicit authContext: AuthContext): Future[Unit] = - Future.fromTry(client.currentOrganisationName).flatMap { mispOrganisation => - lazy val caseTemplate = client.caseTemplate.flatMap { caseTemplateName => - db.roTransaction { implicit graph => - caseTemplateSrv.get(EntityName(caseTemplateName)).headOption - } - } - logger.debug(s"Get eligible organisations") - val organisations = db.roTransaction { implicit graph => - client.organisationFilter(organisationSrv.startTraversal).toSeq - } - val lastSynchro = getLastSyncDate(client, mispOrganisation, organisations) - logger.debug(s"Last synchronisation is $lastSynchro") - client - .searchEvents(publishDate = lastSynchro) - .runWith(Sink.foreachAsync(1) { event => - logger.debug(s"Importing event ${client.name}#${event.id} in organisation(s): ${organisations.mkString(",")}") - Future - .traverse(organisations) { organisation => - Future - .fromTry(updateOrCreateAlert(client, organisation, mispOrganisation, event, caseTemplate)) - .flatMap(alert => importAttibutes(client, event, alert, lastSynchro)) - .recover { - case error => - logger.warn(s"Unable to create alert from MISP event ${client.name}#${event.id}", error) - } + def syncMispEvents(client: TheHiveMispClient)(implicit authContext: AuthContext): Unit = + client + .currentOrganisationName + .fold( + error => logger.error("Unable to get MISP organisation", error), + mispOrganisation => { + + val caseTemplate = client.caseTemplate.flatMap { caseTemplateName => + db.roTransaction { implicit graph => + caseTemplateSrv.get(EntityName(caseTemplateName)).headOption } - .map(_ => ()) - .recover { - case error => - logger.warn(s"Unable to create alert from MISP event ${client.name}#${event.id}", error) + } + + logger.debug(s"Get eligible organisations") + val organisations = db.roTransaction { implicit graph => + client.organisationFilter(organisationSrv.startTraversal).toSeq + } + val lastSynchro = db.roTransaction { implicit graph => + getLastSyncDate(client, mispOrganisation, organisations) + } + + logger.debug(s"Last synchronisation is $lastSynchro") + val queue = client + .searchEvents(publishDate = lastSynchro) + .runWith(Sink.queue[Event]) + QueueIterator(queue).foreach { event => + logger.debug(s"Importing event ${client.name}#${event.id} in organisation(s): ${organisations.mkString(",")}") + organisations.foreach { organisation => + db.tryTransaction { implicit graph => + auditSrv.mergeAudits { + updateOrCreateAlert(client, organisation, mispOrganisation, event, caseTemplate) + .map { + case (alert, updatedFields) => + importAttibutes(client, event, alert, if (alert._updatedBy.isEmpty) None else lastSynchro) + (alert, updatedFields) + } + .recoverWith { + case error => + logger.warn(s"Unable to create alert from MISP event ${client.name}#${event.id}", error) + Failure(error) + } + } { + case (alert, updatedFields) if alert._updatedBy.isDefined => auditSrv.alert.update(alert, updatedFields) + case (alert, updatedFields) => auditSrv.alert.create(alert, updatedFields) + } + } } - }) - .map(_ => ()) - } + } + } + ) } diff --git a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/QueueIterator.scala b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/QueueIterator.scala new file mode 100644 index 0000000000..d117f39932 --- /dev/null +++ b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/QueueIterator.scala @@ -0,0 +1,49 @@ +package org.thp.thehive.connector.misp.services + +import akka.stream.scaladsl.SinkQueueWithCancel +import play.api.Logger + +import java.util.NoSuchElementException +import scala.concurrent.Await +import scala.concurrent.duration.{Duration, DurationInt} +import scala.util.control.NonFatal + +class QueueIterator[T](queue: SinkQueueWithCancel[T], readTimeout: Duration) extends Iterator[T] { + lazy val logger: Logger = Logger(getClass) + + private var nextValue: Option[T] = None + private var isFinished: Boolean = false + def getNextValue(): Unit = + try nextValue = Await.result(queue.pull(), readTimeout) + catch { + case NonFatal(e) => + logger.error("Stream fails", e) + isFinished = true + nextValue = None + } + override def hasNext: Boolean = + if (isFinished) false + else { + if (nextValue.isEmpty) + getNextValue() + nextValue.isDefined + } + + override def next(): T = + nextValue match { + case Some(v) => + nextValue = None + v + case _ if !isFinished => + getNextValue() + nextValue.getOrElse { + isFinished = true + throw new NoSuchElementException + } + case _ => throw new NoSuchElementException + } +} + +object QueueIterator { + def apply[T](queue: SinkQueueWithCancel[T], readTimeout: Duration = 1.minute) = new QueueIterator[T](queue, readTimeout) +} diff --git a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/TheHiveMispClient.scala b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/TheHiveMispClient.scala index 50f0bc2dfb..000c48fe51 100644 --- a/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/TheHiveMispClient.scala +++ b/misp/connector/src/main/scala/org/thp/thehive/connector/misp/services/TheHiveMispClient.scala @@ -23,6 +23,7 @@ case class TheHiveMispClientConfig( wsConfig: ProxyWSConfig = ProxyWSConfig(AhcWSClientConfig(), None), maxAge: Option[Duration], excludedOrganisations: Seq[String] = Nil, + whitelistOrganisations: Seq[String] = Nil, excludedTags: Set[String] = Set.empty, whitelistTags: Set[String] = Set.empty, purpose: MispPurpose.Value = MispPurpose.ImportAndExport, @@ -44,6 +45,7 @@ object TheHiveMispClientConfig { wsConfig <- (JsPath \ "wsConfig").readWithDefault[ProxyWSConfig](ProxyWSConfig(AhcWSClientConfig(), None)) maxAge <- (JsPath \ "maxAge").readNullable[Duration] excludedOrganisations <- (JsPath \ "exclusion" \ "organisations").readWithDefault[Seq[String]](Nil) + whitelistOrganisations <- (JsPath \ "whitelist" \ "organisations").readWithDefault[Seq[String]](Nil) excludedTags <- (JsPath \ "exclusion" \ "tags").readWithDefault[Set[String]](Set.empty) whitelistTags <- (JsPath \ "whitelist" \ "tags").readWithDefault[Set[String]](Set.empty) purpose <- (JsPath \ "purpose").readWithDefault[MispPurpose.Value](MispPurpose.ImportAndExport) @@ -60,6 +62,7 @@ object TheHiveMispClientConfig { wsConfig, maxAge, excludedOrganisations, + whitelistOrganisations, excludedTags, whitelistTags, purpose, @@ -98,6 +101,7 @@ class TheHiveMispClient( ws: WSClient, maxAge: Option[Duration], excludedOrganisations: Seq[String], + whitelistOrganisations: Seq[String], excludedTags: Set[String], whitelistTags: Set[String], purpose: MispPurpose.Value, @@ -114,6 +118,7 @@ class TheHiveMispClient( ws, maxAge, excludedOrganisations, + whitelistOrganisations, excludedTags, whitelistTags ) { @@ -126,6 +131,7 @@ class TheHiveMispClient( new ProxyWS(config.wsConfig, mat), config.maxAge, config.excludedOrganisations, + config.whitelistOrganisations, config.excludedTags, config.whitelistTags, config.purpose, diff --git a/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/MispImportSrvTest.scala b/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/MispImportSrvTest.scala index 915ef429c0..21b5d8c86e 100644 --- a/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/MispImportSrvTest.scala +++ b/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/MispImportSrvTest.scala @@ -73,9 +73,8 @@ class MispImportSrvTest(implicit ec: ExecutionContext) extends PlaySpecification "MISP service" should { "import events" in testApp { app => - await(app[MispImportSrv].syncMispEvents(app[TheHiveMispClient])(authContext))(1.minute) - app[Database].roTransaction { implicit graph => + app[MispImportSrv].syncMispEvents(app[TheHiveMispClient]) app[AlertSrv].startTraversal.getBySourceId("misp", "ORGNAME", "1").visible.getOrFail("Alert") } must beSuccessfulTry( Alert( diff --git a/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/TestMispClientProvider.scala b/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/TestMispClientProvider.scala index a881cc7298..02b3dc36d0 100644 --- a/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/TestMispClientProvider.scala +++ b/misp/connector/src/test/scala/org/thp/thehive/connector/misp/services/TestMispClientProvider.scala @@ -59,6 +59,7 @@ class TestMispClientProvider @Inject() (Action: DefaultActionBuilder, implicit v ws = ws, maxAge = None, excludedOrganisations = Nil, + whitelistOrganisations = Nil, excludedTags = Set.empty, whitelistTags = Set.empty, purpose = MispPurpose.ImportAndExport, diff --git a/thehive/app/org/thp/thehive/services/AlertSrv.scala b/thehive/app/org/thp/thehive/services/AlertSrv.scala index 49c91f328d..0d7de34c4b 100644 --- a/thehive/app/org/thp/thehive/services/AlertSrv.scala +++ b/thehive/app/org/thp/thehive/services/AlertSrv.scala @@ -1,9 +1,5 @@ package org.thp.thehive.services -import java.lang.{Long => JLong} -import java.util.{Date, List => JList, Map => JMap} - -import javax.inject.{Inject, Named, Singleton} import org.apache.tinkerpop.gremlin.process.traversal.P import org.apache.tinkerpop.gremlin.structure.Graph import org.thp.scalligraph.auth.{AuthContext, Permission} @@ -24,6 +20,9 @@ import org.thp.thehive.services.ObservableOps._ import org.thp.thehive.services.OrganisationOps._ import play.api.libs.json.{JsObject, Json} +import java.lang.{Long => JLong} +import java.util.{Date, List => JList, Map => JMap} +import javax.inject.{Inject, Named, Singleton} import scala.util.{Failure, Success, Try} @Singleton @@ -101,7 +100,10 @@ class AlertSrv @Inject() ( .flatMap(auditSrv.alert.update(_, updatedFields)) } - def updateTags(alert: Alert with Entity, tags: Set[Tag with Entity])(implicit graph: Graph, authContext: AuthContext): Try[Unit] = { + def updateTags(alert: Alert with Entity, tags: Set[Tag with Entity])(implicit + graph: Graph, + authContext: AuthContext + ): Try[(Set[Tag with Entity], Set[Tag with Entity])] = { val (tagsToAdd, tagsToRemove) = get(alert) .tags .toIterator @@ -114,11 +116,14 @@ class AlertSrv @Inject() ( _ <- tagsToAdd.toTry(alertTagSrv.create(AlertTag(), alert, _)) _ = get(alert).removeTags(tagsToRemove) _ <- auditSrv.alert.update(alert, Json.obj("tags" -> tags.map(_.toString))) - } yield () + } yield (tagsToAdd, tagsToRemove) } - def updateTagNames(alert: Alert with Entity, tags: Set[String])(implicit graph: Graph, authContext: AuthContext): Try[Unit] = + def updateTagNames(alert: Alert with Entity, tags: Set[String])(implicit + graph: Graph, + authContext: AuthContext + ): Try[(Set[Tag with Entity], Set[Tag with Entity])] = tags.toTry(tagSrv.getOrCreate).flatMap(t => updateTags(alert, t.toSet)) def addTags(alert: Alert with Entity, tags: Set[String])(implicit graph: Graph, authContext: AuthContext): Try[Unit] = { @@ -237,34 +242,36 @@ class AlertSrv @Inject() ( graph: Graph, authContext: AuthContext ): Try[RichCase] = - get(alert.alert).`case`.richCase.getOrFail("Case").orElse { - for { - caseTemplate <- - alert - .caseTemplate - .map(ct => caseTemplateSrv.get(EntityIdOrName(ct)).richCaseTemplate.getOrFail("CaseTemplate")) - .flip - customField = alert.customFields.map(f => InputCustomFieldValue(f.name, f.value, f.order)) - case0 = Case( - number = 0, - title = caseTemplate.flatMap(_.titlePrefix).getOrElse("") + alert.title, - description = alert.description, - severity = alert.severity, - startDate = new Date, - endDate = None, - flag = false, - tlp = alert.tlp, - pap = alert.pap, - status = CaseStatus.Open, - summary = None - ) + auditSrv.mergeAudits { + get(alert.alert).`case`.richCase.getOrFail("Case").orElse { + for { + caseTemplate <- + alert + .caseTemplate + .map(ct => caseTemplateSrv.get(EntityIdOrName(ct)).richCaseTemplate.getOrFail("CaseTemplate")) + .flip + customField = alert.customFields.map(f => InputCustomFieldValue(f.name, f.value, f.order)) + case0 = Case( + number = 0, + title = caseTemplate.flatMap(_.titlePrefix).getOrElse("") + alert.title, + description = alert.description, + severity = alert.severity, + startDate = new Date, + endDate = None, + flag = false, + tlp = alert.tlp, + pap = alert.pap, + status = CaseStatus.Open, + summary = None + ) - createdCase <- caseSrv.create(case0, user, organisation, alert.tags.toSet, customField, caseTemplate, Nil) - _ <- importObservables(alert.alert, createdCase.`case`) - _ <- alertCaseSrv.create(AlertCase(), alert.alert, createdCase.`case`) - _ <- markAsRead(alert._id) - } yield createdCase - } + createdCase <- caseSrv.create(case0, user, organisation, alert.tags.toSet, customField, caseTemplate, Nil) + _ <- importObservables(alert.alert, createdCase.`case`) + _ <- alertCaseSrv.create(AlertCase(), alert.alert, createdCase.`case`) + _ <- markAsRead(alert._id) + } yield createdCase + } + }(richCase => auditSrv.`case`.create(richCase.`case`, richCase.toJson)) def mergeInCase(alertId: EntityIdOrName, caseId: EntityIdOrName)(implicit graph: Graph, authContext: AuthContext): Try[Case with Entity] = for {