Skip to content

Commit

Permalink
#175 #176 Add support of Docker job runner
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Mar 11, 2019
1 parent 2556e0b commit c0353f1
Show file tree
Hide file tree
Showing 9 changed files with 456 additions and 202 deletions.
70 changes: 70 additions & 0 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.thp.cortex.services

import java.nio.file._

import scala.concurrent.{ ExecutionContext, Future }

import play.api.{ Configuration, Logger }

import com.spotify.docker.client.messages.HostConfig.Bind
import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig }
import com.spotify.docker.client.{ DefaultDockerClient, DockerClient }
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models._

@Singleton
class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) {

def this() = this(DefaultDockerClient.fromEnv().build(), false)

@Inject()
def this(config: Configuration) = this(
new DefaultDockerClient.Builder()
.apiVersion(config.getOptional[String]("docker.version").orNull)
.connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100))
.connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000))
//.dockerCertificates()
.readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000))
//.registryAuthSupplier()
.uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock"))
.useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false))
.build(),
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true))

lazy val logger = Logger(getClass)

def run(jobDirectory: Path, dockerImage: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = {
import scala.collection.JavaConverters._
// client.pull(dockerImage)
// ContainerConfig.builder().addVolume()
val hostConfig = HostConfig.builder()
.appendBinds(Bind.from(jobDirectory.toAbsolutePath.toString)
.to("/job")
.readOnly(false)
.build())
.build()
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
val containerConfigBuilder = ContainerConfig
.builder()
.hostConfig(hostConfig)
.image(dockerImage)
.cmd("/job")

val containerConfig = if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build()
else containerConfigBuilder.build()
val containerCreation = client.createContainer(containerConfig)
// Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn)
logger.info(s"Execute container ${containerCreation.id()}\n" +
s" image : $dockerImage\n" +
s" volume: ${jobDirectory.toAbsolutePath}:/job" +
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString))

client.startContainer(containerCreation.id())

Future {
client.waitContainer(containerCreation.id())
()
}
}

}
246 changes: 246 additions & 0 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package org.thp.cortex.services

import java.io.IOException
import java.nio.charset.StandardCharsets
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.Date

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Failure

import play.api.libs.json._
import play.api.{ Configuration, Logger }

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.FileIO
import javax.inject.Inject
import org.thp.cortex.models._

import org.elastic4play.BadRequestError
import org.elastic4play.controllers.{ Fields, FileInputValue }
import org.elastic4play.database.ModifyConfig
import org.elastic4play.services.{ AttachmentSrv, AuthContext, CreateSrv, UpdateSrv }

class JobRunnerSrv @Inject() (
config: Configuration,
reportModel: ReportModel,
artifactModel: ArtifactModel,
processJobRunnerSrv: ProcessJobRunnerSrv,
dockerJobRunnerSrv: DockerJobRunnerSrv,
workerSrv: WorkerSrv,
createSrv: CreateSrv,
updateSrv: UpdateSrv,
attachmentSrv: AttachmentSrv,
akkaSystem: ActorSystem,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {

val logger = Logger(getClass)
lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer")
lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder")

private val runners: Seq[String] = config
.getOptional[Seq[String]]("runners")
.getOrElse(Seq("docker", "process"))
.map(_.toLowerCase)

private object deleteVisitor extends SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}

override def postVisitDirectory(dir: Path, e: IOException): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
}

private def delete(directory: Path): Unit = try {
if (Files.exists(directory))
Files.walkFileTree(directory, deleteVisitor)
()
}
catch {
case t: Throwable logger.warn(s"Fail to remove temporary files ($directory) : $t")
}

private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = {
val jobFolder = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), s"cortex-job-${job.id}-")
val inputJobFolder = Files.createDirectories(jobFolder.resolve("input"))
Files.createDirectories(jobFolder.resolve("output"))

job.attachment()
.map { attachment
val attachmentFile = Files.createTempFile(inputJobFolder, "attachment", "")
attachmentSrv.source(attachment.id).runWith(FileIO.toPath(attachmentFile))
.flatMap {
case ioresult if ioresult.status.isSuccess Future.successful(Some(attachmentFile))
case ioresult Future.failed(ioresult.getError)
}
}
.getOrElse(Future.successful(None))
.map {
case Some(file)
Json.obj(
"file" file.toString, // FIXME set only the temporary file name
"filename" job.attachment().get.name,
"contentType" job.attachment().get.contentType)
case None if job.data().nonEmpty && job.tpe() == WorkerType.responder
Json.obj("data" Json.parse(job.data().get))
case None if job.data().nonEmpty && job.tpe() == WorkerType.analyzer
Json.obj("data" job.data().get)
}
.map { artifact
val proxy_http = (worker.config \ "proxy_http").asOpt[String].fold(JsObject.empty) { proxy Json.obj("proxy" Json.obj("http" proxy)) }
val proxy_https = (worker.config \ "proxy_https").asOpt[String].fold(JsObject.empty) { proxy Json.obj("proxy" Json.obj("https" proxy)) }
val config = worker.config.deepMerge(proxy_http).deepMerge(proxy_https)
(worker.config \ "cacerts").asOpt[String].foreach { cacerts
val cacertsFile = jobFolder.resolve("input").resolve("cacerts")
Files.write(cacertsFile, cacerts.getBytes)
}
artifact +
("dataType" JsString(job.dataType())) +
("tlp" JsNumber(job.tlp())) +
("pap" JsNumber(job.pap())) +
("message" JsString(job.message().getOrElse(""))) +
("parameters" job.params) +
("config" -> config)
}
.map { input
Files.write(inputJobFolder.resolve("input.json"), input.toString.getBytes(StandardCharsets.UTF_8))
jobFolder
}
.recoverWith {
case error
delete(jobFolder)
Future.failed(error)
}
}

private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext) = {
val outputFile = jobFolder.resolve("output").resolve("output.json")
if (Files.exists(outputFile)) {
val is = Files.newInputStream(outputFile)
val report = Json.parse(is)
is.close()

val success = (report \ "success").asOpt[Boolean].getOrElse(false)
if (success) {
val fullReport = (report \ "full").as[JsObject].toString
val summaryReport = (report \ "summary").asOpt[JsObject].getOrElse(JsObject.empty).toString
val artifacts = (report \ "artifacts").asOpt[Seq[JsObject]].getOrElse(Nil)
val operations = (report \ "operations").asOpt[Seq[JsObject]].getOrElse(Nil)
val reportFields = Fields.empty
.set("full", fullReport)
.set("summary", summaryReport)
.set("operations", JsArray(operations).toString)
createSrv[ReportModel, Report, Job](reportModel, job, reportFields)
.flatMap { report
Future.sequence {
for {
artifact artifacts
dataType (artifact \ "dataType").asOpt[String]
fields dataType match {
case "file"
for {
name (artifact \ "filename").asOpt[String]
file (artifact \ "file").asOpt[String]
path = jobFolder.resolve("output").resolve(file)
if Files.exists(path) && path.getParent == jobFolder.resolve("output")
contentType = (artifact \ "contentType").asOpt[String].getOrElse("application/octet-stream")
fiv = FileInputValue(name, path, contentType)
} yield Fields(artifact - "filename" - "file" - "contentType").set("attachment", fiv)
case _ Some(Fields(artifact))
}
} yield createSrv[ArtifactModel, Artifact, Report](artifactModel, report, fields)
}
}
.transformWith {
case Failure(e) endJob(job, JobStatus.Failure, Some(s"Report creation failure: $e"))
case _ endJob(job, JobStatus.Success)
}
}
else {
endJob(job, JobStatus.Failure,
(report \ "errorMessage").asOpt[String],
(report \ "input").asOpt[JsValue].map(_.toString))
}
}
else {
endJob(job, JobStatus.Failure, Some(s"no output"))
}
}

// private def fixArtifact(artifact: Fields): Fields = {
// def rename(oldName: String, newName: String): Fields ⇒ Fields = fields ⇒
// fields.getValue(oldName).fold(fields)(v ⇒ fields.unset(oldName).set(newName, v))
//
// rename("value", "data").andThen(
// rename("type", "dataType"))(artifact)
// }

def run(worker: Worker, job: Job)(implicit authContext: AuthContext): Future[Job] = {
prepareJobFolder(worker, job).flatMap { jobFolder
val finishedJob = for {
workerDefinition workerSrv.getDefinition(worker.workerDefinitionId())
executionContext = workerDefinition.tpe match {
case WorkerType.analyzer analyzerExecutionContext
case WorkerType.responder responderExecutionContext
}
_ startJob(job)
j runners
.foldLeft[Option[Future[Unit]]](None) {
case (None, "docker")
worker.dockerImage()
.map(dockerImage dockerJobRunnerSrv.run(jobFolder, dockerImage, job)(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process")

worker.command()
.map(command processJobRunnerSrv.run(jobFolder, command, job)(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)")
None
}
case (j: Some[_], _) j
case (None, runner)
logger.warn(s"Unknown job runner: $runner")
None

}
.getOrElse(Future.failed(BadRequestError("Worker cannot be run")))
} yield j
finishedJob
.transformWith { r
r.fold(
error endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))),
_ extractReport(jobFolder, job))
}
//.andThen { case _ ⇒ delete(jobFolder) }
}
}

private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8)

private def startJob(job: Job)(implicit authContext: AuthContext): Future[Job] = {
val fields = Fields.empty
.set("status", JobStatus.InProgress.toString)
.set("startDate", Json.toJson(new Date))
updateSrv(job, fields, ModifyConfig(retryOnConflict = 0))
}

private def endJob(job: Job, status: JobStatus.Type, errorMessage: Option[String] = None, input: Option[String] = None)(implicit authContext: AuthContext): Future[Job] = {
val fields = Fields.empty
.set("status", status.toString)
.set("endDate", Json.toJson(new Date))
.set("input", input.map(JsString.apply))
.set("message", errorMessage.map(JsString.apply))
updateSrv(job, fields, ModifyConfig.default)
}
}
Loading

0 comments on commit c0353f1

Please sign in to comment.