diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala new file mode 100644 index 000000000..e1193b885 --- /dev/null +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -0,0 +1,70 @@ +package org.thp.cortex.services + +import java.nio.file._ + +import scala.concurrent.{ ExecutionContext, Future } + +import play.api.{ Configuration, Logger } + +import com.spotify.docker.client.messages.HostConfig.Bind +import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig } +import com.spotify.docker.client.{ DefaultDockerClient, DockerClient } +import javax.inject.{ Inject, Singleton } +import org.thp.cortex.models._ + +@Singleton +class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) { + + def this() = this(DefaultDockerClient.fromEnv().build(), false) + + @Inject() + def this(config: Configuration) = this( + new DefaultDockerClient.Builder() + .apiVersion(config.getOptional[String]("docker.version").orNull) + .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) + .connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000)) + //.dockerCertificates() + .readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000)) + //.registryAuthSupplier() + .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) + .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) + .build(), + config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true)) + + lazy val logger = Logger(getClass) + + def run(jobDirectory: Path, dockerImage: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = { + import scala.collection.JavaConverters._ + // client.pull(dockerImage) + // ContainerConfig.builder().addVolume() + val hostConfig = HostConfig.builder() + .appendBinds(Bind.from(jobDirectory.toAbsolutePath.toString) + .to("/job") + .readOnly(false) + .build()) + .build() + val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") + val containerConfigBuilder = ContainerConfig + .builder() + .hostConfig(hostConfig) + .image(dockerImage) + .cmd("/job") + + val containerConfig = if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build() + else containerConfigBuilder.build() + val containerCreation = client.createContainer(containerConfig) + // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) + logger.info(s"Execute container ${containerCreation.id()}\n" + + s" image : $dockerImage\n" + + s" volume: ${jobDirectory.toAbsolutePath}:/job" + + Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)) + + client.startContainer(containerCreation.id()) + + Future { + client.waitContainer(containerCreation.id()) + () + } + } + +} diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala new file mode 100644 index 000000000..d809571ab --- /dev/null +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -0,0 +1,246 @@ +package org.thp.cortex.services + +import java.io.IOException +import java.nio.charset.StandardCharsets +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes +import java.util.Date + +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.Failure + +import play.api.libs.json._ +import play.api.{ Configuration, Logger } + +import akka.actor.ActorSystem +import akka.stream.Materializer +import akka.stream.scaladsl.FileIO +import javax.inject.Inject +import org.thp.cortex.models._ + +import org.elastic4play.BadRequestError +import org.elastic4play.controllers.{ Fields, FileInputValue } +import org.elastic4play.database.ModifyConfig +import org.elastic4play.services.{ AttachmentSrv, AuthContext, CreateSrv, UpdateSrv } + +class JobRunnerSrv @Inject() ( + config: Configuration, + reportModel: ReportModel, + artifactModel: ArtifactModel, + processJobRunnerSrv: ProcessJobRunnerSrv, + dockerJobRunnerSrv: DockerJobRunnerSrv, + workerSrv: WorkerSrv, + createSrv: CreateSrv, + updateSrv: UpdateSrv, + attachmentSrv: AttachmentSrv, + akkaSystem: ActorSystem, + implicit val ec: ExecutionContext, + implicit val mat: Materializer) { + + val logger = Logger(getClass) + lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer") + lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder") + + private val runners: Seq[String] = config + .getOptional[Seq[String]]("runners") + .getOrElse(Seq("docker", "process")) + .map(_.toLowerCase) + + private object deleteVisitor extends SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + + override def postVisitDirectory(dir: Path, e: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + + private def delete(directory: Path): Unit = try { + if (Files.exists(directory)) + Files.walkFileTree(directory, deleteVisitor) + () + } + catch { + case t: Throwable ⇒ logger.warn(s"Fail to remove temporary files ($directory) : $t") + } + + private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = { + val jobFolder = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), s"cortex-job-${job.id}-") + val inputJobFolder = Files.createDirectories(jobFolder.resolve("input")) + Files.createDirectories(jobFolder.resolve("output")) + + job.attachment() + .map { attachment ⇒ + val attachmentFile = Files.createTempFile(inputJobFolder, "attachment", "") + attachmentSrv.source(attachment.id).runWith(FileIO.toPath(attachmentFile)) + .flatMap { + case ioresult if ioresult.status.isSuccess ⇒ Future.successful(Some(attachmentFile)) + case ioresult ⇒ Future.failed(ioresult.getError) + } + } + .getOrElse(Future.successful(None)) + .map { + case Some(file) ⇒ + Json.obj( + "file" → file.toString, // FIXME set only the temporary file name + "filename" → job.attachment().get.name, + "contentType" → job.attachment().get.contentType) + case None if job.data().nonEmpty && job.tpe() == WorkerType.responder ⇒ + Json.obj("data" → Json.parse(job.data().get)) + case None if job.data().nonEmpty && job.tpe() == WorkerType.analyzer ⇒ + Json.obj("data" → job.data().get) + } + .map { artifact ⇒ + val proxy_http = (worker.config \ "proxy_http").asOpt[String].fold(JsObject.empty) { proxy ⇒ Json.obj("proxy" → Json.obj("http" → proxy)) } + val proxy_https = (worker.config \ "proxy_https").asOpt[String].fold(JsObject.empty) { proxy ⇒ Json.obj("proxy" → Json.obj("https" → proxy)) } + val config = worker.config.deepMerge(proxy_http).deepMerge(proxy_https) + (worker.config \ "cacerts").asOpt[String].foreach { cacerts ⇒ + val cacertsFile = jobFolder.resolve("input").resolve("cacerts") + Files.write(cacertsFile, cacerts.getBytes) + } + artifact + + ("dataType" → JsString(job.dataType())) + + ("tlp" → JsNumber(job.tlp())) + + ("pap" → JsNumber(job.pap())) + + ("message" → JsString(job.message().getOrElse(""))) + + ("parameters" → job.params) + + ("config" -> config) + } + .map { input ⇒ + Files.write(inputJobFolder.resolve("input.json"), input.toString.getBytes(StandardCharsets.UTF_8)) + jobFolder + } + .recoverWith { + case error ⇒ + delete(jobFolder) + Future.failed(error) + } + } + + private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext) = { + val outputFile = jobFolder.resolve("output").resolve("output.json") + if (Files.exists(outputFile)) { + val is = Files.newInputStream(outputFile) + val report = Json.parse(is) + is.close() + + val success = (report \ "success").asOpt[Boolean].getOrElse(false) + if (success) { + val fullReport = (report \ "full").as[JsObject].toString + val summaryReport = (report \ "summary").asOpt[JsObject].getOrElse(JsObject.empty).toString + val artifacts = (report \ "artifacts").asOpt[Seq[JsObject]].getOrElse(Nil) + val operations = (report \ "operations").asOpt[Seq[JsObject]].getOrElse(Nil) + val reportFields = Fields.empty + .set("full", fullReport) + .set("summary", summaryReport) + .set("operations", JsArray(operations).toString) + createSrv[ReportModel, Report, Job](reportModel, job, reportFields) + .flatMap { report ⇒ + Future.sequence { + for { + artifact ← artifacts + dataType ← (artifact \ "dataType").asOpt[String] + fields ← dataType match { + case "file" ⇒ + for { + name ← (artifact \ "filename").asOpt[String] + file ← (artifact \ "file").asOpt[String] + path = jobFolder.resolve("output").resolve(file) + if Files.exists(path) && path.getParent == jobFolder.resolve("output") + contentType = (artifact \ "contentType").asOpt[String].getOrElse("application/octet-stream") + fiv = FileInputValue(name, path, contentType) + } yield Fields(artifact - "filename" - "file" - "contentType").set("attachment", fiv) + case _ ⇒ Some(Fields(artifact)) + } + } yield createSrv[ArtifactModel, Artifact, Report](artifactModel, report, fields) + } + } + .transformWith { + case Failure(e) ⇒ endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e")) + case _ ⇒ endJob(job, JobStatus.Success) + } + } + else { + endJob(job, JobStatus.Failure, + (report \ "errorMessage").asOpt[String], + (report \ "input").asOpt[JsValue].map(_.toString)) + } + } + else { + endJob(job, JobStatus.Failure, Some(s"no output")) + } + } + + // private def fixArtifact(artifact: Fields): Fields = { + // def rename(oldName: String, newName: String): Fields ⇒ Fields = fields ⇒ + // fields.getValue(oldName).fold(fields)(v ⇒ fields.unset(oldName).set(newName, v)) + // + // rename("value", "data").andThen( + // rename("type", "dataType"))(artifact) + // } + + def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = { + prepareJobFolder(worker, job).flatMap { jobFolder ⇒ + val finishedJob = for { + workerDefinition ← workerSrv.getDefinition(worker.workerDefinitionId()) + executionContext = workerDefinition.tpe match { + case WorkerType.analyzer ⇒ analyzerExecutionContext + case WorkerType.responder ⇒ responderExecutionContext + } + _ ← startJob(job) + j ← runners + .foldLeft[Option[Future[Unit]]](None) { + case (None, "docker") ⇒ + worker.dockerImage() + .map(dockerImage ⇒ dockerJobRunnerSrv.run(jobFolder, dockerImage, job)(executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") + None + } + case (None, "process") ⇒ + + worker.command() + .map(command ⇒ processJobRunnerSrv.run(jobFolder, command, job)(executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") + None + } + case (j: Some[_], _) ⇒ j + case (None, runner) ⇒ + logger.warn(s"Unknown job runner: $runner") + None + + } + .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) + } yield j + finishedJob + .transformWith { r ⇒ + r.fold( + error ⇒ endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))), + _ ⇒ extractReport(jobFolder, job)) + } + //.andThen { case _ ⇒ delete(jobFolder) } + } + } + + private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) + + private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = { + val fields = Fields.empty + .set("status", JobStatus.InProgress.toString) + .set("startDate", Json.toJson(new Date)) + updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) + } + + private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit authContext: AuthContext): Future[Job] = { + val fields = Fields.empty + .set("status", status.toString) + .set("endDate", Json.toJson(new Date)) + .set("input", input.map(JsString.apply)) + .set("message", errorMessage.map(JsString.apply)) + updateSrv(job, fields, ModifyConfig.default) + } +} \ No newline at end of file diff --git a/app/org/thp/cortex/services/JobSrv.scala b/app/org/thp/cortex/services/JobSrv.scala index 829771955..711bbf544 100644 --- a/app/org/thp/cortex/services/JobSrv.scala +++ b/app/org/thp/cortex/services/JobSrv.scala @@ -1,29 +1,25 @@ package org.thp.cortex.services -import java.io.{ ByteArrayOutputStream, InputStream } -import java.nio.file.{ Files, Paths } import java.util.Date -import javax.inject.{ Inject, Singleton } +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success } + +import play.api.libs.json._ +import play.api.{ Configuration, Logger } + import akka.NotUsed -import akka.actor.ActorSystem import akka.stream.Materializer -import akka.stream.scaladsl.{ FileIO, Sink, Source } +import akka.stream.scaladsl.{ Sink, Source } +import javax.inject.{ Inject, Singleton } +import org.scalactic.Accumulation._ +import org.scalactic.{ Bad, Good, One, Or } +import org.thp.cortex.models._ import org.elastic4play._ import org.elastic4play.controllers._ -import org.elastic4play.database.ModifyConfig import org.elastic4play.services._ -import org.scalactic.Accumulation._ -import org.scalactic.{ Bad, Good, One, Or } -import org.thp.cortex.models._ -import play.api.libs.json._ -import play.api.{ Configuration, Logger } -import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future } -import scala.sys.process.{ Process, ProcessIO } -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } @Singleton class JobSrv( @@ -33,13 +29,11 @@ class JobSrv( artifactModel: ArtifactModel, workerSrv: WorkerSrv, userSrv: UserSrv, - getSrv: GetSrv, + jobRunnerSrv: JobRunnerSrv, createSrv: CreateSrv, - updateSrv: UpdateSrv, findSrv: FindSrv, deleteSrv: DeleteSrv, attachmentSrv: AttachmentSrv, - akkaSystem: ActorSystem, implicit val ec: ExecutionContext, implicit val mat: Materializer) { @@ -50,13 +44,11 @@ class JobSrv( artifactModel: ArtifactModel, workerSrv: WorkerSrv, userSrv: UserSrv, - getSrv: GetSrv, + jobRunnerSrv: JobRunnerSrv, createSrv: CreateSrv, - updateSrv: UpdateSrv, findSrv: FindSrv, deleteSrv: DeleteSrv, attachmentSrv: AttachmentSrv, - akkaSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) = this( configuration.getOptional[Duration]("cache.job").getOrElse(Duration.Zero), @@ -65,23 +57,15 @@ class JobSrv( artifactModel, workerSrv, userSrv, - getSrv, + jobRunnerSrv, createSrv, - updateSrv, findSrv, deleteSrv, attachmentSrv, - akkaSystem, - ec, mat) + ec, + mat) private lazy val logger = Logger(getClass) - private lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer") - private lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder") - private val osexec = - if (System.getProperty("os.name").toLowerCase.contains("win")) - (c: String) ⇒ s"""cmd /c $c""" - else - (c: String) ⇒ c runPreviousJobs() @@ -93,14 +77,11 @@ class JobSrv( .runForeach { job ⇒ (for { worker ← workerSrv.get(job.workerId()) - workerDefinition ← workerSrv.getDefinition(job.workerId()) - updatedJob ← run(workerDefinition, worker, job) + updatedJob ← jobRunnerSrv.run(worker, job) } yield updatedJob) .onComplete { case Success(j) ⇒ logger.info(s"Job ${job.id} has finished with status ${j.status()}") - case Failure(e) ⇒ - endJob(job, JobStatus.Failure, Some(e.getMessage), None) - logger.error(s"Job ${job.id} has failed", e) + case Failure(e) ⇒ logger.error(s"Job ${job.id} has failed", e) } } } @@ -285,15 +266,13 @@ class JobSrv( case Left(data) ⇒ fields.set("data", data) case Right(attachment) ⇒ fields.set("attachment", AttachmentInputValue(attachment)) } - workerSrv.getDefinition(worker.workerDefinitionId()).flatMap { workerDefinition ⇒ - createSrv[JobModel, Job](jobModel, fieldWithData).andThen { - case Success(job) ⇒ - run(workerDefinition, worker, job) - .onComplete { - case Success(j) ⇒ logger.info(s"Job ${job.id} has finished with status ${j.status()}") - case Failure(e) ⇒ logger.error(s"Job ${job.id} has failed", e) - } - } + createSrv[JobModel, Job](jobModel, fieldWithData).andThen { + case Success(job) ⇒ + jobRunnerSrv.run(worker, job) + .onComplete { + case Success(j) ⇒ logger.info(s"Job ${job.id} has finished with status ${j.status()}") + case Failure(e) ⇒ logger.error(s"Job ${job.id} has failed", e) + } } case false ⇒ Future.failed(RateLimitExceeded(worker)) @@ -344,73 +323,6 @@ class JobSrv( } } - private def fixArtifact(artifact: Fields): Fields = { - def rename(oldName: String, newName: String): Fields ⇒ Fields = fields ⇒ - fields.getValue(oldName).fold(fields)(v ⇒ fields.unset(oldName).set(newName, v)) - - rename("value", "data").andThen( - rename("type", "dataType"))(artifact) - } - - def run(workerDefinition: WorkerDefinition, worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = { - val executionContext = workerDefinition.tpe match { - case WorkerType.analyzer ⇒ analyzerExecutionContext - case WorkerType.responder ⇒ responderExecutionContext - } - buildInput(workerDefinition, worker, job) - .flatMap { input ⇒ - startJob(job) - var output = "" - var error = "" - try { - logger.info(s"Execute ${osexec(workerDefinition.command)} in ${workerDefinition.baseDirectory}") - Process(osexec(workerDefinition.command), workerDefinition.baseDirectory.toFile).run( - new ProcessIO( - { stdin ⇒ Try(stdin.write(input.toString.getBytes("UTF-8"))); stdin.close() }, - { stdout ⇒ output = readStream(stdout) }, - { stderr ⇒ error = readStream(stderr) })) - .exitValue() - val report = Json.parse(output).as[JsObject] - val success = (report \ "success").asOpt[Boolean].getOrElse(false) - if (success) { - val fullReport = (report \ "full").as[JsObject].toString - val summaryReport = (report \ "summary").asOpt[JsObject].getOrElse(JsObject.empty).toString - val artifacts = (report \ "artifacts").asOpt[Seq[JsObject]].getOrElse(Nil) - val operations = (report \ "operations").asOpt[Seq[JsObject]].getOrElse(Nil) - val reportFields = Fields.empty - .set("full", fullReport) - .set("summary", summaryReport) - .set("operations", JsArray(operations).toString) - createSrv[ReportModel, Report, Job](reportModel, job, reportFields) - .flatMap { report ⇒ - Future.traverse(artifacts) { artifact ⇒ - createSrv[ArtifactModel, Artifact, Report](artifactModel, report, fixArtifact(Fields(artifact))) - } - } - .transformWith { - case Failure(e) ⇒ endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e")) - case _ ⇒ endJob(job, JobStatus.Success) - } - } - else { - endJob(job, JobStatus.Failure, - (report \ "errorMessage").asOpt[String], - (report \ "input").asOpt[JsValue].map(_.toString)) - } - } - catch { - case NonFatal(_) ⇒ - val errorMessage = (error + output).take(8192) - endJob(job, JobStatus.Failure, Some(s"Invalid output\n$errorMessage")) - } - finally { - (input \ "file").asOpt[String].foreach { filename ⇒ - Files.deleteIfExists(Paths.get(filename)) - } - } - }(executionContext) - } - def getReport(jobId: String)(implicit authContext: AuthContext): Future[Report] = getForUser(authContext.userId, jobId).flatMap(getReport) def getReport(job: Job): Future[Report] = { @@ -419,73 +331,4 @@ class JobSrv( .runWith(Sink.headOption) .map(_.getOrElse(throw NotFoundError(s"Job ${job.id} has no report"))) } - - private def buildInput(workerDefinition: WorkerDefinition, worker: Worker, job: Job): Future[JsObject] = { - job.attachment() - .map { attachment ⇒ - val tempFile = Files.createTempFile(s"cortex-job-${job.id}-", "") - attachmentSrv.source(attachment.id).runWith(FileIO.toPath(tempFile)) - .flatMap { - case ioresult if ioresult.status.isSuccess ⇒ Future.successful(Some(tempFile)) - case ioresult ⇒ Future.failed(ioresult.getError) - } - } - .getOrElse(Future.successful(None)) - .map { - case Some(file) ⇒ - Json.obj( - "file" → file.toString, - "filename" → job.attachment().get.name, - "contentType" → job.attachment().get.contentType) - case None if job.data().nonEmpty && job.tpe() == WorkerType.responder ⇒ - Json.obj("data" → Json.parse(job.data().get)) - case None if job.data().nonEmpty && job.tpe() == WorkerType.analyzer ⇒ - Json.obj("data" → job.data().get) - } - .map { artifact ⇒ - (BaseConfig.global(worker.tpe()).items ++ BaseConfig.tlp.items ++ BaseConfig.pap.items ++ workerDefinition.configurationItems) - .validatedBy(_.read(worker.config)) - .map(cfg ⇒ Json.obj("config" → JsObject(cfg).deepMerge(workerDefinition.configuration))) - .map { cfg ⇒ - val proxy_http = (cfg \ "config" \ "proxy_http").asOpt[String].fold(JsObject.empty) { proxy ⇒ Json.obj("proxy" → Json.obj("http" → proxy)) } - val proxy_https = (cfg \ "config" \ "proxy_https").asOpt[String].fold(JsObject.empty) { proxy ⇒ Json.obj("proxy" → Json.obj("https" → proxy)) } - cfg.deepMerge(Json.obj("config" → proxy_http.deepMerge(proxy_https))) - } - .map(_ deepMerge artifact + - ("dataType" → JsString(job.dataType())) + - ("tlp" → JsNumber(job.tlp())) + - ("pap" → JsNumber(job.pap())) + - ("message" → JsString(job.message().getOrElse(""))) + - ("parameters" → job.params)) - .badMap(e ⇒ AttributeCheckingError("job", e.toSeq)) - .toTry - } - .flatMap(Future.fromTry) - } - - // - private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = { - val fields = Fields.empty - .set("status", JobStatus.InProgress.toString) - .set("startDate", Json.toJson(new Date)) - updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) - } - - private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit authContext: AuthContext): Future[Job] = { - val fields = Fields.empty - .set("status", status.toString) - .set("endDate", Json.toJson(new Date)) - .set("input", input.map(JsString.apply)) - .set("message", errorMessage.map(JsString.apply)) - updateSrv(job, fields, ModifyConfig.default) - } - - private def readStream(stream: InputStream): String = { - val out = new ByteArrayOutputStream() - val buffer = Array.ofDim[Byte](4096) - Stream.continually(stream.read(buffer)) - .takeWhile(_ != -1) - .foreach(out.write(buffer, 0, _)) - out.toString("UTF-8") - } } \ No newline at end of file diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala new file mode 100644 index 000000000..ad842cfdc --- /dev/null +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -0,0 +1,27 @@ +package org.thp.cortex.services + +import java.nio.file.{ Path, Paths } + +import scala.concurrent.{ ExecutionContext, Future } +import scala.sys.process.{ Process, ProcessLogger } + +import play.api.Logger + +import javax.inject.Singleton +import org.thp.cortex.models._ + +@Singleton +class ProcessJobRunnerSrv { + + lazy val logger = Logger(getClass) + + def run(jobDirectory: Path, command: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = { + Future { + val baseDirectory = Paths.get(command).getParent.getParent + logger.info(s"Execute $command in $baseDirectory") + Process(Seq(command, jobDirectory.toString), baseDirectory.toFile) + .run(ProcessLogger(s ⇒ logger.info(s" Job ${job.id}: $s"))).exitValue() + () + } + } +} diff --git a/test/resources/analyzers/echoAnalyzer/Dockerfile b/test/resources/analyzers/echoAnalyzer/Dockerfile new file mode 100644 index 000000000..f2ae0eb3e --- /dev/null +++ b/test/resources/analyzers/echoAnalyzer/Dockerfile @@ -0,0 +1,6 @@ +FROM debian:latest + +WORKDIR /analyzer +RUN apt update && apt install -y jq +COPY echoAnalyzer.sh echoAnalyzer/echoAnalyzer.sh +ENTRYPOINT ["echoAnalyzer/echoAnalyzer.sh"] \ No newline at end of file diff --git a/test/resources/analyzers/echoAnalyzer/echoAnalyzer.json b/test/resources/analyzers/echoAnalyzer/echoAnalyzer.json index 98c3117ff..4c7024114 100644 --- a/test/resources/analyzers/echoAnalyzer/echoAnalyzer.json +++ b/test/resources/analyzers/echoAnalyzer/echoAnalyzer.json @@ -55,5 +55,6 @@ ], "description": "Fake analyzer used for functional tests", "dataTypeList": ["domain", "thehive:case", "thehive:case_task", "thehive:case_artifact", "thehive:alert", "thehive:case_task_log"], - "command": "echoAnalyzer/echoAnalyzer.sh" + "command": "echoAnalyzer/echoAnalyzer.sh", + "image": "echo_analyzer" } diff --git a/test/resources/analyzers/echoAnalyzer/echoAnalyzer.sh b/test/resources/analyzers/echoAnalyzer/echoAnalyzer.sh index 39e505a92..0ccbdcc23 100755 --- a/test/resources/analyzers/echoAnalyzer/echoAnalyzer.sh +++ b/test/resources/analyzers/echoAnalyzer/echoAnalyzer.sh @@ -1,21 +1,40 @@ -#!/usr/bin/env bash +#!/bin/bash -ARTIFACT=$(cat) -DATA=$(jq .data <<< ${ARTIFACT}) -DATATYPE=$(jq .dataType <<< ${ARTIFACT}) +set -x -cat << EOF -{ - "success": true, - "summary": { - "taxonomies": [ - { "namespace": "test", "predicate": "data", "value": "echo", "level": "info" } - ] - }, - "full": ${ARTIFACT}, - "operations": [ - { "type": "AddTagToCase", "tag": "From Action Operation" }, - { "type": "CreateTask", "title": "task created by action", "description": "yop !" } - ] -} +echo starting with parameters: $* +for JOB +do + echo executing $JOB + if [[ -d "${JOB}" ]]; then + echo directory $JOB exists + if [[ -r "${JOB}/input/input.json" ]]; then + INPUT=$(cat ${JOB}/input/input.json) + else + INPUT="{}" + fi + echo input is $INPUT + DATA=$(jq .data <<< ${INPUT}) + DATATYPE=$(jq .dataType <<< ${INPUT}) + + echo building output + mkdir -p "${JOB}/output" + cat > "${JOB}/output/output.json" <<- EOF + { + "success": true, + "summary": { + "taxonomies": [ + { "namespace": "test", "predicate": "data", "value": "echo", "level": "info" } + ] + }, + "full": ${INPUT}, + "operations": [ + { "type": "AddTagToCase", "tag": "From Action Operation" }, + { "type": "CreateTask", "title": "task created by action", "description": "yop !" } + ] + } EOF + echo output is: + cat "${JOB}/output/output.json" + fi +done diff --git a/test/resources/analyzers/testAnalyzer/testAnalyzer.json b/test/resources/analyzers/testAnalyzer/testAnalyzer.json new file mode 100644 index 000000000..a74e9d57d --- /dev/null +++ b/test/resources/analyzers/testAnalyzer/testAnalyzer.json @@ -0,0 +1,13 @@ +{ + "name": "testAnalyzer", + "version": "1.0", + "author": "TheHive-Project", + "url": "https://github.com/thehive-project/thehive", + "license": "AGPL-V3", + "baseConfig": "testAnalyzer", + "config": {}, + "configurationItems": [], + "description": "Fake analyzer used for functional tests", + "dataTypeList": ["domain"], + "command": "testAnalyzer/testAnalyzer.py" +} diff --git a/test/resources/analyzers/testAnalyzer/testAnalyzer.py b/test/resources/analyzers/testAnalyzer/testAnalyzer.py new file mode 100755 index 000000000..e1d8df86e --- /dev/null +++ b/test/resources/analyzers/testAnalyzer/testAnalyzer.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +# encoding: utf-8 + +from cortexutils.analyzer import Analyzer +from cortexutils import runner + + +class TestAnalyzer(Analyzer): + + def artifacts(self, raw): + return [ + self.build_artifact("ip", "127.0.0.1", tags=["localhost"]), + self.build_artifact("file", "/etc/passwd", tlp=3) + ] + + def summary(self, raw): + return {"taxonomies": [self.build_taxonomy("info", "test", "data", "test")]} + + def run(self): + Analyzer.run(self) + + self.report({ + 'data': self.get_data(), + 'input': self._input + }) + + +if __name__ == '__main__': + runner(TestAnalyzer)