Skip to content

Commit

Permalink
#410 Check the job timeout from the main threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed May 24, 2022
1 parent e6fada5 commit 7d02023
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 34 deletions.
6 changes: 4 additions & 2 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class DockerJobRunnerSrv(
false
}.get

def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit
ec: ExecutionContext
): Future[Unit] = {
import scala.collection.JavaConverters._
if (autoUpdate) client.pull(dockerImage)
// ContainerConfig.builder().addVolume()
Expand Down Expand Up @@ -113,7 +115,7 @@ class DockerJobRunnerSrv(
client.startContainer(containerCreation.id())
client.waitContainer(containerCreation.id())
()
}.andThen {
}(jobExecutor).andThen {
case r =>
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
Expand Down
59 changes: 29 additions & 30 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,10 @@ class JobRunnerSrv @Inject() (
case Failure(e) => endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e"))
case _ => endJob(job, JobStatus.Success)
}
} else {
} else
endJob(job, JobStatus.Failure, (report \ "errorMessage").asOpt[String], (report \ "input").asOpt[JsValue].map(_.toString))
}
} else {
} else
endJob(job, JobStatus.Failure, Some(s"no output"))
}
}

def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] =
Expand All @@ -207,31 +205,32 @@ class JobRunnerSrv @Inject() (
}
val finishedJob = for {
_ <- startJob(job)
j <- runners
.foldLeft[Option[Future[Unit]]](None) {
case (None, "docker") =>
worker
.dockerImage()
.map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process") =>
worker
.command()
.map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)")
None
}
case (j: Some[_], _) => j
case (None, runner) =>
logger.warn(s"Unknown job runner: $runner")
None
j <-
runners
.foldLeft[Option[Future[Unit]]](None) {
case (None, "docker") =>
worker
.dockerImage()
.map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process") =>
worker
.command()
.map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)")
None
}
case (j: Some[_], _) => j
case (None, runner) =>
logger.warn(s"Unknown job runner: $runner")
None

}
.getOrElse(Future.failed(BadRequestError("Worker cannot be run")))
}
.getOrElse(Future.failed(BadRequestError("Worker cannot be run")))
} yield j
finishedJob
.transformWith { r =>
Expand All @@ -253,8 +252,8 @@ class JobRunnerSrv @Inject() (
updateSrv(job, fields, ModifyConfig(retryOnConflict = 0))
}

private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(
implicit authContext: AuthContext
private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit
authContext: AuthContext
): Future[Job] = {
val fields = Fields
.empty
Expand Down
7 changes: 5 additions & 2 deletions app/org/thp/cortex/services/ProcessJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
}
}.getOrElse(None)

def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(
implicit
ec: ExecutionContext
): Future[Unit] = {
val baseDirectory = Paths.get(command).getParent.getParent
val output = StringBuilder.newBuilder
logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}")
Expand All @@ -46,7 +49,7 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
.apply {
process.exitValue()
()
}
}(jobExecutor)
.map { _ =>
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
Expand Down

0 comments on commit 7d02023

Please sign in to comment.