From 6d8190321008cbaa0af4a719ce643a73d58d87e6 Mon Sep 17 00:00:00 2001 From: To-om Date: Sat, 22 Jan 2022 09:34:31 +0100 Subject: [PATCH] #2305 Use stream to read alert observables --- build.sbt | 1 + .../thehive/migration/th3/ElasticClient.scala | 48 ++++++++++++++----- .../org/thp/thehive/migration/th3/Input.scala | 47 +++++++----------- 3 files changed, 55 insertions(+), 41 deletions(-) diff --git a/build.sbt b/build.sbt index ba3be03915..b1459358ea 100644 --- a/build.sbt +++ b/build.sbt @@ -342,6 +342,7 @@ lazy val thehiveMigration = (project in file("migration")) resolvers += "elasticsearch-releases" at "https://artifacts.elastic.co/maven", crossScalaVersions := Seq(scala212), libraryDependencies ++= Seq( + alpakka, ehcache, scopt, specs % Test diff --git a/migration/src/main/scala/org/thp/thehive/migration/th3/ElasticClient.scala b/migration/src/main/scala/org/thp/thehive/migration/th3/ElasticClient.scala index a7b619fb0e..64c0798c05 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/th3/ElasticClient.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/th3/ElasticClient.scala @@ -4,6 +4,7 @@ import akka.NotUsed import akka.actor.{ActorSystem, Scheduler} import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} +import akka.util.ByteString import com.typesafe.sslconfig.ssl.{KeyManagerConfig, KeyStoreConfig, SSLConfigSettings, TrustManagerConfig, TrustStoreConfig} import org.thp.client.{Authentication, NoAuthentication, PasswordAuthentication} import org.thp.scalligraph.utils.Retry @@ -11,7 +12,7 @@ import org.thp.scalligraph.{InternalError, NotFoundError} import play.api.http.HeaderNames import play.api.libs.json.{JsNumber, JsObject, JsValue, Json} import play.api.libs.ws.ahc.{AhcWSClient, AhcWSClientConfig} -import play.api.libs.ws.{WSClient, WSClientConfig} +import play.api.libs.ws.{WSClient, WSClientConfig, WSResponse} import play.api.{Configuration, Logger} import java.net.{URI, URLEncoder} @@ -141,7 +142,7 @@ class ElasticConfig( lazy val logger: Logger = Logger(getClass) def stripUrl(url: String): String = new URI(url).normalize().toASCIIString.replaceAll("/+$", "") - def post(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = { + def post(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[WSResponse] = { val encodedParams = params .map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}") .mkString("&") @@ -152,13 +153,23 @@ class ElasticConfig( .withHttpHeaders(HeaderNames.CONTENT_TYPE -> "application/json") ) .post(body) - .map { - case response if response.status == 200 => response.json - case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}") - } } } + def postJson(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = + post(url, body, params: _*) + .map { + case response if response.status == 200 => response.json + case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}") + } + + def postRaw(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] = + post(url, body, params: _*) + .map { + case response if response.status == 200 => response.bodyAsBytes + case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}") + } + def delete(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = { val encodedParams = params .map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}") @@ -208,6 +219,7 @@ trait ElasticClient { val pageSize: Int val keepAlive: String def search(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] + def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] @@ -217,7 +229,13 @@ trait ElasticClient { } def count(docType: String, query: JsObject)(implicit ec: ExecutionContext): Future[Long] = - search(docType, query + ("size" -> JsNumber(0))).map(j => (j \ "hits" \ "total").as[Long]) + search(docType, query + ("size" -> JsNumber(0))) + .map { j => + (j \ "hits" \ "total") + .asOpt[Long] + .orElse((j \ "hits" \ "total" \ "value").asOpt[Long]) + .getOrElse(-1) + } def get(docType: String, id: String)(implicit ec: ExecutionContext, mat: Materializer): Future[JsValue] = { import ElasticDsl._ @@ -229,9 +247,11 @@ class ElasticMultiTypeClient(elasticConfig: ElasticConfig, indexName: String) ex override val pageSize: Int = elasticConfig.pageSize override val keepAlive: String = elasticConfig.keepAlive override def search(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = - elasticConfig.post(s"/$indexName/$docType/_search", request, params: _*) + elasticConfig.postJson(s"/$indexName/$docType/_search", request, params: _*) + override def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] = + elasticConfig.postRaw(s"/$indexName/$docType/_search", request, params: _*) override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] = - elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive)) + elasticConfig.postJson("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive)) override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] = elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId)) } @@ -243,10 +263,16 @@ class ElasticSingleTypeClient(elasticConfig: ElasticConfig, indexName: String) e import ElasticDsl._ val query = (request \ "query").as[JsObject] val queryWithType = request + ("query" -> and(termQuery("relations", docType), query)) - elasticConfig.post(s"/$indexName/_search", queryWithType, params: _*) + elasticConfig.postJson(s"/$indexName/_search", queryWithType, params: _*) + } + override def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] = { + import ElasticDsl._ + val query = (request \ "query").as[JsObject] + val queryWithType = request + ("query" -> and(termQuery("relations", docType), query)) + elasticConfig.postRaw(s"/$indexName/_search", queryWithType, params: _*) } override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] = - elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive)) + elasticConfig.postJson("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive)) override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] = elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId)) } diff --git a/migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala b/migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala index 7347913c49..5b3c08eb0b 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala @@ -200,37 +200,24 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient override def countAlertObservables(filter: Filter): Future[Long] = Future.failed(new NotImplementedError) - override def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed] = - elaticClient("alert", searchQuery(idsQuery(alertId))) - .map { json => - for { - metaData <- json.validate[MetaData] - observablesJson <- (json \ "artifacts").validate[Seq[JsValue]] - } yield (metaData, observablesJson) - } - .mapConcat { - case JsSuccess(x, _) => List(x) - case JsError(errors) => - val errorStr = errors.map(e => s"\n - ${e._1}: ${e._2.mkString(",")}") - logger.error(s"Alert observable read failure:$errorStr") - Nil - } - .mapConcat { - case (metaData, observablesJson) => - observablesJson.flatMap { observableJson => - Try(metaData.id -> observableJson.as(alertObservableReads(metaData))) - .fold( - error => - if ((observableJson \ "remoteAttachment").isDefined) { - logger.warn(s"Pre 2.13 file observables are ignored in MISP alert $alertId") - Nil - } else List(Failure(error)), - o => List(Success(o)) - ) - }.toList + override def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed] = { + val dummyMetaData = MetaData("no-id", "init", new Date, None, None) + Source + .future(elaticClient.searchRaw("alert", searchQuery(idsQuery(alertId)))) + .via(JsonReader.select("$.hits.hits[*]._source.artifacts[*]")) + .mapConcat { data => + Try(Json.parse(data.toArray[Byte])) + .flatMap { j => + Try(List(alertId -> j.as(alertObservableReads(dummyMetaData)))) + .recover { + case _ if (j \ "remoteAttachment").isDefined => + logger.warn(s"Pre 2.13 file observables are ignored in MISP alert $alertId") + Nil + } + } + .fold(error => List(Failure(error)), _.map(Success(_))) } - - override def countAlertObservables(alertId: String): Future[Long] = Future.failed(new NotImplementedError) + } override def listUsers(filter: Filter): Source[Try[InputUser], NotUsed] = elaticClient("user", searchQuery(matchAll))