Skip to content

Commit

Permalink
#410 Handle job in one thread
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed May 30, 2022
1 parent 01c6b2a commit b6925bd
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 103 deletions.
40 changes: 21 additions & 19 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class DockerJobRunnerSrv(
false
}.get

def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit
def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit
ec: ExecutionContext
): Future[Unit] = {
): Try[Unit] = {
import scala.collection.JavaConverters._
if (autoUpdate) client.pull(dockerImage)
// ContainerConfig.builder().addVolume()
Expand Down Expand Up @@ -114,28 +114,30 @@ class DockerJobRunnerSrv(
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)
)

val execution = Future {
val timeoutSched = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, stopping the container")
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
}
)
val execution = Try {
client.startContainer(containerCreation.id())
client.waitContainer(containerCreation.id())
()
}(jobExecutor).andThen {
case r =>
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
logger.warn(s"The worker didn't generate output file, use output stream.")
val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully())
.fold(e => s"Container logs can't be read (${e.getMessage})", identity)
val message = r.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output)
}
timeoutSched.foreach(_.cancel())
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
logger.warn(s"The worker didn't generate output file.")
val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully())
.fold(e => s"Container logs can't be read (${e.getMessage})", identity)
val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output)

val report = Json.obj("success" -> false, "errorMessage" -> message)
Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8))
}
val report = Json.obj("success" -> false, "errorMessage" -> message)
Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8))
}
timeout
.fold(execution)(t => execution.withTimeout(t, client.stopContainer(containerCreation.id(), 3)))
.andThen {
case _ => client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
}
client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
execution
}

}
120 changes: 64 additions & 56 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.Date
import scala.concurrent.duration.DurationLong
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import play.api.libs.json._
import play.api.{Configuration, Logger}
import akka.actor.ActorSystem
Expand Down Expand Up @@ -87,7 +87,7 @@ class JobRunnerSrv @Inject() (
case t: Throwable => logger.warn(s"Fail to remove temporary files ($directory) : $t")
}

private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = {
private def prepareJobFolder(worker: Worker, job: Job): Try[Path] = {
val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-")
logger.debug(s"Job folder is $jobFolder")
val inputJobFolder = Files.createDirectories(jobFolder.resolve("input"))
Expand All @@ -97,12 +97,17 @@ class JobRunnerSrv @Inject() (
.attachment()
.map { attachment =>
val attachmentFile = Files.createTempFile(inputJobFolder, "attachment", "")
attachmentSrv
.source(attachment.id)
.runWith(FileIO.toPath(attachmentFile))
Try(
Await.result(
attachmentSrv
.source(attachment.id)
.runWith(FileIO.toPath(attachmentFile)),
10.minutes
)
)
.map(_ => Some(attachmentFile))
}
.getOrElse(Future.successful(None))
.getOrElse(Success(None))
.map {
case Some(file) =>
Json.obj("file" -> file.getFileName.toString, "filename" -> job.attachment().get.name, "contentType" -> job.attachment().get.contentType)
Expand Down Expand Up @@ -145,7 +150,7 @@ class JobRunnerSrv @Inject() (
case error =>
if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder)
delete(jobFolder)
Future.failed(error)
Failure(error)
}
}

Expand Down Expand Up @@ -200,64 +205,67 @@ class JobRunnerSrv @Inject() (
endJob(job, JobStatus.Failure, Some(s"no output"))
}

def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] =
prepareJobFolder(worker, job).flatMap { jobFolder =>
val executionContext = worker.tpe() match {
case WorkerType.analyzer => analyzerExecutionContext
case WorkerType.responder => responderExecutionContext
}
val finishedJob = for {
_ <- startJob(job)
j <-
runners
.foldLeft[Option[Future[Unit]]](None) {
case (None, "docker") =>
worker
.dockerImage()
.map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes), executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process") =>
worker
.command()
.map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes), executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)")
None
}
case (j: Some[_], _) => j
case (None, runner) =>
logger.warn(s"Unknown job runner: $runner")
None
def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = {
val executionContext = worker.tpe() match {
case WorkerType.analyzer => analyzerExecutionContext
case WorkerType.responder => responderExecutionContext
}
var maybeJobFolder: Option[Path] = None

}
.getOrElse(Future.failed(BadRequestError("Worker cannot be run")))
} yield j
finishedJob
.transformWith {
case _: Success[_] =>
extractReport(jobFolder, job)
case Failure(error) =>
endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json"))))
Future {
syncStartJob(job).get
val jobFolder = prepareJobFolder(worker, job).get
maybeJobFolder = Some(jobFolder)
runners
.foldLeft[Option[Try[Unit]]](None) {
case (None, "docker") =>
worker
.dockerImage()
.map(dockerImage => dockerJobRunnerSrv.run(jobFolder, dockerImage, worker.jobTimeout().map(_.minutes)))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process") =>
worker
.command()
.map(command => processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes)))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have command)")
None
}
case (j: Some[_], _) => j
case (None, runner) =>
logger.warn(s"Unknown job runner: $runner")
None

}
.andThen {
case _ =>
if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder)
delete(jobFolder)
}
}
.getOrElse(throw BadRequestError("Worker cannot be run"))
}(executionContext)
.transformWith {
case _: Success[_] =>
extractReport(maybeJobFolder.get /* can't be none */, job)
case Failure(error) =>
endJob(job, JobStatus.Failure, Option(error.getMessage), maybeJobFolder.map(jf => readFile(jf.resolve("input").resolve("input.json"))))

}
.andThen {
case _ =>
if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder)
maybeJobFolder.foreach(delete)
}
}

private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8)

private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = {
private def syncStartJob(job: Job)(implicit authContext: AuthContext): Try[Job] = {
val fields = Fields
.empty
.set("status", JobStatus.InProgress.toString)
.set("startDate", Json.toJson(new Date))
updateSrv(job, fields, ModifyConfig(retryOnConflict = 0))
Try(
Await.result(updateSrv(job, fields, ModifyConfig(retryOnConflict = 0, seqNoAndPrimaryTerm = Some((job.seqNo, job.primaryTerm)))), 1.minute)
)
}

private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit
Expand Down
49 changes: 21 additions & 28 deletions app/org/thp/cortex/services/ProcessJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
}
}.getOrElse(None)

def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit
def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit
ec: ExecutionContext
): Future[Unit] = {
): Try[Unit] = {
val baseDirectory = Paths.get(command).getParent.getParent
val output = mutable.StringBuilder.newBuilder
logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}")
Expand All @@ -45,33 +45,26 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {
logger.info(s" Job ${job.id}: $s")
output ++= s
})
val execution = Future
.apply {
process.exitValue()
()
}(jobExecutor)
.map { _ =>
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
val report = Json.obj("success" -> false, "errorMessage" -> output.toString)
Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8))
}
()
}
.recoverWith {
case error =>
logger.error(s"Execution of command $command failed", error)
Future.apply {
val report = Json.obj("success" -> false, "errorMessage" -> s"${error.getMessage}\n$output")
Files.write(jobDirectory.resolve("output").resolve("output.json"), report.toString.getBytes(StandardCharsets.UTF_8))
()
}
val timeoutSched = timeout.map(to =>
system.scheduler.scheduleOnce(to) {
logger.info("Timeout reached, killing process")
process.destroy()
}
timeout.fold(execution)(t => execution.withTimeout(t, killProcess(process)))
}
)

def killProcess(process: Process): Unit = {
logger.info("Timeout reached, killing process")
process.destroy()
val execution = Try {
process.exitValue()
()
}
timeoutSched.foreach(_.cancel())
val outputFile = jobDirectory.resolve("output").resolve("output.json")
if (!Files.exists(outputFile) || Files.size(outputFile) == 0) {
logger.warn(s"The worker didn't generate output file, use output stream.")
val message = execution.fold(e => s"Process execution error: ${e.getMessage}\n$output.result()", _ => output.result())
val report = Json.obj("success" -> false, "errorMessage" -> message)
Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8))
}
execution
}

}

0 comments on commit b6925bd

Please sign in to comment.