Skip to content

Commit

Permalink
Add custom HTTP client. It accepts proxy, ssl and timeout configurati…
Browse files Browse the repository at this point in the history
…on, with stackable config.

Use custom HTTP client in MISP connector (#143) and Cortex connector (#147)
  • Loading branch information
To-om committed Apr 25, 2017
1 parent 106a91a commit 2d846d2
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 159 deletions.
88 changes: 88 additions & 0 deletions thehive-backend/app/services/CustomWSAPI.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package services

import javax.inject.Inject

import akka.stream.Materializer
import play.api.inject.ApplicationLifecycle
import play.api.libs.ws.ahc.{ AhcWSAPI, AhcWSClient, AhcWSClientConfig, AhcWSClientConfigParser }
import play.api.libs.ws.ssl.TrustStoreConfig
import play.api.libs.ws._
import play.api.{ Configuration, Environment, Logger }

object CustomWSAPI {
private[CustomWSAPI] lazy val logger = Logger(getClass)

def parseWSConfig(config: Configuration, environment: Environment): AhcWSClientConfig = {
new AhcWSClientConfigParser(
new WSConfigParser(config, environment).parse(),
config,
environment).parse()
}

def parseProxyConfig(config: Configuration): Option[WSProxyServer] = for {
proxyConfig config.getConfig("play.ws.proxy")
proxyHost proxyConfig.getString("host")
proxyPort proxyConfig.getInt("port")
proxyProtocol = proxyConfig.getString("protocol")
proxyPrincipal = proxyConfig.getString("user")
proxyPassword = proxyConfig.getString("password")
proxyNtlmDomain = proxyConfig.getString("ntlmDomain")
proxyEncoding = proxyConfig.getString("encoding")
proxyNonProxyHosts = proxyConfig.getStringSeq("nonProxyHosts")
} yield DefaultWSProxyServer(proxyHost, proxyPort, proxyProtocol, proxyPrincipal, proxyPassword, proxyNtlmDomain, proxyEncoding, proxyNonProxyHosts)

def getWS(config: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, mat: Materializer): AhcWSAPI = {
val clientConfig = parseWSConfig(config, environment)
val clientConfigWithTruststore = config.getString("play.cert") match {
case Some(p)
logger.warn(
"""Use of "cert" parameter in configuration file is deprecated. Please use:
| ws.ssl {
| trustManager = {
| stores = [
| { type = "PEM", path = "/path/to/cacert.crt" },
| { type = "JKS", path = "/path/to/truststore.jks" }
| ]
| }
| }
""".stripMargin)
clientConfig.copy(
wsClientConfig = clientConfig.wsClientConfig.copy(
ssl = clientConfig.wsClientConfig.ssl.copy(
trustManagerConfig = clientConfig.wsClientConfig.ssl.trustManagerConfig.copy(
trustStoreConfigs = clientConfig.wsClientConfig.ssl.trustManagerConfig.trustStoreConfigs :+ TrustStoreConfig(filePath = Some(p.toString), data = None)))))
case None clientConfig
}
new AhcWSAPI(environment, clientConfigWithTruststore, lifecycle)(mat)
}

def getConfig(config: Configuration, path: String): Configuration = {
Configuration(
config.getConfig(s"play.$path").getOrElse(Configuration.empty).underlying.withFallback(
config.getConfig(path).getOrElse(Configuration.empty).underlying))
}
}

class CustomWSAPI(ws: AhcWSAPI, val proxy: Option[WSProxyServer], config: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, mat: Materializer) extends WSAPI {
private[CustomWSAPI] lazy val logger = Logger(getClass)

@Inject() def this(config: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, mat: Materializer) =
this(
CustomWSAPI.getWS(config, environment, lifecycle, mat),
CustomWSAPI.parseProxyConfig(config),
config, environment, lifecycle, mat)

override def url(url: String): WSRequest = {
val req = ws.url(url)
proxy.fold(req)(req.withProxyServer)
}

override def client: AhcWSClient = ws.client

def withConfig(subConfig: Configuration): CustomWSAPI = {
logger.debug(s"Override WS configuration using $subConfig")
new CustomWSAPI(
Configuration(subConfig.underlying.atKey("play").withFallback(config.underlying)),
environment, lifecycle, mat)
}
}
67 changes: 31 additions & 36 deletions thehive-cortex/app/connectors/cortex/services/CortexClient.scala
Original file line number Diff line number Diff line change
@@ -1,77 +1,72 @@
package connectors.cortex.services

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration

import akka.stream.scaladsl.Source

import play.api.libs.json.{ JsObject, Json }
import play.api.libs.ws.{ WSClient, WSRequest, WSResponse }
import play.api.mvc.MultipartFormData.{ DataPart, FilePart }

import org.elastic4play.NotFoundError

import connectors.cortex.models.{ Analyzer, CortexArtifact, DataArtifact, FileArtifact }
import connectors.cortex.models.JsonFormat._
import play.api.Logger
import play.api.libs.json.{ JsObject, JsValue, Json }
import play.api.libs.ws.{ WSRequest, WSResponse }
import play.api.mvc.MultipartFormData.{ DataPart, FilePart }
import services.CustomWSAPI

import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

class CortexClient(val name: String, baseUrl: String, key: String) {
class CortexClient(val name: String, baseUrl: String, key: String, ws: CustomWSAPI) {

lazy val logger = Logger(getClass)

logger.info(s"new Cortex($name, $baseUrl, $key)")
def request[A](uri: String, f: WSRequest Future[WSResponse], t: WSResponse A)(implicit ws: WSClient, ec: ExecutionContext): Future[A] = {
val url = (baseUrl + uri)
logger.info(s"Requesting Cortex $url")
f(ws.url(url).withHeaders("auth" key)).map {
def request[A](uri: String, f: WSRequest Future[WSResponse], t: WSResponse A)(implicit ec: ExecutionContext): Future[A] = {
logger.info(s"Requesting Cortex $baseUrl")
f(ws.url(s"$baseUrl/$uri").withHeaders("auth" key)).map {
case response if response.status / 100 == 2 t(response)
case error
logger.error(s"Cortex error on $url (${error.status}) \n${error.body}")
logger.error(s"Cortex error on $baseUrl (${error.status}) \n${error.body}")
sys.error("")
}
}

def getAnalyzer(analyzerId: String)(implicit ws: WSClient, ec: ExecutionContext): Future[Analyzer] = {
request(s"/api/analyzer/$analyzerId", _.get, _.json.as[Analyzer]).map(_.copy(cortexIds = List(name)))
def getAnalyzer(analyzerId: String)(implicit ec: ExecutionContext): Future[Analyzer] = {
request(s"api/analyzer/$analyzerId", _.get, _.json.as[Analyzer]).map(_.copy(cortexIds = List(name)))
}

def listAnalyzer(implicit ws: WSClient, ec: ExecutionContext): Future[Seq[Analyzer]] = {
request(s"/api/analyzer", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))
def listAnalyzer(implicit ec: ExecutionContext): Future[Seq[Analyzer]] = {
request(s"api/analyzer", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))
}

def analyze(analyzerId: String, artifact: CortexArtifact)(implicit ws: WSClient, ec: ExecutionContext) = {
def analyze(analyzerId: String, artifact: CortexArtifact)(implicit ec: ExecutionContext): Future[JsValue] = {
artifact match {
case FileArtifact(data, attributes)
val body = Source(List(
FilePart("data", (attributes \ "attachment" \ "name").asOpt[String].getOrElse("noname"), None, data),
DataPart("_json", attributes.toString)))
request(s"/api/analyzer/$analyzerId/run", _.post(body), _.json)
request(s"api/analyzer/$analyzerId/run", _.post(body), _.json)
case a: DataArtifact
request(s"/api/analyzer/$analyzerId/run", _.post(Json.toJson(a)), _.json.as[JsObject])
request(s"api/analyzer/$analyzerId/run", _.post(Json.toJson(a)), _.json.as[JsObject])
}
}

def listAnalyzerForType(dataType: String)(implicit ws: WSClient, ec: ExecutionContext): Future[Seq[Analyzer]] = {
request(s"/api/analyzer/type/$dataType", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))
def listAnalyzerForType(dataType: String)(implicit ec: ExecutionContext): Future[Seq[Analyzer]] = {
request(s"api/analyzer/type/$dataType", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))
}

def listJob(implicit ws: WSClient, ec: ExecutionContext) = {
request(s"/api/job", _.get, _.json.as[Seq[JsObject]])
def listJob(implicit ec: ExecutionContext): Future[Seq[JsObject]] = {
request(s"api/job", _.get, _.json.as[Seq[JsObject]])
}

def getJob(jobId: String)(implicit ws: WSClient, ec: ExecutionContext) = {
request(s"/api/job/$jobId", _.get, _.json.as[JsObject])
def getJob(jobId: String)(implicit ec: ExecutionContext): Future[JsObject] = {
request(s"api/job/$jobId", _.get, _.json.as[JsObject])
}

def removeJob(jobId: String)(implicit ws: WSClient, ec: ExecutionContext) = {
request(s"/api/job/$jobId", _.delete, _ ())
def removeJob(jobId: String)(implicit ec: ExecutionContext): Future[Unit] = {
request(s"api/job/$jobId", _.delete, _ ())
}

def report(jobId: String)(implicit ws: WSClient, ec: ExecutionContext) = {
request(s"/api/job/$jobId/report", _.get, r r.json.as[JsObject])
def report(jobId: String)(implicit ec: ExecutionContext): Future[JsObject] = {
request(s"api/job/$jobId/report", _.get, r r.json.as[JsObject])
}

def waitReport(jobId: String, atMost: Duration)(implicit ws: WSClient, ec: ExecutionContext) = {
request(s"/api/job/$jobId/waitreport", _.withQueryString("atMost" atMost.toString).get, r r.json.as[JsObject])
def waitReport(jobId: String, atMost: Duration)(implicit ec: ExecutionContext): Future[JsObject] = {
request(s"api/job/$jobId/waitreport", _.withQueryString("atMost" atMost.toString).get, r r.json.as[JsObject])
}
}
79 changes: 40 additions & 39 deletions thehive-cortex/app/connectors/cortex/services/CortexSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,36 @@ package connectors.cortex.services

import java.nio.file.{ Path, Paths }
import java.util.Date

import javax.inject.{ Inject, Singleton }

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.DurationInt
import scala.language.implicitConversions
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal

import akka.NotUsed
import akka.actor.ActorDSL.{ Act, actor }
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }

import play.api.{ Configuration, Logger }
import connectors.cortex.models.JsonFormat._
import connectors.cortex.models._
import models.Artifact
import org.elastic4play.controllers.Fields
import org.elastic4play.services._
import org.elastic4play.{ InternalError, NotFoundError }
import play.api.libs.json.{ JsObject, Json }
import play.api.libs.ws.WSClient
import play.api.{ Configuration, Logger }
import services.{ ArtifactSrv, CustomWSAPI, MergeArtifact }

import org.elastic4play.{ InternalError, NotFoundError }
import org.elastic4play.controllers.Fields
import org.elastic4play.services.{ AttachmentSrv, AuthContext, CreateSrv, EventSrv, FindSrv, GetSrv, QueryDef, UpdateSrv }

import connectors.cortex.models.{ Analyzer, CortexJob, DataArtifact, FileArtifact, Job, JobModel, JobStatus }
import connectors.cortex.models.JsonFormat.{ cortexJobFormat, jobStatusFormat }
import models.Artifact
import services.{ ArtifactSrv, MergeArtifact }
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
import scala.language.implicitConversions
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

object CortexConfig {
def getCortexClient(name: String, configuration: Configuration): Option[CortexClient] = {
def getCortexClient(name: String, configuration: Configuration, ws: CustomWSAPI): Option[CortexClient] = {
try {
val url = configuration.getString("url").getOrElse(sys.error("url is missing")).replaceFirst("/*$", "")
val key = "" // configuration.getString("key").getOrElse(sys.error("key is missing"))
Some(new CortexClient(name, url, key))
Some(new CortexClient(name, url, key, ws))
}
catch {
case NonFatal(e)
Expand All @@ -44,22 +40,26 @@ object CortexConfig {
}
}

def getInstances(configuration: Configuration): Seq[CortexClient] = {
def getInstances(configuration: Configuration, globalWS: CustomWSAPI): Seq[CortexClient] = {
for {
cfg configuration.getConfig("cortex").toSeq
cortexWS = globalWS.withConfig(cfg)
key cfg.subKeys
if key != "ws"
c cfg.getConfig(key)
cic getCortexClient(key, c)
instanceWS = cortexWS.withConfig(c)
cic getCortexClient(key, c, instanceWS)
} yield cic
}
}

@Singleton
case class CortexConfig(truststore: Option[Path], instances: Seq[CortexClient]) {

@Inject
def this(configuration: Configuration) = this(
def this(configuration: Configuration, globalWS: CustomWSAPI) = this(
configuration.getString("cortex.cert").map(p Paths.get(p)),
CortexConfig.getInstances(configuration))
CortexConfig.getInstances(configuration, globalWS))
}

@Singleton
Expand All @@ -80,7 +80,7 @@ class CortexSrv @Inject() (

lazy val logger = Logger(getClass)

val mergeActor = actor(new Act {
private[CortexSrv] val mergeActor = actor(new Act {
become {
case MergeArtifact(newArtifact, artifacts, authContext)
import org.elastic4play.services.QueryDSL._
Expand Down Expand Up @@ -109,30 +109,31 @@ class CortexSrv @Inject() (

def getAnalyzer(analyzerId: String): Future[Analyzer] = {
Future
.traverse(cortexConfig.instances) {
case cortex cortex.getAnalyzer(analyzerId).map(Some(_)).fallbackTo(Future.successful(None))
.traverse(cortexConfig.instances) { cortex
cortex.getAnalyzer(analyzerId).map(Some(_)).fallbackTo(Future.successful(None))
}
.map { analyzers
analyzers.foldLeft[Option[Analyzer]](None) {
case (Some(analyzer1), Some(analyzer2)) Some(analyzer1.copy(cortexIds = analyzer1.cortexIds ++ analyzer2.cortexIds))
case (maybeAnalyzer1, maybeAnalyzer2) maybeAnalyzer1 orElse maybeAnalyzer2
}
analyzers
.foldLeft[Option[Analyzer]](None) {
case (Some(analyzer1), Some(analyzer2)) Some(analyzer1.copy(cortexIds = analyzer1.cortexIds ++ analyzer2.cortexIds))
case (maybeAnalyzer1, maybeAnalyzer2) maybeAnalyzer1 orElse maybeAnalyzer2
}
.getOrElse(throw NotFoundError(s"Analyzer $analyzerId not found"))
}
}

def askAnalyzersOnAllCortex(f: CortexClient Future[Seq[Analyzer]]): Future[Seq[Analyzer]] = {
Future
.traverse(cortexConfig.instances) {
case cortex f(cortex)
.traverse(cortexConfig.instances) { cortex
f(cortex)
}
.map(_.flatten)
}

def getAnalyzersFor(dataType: String): Future[Seq[Analyzer]] = {
Future
.traverse(cortexConfig.instances) {
case cortex cortex.listAnalyzerForType(dataType)
.traverse(cortexConfig.instances) { cortex
cortex.listAnalyzerForType(dataType)
}
.map { listOfListOfAnalyzers
val analysers = listOfListOfAnalyzers.flatten
Expand All @@ -146,8 +147,8 @@ class CortexSrv @Inject() (

def listAnalyzer: Future[Seq[Analyzer]] = {
Future
.traverse(cortexConfig.instances) {
case cortex cortex.listAnalyzer
.traverse(cortexConfig.instances) { cortex
cortex.listAnalyzer
}
.map { listOfListOfAnalyzers
val analysers = listOfListOfAnalyzers.flatten
Expand All @@ -164,15 +165,15 @@ class CortexSrv @Inject() (
}

def updateJobWithCortex(jobId: String, cortexJobId: String, cortex: CortexClient)(implicit authContext: AuthContext): Unit = {
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job ${jobId}")
logger.debug(s"Requesting status of job $cortexJobId in cortex ${cortex.name} in order to update job $jobId")
cortex.waitReport(cortexJobId, 1.minute) andThen {
case Success(j)
val status = (j \ "status").asOpt[JobStatus.Type].getOrElse(JobStatus.Failure)
if (status == JobStatus.InProgress)
updateJobWithCortex(jobId, cortexJobId, cortex)
else {
val report = (j \ "report").asOpt[JsObject].getOrElse(JsObject(Nil)).toString
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating job ${jobId}")
logger.debug(s"Job $cortexJobId in cortex ${cortex.name} has finished with status $status, updating job $jobId")
getSrv[JobModel, Job](jobModel, jobId)
.flatMap { job
if (status == JobStatus.Success) {
Expand Down Expand Up @@ -201,7 +202,7 @@ class CortexSrv @Inject() (
case _
}
}
case Failure(e)
case Failure(_)
logger.debug(s"Request of status of job $cortexJobId in cortex ${cortex.name} fails, restarting ...")
updateJobWithCortex(jobId, cortexJobId, cortex)
}
Expand Down
Loading

0 comments on commit 2d846d2

Please sign in to comment.