Skip to content

Commit

Permalink
#609 Rename worker by responder
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 9, 2018
1 parent ac96186 commit f199ac6
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 70 deletions.
28 changes: 14 additions & 14 deletions thehive-cortex/app/connectors/cortex/controllers/CortexCtrl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.elastic4play.models.JsonFormat.baseModelEntityWrites
import org.elastic4play.services.{ Agg, AuxSrv, QueryDSL, QueryDef }
import org.elastic4play.services.JsonFormat.{ aggReads, queryReads }
import connectors.Connector
import connectors.cortex.models.JsonFormat.{ analyzerFormat, workerFormat }
import connectors.cortex.models.JsonFormat.{ analyzerFormat, responderFormat }
import connectors.cortex.services.{ CortexActionSrv, CortexAnalyzerSrv, CortexConfig }
import models.HealthStatus.Type
import models.{ HealthStatus, Roles }
Expand Down Expand Up @@ -76,10 +76,10 @@ class CortexCtrl @Inject() (
case GET(p"/analyzer/type/$dataType<[^/]*>") getAnalyzerFor(dataType)
case GET(p"/analyzer") listAnalyzer

case GET(p"/worker/$workerId<[^/]*>") getWorker(workerId)
case GET(p"/worker") findWorker
case POST(p"/worker/_search") findWorker
case GET(p"/worker/$entityType<[^/]*>/$entityId<[^/]*>") getWorkers(entityType, entityId)
case GET(p"/responder/$responderId<[^/]*>") getResponder(responderId)
case GET(p"/responder") findResponder
case POST(p"/responder/_search") findResponder
case GET(p"/responder/$entityType<[^/]*>/$entityId<[^/]*>") getResponders(entityType, entityId)

case POST(p"/action") createAction
case GET(p"/action") findAction
Expand Down Expand Up @@ -160,25 +160,25 @@ class CortexCtrl @Inject() (
}
}

def getWorker(workerId: String): Action[AnyContent] = authenticated(Roles.read).async { implicit request
cortexActionSrv.getWorkerById(workerId).map { worker
renderer.toOutput(OK, worker)
def getResponder(responderId: String): Action[AnyContent] = authenticated(Roles.read).async { implicit request
cortexActionSrv.getResponderById(responderId).map { responder
renderer.toOutput(OK, responder)
}
}

def getWorkers(entityType: String, entityId: String): Action[AnyContent] = authenticated(Roles.read).async { implicit request
cortexActionSrv.findWorkerFor(entityType, entityId).map { workers
renderer.toOutput(OK, workers)
def getResponders(entityType: String, entityId: String): Action[AnyContent] = authenticated(Roles.read).async { implicit request
cortexActionSrv.findResponderFor(entityType, entityId).map { responders
renderer.toOutput(OK, responders)
}
}

def findWorker: Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request
def findResponder: Action[Fields] = authenticated(Roles.read).async(fieldsBodyParser) { implicit request
val query = request.body.getValue("query") match {
case Some(o: JsObject) o
case _ JsObject.empty
}
cortexActionSrv.findWorkers(query).map { workers
renderer.toOutput(OK, workers)
cortexActionSrv.findResponders(query).map { responders
renderer.toOutput(OK, responders)
}
}

Expand Down
6 changes: 3 additions & 3 deletions thehive-cortex/app/connectors/cortex/models/Action.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import connectors.cortex.models.JsonFormat.jobStatusFormat
import services.AuditedModel

trait ActionAttributes { _: AttributeDef
val workerId = attribute("workerId", F.stringFmt, "Analyzer", O.readonly)
val workerName = optionalAttribute("workerName", F.stringFmt, "Name of the worker", O.readonly)
val workerDefinition = optionalAttribute("workerDefinition", F.stringFmt, "Name of the worker definition", O.readonly)
val responderId = attribute("responderId", F.stringFmt, "Analyzer", O.readonly)
val responderName = optionalAttribute("responderName", F.stringFmt, "Name of the responder", O.readonly)
val responderDefinition = optionalAttribute("responderDefinition", F.stringFmt, "Name of the responder definition", O.readonly)
val status = attribute("status", F.enumFmt(JobStatus), "Status of the action job", JobStatus.InProgress)
val objectType = attribute("objectType", F.stringFmt, "Type of the object on which this job was executed")
val objectId = attribute("objectId", F.stringFmt, "Object ID on which this job was executed", O.readonly)
Expand Down
24 changes: 12 additions & 12 deletions thehive-cortex/app/connectors/cortex/models/JsonFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ object JsonFormat {

implicit val reportTypeFormat: Format[ReportType.Type] = enumFormat(ReportType)

private val workerWrites = Writes[Worker](worker Json.obj(
"id" worker.id,
"name" worker.name,
"version" worker.version,
"description" worker.description,
"dataTypeList" worker.dataTypeList,
"maxTlp" -> worker.maxTlp,
"maxPap" -> worker.maxPap,
"cortexIds" worker.cortexIds))
private val workerReads = Reads[Worker](json
private val responderWrites = Writes[Responder](responder Json.obj(
"id" responder.id,
"name" responder.name,
"version" responder.version,
"description" responder.description,
"dataTypeList" responder.dataTypeList,
"maxTlp" -> responder.maxTlp,
"maxPap" -> responder.maxPap,
"cortexIds" responder.cortexIds))
private val responderReads = Reads[Responder](json
for {
name (json \ "name").validate[String]
version (json \ "version").validate[String]
Expand All @@ -91,6 +91,6 @@ object JsonFormat {
dataTypeList (json \ "dataTypeList").validate[Seq[String]]
maxTlp (json \ "maxTlp").validateOpt[Long]
maxPap (json \ "maxPap").validateOpt[Long]
} yield Worker(id, renamed, version, description, dataTypeList, maxTlp, maxPap))
implicit val workerFormat: Format[Worker] = Format(workerReads, workerWrites)
} yield Responder(id, renamed, version, description, dataTypeList, maxTlp, maxPap))
implicit val responderFormat: Format[Responder] = Format(responderReads, responderWrites)
}
6 changes: 3 additions & 3 deletions thehive-cortex/app/connectors/cortex/models/Worker.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package connectors.cortex.models

case class Worker(
case class Responder(
id: String,
name: String,
version: String,
Expand All @@ -10,7 +10,7 @@ case class Worker(
maxPap: Option[Long],
cortexIds: List[String] = Nil) {

def addCortexId(cid: String): Worker = copy(cortexIds = cid :: cortexIds)
def addCortexId(cid: String): Responder = copy(cortexIds = cid :: cortexIds)

def join(worker: Worker): Worker = copy(cortexIds = cortexIds ::: worker.cortexIds)
def join(responder: Responder): Responder = copy(cortexIds = cortexIds ::: responder.cortexIds)
}
52 changes: 26 additions & 26 deletions thehive-cortex/app/connectors/cortex/services/CortexActionSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,45 +38,45 @@ class CortexActionSrv @Inject() (
implicit val mat: Materializer) {

lazy val logger = Logger(getClass)
lazy val workerIdRegex: Regex = "(.*)-(.*)".r
lazy val responderIdRegex: Regex = "(.*)-(.*)".r

def getWorkerById(id: String): Future[Worker] = {
def getResponderById(id: String): Future[Responder] = {
id match {
case workerIdRegex(instanceId, workerId) cortexConfig.instances.find(_.name == instanceId).map(_.getWorkerById(workerId)).getOrElse(Future.failed(NotFoundError(s"Worker $id not found")))
case _ Future.firstCompletedOf(cortexConfig.instances.map(_.getWorkerById(id)))
case responderIdRegex(instanceId, responderId) cortexConfig.instances.find(_.name == instanceId).map(_.getResponderById(responderId)).getOrElse(Future.failed(NotFoundError(s"Responder $id not found")))
case _ Future.firstCompletedOf(cortexConfig.instances.map(_.getResponderById(id)))
}
}

def askWorkersOnAllCortex(f: CortexClient Future[Seq[Worker]]): Future[Seq[Worker]] = {
def askRespondersOnAllCortex(f: CortexClient Future[Seq[Responder]]): Future[Seq[Responder]] = {
Future
.traverse(cortexConfig.instances) { cortex
f(cortex).recover { case NonFatal(t) logger.error("Request to Cortex fails", t); Nil }
}
.map(_.flatten)
}

def findWorkers(query: JsObject): Future[Seq[Worker]] = {
askWorkersOnAllCortex(_.findWorkers(query))
.map { workers
workers
def findResponders(query: JsObject): Future[Seq[Responder]] = {
askRespondersOnAllCortex(_.findResponders(query))
.map { responders
responders
.groupBy(_.name)
.values
.map(_.reduce(_ join _))
.toSeq
}
}

def findWorkerFor(entityType: String, entityId: String): Future[Seq[Worker]] = {
def findResponderFor(entityType: String, entityId: String): Future[Seq[Responder]] = {
for {
(tlp, pap) getEntity(entityType, entityId)
.flatMap(actionOperationSrv.findCaseEntity)
.map { caze (caze.tlp(), caze.pap()) }
.recover { case _ (0L, 0L) }
query = Json.obj(
"dataTypeList" -> s"thehive:$entityType")
workers findWorkers(query)
applicableWorkers = workers.filter(w w.maxTlp.fold(true)(_ >= tlp) && w.maxPap.fold(true)(_ >= pap))
} yield applicableWorkers
responders findResponders(query)
applicableResponders = responders.filter(w w.maxTlp.fold(true)(_ >= tlp) && w.maxPap.fold(true)(_ >= pap))
} yield applicableResponders
}

def find(queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[Action, NotUsed], Future[Long]) = {
Expand Down Expand Up @@ -150,33 +150,33 @@ class CortexActionSrv @Inject() (
}

def executeAction(fields: Fields)(implicit authContext: AuthContext): Future[Action] = {
def getWorker(cortexClient: CortexClient): Future[Worker] = {
fields.getString("workerId").map(cortexClient.getWorkerById) orElse
fields.getString("workerName").map(cortexClient.getWorkerByName) getOrElse
Future.failed(BadRequestError("worker is missing"))
def getResponder(cortexClient: CortexClient): Future[Responder] = {
fields.getString("responderId").map(cortexClient.getResponderById) orElse
fields.getString("responderName").map(cortexClient.getResponderByName) getOrElse
Future.failed(BadRequestError("responder is missing"))
}

def getCortexClient: Future[(CortexClient, Worker)] = {
def getCortexClient: Future[(CortexClient, Responder)] = {
fields.getString("cortexId")
.map { cortexId
cortexConfig
.instances
.find(_.name == cortexId)
.fold[Future[(CortexClient, Worker)]](Future.failed(NotFoundError(s"cortex $cortexId not found"))) { c
getWorker(c).map(c -> _)
.fold[Future[(CortexClient, Responder)]](Future.failed(NotFoundError(s"cortex $cortexId not found"))) { c
getResponder(c).map(c -> _)
}
}
.getOrElse {
Future.firstCompletedOf {
cortexConfig.instances.map(c getWorker(c).map(c -> _))
cortexConfig.instances.map(c getResponder(c).map(c -> _))
}
}
}

for {
objectType fields.getString("objectType").fold[Future[String]](Future.failed(MissingAttributeError("action.objectType")))(Future.successful)
objectId fields.getString("objectId").fold[Future[String]](Future.failed(MissingAttributeError("action.objectId")))(Future.successful)
(cortexClient, worker) getCortexClient
(cortexClient, responder) getCortexClient
tlp = fields.getLong("tlp").getOrElse(2L)
message = fields.getString("message").getOrElse("")
parameters = fields.getValue("parameters") match {
Expand All @@ -186,7 +186,7 @@ class CortexActionSrv @Inject() (
entity getEntity(objectType, objectId)
entityJson auxSrv(entity, 10, withStats = false, removeUnaudited = true)
jobJson cortexClient.execute(
worker.id,
responder.id,
s"thehive:$objectType",
entityJson,
tlp,
Expand All @@ -196,9 +196,9 @@ class CortexActionSrv @Inject() (
action createSrv[ActionModel, Action](actionModel, Fields.empty
.set("objectType", entity.model.modelName)
.set("objectId", entity.id)
.set("workerId", job.analyzerId)
.set("workerName", job.analyzerName)
.set("workerDefinition", job.analyzerDefinition)
.set("responderId", job.analyzerId)
.set("responderName", job.analyzerName)
.set("responderDefinition", job.analyzerDefinition)
//.set("status", JobStatus.InProgress)
.set("objectType", objectType)
.set("objectId", objectId)
Expand Down
24 changes: 12 additions & 12 deletions thehive-cortex/app/connectors/cortex/services/CortexClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,22 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
.recoverWith { case _ getAnalyzerByName(analyzerId) } // if get analyzer using cortex2 API fails, try using legacy API
}

def getWorkerById(workerId: String)(implicit ec: ExecutionContext): Future[Worker] = {
request(s"api/analyzer/$workerId", _.get, _.json.as[Worker]).map(_.addCortexId(name))
def getResponderById(responderId: String)(implicit ec: ExecutionContext): Future[Responder] = {
request(s"api/responder/$responderId", _.get, _.json.as[Responder]).map(_.addCortexId(name))
}

def getWorkerByName(workerName: String)(implicit ec: ExecutionContext): Future[Worker] = {
def getResponderByName(responderName: String)(implicit ec: ExecutionContext): Future[Responder] = {
val searchRequest = Json.obj(
"query" -> Json.obj(
"_field" -> "name",
"_value" -> workerName),
"_value" -> responderName),
"range" -> "0-1")
request(s"api/analyzer/_search", _.post(searchRequest),
_.json.as[Seq[Worker]])
request(s"api/responder/_search", _.post(searchRequest),
_.json.as[Seq[Responder]])
.flatMap { analyzers
analyzers.headOption
.fold[Future[Worker]](Future.failed(NotFoundError(s"worker $workerName not found"))) { worker
Future.successful(worker.addCortexId(name))
.fold[Future[Responder]](Future.failed(NotFoundError(s"responder $responderName not found"))) { responder
Future.successful(responder.addCortexId(name))
}
}
}
Expand All @@ -140,8 +140,8 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
request(s"api/analyzer?range=all", _.get, _.json.as[Seq[Analyzer]]).map(_.map(_.copy(cortexIds = List(name))))
}

def findWorkers(query: JsObject)(implicit ec: ExecutionContext): Future[Seq[Worker]] = {
request(s"api/analyzer/_search?range=all", _.post(Json.obj("query" -> query)), _.json.as[Seq[Worker]]).map(_.map(_.addCortexId(name)))
def findResponders(query: JsObject)(implicit ec: ExecutionContext): Future[Seq[Responder]] = {
request(s"api/responder/_search?range=all", _.post(Json.obj("query" -> query)), _.json.as[Seq[Responder]]).map(_.map(_.addCortexId(name)))
}

def analyze(analyzerId: String, artifact: CortexArtifact)(implicit ec: ExecutionContext): Future[JsValue] = {
Expand All @@ -157,7 +157,7 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
}

def execute(
workerId: String,
responderId: String,
dataType: String,
data: JsValue,
tlp: Long,
Expand All @@ -169,7 +169,7 @@ class CortexClient(val name: String, baseUrl: String, authentication: Option[Cor
"tlp" -> tlp,
"message" -> message,
"parameters" -> parameters)
request(s"api/analyzer/$workerId/run", _.post(body), _.json.as[JsObject])
request(s"api/analyzer/$responderId/run", _.post(body), _.json.as[JsObject])
}

def listAnalyzerForType(dataType: String)(implicit ec: ExecutionContext): Future[Seq[Analyzer]] = {
Expand Down

0 comments on commit f199ac6

Please sign in to comment.