diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 9415815f5..2979a794b 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -63,9 +63,9 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext - ): Future[Unit] = { + ): Try[Unit] = { import scala.collection.JavaConverters._ if (autoUpdate) client.pull(dockerImage) // ContainerConfig.builder().addVolume() @@ -114,28 +114,30 @@ class DockerJobRunnerSrv( Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) ) - val execution = Future { + val timeoutSched = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, stopping the container") + client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) + } + ) + val execution = Try { client.startContainer(containerCreation.id()) client.waitContainer(containerCreation.id()) () - }(jobExecutor).andThen { - case r => - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - logger.warn(s"The worker didn't generate output file, use output stream.") - val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) - .fold(e => s"Container logs can't be read (${e.getMessage})", identity) - val message = r.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) + } + timeoutSched.foreach(_.cancel()) + val outputFile = jobDirectory.resolve("output").resolve("output.json") + if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file.") + val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) + .fold(e => s"Container logs can't be read (${e.getMessage})", identity) + val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) - val report = Json.obj("success" -> false, "errorMessage" -> message) - Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8)) - } + val report = Json.obj("success" -> false, "errorMessage" -> message) + Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) } - timeout - .fold(execution)(t => execution.withTimeout(t, client.stopContainer(containerCreation.id(), 3))) - .andThen { - case _ => client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - } + client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) + execution } } diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index e649470ec..1731eec4a 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -6,8 +6,8 @@ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import java.util.Date import scala.concurrent.duration.DurationLong -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} import play.api.libs.json._ import play.api.{Configuration, Logger} import akka.actor.ActorSystem @@ -87,7 +87,7 @@ class JobRunnerSrv @Inject() ( case t: Throwable => logger.warn(s"Fail to remove temporary files ($directory) : $t") } - private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = { + private def prepareJobFolder(worker: Worker, job: Job): Try[Path] = { val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") logger.debug(s"Job folder is $jobFolder") val inputJobFolder = Files.createDirectories(jobFolder.resolve("input")) @@ -97,12 +97,17 @@ class JobRunnerSrv @Inject() ( .attachment() .map { attachment => val attachmentFile = Files.createTempFile(inputJobFolder, "attachment", "") - attachmentSrv - .source(attachment.id) - .runWith(FileIO.toPath(attachmentFile)) + Try( + Await.result( + attachmentSrv + .source(attachment.id) + .runWith(FileIO.toPath(attachmentFile)), + 10.minutes + ) + ) .map(_ => Some(attachmentFile)) } - .getOrElse(Future.successful(None)) + .getOrElse(Success(None)) .map { case Some(file) => Json.obj("file" -> file.getFileName.toString, "filename" -> job.attachment().get.name, "contentType" -> job.attachment().get.contentType) @@ -145,7 +150,7 @@ class JobRunnerSrv @Inject() ( case error => if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) delete(jobFolder) - Future.failed(error) + Failure(error) } } @@ -200,64 +205,67 @@ class JobRunnerSrv @Inject() ( endJob(job, JobStatus.Failure, Some(s"no output")) } - def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = - prepareJobFolder(worker, job).flatMap { jobFolder => - val executionContext = worker.tpe() match { - case WorkerType.analyzer => analyzerExecutionContext - case WorkerType.responder => responderExecutionContext - } - val finishedJob = for { - _ <- startJob(job) - j <- - runners - .foldLeft[Option[Future[Unit]]](None) { - case (None, "docker") => - worker - .dockerImage() - .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") - None - } - case (None, "process") => - worker - .command() - .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") - None - } - case (j: Some[_], _) => j - case (None, runner) => - logger.warn(s"Unknown job runner: $runner") - None + def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = { + val executionContext = worker.tpe() match { + case WorkerType.analyzer => analyzerExecutionContext + case WorkerType.responder => responderExecutionContext + } + var maybeJobFolder: Option[Path] = None - } - .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) - } yield j - finishedJob - .transformWith { - case _: Success[_] => - extractReport(jobFolder, job) - case Failure(error) => - endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))) + Future { + syncStartJob(job).get + val jobFolder = prepareJobFolder(worker, job).get + maybeJobFolder = Some(jobFolder) + runners + .foldLeft[Option[Try[Unit]]](None) { + case (None, "docker") => + worker + .dockerImage() + .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, worker.jobTimeout().map(_.minutes))) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") + None + } + case (None, "process") => + worker + .command() + .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with process (doesn't have command)") + None + } + case (j: Some[_], _) => j + case (None, runner) => + logger.warn(s"Unknown job runner: $runner") + None } - .andThen { - case _ => - if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) - delete(jobFolder) - } - } + .getOrElse(throw BadRequestError("Worker cannot be run")) + }(executionContext) + .transformWith { + case _: Success[_] => + extractReport(maybeJobFolder.get /* can't be none */, job) + case Failure(error) => + endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json")))) + + } + .andThen { + case _ => + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + maybeJobFolder.foreach(delete) + } + } private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) - private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = { + private def syncStartJob(job: Job)(implicit authContext: AuthContext): Try[Job] = { val fields = Fields .empty .set("status", JobStatus.InProgress.toString) .set("startDate", Json.toJson(new Date)) - updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) + Try( + Await.result(updateSrv(job, fields, ModifyConfig(retryOnConflict = 0, seqNoAndPrimaryTerm = Some((job.seqNo, job.primaryTerm)))), 1.minute) + ) } private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index d7db36e48..5c33c79d8 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -32,9 +32,9 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext - ): Future[Unit] = { + ): Try[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent val output = mutable.StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") @@ -45,33 +45,26 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { logger.info(s" Job ${job.id}: $s") output ++= s }) - val execution = Future - .apply { - process.exitValue() - () - }(jobExecutor) - .map { _ => - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - val report = Json.obj("success" -> false, "errorMessage" -> output.toString) - Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) - } - () - } - .recoverWith { - case error => - logger.error(s"Execution of command $command failed", error) - Future.apply { - val report = Json.obj("success" -> false, "errorMessage" -> s"${error.getMessage}\n$output") - Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8)) - () - } + val timeoutSched = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, killing process") + process.destroy() } - timeout.fold(execution)(t => execution.withTimeout(t, killProcess(process))) - } + ) - def killProcess(process: Process): Unit = { - logger.info("Timeout reached, killing process") - process.destroy() + val execution = Try { + process.exitValue() + () + } + timeoutSched.foreach(_.cancel()) + val outputFile = jobDirectory.resolve("output").resolve("output.json") + if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file, use output stream.") + val message = execution.fold(e => s"Process execution error: ${e.getMessage}\n$output.result()", _ => output.result()) + val report = Json.obj("success" -> false, "errorMessage" -> message) + Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) + } + execution } + }