diff --git a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala index bb636f65a..d79d2db9b 100644 --- a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala +++ b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala @@ -1,13 +1,13 @@ package org.thp.cortex.controllers -import javax.inject.{ Inject, Singleton } -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.ExecutionContext import play.api.libs.json.{ JsNumber, JsObject, JsString, Json } import play.api.mvc.{ AbstractController, Action, AnyContent, ControllerComponents } import akka.stream.Materializer import akka.stream.scaladsl.Sink +import javax.inject.{ Inject, Singleton } import org.thp.cortex.models.{ Roles, Worker, WorkerDefinition } import org.thp.cortex.services.{ UserSrv, WorkerSrv } @@ -33,15 +33,13 @@ class AnalyzerCtrl @Inject() ( val sort = request.body.getStrings("sort").getOrElse(Nil) val isAdmin = request.roles.contains(Roles.orgAdmin) val (analyzers, analyzerTotal) = workerSrv.findAnalyzersForUser(request.userId, query, range, sort) - val enrichedAnalyzers = analyzers.mapAsync(2)(analyzerJson(isAdmin)) - renderer.toOutput(OK, enrichedAnalyzers, analyzerTotal) + renderer.toOutput(OK, analyzers.map(analyzerJson(isAdmin)), analyzerTotal) } def get(analyzerId: String): Action[AnyContent] = authenticated(Roles.read).async { request ⇒ val isAdmin = request.roles.contains(Roles.orgAdmin) workerSrv.getForUser(request.userId, analyzerId) - .flatMap(analyzerJson(isAdmin)) - .map(renderer.toOutput(OK, _)) + .map(a => renderer.toOutput(OK, analyzerJson(isAdmin)(a))) } private val emptyAnalyzerDefinitionJson = Json.obj( @@ -66,14 +64,11 @@ class AnalyzerCtrl @Inject() ( } + ("analyzerDefinitionId" → JsString(analyzer.workerDefinitionId())) // For compatibility reason } - private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): Future[JsObject] = { - workerSrv.getDefinition(analyzer.workerDefinitionId()) - .map(analyzerDefinition ⇒ analyzerJson(analyzer, Some(analyzerDefinition))) - .recover { case _ ⇒ analyzerJson(analyzer, None) } - .map { - case a if isAdmin ⇒ a + ("configuration" → Json.parse(analyzer.configuration())) - case a ⇒ a - } + private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): JsObject = { + if (isAdmin) + analyzer.toJson + ("configuration" → Json.parse(analyzer.configuration())) + else + analyzer.toJson } def listForType(dataType: String): Action[AnyContent] = authenticated(Roles.read).async { request ⇒ @@ -117,7 +112,6 @@ class AnalyzerCtrl @Inject() ( for { analyzer ← workerSrv.getForUser(request.userId, analyzerId) updatedAnalyzer ← workerSrv.update(analyzer, request.body) - updatedAnalyzerJson ← analyzerJson(isAdmin = true)(updatedAnalyzer) - } yield renderer.toOutput(OK, updatedAnalyzerJson) + } yield renderer.toOutput(OK, analyzerJson(isAdmin = true)(updatedAnalyzer)) } } \ No newline at end of file diff --git a/app/org/thp/cortex/models/Worker.scala b/app/org/thp/cortex/models/Worker.scala index 5caf6d679..50d6a8903 100644 --- a/app/org/thp/cortex/models/Worker.scala +++ b/app/org/thp/cortex/models/Worker.scala @@ -28,10 +28,17 @@ object WorkerType extends Enumeration with HiveEnumeration { trait WorkerAttributes { _: AttributeDef ⇒ val workerId = attribute("_id", F.stringFmt, "Worker id", O.model) val name = attribute("name", F.stringFmt, "Worker name") + val vers = attribute("version", F.stringFmt, "Worker version") val workerDefinitionId = attribute("workerDefinitionId", F.stringFmt, "Worker definition id", O.readonly) val description = attribute("description", F.textFmt, "Worker description") + val author = attribute("author", F.textFmt, "Worker author") + val url = attribute("url", F.textFmt, "Worker url") + val license = attribute("license", F.textFmt, "Worker license") + val command = optionalAttribute("command", F.textFmt, "Worker command") + val dockerImage = optionalAttribute("dockerImage", F.textFmt, "Worker docker image") val dataTypeList = multiAttribute("dataTypeList", F.stringFmt, "List of data type this worker can manage") val configuration = attribute("configuration", F.textFmt, "Configuration of the worker", O.sensitive) + val baseConfig = attribute("baseConfig", F.stringFmt, description="Base configuration key") val rate = optionalAttribute("rate", F.numberFmt, "Number ") val rateUnit = optionalAttribute("rateUnit", F.enumFmt(RateUnit), "") val jobCache = optionalAttribute("jobCache", F.numberFmt, "") diff --git a/app/org/thp/cortex/models/WorkerDefinition.scala b/app/org/thp/cortex/models/WorkerDefinition.scala index afced08eb..fafecc456 100644 --- a/app/org/thp/cortex/models/WorkerDefinition.scala +++ b/app/org/thp/cortex/models/WorkerDefinition.scala @@ -1,8 +1,6 @@ package org.thp.cortex.models -import java.nio.file.Path - -import scala.util.{ Failure, Success, Try } +import java.nio.file.{ Path, Paths } import play.api.Logger import play.api.libs.functional.syntax._ @@ -19,7 +17,7 @@ import org.elastic4play.{ AttributeError, InvalidFormatAttributeError, MissingAt object WorkerConfigItemType extends Enumeration with HiveEnumeration { type Type = Value - val string, number, boolean = Value + val text, string, number, boolean = Value implicit val reads: Format[WorkerConfigItemType.Type] = enumFormat(this) } @@ -37,11 +35,11 @@ case class ConfigurationDefinitionItem( private def check(v: JsValue): JsValue Or Every[AttributeError] = { import WorkerConfigItemType._ v match { - case _: JsString if `type` == string ⇒ Good(v) - case _: JsNumber if `type` == number ⇒ Good(v) - case _: JsBoolean if `type` == boolean ⇒ Good(v) - case JsNull if !isRequired ⇒ Good(v) - case _ ⇒ Bad(One(InvalidFormatAttributeError(s"$name[]", `type`.toString, JsonInputValue(v)))) + case _: JsString if `type` == string || `type` == text ⇒ Good(v) + case _: JsNumber if `type` == number ⇒ Good(v) + case _: JsBoolean if `type` == boolean ⇒ Good(v) + case JsNull if !isRequired ⇒ Good(v) + case _ ⇒ Bad(One(InvalidFormatAttributeError(s"$name[]", `type`.toString, JsonInputValue(v)))) } } @@ -80,8 +78,8 @@ case class WorkerDefinition( author: String, url: String, license: String, - baseDirectory: Path, - command: String, + image: Option[String], + command: Option[Path], baseConfiguration: Option[String], configurationItems: Seq[ConfigurationDefinitionItem], configuration: JsObject, @@ -94,28 +92,7 @@ case class WorkerDefinition( object WorkerDefinition { lazy val logger = Logger(getClass) - def fromPath(definitionFile: Path, workerType: WorkerType.Type): Try[WorkerDefinition] = { - readJsonFile(definitionFile) - .recoverWith { - case error ⇒ - logger.warn(s"Load of worker $definitionFile fails", error) - Failure(error) - } - .map(_.validate(WorkerDefinition.reads(definitionFile.getParent.getParent, workerType))) - .flatMap { - case JsSuccess(workerDefinition, _) ⇒ Success(workerDefinition) - case JsError(errors) ⇒ sys.error(s"Json description file $definitionFile is invalid: $errors") - } - } - - private def readJsonFile(file: Path): Try[JsObject] = { - val source = scala.io.Source.fromFile(file.toFile) - val json = Try(Json.parse(source.mkString).as[JsObject]) - source.close() - json - } - - def reads(path: Path, workerType: WorkerType.Type): Reads[WorkerDefinition] = ( + def singleReads(workerType: WorkerType.Type): Reads[WorkerDefinition] = ( (JsPath \ "name").read[String] and (JsPath \ "version").read[String] and (JsPath \ "description").read[String] and @@ -123,12 +100,18 @@ object WorkerDefinition { (JsPath \ "author").read[String] and (JsPath \ "url").read[String] and (JsPath \ "license").read[String] and - Reads.pure(path) and - (JsPath \ "command").read[String] and + (JsPath \ "image").readNullable[String] and + (JsPath \ "command").readNullable[String].map(_.map(Paths.get(_))) and (JsPath \ "baseConfig").readNullable[String] and (JsPath \ "configurationItems").read[Seq[ConfigurationDefinitionItem]].orElse(Reads.pure(Nil)) and (JsPath \ "config").read[JsObject].orElse(Reads.pure(JsObject.empty)) and Reads.pure(workerType))(WorkerDefinition.apply _) + + def reads(workerType: WorkerType.Type): Reads[List[WorkerDefinition]] = { + val reads = singleReads(workerType) + reads.map(List(_)) orElse Reads.list(reads) + } + implicit val writes: Writes[WorkerDefinition] = Writes[WorkerDefinition] { workerDefinition ⇒ Json.obj( "id" → workerDefinition.id, diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 31f906248..813ff9a49 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -1,13 +1,15 @@ package org.thp.cortex.services +import java.net.URL import java.nio.file.{ Files, Path, Paths } -import javax.inject.{ Inject, Singleton } +import javax.inject.{ Inject, Singleton } import scala.collection.JavaConverters._ import scala.concurrent.{ ExecutionContext, Future } import scala.util.Try -import play.api.libs.json.{ JsObject, JsString } +import play.api.libs.json.{ JsObject, JsString, Json } +import play.api.libs.ws.WSClient import play.api.{ Configuration, Logger } import akka.NotUsed @@ -24,9 +26,8 @@ import org.scalactic.Accumulation._ import org.elastic4play.database.ModifyConfig @Singleton -class WorkerSrv( - analyzersPaths: Seq[Path], - respondersPaths: Seq[Path], +class WorkerSrv @Inject() ( + config: Configuration, workerModel: WorkerModel, organizationSrv: OrganizationSrv, userSrv: UserSrv, @@ -35,35 +36,17 @@ class WorkerSrv( updateSrv: UpdateSrv, deleteSrv: DeleteSrv, findSrv: FindSrv, + ws: WSClient, implicit val ec: ExecutionContext, implicit val mat: Materializer) { - @Inject() def this( - config: Configuration, - workerModel: WorkerModel, - organizationSrv: OrganizationSrv, - userSrv: UserSrv, - createSrv: CreateSrv, - getSrv: GetSrv, - updateSrv: UpdateSrv, - deleteSrv: DeleteSrv, - findSrv: FindSrv, - ec: ExecutionContext, - mat: Materializer) = this( - config.get[Seq[String]]("analyzer.path").map(p ⇒ Paths.get(p)), - config.get[Seq[String]]("responder.path").map(p ⇒ Paths.get(p)), - workerModel, - organizationSrv, - userSrv, - createSrv, - getSrv, - updateSrv, - deleteSrv, - findSrv, - ec, - mat) - private lazy val logger = Logger(getClass) + private val analyzersURLs: Seq[String] = config.getDeprecated[Seq[String]]("analyzer.url", "analyzer.path") + private val respondersURLs: Seq[String] = config.getDeprecated[Seq[String]]("responder.url", "responder.path") + private val runners: Seq[String] = config.getOptional[Seq[String]]("runners").getOrElse(Seq("docker", "process")) + private val dockerRunnerEnabled: Boolean = runners.contains("docker") + private val processRunnerEnabled: Boolean = runners.contains("process") + private var workerMap = Map.empty[String, WorkerDefinition] private object workerMapLock @@ -135,32 +118,80 @@ class WorkerSrv( } def rescan(): Unit = { - scan(analyzersPaths.map(_ → WorkerType.analyzer) ++ - respondersPaths.map(_ → WorkerType.responder)) + scan(analyzersURLs.map(_ → WorkerType.analyzer) ++ + respondersURLs.map(_ → WorkerType.responder)) } - def scan(workerPaths: Seq[(Path, WorkerType.Type)]): Unit = { - val workers = (for { - (workerPath, workerType) ← workerPaths - workerDir ← Try(Files.newDirectoryStream(workerPath).asScala).getOrElse { - logger.warn(s"Worker directory ($workerPath) is not found") - Nil + def scan(workerUrls: Seq[(String, WorkerType.Type)]): Unit = { + def readUrl(url: URL, workerType: WorkerType.Type): Future[Seq[WorkerDefinition]] = { + url.getProtocol match { + case "file" ⇒ Future.successful(readFile(Paths.get(url.toURI), workerType)) + case "http" | "https" if !dockerRunnerEnabled ⇒ + logger.warn(s"$workerType URL $url ignored because Docker runner is disabled") + Future.successful(Nil) + case "http" | "https" ⇒ + val reads = WorkerDefinition.reads(workerType) + ws.url(url.toString).get().map(response ⇒ response.json.as(reads)) + .map(_.filterNot(_.command.isDefined)) + } + } + + def readFile(path: Path, workerType: WorkerType.Type): Seq[WorkerDefinition] = { + val reads = WorkerDefinition.reads(workerType) + val source = scala.io.Source.fromFile(path.toFile) + lazy val basePath = path.getParent.getParent + val workerDefinitions = + for { + w ← Try(source.mkString).map(Json.parse(_).as(reads)).getOrElse { + logger.error(s"File $path has invalid format") + Nil + } + command = w.command.map(cmd ⇒ basePath.resolve(cmd)) + if command.isEmpty || command.exists(_.normalize().startsWith(basePath)) + } yield w.copy(command = command) + source.close() + workerDefinitions.filter { + case w if w.command.isDefined && processRunnerEnabled ⇒ true + case w if w.image.isDefined && dockerRunnerEnabled ⇒ true + case w ⇒ + val reason = if (w.command.isDefined) "process runner is disabled" + else if (w.image.isDefined) "Docker runner is disabled" + else "it doesn't have image nor command" + + logger.warn(s"$workerType ${w.name} is disabled because $reason") + false } - if Files.isDirectory(workerDir) - infoFile ← Files.newDirectoryStream(workerDir, "*.json").asScala - workerDefinition ← WorkerDefinition.fromPath(infoFile, workerType).fold( - error ⇒ { - logger.warn("Worker definition file read error", error) - Nil - }, - ad ⇒ Seq(ad)) - } yield workerDefinition.id → workerDefinition) - .toMap - - workerMapLock.synchronized { - workerMap = workers } - logger.info(s"New worker list:\n\n\t${workerMap.values.map(a ⇒ s"${a.name} ${a.version}").mkString("\n\t")}\n") + + def readDirectory(path: Path, workerType: WorkerType.Type): Seq[WorkerDefinition] = { + for { + workerDir ← Files.newDirectoryStream(path).asScala.toSeq + infoFile ← Files.newDirectoryStream(workerDir, "*.json").asScala + workerDefinition ← readFile(infoFile, workerType) + } yield workerDefinition + } + + Future + .traverse(workerUrls) { + case (workerUrl, workerType) ⇒ + Future(new URL(workerUrl)).flatMap(readUrl(_, workerType)) + .recover { + case _ ⇒ + val path = Paths.get(workerUrl) + if (Files.isRegularFile(path)) readFile(path, workerType) + else if (Files.isDirectory(path)) readDirectory(path, workerType) + else { + logger.warn(s"Worker path ($workerUrl) is not found") + Nil + } + } + } + .foreach { worker ⇒ + val wmap = worker.flatten.map(w ⇒ w.id -> w).toMap + workerMapLock.synchronized(workerMap = wmap) + logger.info(s"New worker list:\n\n\t${workerMap.values.map(a ⇒ s"${a.name} ${a.version}").mkString("\n\t")}\n") + } + } def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit authContext: AuthContext): Future[Worker] = { @@ -181,6 +212,13 @@ class WorkerSrv( createSrv[WorkerModel, Worker, Organization](workerModel, organization, workerFields .set("workerDefinitionId", workerDefinition.id) .set("description", workerDefinition.description) + .set("author", workerDefinition.author) + .set("version", workerDefinition.version) + .set("dockerImage", workerDefinition.image.map(JsString)) + .set("command", workerDefinition.command.map(p ⇒ JsString(p.toString))) + .set("url", workerDefinition.url) + .set("license", workerDefinition.license) + .set("baseConfig", workerDefinition.baseConfiguration.map(JsString.apply)) .set("configuration", cfg.toString) .set("type", workerDefinition.tpe.toString) .addIfAbsent("dataTypeList", StringInputValue(workerDefinition.dataTypeList)))