Skip to content

Commit

Permalink
#236 Add API to resynchronize all MISP alerts. This API also fix aler…
Browse files Browse the repository at this point in the history
…t status name (d451ed0)
  • Loading branch information
To-om committed Jun 13, 2017
1 parent 75b17bb commit 064b7e0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 33 deletions.
3 changes: 3 additions & 0 deletions thehive-backend/app/services/AlertSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class AlertSrv(
def update(id: String, fields: Fields)(implicit authContext: AuthContext): Future[Alert] =
updateSrv[AlertModel, Alert](alertModel, id, fields)

def update(alert: Alert, fields: Fields)(implicit authContext: AuthContext): Future[Alert] =
updateSrv(alert, fields)

def bulkUpdate(ids: Seq[String], fields: Fields)(implicit authContext: AuthContext): Future[Seq[Try[Alert]]] = {
updateSrv[AlertModel, Alert](alertModel, ids, fields)
}
Expand Down
22 changes: 11 additions & 11 deletions thehive-backend/app/services/StreamSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object StreamActor {
/* Ask messages, wait if there is no ready messages*/
case object GetOperations
/* Pending messages must be sent to sender */
case class Submit(senderRef: ActorRef)
case object Submit
/* List of ready messages */
case class StreamMessages(messages: Seq[JsObject])
case object StreamNotFound
Expand All @@ -70,25 +70,25 @@ class StreamActor(
def this(senderRef: ActorRef) = this(
senderRef,
FakeCancellable,
context.system.scheduler.scheduleOnce(refresh, self, Submit(senderRef)),
context.system.scheduler.scheduleOnce(refresh, self, Submit),
false)

/**
* Renew timers
*/
def renew(): WaitingRequest = {
def renew: WaitingRequest = {
if (itemCancellable.cancel()) {
if (!hasResult && globalCancellable.cancel()) {
new WaitingRequest(
senderRef,
context.system.scheduler.scheduleOnce(nextItemMaxWait, self, Submit(senderRef)),
context.system.scheduler.scheduleOnce(globalMaxWait, self, Submit(senderRef)),
context.system.scheduler.scheduleOnce(nextItemMaxWait, self, Submit),
context.system.scheduler.scheduleOnce(globalMaxWait, self, Submit),
true)
}
else
new WaitingRequest(
senderRef,
context.system.scheduler.scheduleOnce(nextItemMaxWait, self, Submit(senderRef)),
context.system.scheduler.scheduleOnce(nextItemMaxWait, self, Submit),
globalCancellable,
true)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ class StreamActor(
aog :+ operation
case _
logger.debug("Impossible")
???
sys.error("")
}
context.become(receiveWithState(waitingRequest.map(_.renew), currentMessages + (requestId Some(updatedOperationGroup))))

Expand All @@ -174,7 +174,7 @@ class StreamActor(
}
context.become(receiveWithState(Some(new WaitingRequest(sender)), currentMessages))

case Submit(senderRef)
case Submit
waitingRequest match {
case Some(wr)
val (readyMessages, pendingMessages) = currentMessages.partition(_._2.fold(false)(_.isReady))
Expand All @@ -184,9 +184,9 @@ class StreamActor(
logger.error("No request to submit !")
}

case Initialize(requestId) context.become(receiveWithState(waitingRequest, currentMessages + (requestId None)))
case operation: AuditOperation
case message logger.warn(s"Unexpected message $message (${message.getClass})")
case Initialize(requestId) context.become(receiveWithState(waitingRequest, currentMessages + (requestId None)))
case _: AuditOperation
case message logger.warn(s"Unexpected message $message (${message.getClass})")
}

def receive: Receive = receiveWithState(None, Map.empty[String, Option[StreamMessageGroup[_]]])
Expand Down
7 changes: 7 additions & 0 deletions thehive-misp/app/connectors/misp/MispCtrl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MispCtrl @Inject() (
private[MispCtrl] lazy val logger = Logger(getClass)
val router = SimpleRouter {
case GET(p"/_syncAlerts") syncAlerts
case GET(p"/_syncAllAlerts") syncAllAlerts
case GET(p"/_syncArtifacts") syncArtifacts
case r throw NotFoundError(s"${r.uri} not found")
}
Expand All @@ -41,6 +42,12 @@ class MispCtrl @Inject() (
.map { m Ok(Json.toJson(m)) }
}

@Timed
def syncAllAlerts: Action[AnyContent] = authenticated(Role.admin).async { implicit request
mispSrv.fullSynchronize()
.map { m Ok(Json.toJson(m)) }
}

@Timed
def syncArtifacts: Action[AnyContent] = authenticated(Role.admin) {
eventSrv.publish(UpdateMispAlertArtifact())
Expand Down
82 changes: 60 additions & 22 deletions thehive-misp/app/connectors/misp/MispSrv.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package connectors.misp

import java.text.SimpleDateFormat
import java.util.Date
import javax.inject.{ Inject, Provider, Singleton }

Expand Down Expand Up @@ -157,46 +156,86 @@ class MispSrv @Inject() (
// for each MISP server
Source(mispConfig.connections.toList)
// get last synchronization
.mapAsyncUnordered(1) { mcfg
alertSrv.stats(and("type" ~= "misp", "source" ~= mcfg.name), Seq(selectMax("lastSyncDate")))
.map { maxLastSyncDate mcfg new Date((maxLastSyncDate \ "max_lastSyncDate").as[Long]) }
.recover { case _ mcfg new Date(0) }
.mapAsyncUnordered(1) { mispConnection
alertSrv.stats(and("type" ~= "misp", "source" ~= mispConnection.name), Seq(selectMax("lastSyncDate")))
.map { maxLastSyncDate mispConnection new Date((maxLastSyncDate \ "max_lastSyncDate").as[Long]) }
.recover { case _ mispConnection new Date(0) }
}
// get events that have been published after the last synchronization
.flatMapConcat {
case (mcfg, lastSyncDate)
getEventsFromDate(mcfg, lastSyncDate)
.map((mcfg, _))
case (mispConnection, lastSyncDate)
synchronize(mispConnection, lastSyncDate)
}
.runWith(Sink.seq)
}

def fullSynchronize()(implicit authContext: AuthContext) = {
import org.elastic4play.services.QueryDSL._

val updatedStatusFields = Fields.empty.set("status", "Updated")
val (updateAlerts, updateAlertCount) = alertSrv.find("status" ~= "Update", Some("all"), Nil)
updateAlertCount.foreach(c logger.info(s"Updating $c alert with Update status"))
updateAlerts
.mapAsyncUnordered(1) { alert
logger.debug(s"Updating alert ${alert.id} (status: Update -> Updated")
alertSrv.update(alert, updatedStatusFields)
.recover {
case error logger.warn(s"""Fail to set "Updated" status to alert ${alert.id}""", error)
}
}
.runWith(Sink.ignore)

val ignoredStatusFields = Fields.empty.set("status", "Ignored")
val (ignoreAlerts, ignoreAlertCount) = alertSrv.find("status" ~= "Ignore", Some("all"), Nil)
ignoreAlertCount.foreach(c logger.info(s"Updating $c alert with Ignore status"))
ignoreAlerts
.mapAsyncUnordered(1) { alert
logger.debug(s"Updating alert ${alert.id} (status: Ignore -> Ignored")
alertSrv.update(alert, ignoredStatusFields)
.recover {
case error logger.warn(s"""Fail to set "Ignored" status to alert ${alert.id}""", error)
}
}
.runWith(Sink.ignore)

Source(mispConfig.connections.toList)
.flatMapConcat(mispConnection synchronize(mispConnection, new Date(0)))
.runWith(Sink.seq)
}

def synchronize(mispConnection: MispConnection, lastSyncDate: Date)(implicit authContext: AuthContext): Source[Try[Alert], NotUsed] = {
logger.info(s"Synchronize MISP ${mispConnection.name} from $lastSyncDate")
val fullSynchro = if (lastSyncDate.getTime == 0) Some(lastSyncDate) else None
// get events that have been published after the last synchronization
getEventsFromDate(mispConnection, lastSyncDate)
// get related alert
.mapAsyncUnordered(1) {
case (mcfg, event)
case event
logger.trace(s"Looking for alert misp:${event.source}:${event.sourceRef}")
alertSrv.get("misp", event.source, event.sourceRef)
.map((mcfg, event, _))
.map((event, _))
}
.mapAsyncUnordered(1) {
case (mcfg, event, alert)
logger.trace(s"MISP synchro ${mcfg.name}, event ${event.sourceRef}, alert ${alert.fold("no alert")(a "alert " + a.alertId() + "last sync at " + a.lastSyncDate())}")
logger.info(s"getting MISP event ${event.sourceRef}")
getAttributes(mcfg, event.sourceRef, alert.map(_.lastSyncDate()))
.map((mcfg, event, alert, _))
case (event, alert)
logger.trace(s"MISP synchro ${mispConnection.name}, event ${event.sourceRef}, alert ${alert.fold("no alert")(a "alert " + a.alertId() + "last sync at " + a.lastSyncDate())}")
logger.info(s"getting MISP event ${event.source}:${event.sourceRef}")
getAttributes(mispConnection, event.sourceRef, fullSynchro.orElse(alert.map(_.lastSyncDate())))
.map((event, alert, _))
}
.mapAsyncUnordered(1) {
// if there is no related alert, create a new one
case (mcfg, event, None, attrs)
logger.info(s"MISP event ${event.sourceRef} has no related alert, create it with ${attrs.size} observable(s)")
case (event, None, attrs)
logger.info(s"MISP event ${event.source}:${event.sourceRef} has no related alert, create it with ${attrs.size} observable(s)")
val alertJson = Json.toJson(event).as[JsObject] +
("type" JsString("misp")) +
("caseTemplate" mcfg.caseTemplate.fold[JsValue](JsNull)(JsString)) +
("caseTemplate" mispConnection.caseTemplate.fold[JsValue](JsNull)(JsString)) +
("artifacts" JsArray(attrs))
alertSrv.create(Fields(alertJson))
.map(Success(_))
.recover { case t Failure(t) }

// if a related alert exists, update it
case (_, event, Some(alert), attrs)
logger.info(s"MISP event ${event.sourceRef} has related alert, update it with ${attrs.size} observable(s)")
case (event, Some(alert), attrs)
logger.info(s"MISP event ${event.source}:${event.sourceRef} has related alert, update it with ${attrs.size} observable(s)")
val alertJson = Json.toJson(event).as[JsObject] -
"type" -
"source" -
Expand All @@ -223,7 +262,6 @@ class MispSrv @Inject() (
.map(Success(_))
.recover { case t Failure(t) }
}
.runWith(Sink.seq)
}

def getEventsFromDate(mispConnection: MispConnection, fromDate: Date): Source[MispAlert, NotUsed] = {
Expand Down

0 comments on commit 064b7e0

Please sign in to comment.