From 5c682c8a7f42f146cefe10a70c0f5f51fbc4ff90 Mon Sep 17 00:00:00 2001 From: To-om Date: Mon, 20 Jan 2020 13:47:33 +0100 Subject: [PATCH] #234 Fix worker refresh --- app/org/thp/cortex/services/WorkerSrv.scala | 41 +++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/app/org/thp/cortex/services/WorkerSrv.scala b/app/org/thp/cortex/services/WorkerSrv.scala index 2a89cd28c..719d22375 100644 --- a/app/org/thp/cortex/services/WorkerSrv.scala +++ b/app/org/thp/cortex/services/WorkerSrv.scala @@ -3,25 +3,27 @@ package org.thp.cortex.services import java.net.URL import java.nio.file.{Files, Path, Paths} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.io.Codec +import scala.util.{Failure, Success, Try} + +import play.api.libs.json.{JsArray, JsObject, JsString, Json} +import play.api.libs.ws.WSClient +import play.api.{Configuration, Logger} + import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.{Sink, Source} import javax.inject.{Inject, Provider, Singleton} -import org.elastic4play._ -import org.elastic4play.controllers.{Fields, StringInputValue} -import org.elastic4play.database.ModifyConfig -import org.elastic4play.services._ import org.scalactic.Accumulation._ import org.scalactic._ import org.thp.cortex.models._ -import play.api.libs.json.{JsArray, JsObject, JsString, Json} -import play.api.libs.ws.WSClient -import play.api.{Configuration, Logger} -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} -import scala.io.Codec -import scala.util.{Failure, Success, Try} +import org.elastic4play._ +import org.elastic4play.controllers.{Fields, StringInputValue} +import org.elastic4play.database.ModifyConfig +import org.elastic4play.services._ @Singleton class WorkerSrv @Inject()( @@ -131,18 +133,19 @@ class WorkerSrv @Inject()( scan( analyzersURLs.map(_ → WorkerType.analyzer) ++ respondersURLs.map(_ → WorkerType.responder) - ) - userSrv.inInitAuthContext { implicit authContext ⇒ - find(any, Some("all"), Nil)._1.runForeach { worker ⇒ - workerMap.get(worker.workerDefinitionId()) match { - case Some(wd) ⇒ update(worker, Fields.empty.set("dataTypeList", Json.toJson(wd.dataTypeList))) - case None ⇒ update(worker, Fields.empty.set("dataTypeList", JsArray.empty)) + ).onComplete { _ ⇒ + userSrv.inInitAuthContext { implicit authContext ⇒ + find(any, Some("all"), Nil)._1.runForeach { worker ⇒ + workerMap.get(worker.workerDefinitionId()) match { + case Some(wd) ⇒ update(worker, Fields.empty.set("dataTypeList", Json.toJson(wd.dataTypeList))) + case None ⇒ update(worker, Fields.empty.set("dataTypeList", JsArray.empty)) + } } } } } - def scan(workerUrls: Seq[(String, WorkerType.Type)]): Unit = { + def scan(workerUrls: Seq[(String, WorkerType.Type)]): Future[Unit] = { def readUrl(url: URL, workerType: WorkerType.Type): Future[Seq[WorkerDefinition]] = url.getProtocol match { case "file" ⇒ Future.successful(readFile(Paths.get(url.toURI), workerType)) @@ -206,7 +209,7 @@ class WorkerSrv @Inject()( } } } - .foreach { worker ⇒ + .map { worker ⇒ val wmap = worker.flatten.map(w ⇒ w.id → w).toMap workerMapLock.synchronized(workerMap = wmap) logger.info(s"New worker list:\n\n\t${workerMap.values.map(a ⇒ s"${a.name} ${a.version}").mkString("\n\t")}\n")