Skip to content

Commit

Permalink
#175 Include worker definition data in database
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Mar 11, 2019
1 parent c0353f1 commit cbe7a69
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 103 deletions.
26 changes: 10 additions & 16 deletions app/org/thp/cortex/controllers/AnalyzerCtrl.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package org.thp.cortex.controllers

import javax.inject.{ Inject, Singleton }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.ExecutionContext

import play.api.libs.json.{ JsNumber, JsObject, JsString, Json }
import play.api.mvc.{ AbstractController, Action, AnyContent, ControllerComponents }

import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models.{ Roles, Worker, WorkerDefinition }
import org.thp.cortex.services.{ UserSrv, WorkerSrv }

Expand All @@ -33,15 +33,13 @@ class AnalyzerCtrl @Inject() (
val sort = request.body.getStrings("sort").getOrElse(Nil)
val isAdmin = request.roles.contains(Roles.orgAdmin)
val (analyzers, analyzerTotal) = workerSrv.findAnalyzersForUser(request.userId, query, range, sort)
val enrichedAnalyzers = analyzers.mapAsync(2)(analyzerJson(isAdmin))
renderer.toOutput(OK, enrichedAnalyzers, analyzerTotal)
renderer.toOutput(OK, analyzers.map(analyzerJson(isAdmin)), analyzerTotal)
}

def get(analyzerId: String): Action[AnyContent] = authenticated(Roles.read).async { request
val isAdmin = request.roles.contains(Roles.orgAdmin)
workerSrv.getForUser(request.userId, analyzerId)
.flatMap(analyzerJson(isAdmin))
.map(renderer.toOutput(OK, _))
.map(a => renderer.toOutput(OK, analyzerJson(isAdmin)(a)))
}

private val emptyAnalyzerDefinitionJson = Json.obj(
Expand All @@ -66,14 +64,11 @@ class AnalyzerCtrl @Inject() (
} + ("analyzerDefinitionId" JsString(analyzer.workerDefinitionId())) // For compatibility reason
}

private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): Future[JsObject] = {
workerSrv.getDefinition(analyzer.workerDefinitionId())
.map(analyzerDefinition analyzerJson(analyzer, Some(analyzerDefinition)))
.recover { case _ analyzerJson(analyzer, None) }
.map {
case a if isAdmin a + ("configuration" Json.parse(analyzer.configuration()))
case a a
}
private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): JsObject = {
if (isAdmin)
analyzer.toJson + ("configuration" Json.parse(analyzer.configuration()))
else
analyzer.toJson
}

def listForType(dataType: String): Action[AnyContent] = authenticated(Roles.read).async { request
Expand Down Expand Up @@ -117,7 +112,6 @@ class AnalyzerCtrl @Inject() (
for {
analyzer workerSrv.getForUser(request.userId, analyzerId)
updatedAnalyzer workerSrv.update(analyzer, request.body)
updatedAnalyzerJson analyzerJson(isAdmin = true)(updatedAnalyzer)
} yield renderer.toOutput(OK, updatedAnalyzerJson)
} yield renderer.toOutput(OK, analyzerJson(isAdmin = true)(updatedAnalyzer))
}
}
7 changes: 7 additions & 0 deletions app/org/thp/cortex/models/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ object WorkerType extends Enumeration with HiveEnumeration {
trait WorkerAttributes { _: AttributeDef
val workerId = attribute("_id", F.stringFmt, "Worker id", O.model)
val name = attribute("name", F.stringFmt, "Worker name")
val vers = attribute("version", F.stringFmt, "Worker version")
val workerDefinitionId = attribute("workerDefinitionId", F.stringFmt, "Worker definition id", O.readonly)
val description = attribute("description", F.textFmt, "Worker description")
val author = attribute("author", F.textFmt, "Worker author")
val url = attribute("url", F.textFmt, "Worker url")
val license = attribute("license", F.textFmt, "Worker license")
val command = optionalAttribute("command", F.textFmt, "Worker command")
val dockerImage = optionalAttribute("dockerImage", F.textFmt, "Worker docker image")
val dataTypeList = multiAttribute("dataTypeList", F.stringFmt, "List of data type this worker can manage")
val configuration = attribute("configuration", F.textFmt, "Configuration of the worker", O.sensitive)
val baseConfig = attribute("baseConfig", F.stringFmt, description="Base configuration key")
val rate = optionalAttribute("rate", F.numberFmt, "Number ")
val rateUnit = optionalAttribute("rateUnit", F.enumFmt(RateUnit), "")
val jobCache = optionalAttribute("jobCache", F.numberFmt, "")
Expand Down
53 changes: 18 additions & 35 deletions app/org/thp/cortex/models/WorkerDefinition.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.thp.cortex.models

import java.nio.file.Path

import scala.util.{ Failure, Success, Try }
import java.nio.file.{ Path, Paths }

import play.api.Logger
import play.api.libs.functional.syntax._
Expand All @@ -19,7 +17,7 @@ import org.elastic4play.{ AttributeError, InvalidFormatAttributeError, MissingAt

object WorkerConfigItemType extends Enumeration with HiveEnumeration {
type Type = Value
val string, number, boolean = Value
val text, string, number, boolean = Value
implicit val reads: Format[WorkerConfigItemType.Type] = enumFormat(this)
}

Expand All @@ -37,11 +35,11 @@ case class ConfigurationDefinitionItem(
private def check(v: JsValue): JsValue Or Every[AttributeError] = {
import WorkerConfigItemType._
v match {
case _: JsString if `type` == string Good(v)
case _: JsNumber if `type` == number Good(v)
case _: JsBoolean if `type` == boolean Good(v)
case JsNull if !isRequired Good(v)
case _ Bad(One(InvalidFormatAttributeError(s"$name[]", `type`.toString, JsonInputValue(v))))
case _: JsString if `type` == string || `type` == text Good(v)
case _: JsNumber if `type` == number Good(v)
case _: JsBoolean if `type` == boolean Good(v)
case JsNull if !isRequired Good(v)
case _ Bad(One(InvalidFormatAttributeError(s"$name[]", `type`.toString, JsonInputValue(v))))
}
}

Expand Down Expand Up @@ -80,8 +78,8 @@ case class WorkerDefinition(
author: String,
url: String,
license: String,
baseDirectory: Path,
command: String,
image: Option[String],
command: Option[Path],
baseConfiguration: Option[String],
configurationItems: Seq[ConfigurationDefinitionItem],
configuration: JsObject,
Expand All @@ -94,41 +92,26 @@ case class WorkerDefinition(
object WorkerDefinition {
lazy val logger = Logger(getClass)

def fromPath(definitionFile: Path, workerType: WorkerType.Type): Try[WorkerDefinition] = {
readJsonFile(definitionFile)
.recoverWith {
case error
logger.warn(s"Load of worker $definitionFile fails", error)
Failure(error)
}
.map(_.validate(WorkerDefinition.reads(definitionFile.getParent.getParent, workerType)))
.flatMap {
case JsSuccess(workerDefinition, _) Success(workerDefinition)
case JsError(errors) sys.error(s"Json description file $definitionFile is invalid: $errors")
}
}

private def readJsonFile(file: Path): Try[JsObject] = {
val source = scala.io.Source.fromFile(file.toFile)
val json = Try(Json.parse(source.mkString).as[JsObject])
source.close()
json
}

def reads(path: Path, workerType: WorkerType.Type): Reads[WorkerDefinition] = (
def singleReads(workerType: WorkerType.Type): Reads[WorkerDefinition] = (
(JsPath \ "name").read[String] and
(JsPath \ "version").read[String] and
(JsPath \ "description").read[String] and
(JsPath \ "dataTypeList").read[Seq[String]].orElse(Reads.pure(Nil)) and
(JsPath \ "author").read[String] and
(JsPath \ "url").read[String] and
(JsPath \ "license").read[String] and
Reads.pure(path) and
(JsPath \ "command").read[String] and
(JsPath \ "image").readNullable[String] and
(JsPath \ "command").readNullable[String].map(_.map(Paths.get(_))) and
(JsPath \ "baseConfig").readNullable[String] and
(JsPath \ "configurationItems").read[Seq[ConfigurationDefinitionItem]].orElse(Reads.pure(Nil)) and
(JsPath \ "config").read[JsObject].orElse(Reads.pure(JsObject.empty)) and
Reads.pure(workerType))(WorkerDefinition.apply _)

def reads(workerType: WorkerType.Type): Reads[List[WorkerDefinition]] = {
val reads = singleReads(workerType)
reads.map(List(_)) orElse Reads.list(reads)
}

implicit val writes: Writes[WorkerDefinition] = Writes[WorkerDefinition] { workerDefinition
Json.obj(
"id" workerDefinition.id,
Expand Down
142 changes: 90 additions & 52 deletions app/org/thp/cortex/services/WorkerSrv.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.thp.cortex.services

import java.net.URL
import java.nio.file.{ Files, Path, Paths }
import javax.inject.{ Inject, Singleton }

import javax.inject.{ Inject, Singleton }
import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

import play.api.libs.json.{ JsObject, JsString }
import play.api.libs.json.{ JsObject, JsString, Json }
import play.api.libs.ws.WSClient
import play.api.{ Configuration, Logger }

import akka.NotUsed
Expand All @@ -24,9 +26,8 @@ import org.scalactic.Accumulation._
import org.elastic4play.database.ModifyConfig

@Singleton
class WorkerSrv(
analyzersPaths: Seq[Path],
respondersPaths: Seq[Path],
class WorkerSrv @Inject() (
config: Configuration,
workerModel: WorkerModel,
organizationSrv: OrganizationSrv,
userSrv: UserSrv,
Expand All @@ -35,35 +36,17 @@ class WorkerSrv(
updateSrv: UpdateSrv,
deleteSrv: DeleteSrv,
findSrv: FindSrv,
ws: WSClient,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {

@Inject() def this(
config: Configuration,
workerModel: WorkerModel,
organizationSrv: OrganizationSrv,
userSrv: UserSrv,
createSrv: CreateSrv,
getSrv: GetSrv,
updateSrv: UpdateSrv,
deleteSrv: DeleteSrv,
findSrv: FindSrv,
ec: ExecutionContext,
mat: Materializer) = this(
config.get[Seq[String]]("analyzer.path").map(p Paths.get(p)),
config.get[Seq[String]]("responder.path").map(p Paths.get(p)),
workerModel,
organizationSrv,
userSrv,
createSrv,
getSrv,
updateSrv,
deleteSrv,
findSrv,
ec,
mat)

private lazy val logger = Logger(getClass)
private val analyzersURLs: Seq[String] = config.getDeprecated[Seq[String]]("analyzer.url", "analyzer.path")
private val respondersURLs: Seq[String] = config.getDeprecated[Seq[String]]("responder.url", "responder.path")
private val runners: Seq[String] = config.getOptional[Seq[String]]("runners").getOrElse(Seq("docker", "process"))
private val dockerRunnerEnabled: Boolean = runners.contains("docker")
private val processRunnerEnabled: Boolean = runners.contains("process")

private var workerMap = Map.empty[String, WorkerDefinition]

private object workerMapLock
Expand Down Expand Up @@ -135,32 +118,80 @@ class WorkerSrv(
}

def rescan(): Unit = {
scan(analyzersPaths.map(_ WorkerType.analyzer) ++
respondersPaths.map(_ WorkerType.responder))
scan(analyzersURLs.map(_ WorkerType.analyzer) ++
respondersURLs.map(_ WorkerType.responder))
}

def scan(workerPaths: Seq[(Path, WorkerType.Type)]): Unit = {
val workers = (for {
(workerPath, workerType) workerPaths
workerDir Try(Files.newDirectoryStream(workerPath).asScala).getOrElse {
logger.warn(s"Worker directory ($workerPath) is not found")
Nil
def scan(workerUrls: Seq[(String, WorkerType.Type)]): Unit = {
def readUrl(url: URL, workerType: WorkerType.Type): Future[Seq[WorkerDefinition]] = {
url.getProtocol match {
case "file" Future.successful(readFile(Paths.get(url.toURI), workerType))
case "http" | "https" if !dockerRunnerEnabled
logger.warn(s"$workerType URL $url ignored because Docker runner is disabled")
Future.successful(Nil)
case "http" | "https"
val reads = WorkerDefinition.reads(workerType)
ws.url(url.toString).get().map(response response.json.as(reads))
.map(_.filterNot(_.command.isDefined))
}
}

def readFile(path: Path, workerType: WorkerType.Type): Seq[WorkerDefinition] = {
val reads = WorkerDefinition.reads(workerType)
val source = scala.io.Source.fromFile(path.toFile)
lazy val basePath = path.getParent.getParent
val workerDefinitions =
for {
w Try(source.mkString).map(Json.parse(_).as(reads)).getOrElse {
logger.error(s"File $path has invalid format")
Nil
}
command = w.command.map(cmd basePath.resolve(cmd))
if command.isEmpty || command.exists(_.normalize().startsWith(basePath))
} yield w.copy(command = command)
source.close()
workerDefinitions.filter {
case w if w.command.isDefined && processRunnerEnabled true
case w if w.image.isDefined && dockerRunnerEnabled true
case w
val reason = if (w.command.isDefined) "process runner is disabled"
else if (w.image.isDefined) "Docker runner is disabled"
else "it doesn't have image nor command"

logger.warn(s"$workerType ${w.name} is disabled because $reason")
false
}
if Files.isDirectory(workerDir)
infoFile Files.newDirectoryStream(workerDir, "*.json").asScala
workerDefinition WorkerDefinition.fromPath(infoFile, workerType).fold(
error {
logger.warn("Worker definition file read error", error)
Nil
},
ad Seq(ad))
} yield workerDefinition.id workerDefinition)
.toMap

workerMapLock.synchronized {
workerMap = workers
}
logger.info(s"New worker list:\n\n\t${workerMap.values.map(a s"${a.name} ${a.version}").mkString("\n\t")}\n")

def readDirectory(path: Path, workerType: WorkerType.Type): Seq[WorkerDefinition] = {
for {
workerDir Files.newDirectoryStream(path).asScala.toSeq
infoFile Files.newDirectoryStream(workerDir, "*.json").asScala
workerDefinition readFile(infoFile, workerType)
} yield workerDefinition
}

Future
.traverse(workerUrls) {
case (workerUrl, workerType)
Future(new URL(workerUrl)).flatMap(readUrl(_, workerType))
.recover {
case _
val path = Paths.get(workerUrl)
if (Files.isRegularFile(path)) readFile(path, workerType)
else if (Files.isDirectory(path)) readDirectory(path, workerType)
else {
logger.warn(s"Worker path ($workerUrl) is not found")
Nil
}
}
}
.foreach { worker
val wmap = worker.flatten.map(w w.id -> w).toMap
workerMapLock.synchronized(workerMap = wmap)
logger.info(s"New worker list:\n\n\t${workerMap.values.map(a s"${a.name} ${a.version}").mkString("\n\t")}\n")
}

}

def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit authContext: AuthContext): Future[Worker] = {
Expand All @@ -181,6 +212,13 @@ class WorkerSrv(
createSrv[WorkerModel, Worker, Organization](workerModel, organization, workerFields
.set("workerDefinitionId", workerDefinition.id)
.set("description", workerDefinition.description)
.set("author", workerDefinition.author)
.set("version", workerDefinition.version)
.set("dockerImage", workerDefinition.image.map(JsString))
.set("command", workerDefinition.command.map(p JsString(p.toString)))
.set("url", workerDefinition.url)
.set("license", workerDefinition.license)
.set("baseConfig", workerDefinition.baseConfiguration.map(JsString.apply))
.set("configuration", cfg.toString)
.set("type", workerDefinition.tpe.toString)
.addIfAbsent("dataTypeList", StringInputValue(workerDefinition.dataTypeList)))
Expand Down

0 comments on commit cbe7a69

Please sign in to comment.