diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index b92d1dfe6..fe92ab568 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -62,7 +62,9 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { + def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit + ec: ExecutionContext + ): Future[Unit] = { import scala.collection.JavaConverters._ if (autoUpdate) client.pull(dockerImage) // ContainerConfig.builder().addVolume() @@ -113,7 +115,7 @@ class DockerJobRunnerSrv( client.startContainer(containerCreation.id()) client.waitContainer(containerCreation.id()) () - }.andThen { + }(jobExecutor).andThen { case r => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index 018215d43..6913e8050 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -191,12 +191,10 @@ class JobRunnerSrv @Inject() ( case Failure(e) => endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e")) case _ => endJob(job, JobStatus.Success) } - } else { + } else endJob(job, JobStatus.Failure, (report \ "errorMessage").asOpt[String], (report \ "input").asOpt[JsValue].map(_.toString)) - } - } else { + } else endJob(job, JobStatus.Failure, Some(s"no output")) - } } def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = @@ -207,31 +205,32 @@ class JobRunnerSrv @Inject() ( } val finishedJob = for { _ <- startJob(job) - j <- runners - .foldLeft[Option[Future[Unit]]](None) { - case (None, "docker") => - worker - .dockerImage() - .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") - None - } - case (None, "process") => - worker - .command() - .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))(executionContext)) - .orElse { - logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") - None - } - case (j: Some[_], _) => j - case (None, runner) => - logger.warn(s"Unknown job runner: $runner") - None + j <- + runners + .foldLeft[Option[Future[Unit]]](None) { + case (None, "docker") => + worker + .dockerImage() + .map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") + None + } + case (None, "process") => + worker + .command() + .map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext)) + .orElse { + logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") + None + } + case (j: Some[_], _) => j + case (None, runner) => + logger.warn(s"Unknown job runner: $runner") + None - } - .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) + } + .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) } yield j finishedJob .transformWith { r => @@ -253,8 +252,8 @@ class JobRunnerSrv @Inject() ( updateSrv(job, fields, ModifyConfig(retryOnConflict = 0)) } - private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)( - implicit authContext: AuthContext + private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit + authContext: AuthContext ): Future[Job] = { val fields = Fields .empty diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index ec981a358..0030805d7 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -31,7 +31,10 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)( + implicit + ec: ExecutionContext + ): Future[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent val output = StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") @@ -46,7 +49,7 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { .apply { process.exitValue() () - } + }(jobExecutor) .map { _ => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {