Skip to content

Commit

Permalink
#1272 Fix multiple Cortex request
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Mar 30, 2020
1 parent 39d8d2e commit 86ad7c5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.Date

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Success
import scala.util.control.NonFatal
import scala.util.matching.Regex

Expand Down Expand Up @@ -42,7 +43,7 @@ class CortexActionSrv @Inject()(
implicit val mat: Materializer
) {

lazy val logger = Logger(getClass)
lazy val logger: Logger = Logger(getClass)
lazy val responderIdRegex: Regex = "(.*)-(.*)".r

def getResponderById(id: String): Future[Responder] =
Expand Down Expand Up @@ -221,9 +222,20 @@ class CortexActionSrv @Inject()(
}
}
.getOrElse {
Future.firstCompletedOf {
cortexConfig.instances.map(c getResponder(c).map(c _))
}
Future
.traverse(cortexConfig.instances) { c
getResponder(c)
.transform {
case Success(w) Success(Some(c w))
case _ Success(None)
}
}
.flatMap { responders
responders
.flatten
.headOption
.fold[Future[(CortexClient, Responder)]](Future.failed(NotFoundError(s"Responder not found")))(Future.successful)
}
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import java.util.Date

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scala.util.{Success, Try}
import scala.util.control.NonFatal

import play.api.Logger
import play.api.libs.json._
import play.api.libs.ws.WSClient

import akka.NotUsed
import akka.actor.{Actor, ActorSystem}
import akka.stream.Materializer
Expand All @@ -21,7 +19,6 @@ import connectors.cortex.models._
import javax.inject.{Inject, Singleton}
import models.{Artifact, Case}
import services.{UserSrv _, _}

import org.elastic4play.controllers.{Fields, FileInputValue}
import org.elastic4play.database.{DBRemove, ModifyConfig}
import org.elastic4play.services.JsonFormat.attachmentFormat
Expand Down Expand Up @@ -139,7 +136,7 @@ class CortexAnalyzerSrv @Inject()(
def realDeleteJob(job: Job): Future[Unit] =
dbRemove(job).map(_ ())

def stats(query: QueryDef, aggs: Seq[Agg]) = findSrv(jobModel, query, aggs: _*)
def stats(query: QueryDef, aggs: Seq[Agg]): Future[JsObject] = findSrv(jobModel, query, aggs: _*)

def getAnalyzer(analyzerId: String): Future[Analyzer] =
Future
Expand Down Expand Up @@ -368,9 +365,20 @@ class CortexAnalyzerSrv @Inject()(
}

case None
Future.firstCompletedOf {
cortexConfig.instances.map(c c.getAnalyzer(analyzerName).map(c _))
}
Future
.traverse(cortexConfig.instances) { c
c.getAnalyzer(analyzerName)
.transform {
case Success(w) Success(Some(c w))
case _ Success(None)
}
}
.flatMap { analyzers
analyzers
.flatten
.headOption
.fold[Future[(CortexClient, Analyzer)]](Future.failed(NotFoundError(s"Analyzer not found")))(Future.successful)
}
}

cortexClientAnalyzer.flatMap {
Expand Down

0 comments on commit 86ad7c5

Please sign in to comment.