From 01c6b2ade264e231b754eaba5256f652a7162fcc Mon Sep 17 00:00:00 2001 From: To-om Date: Wed, 25 May 2022 16:55:24 +0200 Subject: [PATCH] #412 Improve logs --- .../thp/cortex/controllers/AnalyzerCtrl.scala | 4 +- .../cortex/controllers/AttachmentCtrl.scala | 13 +-- .../cortex/controllers/ResponderCtrl.scala | 2 +- .../thp/cortex/controllers/StreamCtrl.scala | 10 +- app/org/thp/cortex/models/Audit.scala | 15 +-- .../thp/cortex/services/AccessLogFilter.scala | 32 ++++++ app/org/thp/cortex/services/CustomWSAPI.scala | 22 ++-- .../cortex/services/DockerJobRunnerSrv.scala | 6 +- .../thp/cortex/services/ErrorHandler.scala | 103 ++++++++---------- .../thp/cortex/services/JobRunnerSrv.scala | 44 +++++--- app/org/thp/cortex/services/JobSrv.scala | 3 +- app/org/thp/cortex/services/KeyAuthSrv.scala | 19 ++-- .../thp/cortex/services/LocalAuthSrv.scala | 8 +- app/org/thp/cortex/services/OAuth2Srv.scala | 17 +-- .../cortex/services/ProcessJobRunnerSrv.scala | 12 +- app/org/thp/cortex/services/WorkerSrv.scala | 25 +++-- app/org/thp/cortex/util/JsonConfig.scala | 2 +- conf/reference.conf | 4 +- 18 files changed, 185 insertions(+), 156 deletions(-) create mode 100644 app/org/thp/cortex/services/AccessLogFilter.scala diff --git a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala index 644f2c180..9a82029e2 100644 --- a/app/org/thp/cortex/controllers/AnalyzerCtrl.scala +++ b/app/org/thp/cortex/controllers/AnalyzerCtrl.scala @@ -44,7 +44,9 @@ class AnalyzerCtrl @Inject() ( private def analyzerJson(isAdmin: Boolean)(analyzer: Worker): JsObject = if (isAdmin) - analyzer.toJson + ("configuration" -> Json.parse(analyzer.configuration())) + ("analyzerDefinitionId" -> JsString(analyzer.workerDefinitionId())) + analyzer.toJson + ("configuration" -> Json.parse(analyzer.configuration())) + ("analyzerDefinitionId" -> JsString( + analyzer.workerDefinitionId() + )) else analyzer.toJson + ("analyzerDefinitionId" -> JsString(analyzer.workerDefinitionId())) diff --git a/app/org/thp/cortex/controllers/AttachmentCtrl.scala b/app/org/thp/cortex/controllers/AttachmentCtrl.scala index 2f849db8e..2aeaf0270 100644 --- a/app/org/thp/cortex/controllers/AttachmentCtrl.scala +++ b/app/org/thp/cortex/controllers/AttachmentCtrl.scala @@ -18,8 +18,7 @@ import play.api.libs.Files.DefaultTemporaryFileCreator import play.api.mvc._ import play.api.{mvc, Configuration} -/** - * Controller used to access stored attachments (plain or zipped) +/** Controller used to access stored attachments (plain or zipped) */ @Singleton class AttachmentCtrl( @@ -41,8 +40,7 @@ class AttachmentCtrl( ) = this(configuration.get[String]("datastore.attachment.password"), tempFileCreator, attachmentSrv, authenticated, components, executionContextSrv) - /** - * Download an attachment, identified by its hash, in plain format + /** Download an attachment, identified by its hash, in plain format * File name can be specified. This method is not protected : browser will * open the document directly. It must be used only for safe file */ @@ -51,7 +49,7 @@ class AttachmentCtrl( executionContextSrv.withDefault { implicit ec => if (hash.startsWith("{{")) // angularjs hack NoContent - else if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + else if (name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).nonEmpty) mvc.Results.BadRequest("File name is invalid") else Result( @@ -69,15 +67,14 @@ class AttachmentCtrl( } } - /** - * Download an attachment, identified by its hash, in zip format. + /** Download an attachment, identified by its hash, in zip format. * Zip file is protected by the password "malware" * File name can be specified (zip extension is append) */ @Timed("controllers.AttachmentCtrl.downloadZip") def downloadZip(hash: String, name: Option[String]): Action[AnyContent] = authenticated(Roles.read) { _ => executionContextSrv.withDefault { implicit ec => - if (!name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).isEmpty) + if (name.getOrElse("").intersect(AttachmentAttributeFormat.forbiddenChar).nonEmpty) BadRequest("File name is invalid") else { val f = tempFileCreator.create("zip", hash).path diff --git a/app/org/thp/cortex/controllers/ResponderCtrl.scala b/app/org/thp/cortex/controllers/ResponderCtrl.scala index e74648061..89dd52268 100644 --- a/app/org/thp/cortex/controllers/ResponderCtrl.scala +++ b/app/org/thp/cortex/controllers/ResponderCtrl.scala @@ -69,7 +69,7 @@ class ResponderCtrl @Inject() ( def listForType(dataType: String): Action[AnyContent] = authenticated(Roles.read).async { request => import org.elastic4play.services.QueryDSL._ val (responderList, responderCount) = workerSrv.findRespondersForUser(request.userId, "dataTypeList" ~= dataType, Some("all"), Nil) - renderer.toOutput(OK, responderList.map(responderJson(false)), responderCount) + renderer.toOutput(OK, responderList.map(responderJson(isAdmin = false)), responderCount) } def create(responderDefinitionId: String): Action[Fields] = authenticated(Roles.orgAdmin).async(fieldsBodyParser) { implicit request => diff --git a/app/org/thp/cortex/controllers/StreamCtrl.scala b/app/org/thp/cortex/controllers/StreamCtrl.scala index 2a0450f5c..0aa42ab86 100644 --- a/app/org/thp/cortex/controllers/StreamCtrl.scala +++ b/app/org/thp/cortex/controllers/StreamCtrl.scala @@ -70,8 +70,7 @@ class StreamCtrl( ) private[StreamCtrl] lazy val logger = Logger(getClass) - /** - * Create a new stream entry with the event head + /** Create a new stream entry with the event head */ @Timed("controllers.StreamCtrl.create") def create: Action[AnyContent] = authenticated(Roles.read) { @@ -85,8 +84,7 @@ class StreamCtrl( private[controllers] def isValidStreamId(streamId: String): Boolean = streamId.length == 10 && streamId.forall(alphanumeric.contains) - /** - * Get events linked to the identified stream entry + /** Get events linked to the identified stream entry * This call waits up to "refresh", if there is no event, return empty response */ @Timed("controllers.StreamCtrl.get") @@ -113,10 +111,10 @@ class StreamCtrl( } @Timed("controllers.StreamCtrl.status") - def status = Action { implicit request => + def status: Action[AnyContent] = Action { implicit request => val status = authenticated.expirationStatus(request) match { case ExpirationWarning(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> true) - case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) + case ExpirationError => Json.obj("remaining" -> 0, "warning" -> true) case ExpirationOk(duration) => Json.obj("remaining" -> duration.toSeconds, "warning" -> false) } Ok(status) diff --git a/app/org/thp/cortex/models/Audit.scala b/app/org/thp/cortex/models/Audit.scala index cf94b1eca..4d8dfd99b 100644 --- a/app/org/thp/cortex/models/Audit.scala +++ b/app/org/thp/cortex/models/Audit.scala @@ -70,13 +70,14 @@ class AuditModel(auditName: String, auditedModels: immutable.Set[AuditedModel]) def mergeAttributeFormat(context: String, format1: AttributeFormat[_], format2: AttributeFormat[_]): Option[AttributeFormat[_]] = (format1, format2) match { - case (OptionalAttributeFormat(f1), f2) => mergeAttributeFormat(context, f1, f2) - case (f1, OptionalAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2) - case (MultiAttributeFormat(f1), MultiAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2).map(MultiAttributeFormat(_)) - case (f1, EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_)) => mergeAttributeFormat(context, f1, StringAttributeFormat) - case (EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_), f2) => mergeAttributeFormat(context, StringAttributeFormat, f2) - case (ObjectAttributeFormat(subAttributes1), ObjectAttributeFormat(subAttributes2)) => mergeAttributes(context, subAttributes1 ++ subAttributes2) - case (f1, f2) if f1 == f2 => Some(f1) + case (OptionalAttributeFormat(f1), f2) => mergeAttributeFormat(context, f1, f2) + case (f1, OptionalAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2) + case (MultiAttributeFormat(f1), MultiAttributeFormat(f2)) => mergeAttributeFormat(context, f1, f2).map(MultiAttributeFormat(_)) + case (f1, EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_)) => mergeAttributeFormat(context, f1, StringAttributeFormat) + case (EnumerationAttributeFormat(_) | ListEnumerationAttributeFormat(_), f2) => mergeAttributeFormat(context, StringAttributeFormat, f2) + case (ObjectAttributeFormat(subAttributes1), ObjectAttributeFormat(subAttributes2)) => + mergeAttributes(context, subAttributes1 ++ subAttributes2) + case (f1, f2) if f1 == f2 => Some(f1) case (f1, f2) => logger.warn(s"Attribute $f1 != $f2") None diff --git a/app/org/thp/cortex/services/AccessLogFilter.scala b/app/org/thp/cortex/services/AccessLogFilter.scala new file mode 100644 index 000000000..af0d0ef06 --- /dev/null +++ b/app/org/thp/cortex/services/AccessLogFilter.scala @@ -0,0 +1,32 @@ +package org.thp.cortex.services + +import play.api.Logger +import play.api.http.HttpErrorHandler +import play.api.mvc.{EssentialAction, EssentialFilter, RequestHeader} + +import javax.inject.Inject +import scala.concurrent.ExecutionContext + +class AccessLogFilter @Inject() (errorHandler: HttpErrorHandler)(implicit ec: ExecutionContext) extends EssentialFilter { + + val logger: Logger = Logger(getClass) + + override def apply(next: EssentialAction): EssentialAction = + (requestHeader: RequestHeader) => { + val startTime = System.currentTimeMillis + next(requestHeader) + .recoverWith { case error => errorHandler.onServerError(requestHeader, error) } + .map { result => + val endTime = System.currentTimeMillis + val requestTime = endTime - startTime + + logger.info( + s"${requestHeader.remoteAddress} ${requestHeader.method} ${requestHeader.uri} took ${requestTime}ms and returned ${result.header.status} ${result + .body + .contentLength + .fold("")(b => s"$b bytes")}" + ) + result.withHeaders("Request-Time" -> requestTime.toString) + } + } +} diff --git a/app/org/thp/cortex/services/CustomWSAPI.scala b/app/org/thp/cortex/services/CustomWSAPI.scala index 9452066cf..f59053eed 100644 --- a/app/org/thp/cortex/services/CustomWSAPI.scala +++ b/app/org/thp/cortex/services/CustomWSAPI.scala @@ -19,7 +19,7 @@ object CustomWSAPI { .parse() def parseProxyConfig(config: Configuration): Option[WSProxyServer] = - config.getOptional[Configuration]("play.ws.proxy").map { proxyConfig ⇒ + config.getOptional[Configuration]("play.ws.proxy").map { proxyConfig => DefaultWSProxyServer( proxyConfig.get[String]("host"), proxyConfig.get[Int]("port"), @@ -35,7 +35,7 @@ object CustomWSAPI { def getWS(config: Configuration)(implicit mat: Materializer): AhcWSClient = { val clientConfig = parseWSConfig(config) val clientConfigWithTruststore = config.getOptional[String]("play.cert") match { - case Some(p) ⇒ + case Some(p) => logger.warn("""Use of "cert" parameter in configuration file is deprecated. Please use: | ws.ssl { | trustManager = { @@ -67,7 +67,7 @@ object CustomWSAPI { ) ) ) - case None ⇒ clientConfig + case None => clientConfig } AhcWSClient(clientConfigWithTruststore, None) } @@ -84,13 +84,13 @@ object CustomWSAPI { @Singleton class CustomWSAPI( - ws: AhcWSClient, - val proxy: Option[WSProxyServer], - config: Configuration, - environment: Environment, - lifecycle: ApplicationLifecycle, - mat: Materializer - ) extends WSClient { + ws: AhcWSClient, + val proxy: Option[WSProxyServer], + config: Configuration, + environment: Environment, + lifecycle: ApplicationLifecycle, + mat: Materializer +) extends WSClient { private[CustomWSAPI] lazy val logger = Logger(getClass) @Inject() def this(config: Configuration, environment: Environment, lifecycle: ApplicationLifecycle, mat: Materializer) = @@ -110,7 +110,7 @@ class CustomWSAPI( try { new CustomWSAPI(Configuration(subConfig.underlying.atKey("play").withFallback(config.underlying)), environment, lifecycle, mat) } catch { - case NonFatal(e) ⇒ + case NonFatal(e) => logger.error(s"WSAPI configuration error, use default values", e) this } diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index fe92ab568..9415815f5 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -50,10 +50,11 @@ class DockerJobRunnerSrv( system: ActorSystem ) - lazy val logger = Logger(getClass) + lazy val logger: Logger = Logger(getClass) lazy val isAvailable: Boolean = Try { + logger.debug(s"Retrieve docker information ...") logger.info(s"Docker is available:\n${client.info()}") true }.recover { @@ -103,6 +104,8 @@ class DockerJobRunnerSrv( else containerConfigBuilder.build() val containerCreation = client.createContainer(containerConfig) // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) + + logger.debug(s"Container configuration: $containerConfig") logger.info( s"Execute container ${containerCreation.id()}\n" + s" timeout: ${timeout.fold("none")(_.toString)}\n" + @@ -119,6 +122,7 @@ class DockerJobRunnerSrv( case r => val outputFile = jobDirectory.resolve("output").resolve("output.json") if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { + logger.warn(s"The worker didn't generate output file, use output stream.") val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) .fold(e => s"Container logs can't be read (${e.getMessage})", identity) val message = r.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) diff --git a/app/org/thp/cortex/services/ErrorHandler.scala b/app/org/thp/cortex/services/ErrorHandler.scala index fefa006b4..5fb22295b 100644 --- a/app/org/thp/cortex/services/ErrorHandler.scala +++ b/app/org/thp/cortex/services/ErrorHandler.scala @@ -1,82 +1,75 @@ package org.thp.cortex.services -import java.net.ConnectException - -import scala.concurrent.Future - +import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites +import org.elastic4play._ import org.thp.cortex.models.{JobNotFoundError, RateLimitExceeded, WorkerNotFoundError} import play.api.Logger +import play.api.http.Status.{BAD_REQUEST, FORBIDDEN, NOT_FOUND} import play.api.http.{HttpErrorHandler, Status, Writeable} -import play.api.libs.json.{JsNull, JsValue, Json} -import play.api.mvc.{RequestHeader, ResponseHeader, Result, Results} +import play.api.libs.json.{JsNull, JsString, JsValue, Json} +import play.api.mvc.{RequestHeader, ResponseHeader, Result} -import org.elastic4play.{ - AttributeCheckingError, - AuthenticationError, - AuthorizationError, - BadRequestError, - CreateError, - ErrorWithObject, - GetError, - IndexNotFoundException, - InternalError, - MultiError, - NotFoundError, - SearchError, - UpdateError -} -import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites +import java.net.ConnectException +import scala.concurrent.Future -/** - * This class handles errors. It traverses all causes of exception to find known error and shows the appropriate message +/** This class handles errors. It traverses all causes of exception to find known error and shows the appropriate message */ class ErrorHandler extends HttpErrorHandler { private[ErrorHandler] lazy val logger = Logger(getClass) - def onClientError(request: RequestHeader, statusCode: Int, message: String): Future[Result] = Future.successful { - Results.Status(statusCode)(s"A client error occurred on ${request.method} ${request.uri} : $message") + def onClientError(request: RequestHeader, statusCode: Int, message: String): Future[Result] = { + val tpe = statusCode match { + case BAD_REQUEST => "BadRequest" + case FORBIDDEN => "Forbidden" + case NOT_FOUND => "NotFound" + case _ => "Unknown" + } + Future.successful(toResult(statusCode, Json.obj("type" -> tpe, "message" -> message))) } - def toErrorResult(ex: Throwable): Option[(Int, JsValue)] = + def toErrorResult(ex: Throwable): (Int, JsValue) = ex match { - case AuthenticationError(message) => Some(Status.UNAUTHORIZED -> Json.obj("type" -> "AuthenticationError", "message" -> message)) - case AuthorizationError(message) => Some(Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message)) + case AuthenticationError(message) => Status.UNAUTHORIZED -> Json.obj("type" -> "AuthenticationError", "message" -> message) + case AuthorizationError(message) => Status.FORBIDDEN -> Json.obj("type" -> "AuthorizationError", "message" -> message) case UpdateError(_, message, attributes) => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "UpdateError", "message" -> message, "object" -> attributes)) - case rle: RateLimitExceeded => Some(Status.TOO_MANY_REQUESTS -> Json.obj("type" -> "RateLimitExceeded", "message" -> rle.getMessage)) - case InternalError(message) => Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message)) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "UpdateError", "message" -> message, "object" -> attributes) + case rle: RateLimitExceeded => Status.TOO_MANY_REQUESTS -> Json.obj("type" -> "RateLimitExceeded", "message" -> rle.getMessage) + case InternalError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "InternalError", "message" -> message) case nfe: NumberFormatException => - Some(Status.BAD_REQUEST -> Json.obj("type" -> "NumberFormatException", "message" -> ("Invalid format " + nfe.getMessage))) - case NotFoundError(message) => Some(Status.NOT_FOUND -> Json.obj("type" -> "NotFoundError", "message" -> message)) - case BadRequestError(message) => Some(Status.BAD_REQUEST -> Json.obj("type" -> "BadRequest", "message" -> message)) - case SearchError(message) => Some(Status.BAD_REQUEST -> Json.obj("type" -> "SearchError", "message" -> s"$message")) - case ace: AttributeCheckingError => Some(Status.BAD_REQUEST -> Json.toJson(ace)) - case iae: IllegalArgumentException => Some(Status.BAD_REQUEST -> Json.obj("type" -> "IllegalArgument", "message" -> iae.getMessage)) + Status.BAD_REQUEST -> Json.obj("type" -> "NumberFormatException", "message" -> ("Invalid format " + nfe.getMessage)) + case NotFoundError(message) => Status.NOT_FOUND -> Json.obj("type" -> "NotFoundError", "message" -> message) + case BadRequestError(message) => Status.BAD_REQUEST -> Json.obj("type" -> "BadRequest", "message" -> message) + case SearchError(message) => Status.BAD_REQUEST -> Json.obj("type" -> "SearchError", "message" -> s"$message") + case ace: AttributeCheckingError => Status.BAD_REQUEST -> Json.toJson(ace) + case iae: IllegalArgumentException => Status.BAD_REQUEST -> Json.obj("type" -> "IllegalArgument", "message" -> iae.getMessage) case _: ConnectException => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "NoNodeAvailable", "message" -> "ElasticSearch cluster is unreachable")) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "NoNodeAvailable", "message" -> "ElasticSearch cluster is unreachable") case CreateError(_, message, attributes) => - Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "CreateError", "message" -> message, "object" -> attributes)) - case ErrorWithObject(tpe, message, obj) => Some(Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj)) - case GetError(message) => Some(Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "GetError", "message" -> message)) + Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "CreateError", "message" -> message, "object" -> attributes) + case ErrorWithObject(tpe, message, obj) => Status.BAD_REQUEST -> Json.obj("type" -> tpe, "message" -> message, "object" -> obj) + case GetError(message) => Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> "GetError", "message" -> message) case MultiError(message, exceptions) => - val suberrors = exceptions.map(e => toErrorResult(e)).collect { - case Some((_, j)) => j - } - Some(Status.MULTI_STATUS -> Json.obj("type" -> "MultiError", "error" -> message, "suberrors" -> suberrors)) - case JobNotFoundError(jobId) => Some(Status.NOT_FOUND -> Json.obj("type" -> "JobNotFoundError", "message" -> s"Job $jobId not found")) + val suberrors = exceptions.map(e => toErrorResult(e)) + Status.MULTI_STATUS -> Json.obj("type" -> "MultiError", "error" -> message, "suberrors" -> suberrors) + case JobNotFoundError(jobId) => Status.NOT_FOUND -> Json.obj("type" -> "JobNotFoundError", "message" -> s"Job $jobId not found") case WorkerNotFoundError(analyzerId) => - Some(Status.NOT_FOUND -> Json.obj("type" -> "AnalyzerNotFoundError", "message" -> s"analyzer $analyzerId not found")) - case IndexNotFoundException => Some(520 -> JsNull) - case t: Throwable => Option(t.getCause).flatMap(toErrorResult) + Status.NOT_FOUND -> Json.obj("type" -> "AnalyzerNotFoundError", "message" -> s"analyzer $analyzerId not found") + case IndexNotFoundException => 520 -> JsNull + case _ if Option(ex.getCause).isDefined => toErrorResult(ex.getCause) + case _ => + logger.error("Internal error", ex) + val json = Json.obj("type" -> ex.getClass.getName, "message" -> ex.getMessage) + Status.INTERNAL_SERVER_ERROR -> (if (ex.getCause == null) json else json + ("cause" -> JsString(ex.getCause.getMessage))) } - def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]) = Result(header = ResponseHeader(status), body = writeable.toEntity(c)) + def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]): Result = + Result(header = ResponseHeader(status), body = writeable.toEntity(c)) def onServerError(request: RequestHeader, exception: Throwable): Future[Result] = { - val (status, body) = toErrorResult(exception).getOrElse( - Status.INTERNAL_SERVER_ERROR -> Json.obj("type" -> exception.getClass.getName, "message" -> exception.getMessage) - ) - logger.info(s"${request.method} ${request.uri} returned $status", exception) + val (status, body) = toErrorResult(exception) + if (!exception.isInstanceOf[AuthenticationError]) + if (logger.isDebugEnabled) logger.warn(s"${request.method} ${request.uri} returned $status", exception) + else logger.warn(s"${request.method} ${request.uri} returned $status") Future.successful(toResult(status, body)) } } diff --git a/app/org/thp/cortex/services/JobRunnerSrv.scala b/app/org/thp/cortex/services/JobRunnerSrv.scala index 6913e8050..e649470ec 100644 --- a/app/org/thp/cortex/services/JobRunnerSrv.scala +++ b/app/org/thp/cortex/services/JobRunnerSrv.scala @@ -5,20 +5,17 @@ import java.nio.charset.StandardCharsets import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes import java.util.Date - import scala.concurrent.duration.DurationLong import scala.concurrent.{ExecutionContext, Future} -import scala.util.Failure - +import scala.util.{Failure, Success} import play.api.libs.json._ import play.api.{Configuration, Logger} - import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.FileIO + import javax.inject.Inject import org.thp.cortex.models._ - import org.elastic4play.BadRequestError import org.elastic4play.controllers.{Fields, FileInputValue} import org.elastic4play.database.ModifyConfig @@ -39,10 +36,11 @@ class JobRunnerSrv @Inject() ( implicit val mat: Materializer ) { - val logger = Logger(getClass) + val logger: Logger = Logger(getClass) lazy val analyzerExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("analyzer") lazy val responderExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("responder") val jobDirectory: Path = Paths.get(config.get[String]("job.directory")) + private val globalKeepJobFolder: Boolean = config.get[Boolean]("job.keepJobFolder") private val runners: Seq[String] = config .getOptional[Seq[String]]("job.runners") @@ -90,7 +88,8 @@ class JobRunnerSrv @Inject() ( } private def prepareJobFolder(worker: Worker, job: Job): Future[Path] = { - val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") + val jobFolder = Files.createTempDirectory(jobDirectory, s"cortex-job-${job.id}-") + logger.debug(s"Job folder is $jobFolder") val inputJobFolder = Files.createDirectories(jobFolder.resolve("input")) Files.createDirectories(jobFolder.resolve("output")) @@ -138,22 +137,26 @@ class JobRunnerSrv @Inject() ( ("config" -> config) } .map { input => + logger.debug(s"Write worker input: $input") Files.write(inputJobFolder.resolve("input.json"), input.toString.getBytes(StandardCharsets.UTF_8)) jobFolder } .recoverWith { case error => - delete(jobFolder) + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + delete(jobFolder) Future.failed(error) } } - private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext) = { + private def extractReport(jobFolder: Path, job: Job)(implicit authContext: AuthContext): Future[Job] = { val outputFile = jobFolder.resolve("output").resolve("output.json") if (Files.exists(outputFile)) { - val is = Files.newInputStream(outputFile) - val report = Json.parse(is) - is.close() + logger.debug(s"Job output: ${new String(Files.readAllBytes(outputFile))}") + val is = Files.newInputStream(outputFile) + val report = + try Json.parse(is) + finally is.close() val success = (report \ "success").asOpt[Boolean].getOrElse(false) if (success) { @@ -233,13 +236,18 @@ class JobRunnerSrv @Inject() ( .getOrElse(Future.failed(BadRequestError("Worker cannot be run"))) } yield j finishedJob - .transformWith { r => - r.fold( - error => endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))), - _ => extractReport(jobFolder, job) - ) + .transformWith { + case _: Success[_] => + extractReport(jobFolder, job) + case Failure(error) => + endJob(job, JobStatus.Failure, Option(error.getMessage), Some(readFile(jobFolder.resolve("input").resolve("input.json")))) + + } + .andThen { + case _ => + if (!(job.params \ "keepJobFolder").asOpt[Boolean].contains(true) || globalKeepJobFolder) + delete(jobFolder) } - .andThen { case _ => delete(jobFolder) } } private def readFile(input: Path): String = new String(Files.readAllBytes(input), StandardCharsets.UTF_8) diff --git a/app/org/thp/cortex/services/JobSrv.scala b/app/org/thp/cortex/services/JobSrv.scala index cbf470bdb..915e16ca4 100644 --- a/app/org/thp/cortex/services/JobSrv.scala +++ b/app/org/thp/cortex/services/JobSrv.scala @@ -190,7 +190,8 @@ class JobSrv( case (dt, Right(fiv)) => dt -> attachmentSrv.save(fiv).map(Right.apply) case (dt, Left(data)) => dt -> Future.successful(Left(data)) }.fold( - typeDataAttachment => typeDataAttachment._2.flatMap(da => create(worker, typeDataAttachment._1, da, tlp, pap, message, parameters, label, force)), + typeDataAttachment => + typeDataAttachment._2.flatMap(da => create(worker, typeDataAttachment._1, da, tlp, pap, message, parameters, label, force)), errors => { val attributeError = AttributeCheckingError("job", errors) logger.error("legacy job create fails", attributeError) diff --git a/app/org/thp/cortex/services/KeyAuthSrv.scala b/app/org/thp/cortex/services/KeyAuthSrv.scala index a8dc8e130..62eb29417 100644 --- a/app/org/thp/cortex/services/KeyAuthSrv.scala +++ b/app/org/thp/cortex/services/KeyAuthSrv.scala @@ -1,20 +1,17 @@ package org.thp.cortex.services -import java.util.Base64 -import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random - -import play.api.libs.json.JsArray -import play.api.mvc.RequestHeader - import akka.stream.Materializer import akka.stream.scaladsl.Sink - import org.elastic4play.controllers.Fields import org.elastic4play.services.{AuthCapability, AuthContext, AuthSrv} import org.elastic4play.{AuthenticationError, BadRequestError} +import play.api.libs.json.JsArray +import play.api.mvc.RequestHeader + +import java.util.Base64 +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random @Singleton class KeyAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { @@ -26,7 +23,7 @@ class KeyAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, Base64.getEncoder.encodeToString(bytes) } - override val capabilities = Set(AuthCapability.authByKey) + override val capabilities: Set[AuthCapability.Type] = Set(AuthCapability.authByKey) override def authenticate(key: String)(implicit request: RequestHeader): Future[AuthContext] = { import org.elastic4play.services.QueryDSL._ diff --git a/app/org/thp/cortex/services/LocalAuthSrv.scala b/app/org/thp/cortex/services/LocalAuthSrv.scala index ae1af07b6..d45b9425d 100644 --- a/app/org/thp/cortex/services/LocalAuthSrv.scala +++ b/app/org/thp/cortex/services/LocalAuthSrv.scala @@ -1,15 +1,11 @@ package org.thp.cortex.services import javax.inject.{Inject, Singleton} - import scala.concurrent.{ExecutionContext, Future} import scala.util.Random - import play.api.mvc.RequestHeader - import akka.stream.Materializer import org.thp.cortex.models.User - import org.elastic4play.controllers.Fields import org.elastic4play.services.{AuthCapability, AuthContext, AuthSrv} import org.elastic4play.utils.Hasher @@ -18,8 +14,8 @@ import org.elastic4play.{AuthenticationError, AuthorizationError} @Singleton class LocalAuthSrv @Inject() (userSrv: UserSrv, implicit val ec: ExecutionContext, implicit val mat: Materializer) extends AuthSrv { - val name = "local" - override val capabilities = Set(AuthCapability.changePassword, AuthCapability.setPassword) + val name: String = "local" + override val capabilities: Set[AuthCapability.Type] = Set(AuthCapability.changePassword, AuthCapability.setPassword) private[services] def doAuthenticate(user: User, password: String): Boolean = user.password().map(_.split(",", 2)).fold(false) { diff --git a/app/org/thp/cortex/services/OAuth2Srv.scala b/app/org/thp/cortex/services/OAuth2Srv.scala index 27ea7f9b6..dfbed6d1d 100644 --- a/app/org/thp/cortex/services/OAuth2Srv.scala +++ b/app/org/thp/cortex/services/OAuth2Srv.scala @@ -87,7 +87,7 @@ class OAuth2Srv( if (!isSecuredAuthCode(request)) { logger.debug("Code or state is not provided, redirect to authorizationUrl") Future.successful(Left(authRedirect(oauth2Config))) - } else { + } else (for { token <- getToken(oauth2Config, request) userData <- getUserData(oauth2Config, token) @@ -95,14 +95,12 @@ class OAuth2Srv( } yield Right(authContext)).recoverWith { case error => Future.failed(AuthenticationError(s"OAuth2 authentication failure: ${error.getMessage}")) } - } } private def isSecuredAuthCode(request: RequestHeader): Boolean = request.queryString.contains("code") && request.queryString.contains("state") - /** - * Filter checking whether we initiate the OAuth2 process + /** Filter checking whether we initiate the OAuth2 process * and redirecting to OAuth2 server if necessary * @return */ @@ -122,12 +120,11 @@ class OAuth2Srv( .withSession("state" -> state) } - /** - * Enriching the initial request with OAuth2 token gotten + /** Enriching the initial request with OAuth2 token gotten * from OAuth2 code * @return */ - private def getToken[A](oauth2Config: OAuth2Config, request: RequestHeader): Future[String] = { + private def getToken(oauth2Config: OAuth2Config, request: RequestHeader): Future[String] = { val token = for { state <- request.session.get("state") @@ -147,8 +144,7 @@ class OAuth2Srv( token.getOrElse(Future.failed(BadRequestError("OAuth2 states mismatch"))) } - /** - * Querying the OAuth2 server for a token + /** Querying the OAuth2 server for a token * @param code the previously obtained code * @return */ @@ -181,8 +177,7 @@ class OAuth2Srv( } } - /** - * Client query for user data with OAuth2 token + /** Client query for user data with OAuth2 token * @param token the token * @return */ diff --git a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala index 0030805d7..d7db36e48 100644 --- a/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/ProcessJobRunnerSrv.scala @@ -2,14 +2,15 @@ package org.thp.cortex.services import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths} - import akka.actor.ActorSystem + import javax.inject.{Inject, Singleton} import org.elastic4play.utils.RichFuture import org.thp.cortex.models._ import play.api.Logger import play.api.libs.json.Json +import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} import scala.sys.process.{Process, ProcessLogger, _} @@ -18,9 +19,9 @@ import scala.util.Try @Singleton class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { - lazy val logger = Logger(getClass) + lazy val logger: Logger = Logger(getClass) - private val pythonPackageVersionRegex = "^Version: ([0-9]*)\\.([0-9]*)\\.([0-9]*)".r + private val pythonPackageVersionRegex = "^Version: (\\d*)\\.(\\d*)\\.(\\d*)".r def checkCortexUtilsVersion(pythonVersion: String): Option[(Int, Int, Int)] = Try { @@ -31,12 +32,11 @@ class ProcessJobRunnerSrv @Inject() (implicit val system: ActorSystem) { } }.getOrElse(None) - def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)( - implicit + def run(jobDirectory: Path, command: String, job: Job, timeout: Option[FiniteDuration], jobExecutor: ExecutionContext)(implicit ec: ExecutionContext ): Future[Unit] = { val baseDirectory = Paths.get(command).getParent.getParent - val output = StringBuilder.newBuilder + val output = mutable.StringBuilder.newBuilder logger.info(s"Execute $command in $baseDirectory, timeout is ${timeout.fold("none")(_.toString)}") val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") val env = if (Files.exists(cacertsFile)) Seq("REQUESTS_CA_BUNDLE" -> cacertsFile.toString) else Nil diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 690a99e41..8b4a37cdd 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -50,10 +50,11 @@ class WorkerSrv @Inject() ( rescan() - def getDefinition(workerId: String): Try[WorkerDefinition] = workerMap.get(workerId) match { - case Some(worker) => Success(worker) - case None => Failure(NotFoundError(s"Worker $workerId not found")) - } + def getDefinition(workerId: String): Try[WorkerDefinition] = + workerMap.get(workerId) match { + case Some(worker) => Success(worker) + case None => Failure(NotFoundError(s"Worker $workerId not found")) + } // def listDefinitions: (Source[WorkerDefinition, NotUsed], Future[Long]) = Source(workerMap.values.toList) → Future.successful(workerMap.size.toLong) @@ -150,8 +151,9 @@ class WorkerSrv @Inject() ( case "file" => Future.successful(readFile(Paths.get(url.toURI), workerType)) case "http" | "https" => val reads = WorkerDefinition.reads(workerType) - ws.url(url.toString) - .get() + val query = ws.url(url.toString).get() + logger.debug(s"Read catalog using query $query") + query .map(response => response.json.as(reads)) .map(_.filterNot(_.command.isDefined)) } @@ -198,12 +200,12 @@ class WorkerSrv @Inject() ( Future(new URL(workerUrl)) .flatMap(readUrl(_, workerType)) .recover { - case _ => + case error => val path = Paths.get(workerUrl) if (Files.isRegularFile(path)) readFile(path, workerType) else if (Files.isDirectory(path)) readDirectory(path, workerType) else { - logger.warn(s"Worker path ($workerUrl) is not found") + logger.warn(s"Worker path ($workerUrl) is not found", error) Nil } } @@ -216,8 +218,8 @@ class WorkerSrv @Inject() ( } - def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)( - implicit authContext: AuthContext + def create(organization: Organization, workerDefinition: WorkerDefinition, workerFields: Fields)(implicit + authContext: AuthContext ): Future[Worker] = { val rawConfig = workerFields.getValue("configuration").fold(JsObject.empty)(_.as[JsObject]) val configItems = workerDefinition.configurationItems ++ BaseConfig.global(workerDefinition.tpe, config).items ++ BaseConfig @@ -252,7 +254,8 @@ class WorkerSrv @Inject() ( .set("configuration", cfg.toString) .set("type", workerDefinition.tpe.toString) .addIfAbsent("dataTypeList", StringInputValue(workerDefinition.dataTypeList)) - ), { + ), + { case One(e) => Future.failed(e) case Every(es @ _*) => Future.failed(AttributeCheckingError(s"worker(${workerDefinition.name}).configuration", es)) } diff --git a/app/org/thp/cortex/util/JsonConfig.scala b/app/org/thp/cortex/util/JsonConfig.scala index 33bce0f5b..382b682fd 100644 --- a/app/org/thp/cortex/util/JsonConfig.scala +++ b/app/org/thp/cortex/util/JsonConfig.scala @@ -19,7 +19,7 @@ object JsonConfig { } ) - implicit def configWrites = OWrites { (cfg: Configuration) => + implicit def configWrites: OWrites[Configuration] = OWrites { (cfg: Configuration) => JsObject(cfg.subKeys.map(key => key -> configValueWrites.writes(cfg.underlying.getValue(key))).toSeq) } } diff --git a/conf/reference.conf b/conf/reference.conf index 7f457257b..a96f0f1e1 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -14,6 +14,7 @@ job { runners = [docker, process] directory = ${java.io.tmpdir} dockerDirectory = ${job.directory} + keepJobFolder = false } # HTTP filters @@ -26,7 +27,8 @@ play.filters { enabled = [ org.thp.cortex.services.StreamFilter, org.elastic4play.services.TempFilter, - org.thp.cortex.services.CSRFFilter + org.thp.cortex.services.CSRFFilter, + org.thp.cortex.services.AccessLogFilter ] }