diff --git a/app/org/thp/cortex/models/BaseConfig.scala b/app/org/thp/cortex/models/BaseConfig.scala index 9b752dafd..389ea2c25 100644 --- a/app/org/thp/cortex/models/BaseConfig.scala +++ b/app/org/thp/cortex/models/BaseConfig.scala @@ -1,5 +1,8 @@ package org.thp.cortex.models +import scala.concurrent.duration.Duration + +import play.api.Configuration import play.api.libs.json._ import org.elastic4play.utils.Collection.distinctBy @@ -16,16 +19,18 @@ object BaseConfig { "configurationItems" → baseConfig.items, "config" → baseConfig.config.fold(JsObject.empty)(_.jsonConfig)) } - def global(tpe: WorkerType.Type): BaseConfig = { + def global(tpe: WorkerType.Type, configuration: Configuration): BaseConfig = { val typedItems = tpe match { case WorkerType.responder ⇒ Nil case WorkerType.analyzer ⇒ Seq( - ConfigurationDefinitionItem("auto_extract_artifacts", "extract artifacts from full report automatically", WorkerConfigItemType.boolean, multi = false, required = false, Some(JsFalse))) + ConfigurationDefinitionItem("auto_extract_artifacts", "extract artifacts from full report automatically", WorkerConfigItemType.boolean, multi = false, required = false, Some(JsFalse)), + ConfigurationDefinitionItem("jobCache", "maximum time, in minutes, previous result is used if similar job is requested", WorkerConfigItemType.number, multi = false, required = false, configuration.getOptional[Duration]("cache.job").map(d ⇒ JsNumber(d.toMinutes)))) } BaseConfig("global", Nil, typedItems ++ Seq( ConfigurationDefinitionItem("proxy_http", "url of http proxy", WorkerConfigItemType.string, multi = false, required = false, None), ConfigurationDefinitionItem("proxy_https", "url of https proxy", WorkerConfigItemType.string, multi = false, required = false, None), - ConfigurationDefinitionItem("cacerts", "Certificate authotities", WorkerConfigItemType.text, multi = false, required = false, None)), + ConfigurationDefinitionItem("cacerts", "certificate authorities", WorkerConfigItemType.text, multi = false, required = false, None), + ConfigurationDefinitionItem("jobTimeout", "maximum allowed job execution time (in minutes)", WorkerConfigItemType.number, multi = false, required = false, configuration.getOptional[Duration]("job.timeout").map(d ⇒ JsNumber(d.toMinutes)))), None) } val tlp = BaseConfig("tlp", Nil, Seq( diff --git a/app/org/thp/cortex/models/Worker.scala b/app/org/thp/cortex/models/Worker.scala index 50d6a8903..9ebc32393 100644 --- a/app/org/thp/cortex/models/Worker.scala +++ b/app/org/thp/cortex/models/Worker.scala @@ -38,10 +38,11 @@ trait WorkerAttributes { _: AttributeDef ⇒ val dockerImage = optionalAttribute("dockerImage", F.textFmt, "Worker docker image") val dataTypeList = multiAttribute("dataTypeList", F.stringFmt, "List of data type this worker can manage") val configuration = attribute("configuration", F.textFmt, "Configuration of the worker", O.sensitive) - val baseConfig = attribute("baseConfig", F.stringFmt, description="Base configuration key") + val baseConfig = attribute("baseConfig", F.stringFmt, "Base configuration key") val rate = optionalAttribute("rate", F.numberFmt, "Number ") val rateUnit = optionalAttribute("rateUnit", F.enumFmt(RateUnit), "") val jobCache = optionalAttribute("jobCache", F.numberFmt, "") + val jobTimeout = optionalAttribute("jobTimeout", F.numberFmt, "") val tpe = attribute("type", F.enumFmt(WorkerType), "", O.readonly) } diff --git a/app/org/thp/cortex/services/AnalyzerConfigSrv.scala b/app/org/thp/cortex/services/AnalyzerConfigSrv.scala index fbc1d2fd4..ca81b8385 100644 --- a/app/org/thp/cortex/services/AnalyzerConfigSrv.scala +++ b/app/org/thp/cortex/services/AnalyzerConfigSrv.scala @@ -2,6 +2,8 @@ package org.thp.cortex.services import scala.concurrent.{ ExecutionContext, Future } +import play.api.Configuration + import akka.stream.Materializer import javax.inject.{ Inject, Singleton } import org.thp.cortex.models.{ BaseConfig, WorkerConfigModel, WorkerType } @@ -10,6 +12,7 @@ import org.elastic4play.services.{ CreateSrv, FindSrv, UpdateSrv } @Singleton class AnalyzerConfigSrv @Inject() ( + val configuration: Configuration, val workerConfigModel: WorkerConfigModel, val userSrv: UserSrv, val organizationSrv: OrganizationSrv, diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index e1193b885..a59bb1248 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -2,23 +2,25 @@ package org.thp.cortex.services import java.nio.file._ +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Future } import play.api.{ Configuration, Logger } +import akka.actor.ActorSystem import com.spotify.docker.client.messages.HostConfig.Bind import com.spotify.docker.client.messages.{ ContainerConfig, HostConfig } import com.spotify.docker.client.{ DefaultDockerClient, DockerClient } import javax.inject.{ Inject, Singleton } import org.thp.cortex.models._ -@Singleton -class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) { +import org.elastic4play.utils.RichFuture - def this() = this(DefaultDockerClient.fromEnv().build(), false) +@Singleton +class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean, implicit val system: ActorSystem) { @Inject() - def this(config: Configuration) = this( + def this(config: Configuration, system: ActorSystem) = this( new DefaultDockerClient.Builder() .apiVersion(config.getOptional[String]("docker.version").orNull) .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) @@ -29,11 +31,12 @@ class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) { .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) .build(), - config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true)) + config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true), + system: ActorSystem) lazy val logger = Logger(getClass) - def run(jobDirectory: Path, dockerImage: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = { + def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { import scala.collection.JavaConverters._ // client.pull(dockerImage) // ContainerConfig.builder().addVolume() @@ -55,16 +58,21 @@ class DockerJobRunnerSrv(client: DockerClient, autoUpdate: Boolean) { val containerCreation = client.createContainer(containerConfig) // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) logger.info(s"Execute container ${containerCreation.id()}\n" + - s" image : $dockerImage\n" + - s" volume: ${jobDirectory.toAbsolutePath}:/job" + - Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)) + s" timeout: ${timeout.fold("none")(_.toString)}\n" + + s" image : $dockerImage\n" + + s" volume : ${jobDirectory.toAbsolutePath}:/job" + + Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString)) client.startContainer(containerCreation.id()) - Future { + val execution = Future { client.waitContainer(containerCreation.id()) () } + timeout.fold(execution)(t ⇒ execution.withTimeout(t, client.stopContainer(containerCreation.id(), 3))) + .andThen { + case _ ⇒ client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) + } } } diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index d809571ab..3f363d9e4 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -6,6 +6,7 @@ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import java.util.Date +import scala.concurrent.duration.DurationLong import scala.concurrent.{ ExecutionContext, Future } import scala.util.Failure @@ -195,7 +196,7 @@ class JobRunnerSrv @Inject() ( .foldLeft[Option[Future[Unit]]](None) { case (None, "docker") ⇒ worker.dockerImage() - .map(dockerImage ⇒ dockerJobRunnerSrv.run(jobFolder, dockerImage, job)(executionContext)) + .map(dockerImage ⇒ dockerJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes))(executionContext)) .orElse { logger.warn(s"worker ${worker.id} can't be run with docker (doesn't have image)") None @@ -203,7 +204,7 @@ class JobRunnerSrv @Inject() ( case (None, "process") ⇒ worker.command() - .map(command ⇒ processJobRunnerSrv.run(jobFolder, command, job)(executionContext)) + .map(command ⇒ processJobRunnerSrv.run(jobFolder, command, job, worker.jobTimeout().map(_.minutes))(executionContext)) .orElse { logger.warn(s"worker ${worker.id} can't be run with process (doesn't have image)") None diff --git a/app/org/thp/cortex/services/JobSrv.scala b/app/org/thp/cortex/services/JobSrv.scala index 711bbf544..fe568c2c5 100644 --- a/app/org/thp/cortex/services/JobSrv.scala +++ b/app/org/thp/cortex/services/JobSrv.scala @@ -306,7 +306,7 @@ class JobSrv( } else { import org.elastic4play.services.QueryDSL._ - logger.info(s"Looking for similar job (worker=${worker.id}, dataType=$dataType, data=$dataAttachment, tlp=$tlp, parameters=$parameters") + logger.info(s"Looking for similar job in the last ${cache.toMinutes} minutes (worker=${worker.id}, dataType=$dataType, data=$dataAttachment, tlp=$tlp, parameters=$parameters)") val now = new Date().getTime find(and( "workerId" ~= worker.id, diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index ad842cfdc..6fd8596cf 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -2,26 +2,37 @@ package org.thp.cortex.services import java.nio.file.{ Path, Paths } +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Future } import scala.sys.process.{ Process, ProcessLogger } import play.api.Logger -import javax.inject.Singleton +import akka.actor.ActorSystem +import javax.inject.{ Inject, Singleton } import org.thp.cortex.models._ +import org.elastic4play.utils.RichFuture + @Singleton -class ProcessJobRunnerSrv { +class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { lazy val logger = Logger(getClass) - def run(jobDirectory: Path, command: String, job: Job)(implicit ec: ExecutionContext): Future[Unit] = { - Future { - val baseDirectory = Paths.get(command).getParent.getParent - logger.info(s"Execute $command in $baseDirectory") - Process(Seq(command, jobDirectory.toString), baseDirectory.toFile) - .run(ProcessLogger(s ⇒ logger.info(s" Job ${job.id}: $s"))).exitValue() + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { + val baseDirectory = Paths.get(command).getParent.getParent + logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") + val process = Process(Seq(command, jobDirectory.toString), baseDirectory.toFile) + .run(ProcessLogger(s ⇒ logger.info(s" Job ${job.id}: $s"))) + val execution = Future { + process.exitValue() () } + timeout.fold(execution)(t ⇒ execution.withTimeout(t, killProcess(process))) + } + + def killProcess(process: Process): Unit = { + logger.info("Timeout reached, killing process") + process.destroy() } } diff --git a/app/org/thp/cortex/services/ResponderConfigSrv.scala b/app/org/thp/cortex/services/ResponderConfigSrv.scala index 50712a4ff..aa3e6baba 100644 --- a/app/org/thp/cortex/services/ResponderConfigSrv.scala +++ b/app/org/thp/cortex/services/ResponderConfigSrv.scala @@ -2,6 +2,8 @@ package org.thp.cortex.services import scala.concurrent.{ ExecutionContext, Future } +import play.api.Configuration + import akka.stream.Materializer import javax.inject.{ Inject, Singleton } import org.thp.cortex.models.{ BaseConfig, WorkerConfigModel, WorkerType } @@ -10,6 +12,7 @@ import org.elastic4play.services.{ CreateSrv, FindSrv, UpdateSrv } @Singleton class ResponderConfigSrv @Inject() ( + val configuration: Configuration, val workerConfigModel: WorkerConfigModel, val userSrv: UserSrv, val organizationSrv: OrganizationSrv, diff --git a/app/org/thp/cortex/services/WorkerConfigSrv.scala b/app/org/thp/cortex/services/WorkerConfigSrv.scala index 87484767d..f09906433 100644 --- a/app/org/thp/cortex/services/WorkerConfigSrv.scala +++ b/app/org/thp/cortex/services/WorkerConfigSrv.scala @@ -2,6 +2,7 @@ package org.thp.cortex.services import scala.concurrent.{ ExecutionContext, Future } +import play.api.Configuration import play.api.libs.json._ import akka.NotUsed @@ -16,6 +17,7 @@ import org.elastic4play.database.ModifyConfig import org.elastic4play.services._ trait WorkerConfigSrv { + val configuration: Configuration val userSrv: UserSrv val createSrv: CreateSrv val updateSrv: UpdateSrv @@ -41,7 +43,7 @@ trait WorkerConfigSrv { .mapMaterializedValue(_ ⇒ NotUsed) .runWith(Sink.seq) .map { baseConfigs ⇒ - (BaseConfig.global(workerType) +: baseConfigs) + (BaseConfig.global(workerType, configuration) +: baseConfigs) .map(c ⇒ c.name → c) .toMap } diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 813ff9a49..9207770cf 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -196,7 +196,7 @@ class WorkerSrv @Inject() ( def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit authContext: AuthContext): Future[Worker] = { val rawConfig = workerFields.getValue("configuration").fold(JsObject.empty)(_.as[JsObject]) - val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe).items ++ BaseConfig.tlp.items ++ BaseConfig.pap.items + val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe, config).items ++ BaseConfig.tlp.items ++ BaseConfig.pap.items val configOrErrors = configItems .validatedBy(_.read(rawConfig)) .map(JsObject.apply) diff --git a/conf/reference.conf b/conf/reference.conf index 7e65ca6f2..d624ff9d9 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -9,6 +9,7 @@ cache { organization = 5 minutes } +job.timeout = 30 minutes # HTTP filters play.filters { diff --git a/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html b/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html index b6e26444b..a907ccf10 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html +++ b/www/src/app/pages/admin/organizations/components/analyzers/analyzer-config-form.html @@ -1,6 +1,6 @@

Base details

- +
@@ -18,7 +18,7 @@

Apply defaults

- +
@@ -30,7 +30,7 @@

Apply defaults - +

@@ -43,9 +43,9 @@

-

+ - +
@@ -64,7 +64,7 @@

-
+

@@ -78,6 +78,18 @@

+
+ +
+ +
+
+
+ +
+ +
+
@@ -89,35 +101,19 @@

Set to True to enable automatic observables extraction from analysis reports.

-

+
-
+
-
-
-

Define the maximum number of requests and the associated unit if applicable.

-
-
-
- -
-
- - -
-
- - -
-

Define the number minutes for analysis report caching, or use the globally defined value.

+

Define the maximum number of requests and the associated unit if applicable.

- \ No newline at end of file + diff --git a/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.controller.js b/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.controller.js index 7f5ed4ea8..3fe66ad97 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.controller.js +++ b/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.controller.js @@ -32,7 +32,8 @@ export default class AnalyzerEditController { configuration: {}, rate: undefined, rateUnit: undefined, - jobCache: null + jobCache: undefined, + jobTimeout: undefined }; _.forEach(this.definition.configurationItems, item => { @@ -74,6 +75,14 @@ export default class AnalyzerEditController { analyzer.configuration.max_pap = 2; } + if (analyzer.jobCache === undefined) { + analyzer.jobCache = this.globalConfig.config.jobCache; + } + + if (analyzer.jobTimeout === undefined) { + analyzer.jobTimeout = this.globalConfig.config.jobTimeout; + } + this.analyzer = analyzer; } } @@ -84,4 +93,4 @@ export default class AnalyzerEditController { cancel() { this.$uibModalInstance.dismiss('cancel'); } -} \ No newline at end of file +} diff --git a/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.modal.html b/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.modal.html index c65f05f27..d94c3c6f6 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.modal.html +++ b/www/src/app/pages/admin/organizations/components/analyzers/analyzer.edit.modal.html @@ -3,7 +3,7 @@
- \ No newline at end of file + diff --git a/www/src/app/pages/admin/organizations/components/analyzers/analyzers-list.controller.js b/www/src/app/pages/admin/organizations/components/analyzers/analyzers-list.controller.js index 1a2e0d44e..cf801c118 100644 --- a/www/src/app/pages/admin/organizations/components/analyzers/analyzers-list.controller.js +++ b/www/src/app/pages/admin/organizations/components/analyzers/analyzers-list.controller.js @@ -102,7 +102,8 @@ export default class OrganizationAnalyzersController { 'rate', 'rateUnit', 'name', - 'jobCache' + 'jobCache', + 'jobTimeout' ) ); } diff --git a/www/src/app/pages/admin/organizations/components/responders/responder.edit.controller.js b/www/src/app/pages/admin/organizations/components/responders/responder.edit.controller.js index 548a77dce..9d21e0772 100644 --- a/www/src/app/pages/admin/organizations/components/responders/responder.edit.controller.js +++ b/www/src/app/pages/admin/organizations/components/responders/responder.edit.controller.js @@ -48,7 +48,9 @@ export default class ResponderEditController { const globalConfig = [ 'proxy_http', 'proxy_https', - 'cacerts' + 'cacerts', + 'jobCache', + 'jobTimeout' ]; _.forEach(globalConfig, cnf => { if (responder.configuration[cnf] === undefined) { @@ -82,4 +84,4 @@ export default class ResponderEditController { cancel() { this.$uibModalInstance.dismiss('cancel'); } -} \ No newline at end of file +}