Skip to content

Commit

Permalink
#170 Add job timeout parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Mar 12, 2019
1 parent 66ca457 commit ed207e6
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 60 deletions.
11 changes: 8 additions & 3 deletions app/org/thp/cortex/models/BaseConfig.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.thp.cortex.models

import scala.concurrent.duration.Duration

import play.api.Configuration
import play.api.libs.json._

import org.elastic4play.utils.Collection.distinctBy
Expand All @@ -16,16 +19,18 @@ object BaseConfig {
"configurationItems" baseConfig.items,
"config" baseConfig.config.fold(JsObject.empty)(_.jsonConfig))
}
def global(tpe: WorkerType.Type): BaseConfig = {
def global(tpe: WorkerType.Type, configuration: Configuration): BaseConfig = {
val typedItems = tpe match {
case WorkerType.responder Nil
case WorkerType.analyzer Seq(
ConfigurationDefinitionItem("auto_extract_artifacts", "extract artifacts from full report automatically", WorkerConfigItemType.boolean, multi = false, required = false, Some(JsFalse)))
ConfigurationDefinitionItem("auto_extract_artifacts", "extract artifacts from full report automatically", WorkerConfigItemType.boolean, multi = false, required = false, Some(JsFalse)),
ConfigurationDefinitionItem("jobCache", "maximum time, in minutes, previous result is used if similar job is requested", WorkerConfigItemType.number, multi = false, required = false, configuration.getOptional[Duration]("cache.job").map(d JsNumber(d.toMinutes))))
}
BaseConfig("global", Nil, typedItems ++ Seq(
ConfigurationDefinitionItem("proxy_http", "url of http proxy", WorkerConfigItemType.string, multi = false, required = false, None),
ConfigurationDefinitionItem("proxy_https", "url of https proxy", WorkerConfigItemType.string, multi = false, required = false, None),
ConfigurationDefinitionItem("cacerts", "Certificate authotities", WorkerConfigItemType.text, multi = false, required = false, None)),
ConfigurationDefinitionItem("cacerts", "certificate authorities", WorkerConfigItemType.text, multi = false, required = false, None),
ConfigurationDefinitionItem("jobTimeout", "maximum allowed job execution time (in minutes)", WorkerConfigItemType.number, multi = false, required = false, configuration.getOptional[Duration]("job.timeout").map(d JsNumber(d.toMinutes)))),
None)
}
val tlp = BaseConfig("tlp", Nil, Seq(
Expand Down
3 changes: 2 additions & 1 deletion app/org/thp/cortex/models/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ trait WorkerAttributes { _: AttributeDef ⇒
val dockerImage = optionalAttribute("dockerImage", F.textFmt, "Worker docker image")
val dataTypeList = multiAttribute("dataTypeList", F.stringFmt, "List of data type this worker can manage")
val configuration = attribute("configuration", F.textFmt, "Configuration of the worker", O.sensitive)
val baseConfig = attribute("baseConfig", F.stringFmt, description="Base configuration key")
val baseConfig = attribute("baseConfig", F.stringFmt, "Base configuration key")
val rate = optionalAttribute("rate", F.numberFmt, "Number ")
val rateUnit = optionalAttribute("rateUnit", F.enumFmt(RateUnit), "")
val jobCache = optionalAttribute("jobCache", F.numberFmt, "")
val jobTimeout = optionalAttribute("jobTimeout", F.numberFmt, "")
val tpe = attribute("type", F.enumFmt(WorkerType), "", O.readonly)
}

Expand Down
3 changes: 3 additions & 0 deletions app/org/thp/cortex/services/AnalyzerConfigSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.thp.cortex.services

import scala.concurrent.{ ExecutionContext, Future }

import play.api.Configuration

import akka.stream.Materializer
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models.{ BaseConfig, WorkerConfigModel, WorkerType }
Expand All @@ -10,6 +12,7 @@ import org.elastic4play.services.{ CreateSrv, FindSrv, UpdateSrv }

@Singleton
class AnalyzerConfigSrv @Inject() (
val configuration: Configuration,
val workerConfigModel: WorkerConfigModel,
val userSrv: UserSrv,
val organizationSrv: OrganizationSrv,
Expand Down
28 changes: 18 additions & 10 deletions app/org/thp/cortex/services/DockerJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package org.thp.cortex.services

import java.nio.file._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }

import play.api.{ Configuration, Logger }

import akka.actor.ActorSystem
import com.spotify.docker.client.messages.HostConfig.Bind
import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig }
import com.spotify.docker.client.{ DefaultDockerClient, DockerClient }
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models._

@Singleton
class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) {
import org.elastic4play.utils.RichFuture

def this() = this(DefaultDockerClient.fromEnv().build(), false)
@Singleton
class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean, implicit val system: ActorSystem) {

@Inject()
def this(config: Configuration) = this(
def this(config: Configuration, system: ActorSystem) = this(
new DefaultDockerClient.Builder()
.apiVersion(config.getOptional[String]("docker.version").orNull)
.connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100))
Expand All @@ -29,11 +31,12 @@ class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) {
.uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock"))
.useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false))
.build(),
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true))
config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true),
system: ActorSystem)

lazy val logger = Logger(getClass)

def run(jobDirectory: Path, dockerImage: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = {
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
import scala.collection.JavaConverters._
// client.pull(dockerImage)
// ContainerConfig.builder().addVolume()
Expand All @@ -55,16 +58,21 @@ class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) {
val containerCreation = client.createContainer(containerConfig)
// Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn)
logger.info(s"Execute container ${containerCreation.id()}\n" +
s" image : $dockerImage\n" +
s" volume: ${jobDirectory.toAbsolutePath}:/job" +
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString))
s" timeout: ${timeout.fold("none")(_.toString)}\n" +
s" image : $dockerImage\n" +
s" volume : ${jobDirectory.toAbsolutePath}:/job" +
Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString))

client.startContainer(containerCreation.id())

Future {
val execution = Future {
client.waitContainer(containerCreation.id())
()
}
timeout.fold(execution)(t execution.withTimeout(t, client.stopContainer(containerCreation.id(), 3)))
.andThen {
case _ client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill())
}
}

}
5 changes: 3 additions & 2 deletions app/org/thp/cortex/services/JobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.Date

import scala.concurrent.duration.DurationLong
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Failure

Expand Down Expand Up @@ -195,15 +196,15 @@ class JobRunnerSrv @Inject() (
.foldLeft[Option[Future[Unit]]](None) {
case (None, "docker")
worker.dockerImage()
.map(dockerImage dockerJobRunnerSrv.run(jobFolder, dockerImage, job)(executionContext))
.map(dockerImage dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)")
None
}
case (None, "process")

worker.command()
.map(command processJobRunnerSrv.run(jobFolder, command, job)(executionContext))
.map(command processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))(executionContext))
.orElse {
logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)")
None
Expand Down
2 changes: 1 addition & 1 deletion app/org/thp/cortex/services/JobSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class JobSrv(
}
else {
import org.elastic4play.services.QueryDSL._
logger.info(s"Looking for similar job (worker=${worker.id}, dataType=$dataType, data=$dataAttachment, tlp=$tlp, parameters=$parameters")
logger.info(s"Looking for similar job in the last ${cache.toMinutes} minutes (worker=${worker.id}, dataType=$dataType, data=$dataAttachment, tlp=$tlp, parameters=$parameters)")
val now = new Date().getTime
find(and(
"workerId" ~= worker.id,
Expand Down
27 changes: 19 additions & 8 deletions app/org/thp/cortex/services/ProcessJobRunnerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,37 @@ package org.thp.cortex.services

import java.nio.file.{ Path, Paths }

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.sys.process.{ Process, ProcessLogger }

import play.api.Logger

import javax.inject.Singleton
import akka.actor.ActorSystem
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models._

import org.elastic4play.utils.RichFuture

@Singleton
class ProcessJobRunnerSrv {
class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) {

lazy val logger = Logger(getClass)

def run(jobDirectory: Path, command: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = {
Future {
val baseDirectory = Paths.get(command).getParent.getParent
logger.info(s"Execute $command in $baseDirectory")
Process(Seq(command, jobDirectory.toString), baseDirectory.toFile)
.run(ProcessLogger(s logger.info(s" Job ${job.id}: $s"))).exitValue()
def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
val baseDirectory = Paths.get(command).getParent.getParent
logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}")
val process = Process(Seq(command, jobDirectory.toString), baseDirectory.toFile)
.run(ProcessLogger(s logger.info(s" Job ${job.id}: $s")))
val execution = Future {
process.exitValue()
()
}
timeout.fold(execution)(t execution.withTimeout(t, killProcess(process)))
}

def killProcess(process: Process): Unit = {
logger.info("Timeout reached, killing process")
process.destroy()
}
}
3 changes: 3 additions & 0 deletions app/org/thp/cortex/services/ResponderConfigSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.thp.cortex.services

import scala.concurrent.{ ExecutionContext, Future }

import play.api.Configuration

import akka.stream.Materializer
import javax.inject.{ Inject, Singleton }
import org.thp.cortex.models.{ BaseConfig, WorkerConfigModel, WorkerType }
Expand All @@ -10,6 +12,7 @@ import org.elastic4play.services.{ CreateSrv, FindSrv, UpdateSrv }

@Singleton
class ResponderConfigSrv @Inject() (
val configuration: Configuration,
val workerConfigModel: WorkerConfigModel,
val userSrv: UserSrv,
val organizationSrv: OrganizationSrv,
Expand Down
4 changes: 3 additions & 1 deletion app/org/thp/cortex/services/WorkerConfigSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.thp.cortex.services

import scala.concurrent.{ ExecutionContext, Future }

import play.api.Configuration
import play.api.libs.json._

import akka.NotUsed
Expand All @@ -16,6 +17,7 @@ import org.elastic4play.database.ModifyConfig
import org.elastic4play.services._

trait WorkerConfigSrv {
val configuration: Configuration
val userSrv: UserSrv
val createSrv: CreateSrv
val updateSrv: UpdateSrv
Expand All @@ -41,7 +43,7 @@ trait WorkerConfigSrv {
.mapMaterializedValue(_ NotUsed)
.runWith(Sink.seq)
.map { baseConfigs
(BaseConfig.global(workerType) +: baseConfigs)
(BaseConfig.global(workerType, configuration) +: baseConfigs)
.map(c c.name c)
.toMap
}
Expand Down
2 changes: 1 addition & 1 deletion app/org/thp/cortex/services/WorkerSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class WorkerSrv @Inject() (

def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit authContext: AuthContext): Future[Worker] = {
val rawConfig = workerFields.getValue("configuration").fold(JsObject.empty)(_.as[JsObject])
val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe).items ++ BaseConfig.tlp.items ++ BaseConfig.pap.items
val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe, config).items ++ BaseConfig.tlp.items ++ BaseConfig.pap.items
val configOrErrors = configItems
.validatedBy(_.read(rawConfig))
.map(JsObject.apply)
Expand Down
1 change: 1 addition & 0 deletions conf/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cache {
organization = 5 minutes
}

job.timeout = 30 minutes

# HTTP filters
play.filters {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<section>
<h4>Base details</h4>

<div class="form-group">
<label class="col-sm-3 control-label">Name</label>
<div class="col-sm-9">
Expand All @@ -18,7 +18,7 @@ <h4>
<i class="fa fa-copy"></i> Apply defaults</a>
</small>
</div>
</h4>
</h4>
<configuration-form items="$ctrl.definition.configurationItems" configuration="$ctrl.analyzer.configuration"></configuration-form>
</section>

Expand All @@ -30,7 +30,7 @@ <h4>
<a href ng-click="$ctrl.applyGlobalConfig()">
<i class="fa fa-copy"></i> Apply defaults</a>
</small>
</div>
</div>
</h4>
<div class="form-group">
<label class="col-sm-3 control-label">Enable TLP check</label>
Expand All @@ -43,9 +43,9 @@ <h4>
<label>Max TLP</label>
<select class="form-control" style="width: 200px;" ng-model="$ctrl.analyzer.configuration.max_tlp"
ng-options="tlp.value as tlp.key for tlp in $ctrl.Tlps" ng-required="$ctrl.analyzer.configuration.check_tlp"></select>
</div>
</div>
</div>
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Enable PAP check</label>
<div class="col-sm-9 form-inline">
Expand All @@ -64,7 +64,7 @@ <h4>
<label class="col-sm-3 control-label">HTTP Proxy</label>
<div class="col-sm-9">
<input class="form-control" type="text" ng-model="$ctrl.analyzer.configuration.proxy_http">
</div>
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">HTTPS Proxy</label>
Expand All @@ -78,6 +78,18 @@ <h4>
<textarea class="form-control vertical" rows="5" ng-model="$ctrl.analyzer.configuration.cacerts"></textarea>
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Job cache</label>
<div class="col-sm-9">
<input class="form-control" type="text" ng-model="$ctrl.analyzer.jobCache" ui-number-mask="0">
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Job timeout</label>
<div class="col-sm-9">
<input class="form-control" type="text" ng-model="$ctrl.analyzer.jobTimeout" ui-number-mask="0">
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Extract observables</label>
<div class="col-sm-9">
Expand All @@ -89,35 +101,19 @@ <h4>
<div class="col-sm-offset-3 col-sm-9">
<p class="help-block">Set to True to enable automatic observables extraction from analysis reports.</p>
</div>
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Rate Limiting</label>
<div class="col-sm-4">
<input class="form-control" type="text" ng-model="$ctrl.analyzer.rate" ui-number-mask="0" ui-hide-group-sep>
</div>
<div class="col-sm-5">
<div class="col-sm-5">
<select class="form-control" ng-model="$ctrl.analyzer.rateUnit" ng-options="unit for unit in $ctrl.rateUnits">
<option value="">-- choose unit --</option>
</select>
</div>
<div class="col-sm-offset-3 col-sm-9">
<p class="help-block">Define the maximum number of requests and the associated unit if applicable.</p>
</div>
</div>
<div class="form-group">
<label class="col-sm-3 control-label">Cache</label>
<div class="col-sm-9 form-inline">
<div class="btn-group">
<label uib-btn-radio="true" class="btn btn-default" ng-model="$ctrl.useGlobalCache" ng-click="$ctrl.analyzer.jobCache = null">Use global</label>
<label uib-btn-radio="false" class="btn btn-default" ng-model="$ctrl.useGlobalCache">Custom</label>
</div>
<div class="form-group" style="margin-left: 30px;" ng-show="$ctrl.useGlobalCache === false">
<label>Number of minutes</label>
<input class="form-control" style="width: 200px;" type="text" ng-model="$ctrl.analyzer.jobCache" ui-number-mask="0" ui-hide-group-sep>
</div>
</div>
<div class="col-sm-offset-3 col-sm-9">
<p class="help-block">Define the number minutes for analysis report caching, or use the globally defined value.</p>
<p class="help-block">Define the maximum number of requests and the associated unit if applicable.</p>
</div>
</div>
</section>
</section>
Loading

0 comments on commit ed207e6

Please sign in to comment.