Skip to content

Commit

Permalink
#236 Wait end of alert status fix before starting full synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jun 14, 2017
1 parent 3c5f0c9 commit 689801f
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions thehive-misp/app/connectors/misp/MispSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import play.api.libs.json._
import play.api.{ Configuration, Environment, Logger }
import services._

import scala.collection.immutable
import scala.concurrent.duration.{ DurationInt, DurationLong, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -168,13 +169,13 @@ class MispSrv @Inject() (
.runWith(Sink.seq)
}

def fullSynchronize()(implicit authContext: AuthContext) = {
def fullSynchronize()(implicit authContext: AuthContext): Future[immutable.Seq[Try[Alert]]] = {
import org.elastic4play.services.QueryDSL._

val updatedStatusFields = Fields.empty.set("status", "Updated")
val (updateAlerts, updateAlertCount) = alertSrv.find("status" ~= "Update", Some("all"), Nil)
updateAlertCount.foreach(c logger.info(s"Updating $c alert with Update status"))
updateAlerts
val updateAlertProcess = updateAlerts
.mapAsyncUnordered(1) { alert
logger.debug(s"Updating alert ${alert.id} (status: Update -> Updated")
alertSrv.update(alert, updatedStatusFields)
Expand All @@ -187,7 +188,7 @@ class MispSrv @Inject() (
val ignoredStatusFields = Fields.empty.set("status", "Ignored")
val (ignoreAlerts, ignoreAlertCount) = alertSrv.find("status" ~= "Ignore", Some("all"), Nil)
ignoreAlertCount.foreach(c logger.info(s"Updating $c alert with Ignore status"))
ignoreAlerts
val ignoreAlertProcess = ignoreAlerts
.mapAsyncUnordered(1) { alert
logger.debug(s"Updating alert ${alert.id} (status: Ignore -> Ignored")
alertSrv.update(alert, ignoredStatusFields)
Expand All @@ -197,9 +198,13 @@ class MispSrv @Inject() (
}
.runWith(Sink.ignore)

Source(mispConfig.connections.toList)
.flatMapConcat(mispConnection synchronize(mispConnection, new Date(0)))
.runWith(Sink.seq)
updateAlertProcess
.flatMap(_ ignoreAlertProcess)
.flatMap { _
Source(mispConfig.connections.toList)
.flatMapConcat(mispConnection synchronize(mispConnection, new Date(0)))
.runWith(Sink.seq)
}
}

def synchronize(mispConnection: MispConnection, lastSyncDate: Date)(implicit authContext: AuthContext): Source[Try[Alert], NotUsed] = {
Expand All @@ -208,11 +213,10 @@ class MispSrv @Inject() (
// get events that have been published after the last synchronization
getEventsFromDate(mispConnection, lastSyncDate)
// get related alert
.mapAsyncUnordered(1) {
case event
logger.trace(s"Looking for alert misp:${event.source}:${event.sourceRef}")
alertSrv.get("misp", event.source, event.sourceRef)
.map((event, _))
.mapAsyncUnordered(1) { event
logger.trace(s"Looking for alert misp:${event.source}:${event.sourceRef}")
alertSrv.get("misp", event.source, event.sourceRef)
.map((event, _))
}
.mapAsyncUnordered(1) {
case (event, alert)
Expand Down Expand Up @@ -248,6 +252,7 @@ class MispSrv @Inject() (
case AlertStatus.New Json.toJson(AlertStatus.New)
case _ Json.toJson(AlertStatus.Updated)
}))
logger.debug(s"Update alert ${alert.id} with\n$alertJson")
val fAlert = alertSrv.update(alert.id, Fields(alertJson))
// if a case have been created, update it
(alert.caze() match {
Expand Down

0 comments on commit 689801f

Please sign in to comment.