Skip to content

Commit

Permalink
#2305 Use stream to read alert observables
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 22, 2022
1 parent 44dea68 commit 6d81903
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 41 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ lazy val thehiveMigration = (project in file("migration"))
resolvers += "elasticsearch-releases" at "https://artifacts.elastic.co/maven",
crossScalaVersions := Seq(scala212),
libraryDependencies ++= Seq(
alpakka,
ehcache,
scopt,
specs % Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import akka.NotUsed
import akka.actor.{ActorSystem, Scheduler}
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.sslconfig.ssl.{KeyManagerConfig, KeyStoreConfig, SSLConfigSettings, TrustManagerConfig, TrustStoreConfig}
import org.thp.client.{Authentication, NoAuthentication, PasswordAuthentication}
import org.thp.scalligraph.utils.Retry
import org.thp.scalligraph.{InternalError, NotFoundError}
import play.api.http.HeaderNames
import play.api.libs.json.{JsNumber, JsObject, JsValue, Json}
import play.api.libs.ws.ahc.{AhcWSClient, AhcWSClientConfig}
import play.api.libs.ws.{WSClient, WSClientConfig}
import play.api.libs.ws.{WSClient, WSClientConfig, WSResponse}
import play.api.{Configuration, Logger}

import java.net.{URI, URLEncoder}
Expand Down Expand Up @@ -141,7 +142,7 @@ class ElasticConfig(
lazy val logger: Logger = Logger(getClass)
def stripUrl(url: String): String = new URI(url).normalize().toASCIIString.replaceAll("/+$", "")

def post(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = {
def post(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[WSResponse] = {
val encodedParams = params
.map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}")
.mkString("&")
Expand All @@ -152,13 +153,23 @@ class ElasticConfig(
.withHttpHeaders(HeaderNames.CONTENT_TYPE -> "application/json")
)
.post(body)
.map {
case response if response.status == 200 => response.json
case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}")
}
}
}

def postJson(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] =
post(url, body, params: _*)
.map {
case response if response.status == 200 => response.json
case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}")
}

def postRaw(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] =
post(url, body, params: _*)
.map {
case response if response.status == 200 => response.bodyAsBytes
case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}")
}

def delete(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = {
val encodedParams = params
.map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}")
Expand Down Expand Up @@ -208,6 +219,7 @@ trait ElasticClient {
val pageSize: Int
val keepAlive: String
def search(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue]
def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString]
def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue]
def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue]

Expand All @@ -217,7 +229,13 @@ trait ElasticClient {
}

def count(docType: String, query: JsObject)(implicit ec: ExecutionContext): Future[Long] =
search(docType, query + ("size" -> JsNumber(0))).map(j => (j \ "hits" \ "total").as[Long])
search(docType, query + ("size" -> JsNumber(0)))
.map { j =>
(j \ "hits" \ "total")
.asOpt[Long]
.orElse((j \ "hits" \ "total" \ "value").asOpt[Long])
.getOrElse(-1)
}

def get(docType: String, id: String)(implicit ec: ExecutionContext, mat: Materializer): Future[JsValue] = {
import ElasticDsl._
Expand All @@ -229,9 +247,11 @@ class ElasticMultiTypeClient(elasticConfig: ElasticConfig, indexName: String) ex
override val pageSize: Int = elasticConfig.pageSize
override val keepAlive: String = elasticConfig.keepAlive
override def search(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.post(s"/$indexName/$docType/_search", request, params: _*)
elasticConfig.postJson(s"/$indexName/$docType/_search", request, params: _*)
override def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] =
elasticConfig.postRaw(s"/$indexName/$docType/_search", request, params: _*)
override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
elasticConfig.postJson("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId))
}
Expand All @@ -243,10 +263,16 @@ class ElasticSingleTypeClient(elasticConfig: ElasticConfig, indexName: String) e
import ElasticDsl._
val query = (request \ "query").as[JsObject]
val queryWithType = request + ("query" -> and(termQuery("relations", docType), query))
elasticConfig.post(s"/$indexName/_search", queryWithType, params: _*)
elasticConfig.postJson(s"/$indexName/_search", queryWithType, params: _*)
}
override def searchRaw(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[ByteString] = {
import ElasticDsl._
val query = (request \ "query").as[JsObject]
val queryWithType = request + ("query" -> and(termQuery("relations", docType), query))
elasticConfig.postRaw(s"/$indexName/_search", queryWithType, params: _*)
}
override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
elasticConfig.postJson("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId))
}
47 changes: 17 additions & 30 deletions migration/src/main/scala/org/thp/thehive/migration/th3/Input.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,37 +200,24 @@ class Input @Inject() (configuration: Configuration, elaticClient: ElasticClient

override def countAlertObservables(filter: Filter): Future[Long] = Future.failed(new NotImplementedError)

override def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed] =
elaticClient("alert", searchQuery(idsQuery(alertId)))
.map { json =>
for {
metaData <- json.validate[MetaData]
observablesJson <- (json \ "artifacts").validate[Seq[JsValue]]
} yield (metaData, observablesJson)
}
.mapConcat {
case JsSuccess(x, _) => List(x)
case JsError(errors) =>
val errorStr = errors.map(e => s"\n - ${e._1}: ${e._2.mkString(",")}")
logger.error(s"Alert observable read failure:$errorStr")
Nil
}
.mapConcat {
case (metaData, observablesJson) =>
observablesJson.flatMap { observableJson =>
Try(metaData.id -> observableJson.as(alertObservableReads(metaData)))
.fold(
error =>
if ((observableJson \ "remoteAttachment").isDefined) {
logger.warn(s"Pre 2.13 file observables are ignored in MISP alert $alertId")
Nil
} else List(Failure(error)),
o => List(Success(o))
)
}.toList
override def listAlertObservables(alertId: String): Source[Try[(String, InputObservable)], NotUsed] = {
val dummyMetaData = MetaData("no-id", "init", new Date, None, None)
Source
.future(elaticClient.searchRaw("alert", searchQuery(idsQuery(alertId))))
.via(JsonReader.select("$.hits.hits[*]._source.artifacts[*]"))
.mapConcat { data =>
Try(Json.parse(data.toArray[Byte]))
.flatMap { j =>
Try(List(alertId -> j.as(alertObservableReads(dummyMetaData))))
.recover {
case _ if (j \ "remoteAttachment").isDefined =>
logger.warn(s"Pre 2.13 file observables are ignored in MISP alert $alertId")
Nil
}
}
.fold(error => List(Failure(error)), _.map(Success(_)))
}

override def countAlertObservables(alertId: String): Future[Long] = Future.failed(new NotImplementedError)
}

override def listUsers(filter: Filter): Source[Try[InputUser], NotUsed] =
elaticClient("user", searchQuery(matchAll))
Expand Down

0 comments on commit 6d81903

Please sign in to comment.