Skip to content

Commit

Permalink
#1685 Fix MISP filters, improve sync
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Dec 17, 2020
1 parent 94609e4 commit 54596fe
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 300 deletions.
2 changes: 1 addition & 1 deletion ScalliGraph
23 changes: 17 additions & 6 deletions misp/client/src/main/scala/org/thp/misp/client/MispClient.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package org.thp.misp.client

import java.util.Date

import akka.NotUsed
import akka.stream.alpakka.json.scaladsl.JsonReader
import akka.stream.scaladsl.{JsonFraming, Source}
import akka.util.ByteString
import org.thp.client.{ApplicationError, Authentication, ProxyWS}
import org.thp.misp.dto.{Attribute, Event, Organisation, Tag, User}
import org.thp.misp.dto._
import org.thp.scalligraph.InternalError
import org.thp.scalligraph.utils.FunctionalCondition._
import play.api.Logger
import play.api.http.Status
import play.api.libs.json.{JsObject, JsString, JsValue, Json}
import play.api.libs.json._
import play.api.libs.ws.{WSClient, WSRequest}

import java.util.Date
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
Expand All @@ -29,6 +29,7 @@ class MispClient(
ws: WSClient,
maxAge: Option[Duration],
excludedOrganisations: Seq[String],
whitelistOrganisations: Seq[String],
excludedTags: Set[String],
whitelistTags: Set[String]
) {
Expand Down Expand Up @@ -148,7 +149,7 @@ class MispClient(

def getEvent(eventId: String)(implicit ec: ExecutionContext): Future[Event] = {
logger.debug(s"Get MISP event $eventId")
require(!eventId.isEmpty)
require(eventId.nonEmpty)
get(s"events/$eventId")
.map(e => (e \ "Event").as[Event])
}
Expand All @@ -163,7 +164,17 @@ class MispClient(
.recover { case _ => Json.obj("name" -> name, "version" -> "", "status" -> "ERROR", "url" -> baseUrl) }

def searchEvents(publishDate: Option[Date] = None)(implicit ec: ExecutionContext): Source[Event, NotUsed] = {
val query = publishDate.fold(JsObject.empty)(d => Json.obj("searchpublish_timestamp" -> ((d.getTime / 1000) + 1)))
val fromDate = (maxAge.map(a => System.currentTimeMillis() - a.toMillis).toSeq ++ publishDate.map(_.getTime))
.sorted(Ordering[Long].reverse)
.headOption
.map(d => "searchpublish_timestamp" -> JsNumber((d / 1000) + 1))
val tagFilter = (whitelistTags ++ excludedTags.map("!" + _)).map(JsString.apply)
val organisationFilter = (whitelistOrganisations ++ excludedOrganisations.map("!" + _)).map(JsString.apply)
val query = JsObject
.empty
.merge(fromDate)(_ + _)
.when(tagFilter.nonEmpty)(_ + ("searchtag" -> JsArray(tagFilter.toSeq)))
.when(organisationFilter.nonEmpty)(_ + ("searchorg" -> JsArray(organisationFilter)))
logger.debug("Search MISP events")
Source
.futureSource(postStream("events/index", query))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import org.thp.scalligraph.auth.UserSrv
import play.api.Logger

import javax.inject.{Inject, Named, Provider}
import scala.concurrent.Future
import scala.util.{Failure, Success}

sealed trait MispMessage
case object Synchro extends MispMessage
case class EndOfSynchro(error: Option[String]) extends MispMessage
case object Synchro extends MispMessage

class MispActor @Inject() (
connector: Connector,
Expand All @@ -25,39 +22,22 @@ class MispActor @Inject() (
override def preStart(): Unit = {
super.preStart()
logger.info(s"[$self] Starting actor MISP")
context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInitialDelay, self, Synchro)))
context.become(receive(context.system.scheduler.scheduleOnce(connector.syncInitialDelay, self, Synchro)))
}

override def receive: Receive = {
case other => logger.warn(s"Unknown message $other (${other.getClass})")
}

def running: Receive = {
case Synchro => logger.info("MISP synchronisation is already in progress")
case EndOfSynchro(None) =>
logger.info("MISP synchronisation is complete")
context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro)))
case EndOfSynchro(Some(error)) =>
logger.error(s"MISP synchronisation fails: $error")
context.become(waiting(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro)))
case other => logger.warn(s"Unknown message $other (${other.getClass})")
}

def waiting(scheduledSynchronisation: Cancellable): Receive = {
def receive(scheduledSynchronisation: Cancellable): Receive = {
case Synchro =>
scheduledSynchronisation.cancel()
context.become(running)
logger.info(s"Synchronising MISP events for ${connector.clients.map(_.name).mkString(",")}")
Future
.traverse(connector.clients.filter(_.canImport))(mispImportSrv.syncMispEvents(_)(userSrv.getSystemAuthContext))
.map(_ => ())
.onComplete {
case _: Success[_] => self ! EndOfSynchro(None)
case Failure(error) =>
logger.error("MISP synchronisation failure", error)
self ! EndOfSynchro(Some(error.toString))
}
case other => logger.warn(s"Unknown message $other (${other.getClass})")
connector.clients.filter(_.canImport).foreach { mispClient =>
mispImportSrv.syncMispEvents(mispClient)(userSrv.getSystemAuthContext)
}
logger.info("MISP synchronisation is complete")
context.become(receive(context.system.scheduler.scheduleOnce(connector.syncInterval, self, Synchro)))
}
}

Expand Down
Loading

0 comments on commit 54596fe

Please sign in to comment.