From 61b536dba876dc1e4a2b55d496f5e8bc01c31db7 Mon Sep 17 00:00:00 2001 From: To-om Date: Fri, 10 May 2019 17:15:57 +0200 Subject: [PATCH] #82 Add support of ElasticSearch 6.x --- app/org/elastic4play/ErrorHandler.scala | 15 +- app/org/elastic4play/Errors.scala | 3 +- .../database/DBConfiguration.scala | 293 ++++++++---------- app/org/elastic4play/database/DBCreate.scala | 62 ++-- app/org/elastic4play/database/DBFind.scala | 125 ++++---- app/org/elastic4play/database/DBGet.scala | 5 +- app/org/elastic4play/database/DBIndex.scala | 72 +++-- app/org/elastic4play/database/DBModify.scala | 18 +- app/org/elastic4play/database/DBRemove.scala | 17 +- .../elastic4play/database/DBSequence.scala | 19 +- app/org/elastic4play/database/DBUtils.scala | 42 ++- .../models/AttachmentAttributeFormat.scala | 6 +- app/org/elastic4play/models/Attributes.scala | 12 +- .../models/BinaryAttributeFormat.scala | 6 +- .../models/BooleanAttributeFormat.scala | 6 +- .../models/CustomAttributeFormat.scala | 12 +- .../models/DateAttributeFormat.scala | 6 +- .../models/EnumerationAttributeFormat.scala | 6 +- .../models/HashAttributeFormat.scala | 6 +- .../ListEnumerationAttributeFormat.scala | 6 +- .../models/MetricsAttributeFormat.scala | 10 +- .../models/MultiAttributeFormat.scala | 4 +- .../models/NumberAttributeFormat.scala | 6 +- .../models/ObjectAttributeFormat.scala | 10 +- .../models/OptionalAttributeFormat.scala | 4 +- .../models/RawAttributeFormat.scala | 6 +- .../models/StringAttributeFormat.scala | 6 +- .../models/TextAttributeFormat.scala | 6 +- .../models/UUIDAttributeFormat.scala | 6 +- .../elastic4play/services/Aggregations.scala | 213 +++++++------ .../elastic4play/services/AttachmentSrv.scala | 22 +- app/org/elastic4play/services/CreateSrv.scala | 3 +- app/org/elastic4play/services/FindSrv.scala | 32 +- .../elastic4play/services/MigrationSrv.scala | 55 +++- app/org/elastic4play/services/QueryDSL.scala | 22 +- build.sbt | 11 +- .../elastic4play/database/DBCreateSpec.scala | 21 +- .../elastic4play/database/DBFindSpec.scala | 65 ++-- 38 files changed, 579 insertions(+), 660 deletions(-) diff --git a/app/org/elastic4play/ErrorHandler.scala b/app/org/elastic4play/ErrorHandler.scala index 6bd9e80..08e02c4 100644 --- a/app/org/elastic4play/ErrorHandler.scala +++ b/app/org/elastic4play/ErrorHandler.scala @@ -1,5 +1,7 @@ package org.elastic4play +import java.net.ConnectException + import scala.concurrent.Future import play.api.Logger @@ -7,10 +9,6 @@ import play.api.http.{HttpErrorHandler, Status, Writeable} import play.api.libs.json.{JsNull, JsValue, Json} import play.api.mvc.{RequestHeader, ResponseHeader, Result, Results} -import org.elasticsearch.client.transport.NoNodeAvailableException -import org.elasticsearch.index.IndexNotFoundException -import org.elasticsearch.index.query.QueryShardException - import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites /** @@ -35,10 +33,10 @@ class ErrorHandler extends HttpErrorHandler { Some(Status.BAD_REQUEST → Json.obj("type" → "NumberFormatException", "message" → ("Invalid format " + nfe.getMessage))) case NotFoundError(message) ⇒ Some(Status.NOT_FOUND → Json.obj("type" → "NotFoundError", "message" → message)) case BadRequestError(message) ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "BadRequest", "message" → message)) - case SearchError(message, cause) ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "SearchError", "message" → s"$message (${cause.getMessage})")) + case SearchError(message) ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "SearchError", "message" → s"$message")) case ace: AttributeCheckingError ⇒ Some(Status.BAD_REQUEST → Json.toJson(ace)) case iae: IllegalArgumentException ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "IllegalArgument", "message" → iae.getMessage)) - case _: NoNodeAvailableException ⇒ + case _: ConnectException ⇒ Some(Status.INTERNAL_SERVER_ERROR → Json.obj("type" → "NoNodeAvailable", "message" → "ElasticSearch cluster is unreachable")) case CreateError(_, message, attributes) ⇒ Some(Status.INTERNAL_SERVER_ERROR → Json.obj("type" → "CreateError", "message" → message, "object" → attributes)) @@ -49,9 +47,8 @@ class ErrorHandler extends HttpErrorHandler { case Some((_, j)) ⇒ j } Some(Status.MULTI_STATUS → Json.obj("type" → "MultiError", "error" → message, "suberrors" → suberrors)) - case _: IndexNotFoundException ⇒ Some(520 → JsNull) - case qse: QueryShardException ⇒ Some(Status.BAD_REQUEST → Json.obj("type" → "Invalid search query", "message" → qse.getMessage)) - case t: Throwable ⇒ Option(t.getCause).flatMap(toErrorResult) + case IndexNotFoundException ⇒ Some(520 → JsNull) + case t: Throwable ⇒ Option(t.getCause).flatMap(toErrorResult) } def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]) = Result(header = ResponseHeader(status), body = writeable.toEntity(c)) diff --git a/app/org/elastic4play/Errors.scala b/app/org/elastic4play/Errors.scala index 7ed591e..28ecbaf 100644 --- a/app/org/elastic4play/Errors.scala +++ b/app/org/elastic4play/Errors.scala @@ -17,12 +17,13 @@ case class NotFoundError(message: String) case class GetError(message: String) extends Exception(message) case class UpdateError(status: Option[String], message: String, attributes: JsObject) extends ErrorWithObject(message, attributes) case class InternalError(message: String) extends Exception(message) -case class SearchError(message: String, cause: Throwable) extends Exception(message, cause) +case class SearchError(message: String) extends Exception(message) case class AuthenticationError(message: String) extends Exception(message) case class OAuth2Redirect(redirectUrl: String, params: Map[String, Seq[String]]) extends Exception(redirectUrl) case class AuthorizationError(message: String) extends Exception(message) case class MultiError(message: String, exceptions: Seq[Exception]) extends Exception(message + exceptions.map(_.getMessage).mkString(" :\n\t- ", "\n\t- ", "")) +case object IndexNotFoundException extends Exception case class AttributeCheckingError(tableName: String, errors: Seq[AttributeError] = Nil) extends Exception(errors.mkString("[", "][", "]")) { override def toString: String = errors.mkString("[", "][", "]") diff --git a/app/org/elastic4play/database/DBConfiguration.scala b/app/org/elastic4play/database/DBConfiguration.scala index 07a51e5..6d413e1 100644 --- a/app/org/elastic4play/database/DBConfiguration.scala +++ b/app/org/elastic4play/database/DBConfiguration.scala @@ -1,39 +1,32 @@ package org.elastic4play.database -import javax.inject.{Inject, Named, Singleton} +import java.nio.file.{Files, Paths} +import java.security.KeyStore + +import scala.collection.JavaConverters._ import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future, Promise} import play.api.inject.ApplicationLifecycle +import play.api.libs.json.JsObject import play.api.{Configuration, Logger} import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.{Sink, Source} -import com.sksamuel.elastic4s.ElasticDsl._ -import com.sksamuel.elastic4s.admin.IndexExistsDefinition -import com.sksamuel.elastic4s.bulk.RichBulkItemResponse -import com.sksamuel.elastic4s.cluster.{ClusterHealthDefinition, ClusterStatsDefinition} -import com.sksamuel.elastic4s.delete.DeleteByIdDefinition -import com.sksamuel.elastic4s.get.{GetDefinition, RichGetResponse} -import com.sksamuel.elastic4s.index.RichIndexResponse -import com.sksamuel.elastic4s.indexes.{CreateIndexDefinition, IndexDefinition} +import com.sksamuel.elastic4s.http._ +import com.sksamuel.elastic4s.http.bulk.BulkResponseItem +import com.sksamuel.elastic4s.http.search.SearchHit import com.sksamuel.elastic4s.searches._ import com.sksamuel.elastic4s.streams.ReactiveElastic.ReactiveElastic import com.sksamuel.elastic4s.streams.{RequestBuilder, ResponseListener} -import com.sksamuel.elastic4s.update.{RichUpdateResponse, UpdateDefinition} -import com.sksamuel.elastic4s.{ElasticsearchClientUri, TcpClient} -import com.sksamuel.elastic4s.xpack.security.XPackElasticClient -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse -import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse -import org.elasticsearch.action.delete.DeleteResponse -import org.elasticsearch.common.settings.Settings -import com.floragunn.searchguard.ssl.SearchGuardSSLPlugin -import com.floragunn.searchguard.ssl.util.SSLConfigConstants - -import org.elastic4play.Timed +import javax.inject.{Inject, Named, Singleton} +import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory} +import org.apache.http.client.config.RequestConfig +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder +import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} + +import org.elastic4play.{ConflictError, IndexNotFoundException, InternalError, SearchError} /** * This class is a wrapper of ElasticSearch client from Elastic4s @@ -41,160 +34,126 @@ import org.elastic4play.Timed * It add timed annotation in order to measure storage metrics */ @Singleton -class DBConfiguration( - searchHost: Seq[String], - searchCluster: String, - baseIndexName: String, - xpackUsername: Option[String], - xpackPassword: Option[String], - xpackSSL: Boolean, - xpackCAPath: Option[String], - xpackCertificatePath: Option[String], - xpackKeyPath: Option[String], - sgKeystorePath: Option[String], - sgTruststorePath: Option[String], - sgKeystorePassword: Option[String], - sgTruststorePassword: Option[String], - sgHostVerification: Boolean, - sgHostVerificationResolveHostname: Boolean, +class DBConfiguration @Inject()( + config: Configuration, lifecycle: ApplicationLifecycle, - val version: Int, + @Named("databaseVersion") val version: Int, implicit val ec: ExecutionContext, implicit val actorSystem: ActorSystem ) { + private[DBConfiguration] lazy val logger = Logger(getClass) - @Inject() def this( - configuration: Configuration, - lifecycle: ApplicationLifecycle, - @Named("databaseVersion") version: Int, - ec: ExecutionContext, - actorSystem: ActorSystem - ) = { - this( - configuration.get[Seq[String]]("search.host"), - configuration.get[String]("search.cluster"), - configuration.get[String]("search.index"), - configuration.getOptional[String]("search.username"), - configuration.getOptional[String]("search.password"), - configuration.getOptional[Boolean]("search.ssl.enabled").getOrElse(false), - configuration.getOptional[String]("search.ssl.ca"), - configuration.getOptional[String]("search.ssl.certificate"), - configuration.getOptional[String]("search.ssl.key"), - configuration.getOptional[String]("search.guard.keyStore.path"), - configuration.getOptional[String]("search.guard.trustStore.path"), - configuration.getOptional[String]("search.guard.keyStore.password"), - configuration.getOptional[String]("search.guard.trustStore.password"), - configuration.getOptional[Boolean]("search.guard.hostVerification").getOrElse(false), - configuration.getOptional[Boolean]("search.guard.hostVerificationResolveHostname").getOrElse(false), - lifecycle, - version, - ec, - actorSystem - ) + def requestConfig: RequestConfigCallback = (requestConfigBuilder: RequestConfig.Builder) ⇒ { + config.getOptional[Boolean]("search.authenticationEnabled").foreach(requestConfigBuilder.setAuthenticationEnabled) + config.getOptional[Boolean]("search.circularRedirectsAllowed").foreach(requestConfigBuilder.setCircularRedirectsAllowed) + config.getOptional[Int]("search.connectionRequestTimeout").foreach(requestConfigBuilder.setConnectionRequestTimeout) + config.getOptional[Int]("search.connectTimeout").foreach(requestConfigBuilder.setConnectTimeout) + config.getOptional[Boolean]("search.contentCompressionEnabled").foreach(requestConfigBuilder.setContentCompressionEnabled) + config.getOptional[String]("search.cookieSpec").foreach(requestConfigBuilder.setCookieSpec) + config.getOptional[Boolean]("search.expectContinueEnabled").foreach(requestConfigBuilder.setExpectContinueEnabled) + // config.getOptional[InetAddress]("search.localAddress").foreach(requestConfigBuilder.setLocalAddress) + config.getOptional[Int]("search.maxRedirects").foreach(requestConfigBuilder.setMaxRedirects) + // config.getOptional[Boolean]("search.proxy").foreach(requestConfigBuilder.setProxy) + config.getOptional[Seq[String]]("search.proxyPreferredAuthSchemes").foreach(v ⇒ requestConfigBuilder.setProxyPreferredAuthSchemes(v.asJava)) + config.getOptional[Boolean]("search.redirectsEnabled").foreach(requestConfigBuilder.setRedirectsEnabled) + config.getOptional[Boolean]("search.relativeRedirectsAllowed").foreach(requestConfigBuilder.setRelativeRedirectsAllowed) + config.getOptional[Int]("search.socketTimeout").foreach(requestConfigBuilder.setSocketTimeout) + config.getOptional[Seq[String]]("search.targetPreferredAuthSchemes").foreach(v ⇒ requestConfigBuilder.setTargetPreferredAuthSchemes(v.asJava)) + requestConfigBuilder } - private[DBConfiguration] lazy val logger = Logger(getClass) - - private def xpackConnect(uri: ElasticsearchClientUri, settings: Settings.Builder): Option[TcpClient] = - for { - username ← xpackUsername - if username.nonEmpty - password ← xpackPassword - if password.nonEmpty - } yield { - settings.put("xpack.security.user", s"$username:$password") - if (xpackSSL) { - settings.put("xpack.security.transport.ssl.enabled", "true") - xpackCAPath.foreach(ca ⇒ settings.put("xpack.ssl.certificate_authorities", ca)) - xpackCertificatePath.foreach(cp ⇒ settings.put("xpack.ssl.certificate", cp)) - xpackKeyPath.foreach(k ⇒ settings.put("xpack.ssl.key", k)) + def httpClientConfig: HttpClientConfigCallback = (httpClientBuilder: HttpAsyncClientBuilder) ⇒ { + config.getOptional[String]("search.keyStore.path").map { keyStore ⇒ + val keyStorePath = Paths.get(keyStore) + val keyStoreType = config.getOptional[String]("search.keyStore.type").getOrElse(KeyStore.getDefaultType) + val keyStorePassword = config.getOptional[String]("search.keyStore.password").getOrElse("").toCharArray + val keyInputStream = Files.newInputStream(keyStorePath) + val keyManagers = try { + val keyStore = KeyStore.getInstance(keyStoreType) + keyStore.load(keyInputStream, keyStorePassword) + val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + kmf.init(keyStore, keyStorePassword) + kmf.getKeyManagers + } finally { + keyInputStream.close() } - XPackElasticClient(settings.build(), uri) - } - private def sgConnect(uri: ElasticsearchClientUri, settings: Settings.Builder): Option[TcpClient] = - for { - keystorePath ← sgKeystorePath - truststorePath ← sgTruststorePath - keystorePassword ← sgKeystorePassword - truststorePassword ← sgTruststorePassword - } yield { - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_KEYSTORE_FILEPATH, keystorePath) - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_TRUSTSTORE_FILEPATH, truststorePath) - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_KEYSTORE_PASSWORD, keystorePassword) - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_TRUSTSTORE_PASSWORD, truststorePassword) - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_ENFORCE_HOSTNAME_VERIFICATION, sgHostVerification) - settings.put(SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_ENFORCE_HOSTNAME_VERIFICATION_RESOLVE_HOST_NAME, sgHostVerificationResolveHostname) - TcpClient.transport(settings.build(), uri, classOf[SearchGuardSSLPlugin]) + val trustManagers = config + .getOptional[String]("search.trustStore.path") + .map { trustStorePath ⇒ + val keyStoreType = config.getOptional[String]("search.keyStore.type").getOrElse(KeyStore.getDefaultType) + val trustStorePassword = config.getOptional[String]("search.trustStore.password").getOrElse("").toCharArray + val trustInputStream = Files.newInputStream(Paths.get(trustStorePath)) + try { + val keyStore = KeyStore.getInstance(keyStoreType) + keyStore.load(trustInputStream, trustStorePassword) + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(keyStore) + tmf.getTrustManagers + } finally { + trustInputStream.close() + } + } + .getOrElse(Array.empty) + + // Configure the SSL context to use TLS + val sslContext = SSLContext.getInstance("TLS") + sslContext.init(keyManagers, trustManagers, null) + + httpClientBuilder.setSSLContext(sslContext) } - - private def connect(): TcpClient = { - val uri = ElasticsearchClientUri(s"elasticsearch://${searchHost.mkString(",")}") - val settings = Settings.builder() - settings.put("cluster.name", searchCluster) - - xpackConnect(uri, settings) - .orElse(sgConnect(uri, settings)) - .getOrElse(TcpClient.transport(settings.build(), uri)) + httpClientBuilder } /** * Underlying ElasticSearch client */ - private[database] val client = connect() + private[database] val client = ElasticClient(ElasticProperties(config.get[String]("search.uri")), requestConfig, httpClientConfig) // when application close, close also ElasticSearch connection lifecycle.addStopHook { () ⇒ - Future { client.close() } + Future { + client.close() + } } - @Timed("database.index") - def execute(indexDefinition: IndexDefinition): Future[RichIndexResponse] = client.execute(indexDefinition) - - @Timed("database.search") - def execute(searchDefinition: SearchDefinition): Future[RichSearchResponse] = client.execute(searchDefinition) - - @Timed("database.create") - def execute(createIndexDefinition: CreateIndexDefinition): Future[CreateIndexResponse] = client.execute(createIndexDefinition) - - @Timed("database.update") - def execute(updateDefinition: UpdateDefinition): Future[RichUpdateResponse] = client.execute(updateDefinition) - - @Timed("database.search_scroll") - def execute(searchScrollDefinition: SearchScrollDefinition): Future[RichSearchResponse] = client.execute(searchScrollDefinition) - - @Timed("database.index_exists") - def execute(indexExistsDefinition: IndexExistsDefinition): Future[IndicesExistsResponse] = client.execute(indexExistsDefinition) - - @Timed("database.delete") - def execute(deleteByIdDefinition: DeleteByIdDefinition): Future[DeleteResponse] = client.execute(deleteByIdDefinition) - - @Timed("database.get") - def execute(getDefinition: GetDefinition): Future[RichGetResponse] = client.execute(getDefinition) - - @Timed("database.clear_scroll") - def execute(clearScrollDefinition: ClearScrollDefinition): Future[ClearScrollResult] = client.execute(clearScrollDefinition) - - @Timed("database.cluster_health") - def execute(clusterHealthDefinition: ClusterHealthDefinition): Future[ClusterHealthResponse] = client.execute(clusterHealthDefinition) - - @Timed("database.cluster_stats") - def execute(clusterStatsDefinition: ClusterStatsDefinition): Future[ClusterStatsResponse] = client.execute(clusterStatsDefinition) + def execute[T, U](t: T)( + implicit + handler: Handler[T, U], + manifest: Manifest[U] + ): Future[U] = { + logger.debug(s"Elasticsearch request: ${client.show(t)}") + client.execute(t).flatMap { + case RequestSuccess(_, _, _, r) ⇒ Future.successful(r) + case RequestFailure(_, _, _, error) ⇒ + val exception = error.`type` match { + case "index_not_found_exception" ⇒ IndexNotFoundException + case "version_conflict_engine_exception" ⇒ ConflictError(error.reason, JsObject.empty) + case "search_phase_execution_exception" ⇒ SearchError(error.reason) + case _ ⇒ InternalError(s"Unknown error: $error") + } + exception match { + case _: ConflictError ⇒ + case _ ⇒ logger.error(s"ElasticSearch request failure: ${client.show(t)}\n => $error") + } + Future.failed(exception) + } + } /** * Creates a Source (akka stream) from the result of the search */ - def source(searchDefinition: SearchDefinition): Source[RichSearchHit, NotUsed] = Source.fromPublisher(client.publisher(searchDefinition)) - - private lazy val sinkListener = new ResponseListener { - override def onAck(resp: RichBulkItemResponse): Unit = () - override def onFailure(resp: RichBulkItemResponse): Unit = - logger.warn(s"Document index failure ${resp.id}: ${resp.failureMessage}") - } + def source(searchRequest: SearchRequest): Source[SearchHit, NotUsed] = Source.fromPublisher(client.publisher(searchRequest)) /** * Create a Sink (akka stream) that create entity in ElasticSearch */ def sink[T](implicit builder: RequestBuilder[T]): Sink[T, Future[Unit]] = { + val sinkListener = new ResponseListener[T] { + override def onAck(resp: BulkResponseItem, original: T): Unit = () + + override def onFailure(resp: BulkResponseItem, original: T): Unit = + logger.warn(s"Document index failure ${resp.id}: ${resp.error.fold("unexpected")(_.toString)}\n$original") + } val end = Promise[Unit] val complete = () ⇒ { if (!end.isCompleted) @@ -207,7 +166,19 @@ class DBConfiguration( } Sink .fromSubscriber( - client.subscriber[T](failureWait = 1.second, maxAttempts = 10, listener = sinkListener, completionFn = complete, errorFn = failure) + client.subscriber( + batchSize = 100, + concurrentRequests = 5, + refreshAfterOp = false, + listener = sinkListener, + typedListener = ResponseListener.noop, + completionFn = complete, + errorFn = failure, + flushInterval = None, + flushAfter = None, + failureWait = 2.seconds, + maxAttempts = 10 + ) ) .mapMaterializedValue { _ ⇒ end.future @@ -217,31 +188,11 @@ class DBConfiguration( /** * Name of the index, suffixed by the current version */ - val indexName: String = baseIndexName + "_" + version + val indexName: String = config.get[String]("search.index") + "_" + version /** * return a new instance of DBConfiguration that points to the previous version of the index schema */ def previousVersion: DBConfiguration = - new DBConfiguration( - searchHost, - searchCluster, - baseIndexName, - xpackUsername, - xpackPassword, - xpackSSL, - xpackCAPath, - xpackCertificatePath, - xpackKeyPath, - sgKeystorePath, - sgTruststorePath, - sgKeystorePassword, - sgTruststorePassword, - sgHostVerification, - sgHostVerificationResolveHostname, - lifecycle, - version - 1, - ec, - actorSystem - ) + new DBConfiguration(config, lifecycle, version - 1, ec, actorSystem) } diff --git a/app/org/elastic4play/database/DBCreate.scala b/app/org/elastic4play/database/DBCreate.scala index 3c31769..6732a89 100644 --- a/app/org/elastic4play/database/DBCreate.scala +++ b/app/org/elastic4play/database/DBCreate.scala @@ -1,7 +1,5 @@ package org.elastic4play.database -import javax.inject.{Inject, Singleton} - import scala.concurrent.{ExecutionContext, Future} import play.api.Logger @@ -9,15 +7,14 @@ import play.api.libs.json.JsValue.jsValueToJsLookup import play.api.libs.json._ import akka.stream.scaladsl.Sink -import com.sksamuel.elastic4s.ElasticDsl.indexInto -import com.sksamuel.elastic4s.indexes.IndexDefinition +import com.sksamuel.elastic4s.RefreshPolicy +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.indexes.IndexRequest import com.sksamuel.elastic4s.streams.RequestBuilder -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy -import org.elasticsearch.index.engine.VersionConflictEngineException -import org.elasticsearch.transport.RemoteTransportException +import javax.inject.{Inject, Singleton} +import org.elastic4play.CreateError import org.elastic4play.models.BaseEntity -import org.elastic4play.{ConflictError, CreateError, InternalError} /** * Service lass responsible for entity creation @@ -58,69 +55,56 @@ class DBCreate @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) .orElse(id) // remove attributes that starts with "_" because we wan't permit to interfere with elasticsearch internal fields - val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString + val docSource = addParent(modelName, parent, JsObject(attributes.fields.filterNot(_._1.startsWith("_")))) + db.execute { - addId(id).andThen(addParent(parentId)).andThen(addRouting(routing)) { - indexInto(db.indexName, modelName).source(docSource).refresh(RefreshPolicy.WAIT_UNTIL) + addId(id).andThen(addRouting(routing)) { + indexInto(db.indexName / "doc").source(docSource.toString).refresh(RefreshPolicy.WAIT_UNTIL) } } - .transform( + .map( indexResponse ⇒ attributes + ("_type" → JsString(modelName)) + ("_id" → JsString(indexResponse.id)) + ("_parent" → parentId.fold[JsValue](JsNull)(JsString)) + ("_routing" → JsString(routing.getOrElse(indexResponse.id))) + - ("_version" → JsNumber(indexResponse.version)), - convertError(attributes, _) + ("_version" → JsNumber(indexResponse.version)) ) } - private[database] def convertError(attributes: JsObject, error: Throwable): Throwable = error match { - case rte: RemoteTransportException ⇒ convertError(attributes, rte.getCause) - case vcee: VersionConflictEngineException ⇒ ConflictError(vcee.getMessage, attributes) - case other ⇒ - logger.warn("create error", other) - CreateError(None, other.getMessage, attributes) - } - /** * add id information in index definition */ - private def addId(id: Option[String]): IndexDefinition ⇒ IndexDefinition = id match { + private def addId(id: Option[String]): IndexRequest ⇒ IndexRequest = id match { case Some(i) ⇒ _ id i createOnly true case None ⇒ identity } - /** - * add parent information in index definition - */ - private def addParent(parent: Option[String]): IndexDefinition ⇒ IndexDefinition = parent match { - case Some(p) ⇒ _ parent p - case None ⇒ identity - } - /** * add routing information in index definition */ - private def addRouting(routing: Option[String]): IndexDefinition ⇒ IndexDefinition = routing match { + private def addRouting(routing: Option[String]): IndexRequest ⇒ IndexRequest = routing match { case Some(r) ⇒ _ routing r case None ⇒ identity } + private def addParent(modelName: String, parent: Option[BaseEntity], entity: JsObject): JsObject = parent match { + case Some(p) ⇒ entity + ("relations" → Json.obj("name" → modelName, "parent" → p.id)) + case None ⇒ entity + ("relations" → JsString(modelName)) + } + /** * Class used to build index definition based on model name and attributes * This class is used by sink (ElasticSearch reactive stream) */ private class AttributeRequestBuilder() extends RequestBuilder[JsObject] { - override def request(attributes: JsObject): IndexDefinition = { - val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString + override def request(attributes: JsObject): IndexRequest = { val id = (attributes \ "_id").asOpt[String] - val parent = (attributes \ "_parent").asOpt[String] - val routing = (attributes \ "_routing").asOpt[String] orElse parent orElse id - val modelName = (attributes \ "_type").asOpt[String].getOrElse(throw InternalError("The entity doesn't contain _type attribute")) - addId(id).andThen(addParent(parent)).andThen(addRouting(routing)) { - indexInto(db.indexName, modelName).source(docSource) + val routing = (attributes \ "_routing").asOpt[String] orElse id + val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_"))) + addId(id).andThen(addRouting(routing)) { + indexInto(db.indexName, "doc").source(docSource.toString) } } } diff --git a/app/org/elastic4play/database/DBFind.scala b/app/org/elastic4play/database/DBFind.scala index 690feee..86c8048 100644 --- a/app/org/elastic4play/database/DBFind.scala +++ b/app/org/elastic4play/database/DBFind.scala @@ -1,23 +1,22 @@ package org.elastic4play.database +import akka.NotUsed +import akka.stream.scaladsl.Source +import akka.stream.stage.{AsyncCallback, GraphStage, GraphStageLogic, OutHandler} +import akka.stream.{Attributes, Materializer, Outlet, SourceShape} +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.search.{SearchHit, SearchResponse} +import com.sksamuel.elastic4s.searches.SearchRequest import javax.inject.{Inject, Singleton} +import org.elastic4play.{IndexNotFoundException, SearchError} +import play.api.libs.json._ +import play.api.{Configuration, Logger} import scala.collection.mutable import scala.concurrent.duration.{DurationLong, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} -import play.api.libs.json._ -import play.api.{Configuration, Logger} - -import akka.NotUsed -import akka.stream.{Attributes, Materializer, Outlet, SourceShape} -import akka.stream.scaladsl.Source -import akka.stream.stage.{AsyncCallback, GraphStage, GraphStageLogic, OutHandler} -import com.sksamuel.elastic4s.searches.{RichSearchHit, RichSearchResponse, SearchBuilderFn, SearchDefinition} - -import org.elastic4play.SearchError - /** * Service class responsible for entity search */ @@ -27,8 +26,8 @@ class DBFind(pageSize: Int, keepAlive: FiniteDuration, db: DBConfiguration, impl @Inject def this(configuration: Configuration, db: DBConfiguration, ec: ExecutionContext, mat: Materializer) = this(configuration.get[Int]("search.pagesize"), configuration.getMillis("search.keepalive").millis, db, ec, mat) - private[database] val keepAliveStr = keepAlive.toMillis + "ms" - private[DBFind] lazy val logger = Logger(getClass) + val keepAliveStr = keepAlive.toMillis + "ms" + private[DBFind] lazy val logger = Logger(getClass) /** * return a new instance of DBFind but using another DBConfiguration @@ -57,29 +56,21 @@ class DBFind(pageSize: Int, keepAlive: FiniteDuration, db: DBConfiguration, impl /** * Execute the search definition using scroll */ - private[database] def searchWithScroll( - searchDefinition: SearchDefinition, - offset: Int, - limit: Int - ): (Source[RichSearchHit, NotUsed], Future[Long]) = { - val searchWithScroll = new SearchWithScroll(db, searchDefinition, keepAliveStr, offset, limit) + private[database] def searchWithScroll(searchRequest: SearchRequest, offset: Int, limit: Int): (Source[SearchHit, NotUsed], Future[Long]) = { + val searchWithScroll = new SearchWithScroll(db, searchRequest, keepAliveStr, offset, limit) (Source.fromGraph(searchWithScroll), searchWithScroll.totalHits) } /** * Execute the search definition */ - private[database] def searchWithoutScroll( - searchDefinition: SearchDefinition, - offset: Int, - limit: Int - ): (Source[RichSearchHit, NotUsed], Future[Long]) = { - val resp = db.execute(searchDefinition.start(offset).limit(limit)) + private[database] def searchWithoutScroll(searchRequest: SearchRequest, offset: Int, limit: Int): (Source[SearchHit, NotUsed], Future[Long]) = { + val resp = db.execute(searchRequest.start(offset).limit(limit)) val total = resp.map(_.totalHits) val src = Source .fromFuture(resp) .mapConcat { resp ⇒ - resp.hits.toList + resp.hits.hits.toList } (src, total) } @@ -90,21 +81,21 @@ class DBFind(pageSize: Int, keepAlive: FiniteDuration, db: DBConfiguration, impl * @param range first and last entities to retrieve, for example "23-42" (default value is "0-10") * @param sortBy define order of the entities by specifying field names used in sort. Fields can be prefixed by * "-" for descendant or "+" for ascendant sort (ascendant by default). - * @param query a function that build a SearchDefinition using the index name + * @param query a function that build a SearchRequest using the index name * @return Source (akka stream) of JsObject. The source is materialized as future of long that contains the total number of entities. */ - def apply(range: Option[String], sortBy: Seq[String])(query: (String) ⇒ SearchDefinition): (Source[JsObject, NotUsed], Future[Long]) = { - val (offset, limit) = getOffsetAndLimitFromRange(range) - val sortDef = DBUtils.sortDefinition(sortBy) - val searchDefinition = query(db.indexName).storedFields("_source", "_routing", "_parent").start(offset).sortBy(sortDef).version(true) + def apply(range: Option[String], sortBy: Seq[String])(query: String ⇒ SearchRequest): (Source[JsObject, NotUsed], Future[Long]) = { + val (offset, limit) = getOffsetAndLimitFromRange(range) + val sortDef = DBUtils.sortDefinition(sortBy) + val searchRequest = query(db.indexName).start(offset).sortBy(sortDef).version(true) logger.debug( - s"search in ${searchDefinition.indexesTypes.indexes.mkString(",")} / ${searchDefinition.indexesTypes.types.mkString(",")} ${SearchBuilderFn(db.client.java, searchDefinition)}" + s"search in ${searchRequest.indexesTypes.indexes.mkString(",")} / ${searchRequest.indexesTypes.types.mkString(",")} ${db.client.show(searchRequest)}" ) val (src, total) = if (limit > 2 * pageSize) { - searchWithScroll(searchDefinition, offset, limit) + searchWithScroll(searchRequest, offset, limit) } else { - searchWithoutScroll(searchDefinition, offset, limit) + searchWithoutScroll(searchRequest, offset, limit) } (src.map(DBUtils.hit2json), total) @@ -114,36 +105,37 @@ class DBFind(pageSize: Int, keepAlive: FiniteDuration, db: DBConfiguration, impl * Execute the search definition * This function is used to run aggregations */ - def apply(query: (String) ⇒ SearchDefinition): Future[RichSearchResponse] = { - val searchDefinition = query(db.indexName) + def apply(query: String ⇒ SearchRequest): Future[SearchResponse] = { + val searchRequest = query(db.indexName) logger.debug( - s"search in ${searchDefinition.indexesTypes.indexes.mkString(",")} / ${searchDefinition.indexesTypes.types.mkString(",")} ${SearchBuilderFn(db.client.java, searchDefinition)}" + s"search in ${searchRequest.indexesTypes.indexes.mkString(",")} / ${searchRequest.indexesTypes.types.mkString(",")} ${db.client.show(searchRequest)}" ) - db.execute(searchDefinition) + db.execute(searchRequest) .recoverWith { - case t if DBUtils.isIndexMissing(t) ⇒ Future.failed(t) - case t ⇒ Future.failed(SearchError("Invalid search query", t)) + case t if t == IndexNotFoundException ⇒ Future.failed(t) + case t ⇒ Future.failed(SearchError("Invalid search query", t)) } } } -class SearchWithScroll(db: DBConfiguration, searchDefinition: SearchDefinition, keepAliveStr: String, offset: Int, max: Int)( - implicit ec: ExecutionContext -) extends GraphStage[SourceShape[RichSearchHit]] { +class SearchWithScroll(db: DBConfiguration, SearchRequest: SearchRequest, keepAliveStr: String, offset: Int, max: Int)( + implicit + ec: ExecutionContext +) extends GraphStage[SourceShape[SearchHit]] { - private[SearchWithScroll] lazy val logger = Logger(getClass) - val out: Outlet[RichSearchHit] = Outlet[RichSearchHit]("searchHits") - val shape: SourceShape[RichSearchHit] = SourceShape.of(out) - val firstResults: Future[RichSearchResponse] = db.execute(searchDefinition.scroll(keepAliveStr)) - val totalHits: Future[Long] = firstResults.map(_.totalHits) + private[SearchWithScroll] lazy val logger = Logger(getClass) + val out: Outlet[SearchHit] = Outlet[SearchHit]("searchHits") + val shape: SourceShape[SearchHit] = SourceShape.of(out) + val firstResults: Future[SearchResponse] = db.execute(SearchRequest.scroll(keepAliveStr)) + val totalHits: Future[Long] = firstResults.map(_.totalHits) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - var processed: Long = 0 - var skip: Int = offset - val queue: mutable.Queue[RichSearchHit] = mutable.Queue.empty - var scrollId: Future[String] = firstResults.map(_.scrollId) - var firstResultProcessed = false + var processed: Long = 0 + var skip: Long = offset + val queue: mutable.Queue[SearchHit] = mutable.Queue.empty + var scrollId: Future[String] = firstResults.map(_.scrollId.get) + var firstResultProcessed = false setHandler( out, @@ -157,52 +149,51 @@ class SearchWithScroll(db: DBConfiguration, searchDefinition: SearchDefinition, } } - val firstCallback: AsyncCallback[RichSearchResponse] = getAsyncCallback[RichSearchResponse] { + val firstCallback: AsyncCallback[SearchResponse] = getAsyncCallback[SearchResponse] { case searchResponse if skip > 0 ⇒ - if (searchResponse.hits.length <= skip) - skip -= searchResponse.hits.length + if (searchResponse.hits.size <= skip) + skip -= searchResponse.hits.size else { - queue ++= searchResponse.hits.drop(skip) + queue ++= searchResponse.hits.hits.drop(skip.toInt) skip = 0 } firstResultProcessed = true onPull() case searchResponse ⇒ - queue ++= searchResponse.hits + queue ++= searchResponse.hits.hits firstResultProcessed = true onPull() } override def onPull(): Unit = if (firstResultProcessed) { - import com.sksamuel.elastic4s.ElasticDsl.searchScroll if (processed >= max) completeStage() if (queue.isEmpty) { - val callback = getAsyncCallback[Try[RichSearchResponse]] { + val callback = getAsyncCallback[Try[SearchResponse]] { case Success(searchResponse) if searchResponse.isTimedOut ⇒ logger.warn("Search timeout") failStage(SearchError("Request terminated early or timed out", null)) case Success(searchResponse) if searchResponse.isEmpty ⇒ completeStage() case Success(searchResponse) if skip > 0 ⇒ - if (searchResponse.hits.length <= skip) { - skip -= searchResponse.hits.length + if (searchResponse.hits.size <= skip) { + skip -= searchResponse.hits.size onPull() } else { - queue ++= searchResponse.hits.drop(skip) + queue ++= searchResponse.hits.hits.drop(skip.toInt) skip = 0 pushNextHit() } case Success(searchResponse) ⇒ - queue ++= searchResponse.hits + queue ++= searchResponse.hits.hits pushNextHit() case Failure(error) ⇒ logger.warn("Search error", error) failStage(SearchError("Request terminated early or timed out", error)) } val futureSearchResponse = scrollId.flatMap(s ⇒ db.execute(searchScroll(s).keepAlive(keepAliveStr))) - scrollId = futureSearchResponse.map(_.scrollId) + scrollId = futureSearchResponse.map(_.scrollId.get) futureSearchResponse.onComplete(callback.invoke) } else { pushNextHit() @@ -210,11 +201,9 @@ class SearchWithScroll(db: DBConfiguration, searchDefinition: SearchDefinition, } else firstResults.foreach(firstCallback.invoke) } ) - override def postStop(): Unit = { - import com.sksamuel.elastic4s.ElasticDsl.clearScroll + override def postStop(): Unit = scrollId.foreach { s ⇒ db.execute(clearScroll(s)) } - } } } diff --git a/app/org/elastic4play/database/DBGet.scala b/app/org/elastic4play/database/DBGet.scala index 583aa89..67da587 100644 --- a/app/org/elastic4play/database/DBGet.scala +++ b/app/org/elastic4play/database/DBGet.scala @@ -6,7 +6,7 @@ import scala.concurrent.{ExecutionContext, Future} import play.api.libs.json.JsObject -import com.sksamuel.elastic4s.ElasticDsl.{idsQuery, search} +import com.sksamuel.elastic4s.http.ElasticDsl._ import org.elastic4play.NotFoundError @@ -25,12 +25,13 @@ class DBGet @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) { db.execute { // Search by id is not possible on child entity without routing information ⇒ id query search(db.indexName) - .query(idsQuery(id).types(modelName)) + .query(idsQuery(id) /*.types(modelName)*/ ) .size(1) .version(true) } .map { searchResponse ⇒ searchResponse + .hits .hits .headOption .fold[JsObject](throw NotFoundError(s"$modelName $id not found")) { hit ⇒ diff --git a/app/org/elastic4play/database/DBIndex.scala b/app/org/elastic4play/database/DBIndex.scala index afcedaf..897b434 100644 --- a/app/org/elastic4play/database/DBIndex.scala +++ b/app/org/elastic4play/database/DBIndex.scala @@ -1,17 +1,15 @@ package org.elastic4play.database -import javax.inject.{Inject, Singleton} - -import scala.collection.JavaConverters.asScalaSetConverter import scala.concurrent.{blocking, ExecutionContext, Future} import play.api.{Configuration, Logger} -import com.sksamuel.elastic4s.ElasticDsl.{clusterHealth, index, mapping, search, RichFuture} -import com.sksamuel.elastic4s.cluster.ClusterStatsDefinition -import com.sksamuel.elastic4s.indexes.CreateIndexDefinition +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.indexes.CreateIndexRequest +import javax.inject.{Inject, Singleton} -import org.elastic4play.models.{ChildModelDef, ModelAttributes, ModelDef} +import org.elastic4play.models.{ChildModelDef, ModelAttributes} +import org.elastic4play.utils.Collection @Singleton class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map[String, Any], implicit val ec: ExecutionContext) { @@ -41,33 +39,32 @@ class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map * @return a future which is completed when index creation is finished */ def createIndex(models: Iterable[ModelAttributes]): Future[Unit] = { - val modelsMapping = models.map { - case model: ModelDef[_, _] ⇒ - mapping(model.modelName) - .fields(model.attributes.filterNot(_.attributeName == "_id").map(_.elasticMapping)) - .dateDetection(false) - .numericDetection(false) - .templates(model.attributes.flatMap(_.elasticTemplate())) - case model: ChildModelDef[_, _, _, _] ⇒ - mapping(model.modelName) - .fields(model.attributes.filterNot(_.attributeName == "_id").map(_.elasticMapping)) - .parent(model.parentModel.modelName) - .dateDetection(false) - .numericDetection(false) - .templates(model.attributes.flatMap(_.elasticTemplate())) - }.toSeq + val mappingTemplates = Collection.distinctBy(models.flatMap(_.attributes).flatMap(_.elasticTemplate()))(_.name) + val fields = models.flatMap(_.attributes.filterNot(_.attributeName == "_id").map(_.elasticMapping)).toSeq + val relationsField = models + .map { + case child: ChildModelDef[_, _, _, _] ⇒ child.parentModel.modelName → Seq(child.modelName) + case model ⇒ model.modelName → Nil + } + .groupBy(_._1) + .foldLeft(joinField("relations")) { + case (join, (parent, child)) ⇒ join.relation(parent, child.flatMap(_._2).toSeq) + } + val modelMapping = mapping("doc") + .fields(fields :+ relationsField) + .dateDetection(false) + .numericDetection(false) + .templates(mappingTemplates) db.execute { - val createIndexDefinition = CreateIndexDefinition(db.indexName) - .mappings(modelsMapping) + val createIndexDefinition = CreateIndexRequest(db.indexName) + .mappings(modelMapping) .shards(nbShards) .replicas(nbReplicas) - settings.foldLeft(createIndexDefinition) { + settings.foldLeft(createIndexDefinition.indexSetting("mapping.single_type", true)) { case (cid, (key, value)) ⇒ cid.indexSetting(key, value) } } - .map { _ ⇒ - () - } + .map(_ ⇒ ()) } /** @@ -77,7 +74,7 @@ class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map */ def getIndexStatus: Future[Boolean] = db.execute { - index.exists(db.indexName) + indexExists(db.indexName) } .map { _.isExists @@ -100,7 +97,7 @@ class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map */ def getSize(modelName: String): Future[Long] = db.execute { - search(db.indexName → modelName).matchAllQuery().size(0) + search(db.indexName).matchQuery("relations", modelName).size(0) } .map { _.totalHits @@ -120,7 +117,14 @@ class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map clusterHealth(db.indexName) } .map { - _.getStatus.value().toInt + _.status match { + case "green" ⇒ 0 + case "yellow" ⇒ 1 + case "red" ⇒ 2 + case status ⇒ + logger.error(s"unknown cluster status: $status") + 2 + } } .recover { case _ ⇒ 2 } @@ -138,10 +142,4 @@ class DBIndex(db: DBConfiguration, nbShards: Int, nbReplicas: Int, settings: Map def clusterStatusName: String = blocking { getClusterStatusName.await } - - def clusterVersions: Future[Seq[String]] = - db.execute(ClusterStatsDefinition()) - .map { clusterStatsResponse ⇒ - clusterStatsResponse.getNodesStats.getVersions.asScala.toSeq.map(_.toString) - } } diff --git a/app/org/elastic4play/database/DBModify.scala b/app/org/elastic4play/database/DBModify.scala index 5ae3530..8a0dba4 100644 --- a/app/org/elastic4play/database/DBModify.scala +++ b/app/org/elastic4play/database/DBModify.scala @@ -1,20 +1,18 @@ package org.elastic4play.database import javax.inject.{Inject, Singleton} - import scala.concurrent.{ExecutionContext, Future} - import play.api.Logger import play.api.libs.json._ - -import com.sksamuel.elastic4s.ElasticDsl.{script, update} -import com.sksamuel.elastic4s.script.ScriptDefinition -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy - +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.script.Script import org.elastic4play.models.BaseEntity + import scala.collection.JavaConverters._ import java.util.{Map ⇒ JMap} +import com.sksamuel.elastic4s.RefreshPolicy + case class ModifyConfig(retryOnConflict: Int = 5, refreshPolicy: RefreshPolicy = RefreshPolicy.WAIT_UNTIL, version: Option[Long] = None) object ModifyConfig { @@ -50,7 +48,7 @@ class DBModify @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) * Sub attribute can be updated using dot notation ("attr.subattribute"). * @return ElasticSearch update script */ - private[database] def buildScript(entity: BaseEntity, updateAttributes: JsObject): ScriptDefinition = { + private[database] def buildScript(entity: BaseEntity, updateAttributes: JsObject): Script = { val attrs = updateAttributes.fields.zipWithIndex val updateScript = attrs.map { case ((name, JsArray(Seq())), _) ⇒ @@ -81,7 +79,7 @@ class DBModify @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) def apply(entity: BaseEntity, updateAttributes: JsObject, modifyConfig: ModifyConfig): Future[BaseEntity] = db.execute { val updateDefinition = update(entity.id) - .in(db.indexName → entity.model.modelName) + .in(db.indexName / "doc") .routing(entity.routing) .script(buildScript(entity, updateAttributes)) .fetchSource(true) @@ -91,7 +89,7 @@ class DBModify @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) } .map { updateResponse ⇒ entity.model( - Json.parse(updateResponse.get.sourceAsString).as[JsObject] + + Json.parse(updateResponse.result).as[JsObject] + ("_type" → JsString(entity.model.modelName)) + ("_id" → JsString(entity.id)) + ("_routing" → JsString(entity.routing)) + diff --git a/app/org/elastic4play/database/DBRemove.scala b/app/org/elastic4play/database/DBRemove.scala index 3835d0c..1bba3b5 100644 --- a/app/org/elastic4play/database/DBRemove.scala +++ b/app/org/elastic4play/database/DBRemove.scala @@ -1,16 +1,13 @@ package org.elastic4play.database +import com.sksamuel.elastic4s.RefreshPolicy +import com.sksamuel.elastic4s.http.ElasticDsl._ import javax.inject.{Inject, Singleton} - -import scala.concurrent.{ExecutionContext, Future} - +import org.elastic4play.models.BaseEntity import play.api.Logger -import com.sksamuel.elastic4s.ElasticDsl.{delete, RichString} -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy -import org.elasticsearch.rest.RestStatus - -import org.elastic4play.models.BaseEntity +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Success @Singleton class DBRemove @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) { @@ -25,8 +22,6 @@ class DBRemove @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext) .routing(entity.routing) .refresh(RefreshPolicy.WAIT_UNTIL) } - .map { deleteResponse ⇒ - deleteResponse.status != RestStatus.NOT_FOUND - } + .transform(r ⇒ Success(r.isSuccess)) } } diff --git a/app/org/elastic4play/database/DBSequence.scala b/app/org/elastic4play/database/DBSequence.scala index 28b4e06..a018550 100644 --- a/app/org/elastic4play/database/DBSequence.scala +++ b/app/org/elastic4play/database/DBSequence.scala @@ -1,16 +1,14 @@ package org.elastic4play.database +import com.sksamuel.elastic4s.RefreshPolicy import javax.inject.{Inject, Singleton} import scala.concurrent.{ExecutionContext, Future} - -import com.sksamuel.elastic4s.ElasticDsl.update -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy - +import com.sksamuel.elastic4s.http.ElasticDsl._ import org.elastic4play.models.{ModelAttributes, AttributeFormat ⇒ F, AttributeOption ⇒ O} class SequenceModel extends ModelAttributes("sequence") { - val counter = attribute("sequence", F.numberFmt, "Value of the sequence", O.model) + val counter = attribute("sequenceCounter", F.numberFmt, "Value of the sequence", O.model) } @Singleton @@ -18,15 +16,14 @@ class DBSequence @Inject()(db: DBConfiguration, implicit val ec: ExecutionContex def apply(seqId: String): Future[Int] = db.execute { - update(seqId) - .in(db.indexName → "sequence") - .upsert("counter" → 1) - .script("ctx._source.counter += 1") + update(s"sequence_$seqId") + .in(db.indexName / "doc") + .upsert("sequenceCounter" → 1, "relations" → "sequence") + .script("ctx._source.sequenceCounter += 1") .retryOnConflict(5) - //.fetchSource(Seq("counter"), Nil) // doesn't work any longer .fetchSource(true) .refresh(RefreshPolicy.WAIT_UNTIL) } map { updateResponse ⇒ - updateResponse.get.sourceAsMap().get("counter").asInstanceOf[Int] + updateResponse.source("sequenceCounter").asInstanceOf[Int] } } diff --git a/app/org/elastic4play/database/DBUtils.scala b/app/org/elastic4play/database/DBUtils.scala index 324bd04..129fcb8 100644 --- a/app/org/elastic4play/database/DBUtils.scala +++ b/app/org/elastic4play/database/DBUtils.scala @@ -1,22 +1,18 @@ package org.elastic4play.database -import scala.annotation.tailrec - import play.api.libs.json._ -import com.sksamuel.elastic4s.ElasticDsl.fieldSort -import com.sksamuel.elastic4s.searches.RichSearchHit -import com.sksamuel.elastic4s.searches.sort.FieldSortDefinition -import org.elasticsearch.index.IndexNotFoundException -import org.elasticsearch.transport.RemoteTransportException -import org.elasticsearch.search.sort.SortOrder.{ASC, DESC} +import com.sksamuel.elastic4s.http.ElasticDsl.fieldSort +import com.sksamuel.elastic4s.http.search.SearchHit +import com.sksamuel.elastic4s.searches.sort.Sort +import com.sksamuel.elastic4s.searches.sort.SortOrder.{ASC, DESC} import org.elastic4play.utils object DBUtils { - def sortDefinition(sortBy: Seq[String]): Seq[FieldSortDefinition] = { - val byFieldList: Seq[(String, FieldSortDefinition)] = sortBy + def sortDefinition(sortBy: Seq[String]): Seq[Sort] = { + val byFieldList: Seq[(String, Sort)] = sortBy .map { case f if f.startsWith("+") ⇒ f.drop(1) → fieldSort(f.drop(1)).order(ASC) case f if f.startsWith("-") ⇒ f.drop(1) → fieldSort(f.drop(1)).order(DESC) @@ -27,27 +23,25 @@ object DBUtils { utils .Collection .distinctBy(byFieldList)(_._1) - .map(_._2) :+ fieldSort("_uid").order(DESC) + .map(_._2) :+ fieldSort("_id").order(DESC) } /** * Transform search hit into JsObject * This function parses hit source add _type, _routing, _parent, _id and _version attributes */ - def hit2json(hit: RichSearchHit) = { - val id = JsString(hit.id) - Json.parse(hit.sourceAsString).as[JsObject] + - ("_type" → JsString(hit.`type`)) + - ("_routing" → hit.fields.get("_routing").map(r ⇒ JsString(r.java.getValue[String])).getOrElse(id)) + - ("_parent" → hit.fields.get("_parent").map(r ⇒ JsString(r.java.getValue[String])).getOrElse(JsNull)) + + def hit2json(hit: SearchHit) = { + val id = JsString(hit.id) + val body = Json.parse(hit.sourceAsString).as[JsObject] + val (parent, model) = (body \ "relations" \ "parent").asOpt[JsString] match { + case Some(p) ⇒ p → (body \ "relations" \ "name").as[JsString] + case None ⇒ JsNull → (body \ "relations").as[JsString] + } + body - "relations" + + ("_type" → model) + + ("_routing" → hit.routing.fold(id)(JsString.apply)) + + ("_parent" → parent) + ("_id" → id) + ("_version" → JsNumber(hit.version)) } - - @tailrec - def isIndexMissing(t: Throwable): Boolean = t match { - case t: RemoteTransportException ⇒ isIndexMissing(t.getCause) - case _: IndexNotFoundException ⇒ true - case _ ⇒ false - } } diff --git a/app/org/elastic4play/models/AttachmentAttributeFormat.scala b/app/org/elastic4play/models/AttachmentAttributeFormat.scala index 815158b..59489b5 100644 --- a/app/org/elastic4play/models/AttachmentAttributeFormat.scala +++ b/app/org/elastic4play/models/AttachmentAttributeFormat.scala @@ -3,8 +3,8 @@ package org.elastic4play.models import play.api.Logger import play.api.libs.json.{JsValue, Json} -import com.sksamuel.elastic4s.ElasticDsl.{keywordField, longField, nestedField} -import com.sksamuel.elastic4s.mappings.NestedFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.{keywordField, longField, nestedField} +import com.sksamuel.elastic4s.mappings.NestedField import org.scalactic._ import org.elastic4play.controllers.JsonFormat._ @@ -54,7 +54,7 @@ object AttachmentAttributeFormat extends AttributeFormat[Attachment]("attachment result } - override def elasticType(attributeName: String): NestedFieldDefinition = + override def elasticType(attributeName: String): NestedField = nestedField(attributeName).fields( keywordField("name"), keywordField("hashes"), diff --git a/app/org/elastic4play/models/Attributes.scala b/app/org/elastic4play/models/Attributes.scala index ed026fa..e7b237a 100644 --- a/app/org/elastic4play/models/Attributes.scala +++ b/app/org/elastic4play/models/Attributes.scala @@ -3,8 +3,8 @@ package org.elastic4play.models import play.api.Logger import play.api.libs.json.{Format, JsArray, JsNull, JsValue} -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition -import com.sksamuel.elastic4s.mappings.{BasicFieldDefinition, FieldDefinition} +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest +import com.sksamuel.elastic4s.mappings.{BasicField, FieldDefinition} import org.scalactic._ import org.elastic4play.controllers.InputValue @@ -29,7 +29,7 @@ abstract class AttributeFormat[T](val name: String)(implicit val jsFormat: Forma def elasticType(attributeName: String): FieldDefinition - def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateDefinition] = Nil + def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateRequest] = Nil protected def formatError(value: InputValue) = Bad(One(InvalidFormatAttributeError("", name, value))) @@ -94,11 +94,11 @@ case class Attribute[T]( } def elasticMapping: FieldDefinition = format.elasticType(attributeName) match { - case a: BasicFieldDefinition if isSensitive && a.`type` == "String" ⇒ a.index("no") - case a ⇒ a + case a: BasicField if isSensitive && a.`type` == "String" ⇒ a.index(false) + case a ⇒ a } - def elasticTemplate(attributePath: Seq[String] = Nil): Seq[DynamicTemplateDefinition] = + def elasticTemplate(attributePath: Seq[String] = Nil): Seq[DynamicTemplateRequest] = format.elasticTemplate(attributePath :+ attributeName) def validateForCreation(value: Option[JsValue]): Option[JsValue] Or Every[AttributeError] = { diff --git a/app/org/elastic4play/models/BinaryAttributeFormat.scala b/app/org/elastic4play/models/BinaryAttributeFormat.scala index 2f8ad78..598cea4 100644 --- a/app/org/elastic4play/models/BinaryAttributeFormat.scala +++ b/app/org/elastic4play/models/BinaryAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.JsValue -import com.sksamuel.elastic4s.ElasticDsl.binaryField -import com.sksamuel.elastic4s.mappings.BasicFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.binaryField +import com.sksamuel.elastic4s.mappings.BasicField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue} @@ -16,7 +16,7 @@ class BinaryAttributeFormat extends AttributeFormat[Array[Byte]]("binary")(binar override def fromInputValue(subNames: Seq[String], value: InputValue): Array[Byte] Or Every[AttributeError] = formatError(value) - override def elasticType(attributeName: String): BasicFieldDefinition = binaryField(attributeName) + override def elasticType(attributeName: String): BasicField = binaryField(attributeName) override def definition(dblists: DBLists, attribute: Attribute[Array[Byte]]): Seq[AttributeDefinition] = Nil } diff --git a/app/org/elastic4play/models/BooleanAttributeFormat.scala b/app/org/elastic4play/models/BooleanAttributeFormat.scala index 4566d04..3343d7e 100644 --- a/app/org/elastic4play/models/BooleanAttributeFormat.scala +++ b/app/org/elastic4play/models/BooleanAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsBoolean, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.booleanField -import com.sksamuel.elastic4s.mappings.BasicFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.booleanField +import com.sksamuel.elastic4s.mappings.BasicField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -30,7 +30,7 @@ class BooleanAttributeFormat extends AttributeFormat[Boolean]("boolean") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): BasicFieldDefinition = booleanField(attributeName) + override def elasticType(attributeName: String): BasicField = booleanField(attributeName) } object BooleanAttributeFormat extends BooleanAttributeFormat diff --git a/app/org/elastic4play/models/CustomAttributeFormat.scala b/app/org/elastic4play/models/CustomAttributeFormat.scala index 5e6d7b5..c747500 100644 --- a/app/org/elastic4play/models/CustomAttributeFormat.scala +++ b/app/org/elastic4play/models/CustomAttributeFormat.scala @@ -3,10 +3,10 @@ package org.elastic4play.models import play.api.Logger import play.api.libs.json._ -import com.sksamuel.elastic4s.ElasticDsl.{booleanField, dateField, keywordField, longField, nestedField} -import com.sksamuel.elastic4s.mappings.NestedFieldDefinition -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition -import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s.http.ElasticDsl.{booleanField, dateField, keywordField, longField, nestedField} +import com.sksamuel.elastic4s.mappings.NestedField +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest +import com.sksamuel.elastic4s.http.ElasticDsl._ import org.scalactic._ import org.elastic4play.AttributeError @@ -64,10 +64,10 @@ class CustomAttributeFormat extends AttributeFormat[JsValue]("custom") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): NestedFieldDefinition = + override def elasticType(attributeName: String): NestedField = nestedField(attributeName) - override def elasticTemplate(attributePath: Seq[String] = Nil): Seq[DynamicTemplateDefinition] = + override def elasticTemplate(attributePath: Seq[String] = Nil): Seq[DynamicTemplateRequest] = dynamicTemplate(attributePath.mkString("_")) .mapping( dynamicNestedField().fields( diff --git a/app/org/elastic4play/models/DateAttributeFormat.scala b/app/org/elastic4play/models/DateAttributeFormat.scala index 0e9f2d3..5f5afe8 100644 --- a/app/org/elastic4play/models/DateAttributeFormat.scala +++ b/app/org/elastic4play/models/DateAttributeFormat.scala @@ -6,8 +6,8 @@ import scala.util.Try import play.api.libs.json.{JsNumber, JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.dateField -import com.sksamuel.elastic4s.mappings.BasicFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.dateField +import com.sksamuel.elastic4s.mappings.BasicField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -42,7 +42,7 @@ class DateAttributeFormat extends AttributeFormat[Date]("date") { } } - override def elasticType(attributeName: String): BasicFieldDefinition = dateField(attributeName).format("epoch_millis||basic_date_time_no_millis") + override def elasticType(attributeName: String): BasicField = dateField(attributeName).format("epoch_millis||basic_date_time_no_millis") } object DateAttributeFormat extends DateAttributeFormat diff --git a/app/org/elastic4play/models/EnumerationAttributeFormat.scala b/app/org/elastic4play/models/EnumerationAttributeFormat.scala index 44cdf33..1144ad0 100644 --- a/app/org/elastic4play/models/EnumerationAttributeFormat.scala +++ b/app/org/elastic4play/models/EnumerationAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{Format, JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.mappings.KeywordFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.keywordField +import com.sksamuel.elastic4s.mappings.KeywordField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -42,7 +42,7 @@ case class EnumerationAttributeFormat[T <: Enumeration](enum: T)(implicit format case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): KeywordFieldDefinition = keywordField(attributeName) + override def elasticType(attributeName: String): KeywordField = keywordField(attributeName) override def definition(dblists: DBLists, attribute: Attribute[T#Value]): Seq[AttributeDefinition] = Seq(AttributeDefinition(attribute.attributeName, name, attribute.description, enum.values.map(v ⇒ JsString(v.toString)).toSeq, Nil)) diff --git a/app/org/elastic4play/models/HashAttributeFormat.scala b/app/org/elastic4play/models/HashAttributeFormat.scala index 6375faa..ebc1a15 100644 --- a/app/org/elastic4play/models/HashAttributeFormat.scala +++ b/app/org/elastic4play/models/HashAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.mappings.KeywordFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.keywordField +import com.sksamuel.elastic4s.mappings.KeywordField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -27,5 +27,5 @@ object HashAttributeFormat extends AttributeFormat[String]("hash") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): KeywordFieldDefinition = keywordField(attributeName) + override def elasticType(attributeName: String): KeywordField = keywordField(attributeName) } diff --git a/app/org/elastic4play/models/ListEnumerationAttributeFormat.scala b/app/org/elastic4play/models/ListEnumerationAttributeFormat.scala index 2593fe7..3d6aa1b 100644 --- a/app/org/elastic4play/models/ListEnumerationAttributeFormat.scala +++ b/app/org/elastic4play/models/ListEnumerationAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.mappings.KeywordFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.keywordField +import com.sksamuel.elastic4s.mappings.KeywordField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -27,7 +27,7 @@ case class ListEnumerationAttributeFormat(enumerationName: String)(dblists: DBLi case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): KeywordFieldDefinition = keywordField(attributeName) + override def elasticType(attributeName: String): KeywordField = keywordField(attributeName) override def definition(dblists: DBLists, attribute: Attribute[String]): Seq[AttributeDefinition] = Seq(AttributeDefinition(attribute.attributeName, name, attribute.description, items.map(JsString.apply).toSeq, Nil)) diff --git a/app/org/elastic4play/models/MetricsAttributeFormat.scala b/app/org/elastic4play/models/MetricsAttributeFormat.scala index a3ed8f3..b7a7677 100644 --- a/app/org/elastic4play/models/MetricsAttributeFormat.scala +++ b/app/org/elastic4play/models/MetricsAttributeFormat.scala @@ -2,9 +2,9 @@ package org.elastic4play.models import play.api.libs.json._ -import com.sksamuel.elastic4s.ElasticDsl.{dynamicLongField, dynamicTemplate, nestedField} -import com.sksamuel.elastic4s.mappings.NestedFieldDefinition -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.{dynamicLongField, dynamicTemplate, nestedField} +import com.sksamuel.elastic4s.mappings.NestedField +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest import org.scalactic.Accumulation._ import org.scalactic._ @@ -32,9 +32,9 @@ class MetricsAttributeFormat extends AttributeFormat[JsValue]("metrics") { OptionalAttributeFormat(NumberAttributeFormat).inputValueToJson(subNames.tail, value) //.map(v ⇒ JsObject(Seq(subNames.head → v))) } - override def elasticType(attributeName: String): NestedFieldDefinition = nestedField(attributeName) + override def elasticType(attributeName: String): NestedField = nestedField(attributeName) - override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateDefinition] = + override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateRequest] = dynamicTemplate(attributePath.mkString("_")) .mapping(dynamicLongField()) .pathMatch(attributePath.mkString(".") + ".*") :: Nil diff --git a/app/org/elastic4play/models/MultiAttributeFormat.scala b/app/org/elastic4play/models/MultiAttributeFormat.scala index 3234fae..2021ce8 100644 --- a/app/org/elastic4play/models/MultiAttributeFormat.scala +++ b/app/org/elastic4play/models/MultiAttributeFormat.scala @@ -3,7 +3,7 @@ package org.elastic4play.models import play.api.libs.json.{JsArray, JsValue} import com.sksamuel.elastic4s.mappings.FieldDefinition -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest import org.scalactic.Accumulation._ import org.scalactic._ @@ -45,7 +45,7 @@ case class MultiAttributeFormat[T](attributeFormat: AttributeFormat[T]) override def elasticType(attributeName: String): FieldDefinition = attributeFormat.elasticType(attributeName) - override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateDefinition] = attributeFormat.elasticTemplate(attributePath) + override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateRequest] = attributeFormat.elasticTemplate(attributePath) override def definition(dblists: DBLists, attribute: Attribute[Seq[T]]): Seq[AttributeDefinition] = attributeFormat.definition(dblists, attribute.asInstanceOf[Attribute[T]]) diff --git a/app/org/elastic4play/models/NumberAttributeFormat.scala b/app/org/elastic4play/models/NumberAttributeFormat.scala index 5b023b4..94f8b48 100644 --- a/app/org/elastic4play/models/NumberAttributeFormat.scala +++ b/app/org/elastic4play/models/NumberAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsNumber, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.longField -import com.sksamuel.elastic4s.mappings.BasicFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.longField +import com.sksamuel.elastic4s.mappings.BasicField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -30,7 +30,7 @@ class NumberAttributeFormat extends AttributeFormat[Long]("number") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): BasicFieldDefinition = longField(attributeName) + override def elasticType(attributeName: String): BasicField = longField(attributeName) } diff --git a/app/org/elastic4play/models/ObjectAttributeFormat.scala b/app/org/elastic4play/models/ObjectAttributeFormat.scala index 5366ddb..a4a5466 100644 --- a/app/org/elastic4play/models/ObjectAttributeFormat.scala +++ b/app/org/elastic4play/models/ObjectAttributeFormat.scala @@ -3,9 +3,9 @@ package org.elastic4play.models import play.api.Logger import play.api.libs.json._ -import com.sksamuel.elastic4s.ElasticDsl.nestedField -import com.sksamuel.elastic4s.mappings.NestedFieldDefinition -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.nestedField +import com.sksamuel.elastic4s.mappings.NestedField +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest import org.scalactic.Accumulation._ import org.scalactic._ @@ -96,9 +96,9 @@ case class ObjectAttributeFormat(subAttributes: Seq[Attribute[_]]) extends Attri result } - override def elasticType(attributeName: String): NestedFieldDefinition = nestedField(attributeName).fields(subAttributes.map(_.elasticMapping)) + override def elasticType(attributeName: String): NestedField = nestedField(attributeName).fields(subAttributes.map(_.elasticMapping)) - override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateDefinition] = + override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateRequest] = subAttributes.flatMap(_.elasticTemplate(attributePath)) override def definition(dblists: DBLists, attribute: Attribute[JsObject]): Seq[AttributeDefinition] = diff --git a/app/org/elastic4play/models/OptionalAttributeFormat.scala b/app/org/elastic4play/models/OptionalAttributeFormat.scala index 6305b90..bb60a03 100644 --- a/app/org/elastic4play/models/OptionalAttributeFormat.scala +++ b/app/org/elastic4play/models/OptionalAttributeFormat.scala @@ -3,7 +3,7 @@ package org.elastic4play.models import play.api.libs.json.{JsNull, JsValue} import com.sksamuel.elastic4s.mappings.FieldDefinition -import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateDefinition +import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicTemplateRequest import org.scalactic._ import org.elastic4play.AttributeError @@ -30,7 +30,7 @@ case class OptionalAttributeFormat[T](attributeFormat: AttributeFormat[T]) override def elasticType(attributeName: String): FieldDefinition = attributeFormat.elasticType(attributeName) - override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateDefinition] = attributeFormat.elasticTemplate(attributePath) + override def elasticTemplate(attributePath: Seq[String]): Seq[DynamicTemplateRequest] = attributeFormat.elasticTemplate(attributePath) override def definition(dblists: DBLists, attribute: Attribute[Option[T]]): Seq[AttributeDefinition] = attributeFormat.definition(dblists, attribute.asInstanceOf[Attribute[T]]) diff --git a/app/org/elastic4play/models/RawAttributeFormat.scala b/app/org/elastic4play/models/RawAttributeFormat.scala index 64b6b5b..24af6f3 100644 --- a/app/org/elastic4play/models/RawAttributeFormat.scala +++ b/app/org/elastic4play/models/RawAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.binaryField -import com.sksamuel.elastic4s.mappings.BasicFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.binaryField +import com.sksamuel.elastic4s.mappings.BasicField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue} @@ -20,7 +20,7 @@ class RawAttributeFormat extends AttributeFormat[String]("raw") { case other ⇒ other } - override def elasticType(attributeName: String): BasicFieldDefinition = binaryField(attributeName) + override def elasticType(attributeName: String): BasicField = binaryField(attributeName) } object RawAttributeFormat extends RawAttributeFormat diff --git a/app/org/elastic4play/models/StringAttributeFormat.scala b/app/org/elastic4play/models/StringAttributeFormat.scala index a2f126a..9c0ef10 100644 --- a/app/org/elastic4play/models/StringAttributeFormat.scala +++ b/app/org/elastic4play/models/StringAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.mappings.KeywordFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.keywordField +import com.sksamuel.elastic4s.mappings.KeywordField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue} @@ -21,7 +21,7 @@ class StringAttributeFormat extends AttributeFormat[String]("string") { case other ⇒ other } - override def elasticType(attributeName: String): KeywordFieldDefinition = keywordField(attributeName) + override def elasticType(attributeName: String): KeywordField = keywordField(attributeName) } object StringAttributeFormat extends StringAttributeFormat diff --git a/app/org/elastic4play/models/TextAttributeFormat.scala b/app/org/elastic4play/models/TextAttributeFormat.scala index 68d6250..469f60b 100644 --- a/app/org/elastic4play/models/TextAttributeFormat.scala +++ b/app/org/elastic4play/models/TextAttributeFormat.scala @@ -2,8 +2,8 @@ package org.elastic4play.models import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.textField -import com.sksamuel.elastic4s.mappings.TextFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.textField +import com.sksamuel.elastic4s.mappings.TextField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -25,7 +25,7 @@ class TextAttributeFormat extends AttributeFormat[String]("text") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): TextFieldDefinition = textField(attributeName).fielddata(true) + override def elasticType(attributeName: String): TextField = textField(attributeName).fielddata(true) } object TextAttributeFormat extends TextAttributeFormat diff --git a/app/org/elastic4play/models/UUIDAttributeFormat.scala b/app/org/elastic4play/models/UUIDAttributeFormat.scala index ea4eaf4..354c894 100644 --- a/app/org/elastic4play/models/UUIDAttributeFormat.scala +++ b/app/org/elastic4play/models/UUIDAttributeFormat.scala @@ -4,8 +4,8 @@ import java.util.UUID import play.api.libs.json.{JsString, JsValue} -import com.sksamuel.elastic4s.ElasticDsl.keywordField -import com.sksamuel.elastic4s.mappings.KeywordFieldDefinition +import com.sksamuel.elastic4s.http.ElasticDsl.keywordField +import com.sksamuel.elastic4s.mappings.KeywordField import org.scalactic._ import org.elastic4play.controllers.{InputValue, JsonInputValue, StringInputValue} @@ -42,7 +42,7 @@ class UUIDAttributeFormat extends AttributeFormat[UUID]("uuid") { case _ ⇒ formatError(value) } - override def elasticType(attributeName: String): KeywordFieldDefinition = keywordField(attributeName) + override def elasticType(attributeName: String): KeywordField = keywordField(attributeName) } object UUIDAttributeFormat extends UUIDAttributeFormat diff --git a/app/org/elastic4play/services/Aggregations.scala b/app/org/elastic4play/services/Aggregations.scala index c488eec..a42943b 100644 --- a/app/org/elastic4play/services/Aggregations.scala +++ b/app/org/elastic4play/services/Aggregations.scala @@ -1,11 +1,10 @@ package org.elastic4play.services import scala.util.Try -import scala.collection.JavaConverters._ -import play.api.libs.json.{JsArray, JsNumber, JsObject} +import play.api.libs.json._ -import com.sksamuel.elastic4s.ElasticDsl.{ +import com.sksamuel.elastic4s.http.ElasticDsl.{ avgAggregation, dateHistogramAggregation, filterAggregation, @@ -17,53 +16,43 @@ import com.sksamuel.elastic4s.ElasticDsl.{ termsAggregation, topHitsAggregation } -import com.sksamuel.elastic4s.script.ScriptDefinition -import com.sksamuel.elastic4s.searches.RichSearchHit +import com.sksamuel.elastic4s.http.search.HasAggregations +import com.sksamuel.elastic4s.json.JacksonSupport +import com.sksamuel.elastic4s.script.Script +import com.sksamuel.elastic4s.searches.DateHistogramInterval import com.sksamuel.elastic4s.searches.aggs._ -import org.elasticsearch.search.aggregations.bucket.filter.Filter -import org.elasticsearch.search.aggregations.bucket.filters.Filters -import org.elasticsearch.search.aggregations.bucket.histogram.{DateHistogramInterval, Histogram} -import org.elasticsearch.search.aggregations.bucket.terms.Terms -import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order -import org.elasticsearch.search.aggregations.bucket.nested.Nested -import org.elasticsearch.search.aggregations.metrics.avg.Avg -import org.elasticsearch.search.aggregations.metrics.max.Max -import org.elasticsearch.search.aggregations.metrics.min.Min -import org.elasticsearch.search.aggregations.metrics.sum.Sum -import org.elasticsearch.search.aggregations.metrics.tophits.TopHits -import org.joda.time.DateTime import org.elastic4play.BadRequestError import org.elastic4play.database.DBUtils import org.elastic4play.models.BaseModelDef abstract class Agg(val aggregationName: String) { - def apply(model: BaseModelDef): Seq[AggregationDefinition] + def apply(model: BaseModelDef): Seq[Aggregation] - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject } abstract class FieldAgg(val fieldName: String, aggregationName: String, query: Option[QueryDef]) extends Agg(aggregationName) { - def script(s: String): AggregationDefinition + def script(s: String): Aggregation - def field(f: String): AggregationDefinition + def field(f: String): Aggregation - def getAggregation(fieldName: String, aggregations: RichAggregations, query: Option[QueryDef]): RichAggregations = { + def getAggregation(fieldName: String, aggregations: HasAggregations, query: Option[QueryDef]): HasAggregations = { val agg = query match { case None ⇒ aggregations - case _ ⇒ RichAggregations(aggregations.aggregations.get[Filter](aggregationName).getAggregations) + case _ ⇒ aggregations.filter(aggregationName) } if (fieldName.startsWith("computed")) agg else { fieldName.split("\\.").init.foldLeft(agg) { (a, _) ⇒ - RichAggregations(a.getAs[Nested](aggregationName).getAggregations) + a.nested(aggregationName) } } } - def apply(model: BaseModelDef): Seq[AggregationDefinition] = { + def apply(model: BaseModelDef): Seq[Aggregation] = { val aggs = fieldName.split("\\.") match { case Array("computed", c) ⇒ val s = model.computedMetrics.getOrElse(c, throw BadRequestError(s"Field $fieldName is unknown in ${model.modelName}")) @@ -81,7 +70,7 @@ abstract class FieldAgg(val fieldName: String, aggregationName: String, query: O .inits .toSeq .init - .foldLeft[AggregationDefinition](field(fieldName)) { (agg, f) ⇒ + .foldLeft[Aggregation](field(fieldName)) { (agg, f) ⇒ nestedAggregation(aggregationName, f.mkString(".")).subaggs(agg) } ) @@ -94,91 +83,103 @@ abstract class FieldAgg(val fieldName: String, aggregationName: String, query: O } class SelectAvg(aggregationName: String, fieldName: String, query: Option[QueryDef]) extends FieldAgg(fieldName, aggregationName, query) { - def script(s: String): AggregationDefinition = avgAggregation(aggregationName).script(ScriptDefinition(s).lang("groovy")) + def script(s: String): Aggregation = avgAggregation(aggregationName).script(Script(s).lang("groovy")) - def field(f: String): AggregationDefinition = avgAggregation(aggregationName).field(f) + def field(f: String): Aggregation = avgAggregation(aggregationName).field(f) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val avg = getAggregation(fieldName, aggregations, query).getAs[Avg](aggregationName) - val value = Try(JsNumber(avg.getValue)).toOption.getOrElse(JsNumber(0)) - JsObject(Seq(avg.getName → value)) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val avg = getAggregation(fieldName, aggregations, query).avg(aggregationName) + val value = Try(JsNumber(avg.value)).toOption.getOrElse(JsNumber(0)) + JsObject(Seq(avg.name → value)) } } class SelectMin(aggregationName: String, fieldName: String, query: Option[QueryDef]) extends FieldAgg(fieldName, aggregationName, query) { - def script(s: String): AggregationDefinition = minAggregation(aggregationName).script(ScriptDefinition(s).lang("groovy")) + def script(s: String): Aggregation = minAggregation(aggregationName).script(Script(s).lang("groovy")) - def field(f: String): AggregationDefinition = minAggregation(aggregationName).field(f) + def field(f: String): Aggregation = minAggregation(aggregationName).field(f) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val min = getAggregation(fieldName, aggregations, query).getAs[Min](aggregationName) - val value = Try(JsNumber(min.getValue)).toOption.getOrElse(JsNumber(0)) - JsObject(Seq(min.getName → value)) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val min = getAggregation(fieldName, aggregations, query).min(aggregationName) + val value = min.value.fold(JsNumber(0))(m ⇒ JsNumber(m)) + JsObject(Seq(min.name → value)) } } class SelectMax(aggregationName: String, fieldName: String, query: Option[QueryDef]) extends FieldAgg(fieldName, aggregationName, query) { - def script(s: String): AggregationDefinition = maxAggregation(aggregationName).script(ScriptDefinition(s).lang("groovy")) + def script(s: String): Aggregation = maxAggregation(aggregationName).script(Script(s).lang("groovy")) - def field(f: String): AggregationDefinition = maxAggregation(aggregationName).field(f) + def field(f: String): Aggregation = maxAggregation(aggregationName).field(f) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val max = getAggregation(fieldName, aggregations, query).getAs[Max](aggregationName) - val value = Try(JsNumber(max.getValue)).toOption.getOrElse(JsNumber(0)) - JsObject(Seq(max.getName → value)) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val max = getAggregation(fieldName, aggregations, query).max(aggregationName) + val value = max.value.fold(JsNumber(0))(m ⇒ JsNumber(m)) + JsObject(Seq(max.name → value)) } } class SelectSum(aggregationName: String, fieldName: String, query: Option[QueryDef]) extends FieldAgg(fieldName, aggregationName, query) { - def script(s: String): AggregationDefinition = sumAggregation(aggregationName).script(ScriptDefinition(s).lang("groovy")) + def script(s: String): Aggregation = sumAggregation(aggregationName).script(Script(s).lang("groovy")) - def field(f: String): AggregationDefinition = sumAggregation(aggregationName).field(f) + def field(f: String): Aggregation = sumAggregation(aggregationName).field(f) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val sum = getAggregation(fieldName, aggregations, query).getAs[Sum](aggregationName) - val value = Try(JsNumber(sum.getValue)).toOption.getOrElse(JsNumber(0)) - JsObject(Seq(sum.getName → value)) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val sum = getAggregation(fieldName, aggregations, query).sum(aggregationName) + val value = JsNumber(sum.value) + JsObject(Seq(sum.name → value)) } } class SelectCount(aggregationName: String, query: Option[QueryDef]) extends FieldAgg("", aggregationName, query) { - def script(s: String): AggregationDefinition = ??? + def script(s: String): Aggregation = ??? - def field(f: String): AggregationDefinition = filterAggregation(aggregationName).query(matchAllQuery) + def field(f: String): Aggregation = filterAggregation(aggregationName).query(matchAllQuery) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val count = aggregations.getAs[Filter](aggregationName) - JsObject(Seq(count.getName → JsNumber(count.getDocCount))) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val count = aggregations.filter(aggregationName) + JsObject(Seq(count.name → JsNumber(count.docCount))) } } class SelectTop(aggregationName: String, size: Int, sortBy: Seq[String], query: Option[QueryDef] = None) extends FieldAgg("", aggregationName, query) { - def script(s: String): AggregationDefinition = ??? + def script(s: String): Aggregation = ??? - def field(f: String): AggregationDefinition = topHitsAggregation(aggregationName).size(size).sortBy(DBUtils.sortDefinition(sortBy)) + def field(f: String): Aggregation = topHitsAggregation(aggregationName).size(size).sortBy(DBUtils.sortDefinition(sortBy)) - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val top = aggregations.getAs[TopHits](aggregationName) - JsObject(Seq("top" → JsArray(top.getHits.getHits.map(h ⇒ DBUtils.hit2json(RichSearchHit(h)))))) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val hits = aggregations.tophits(aggregationName).hits.map { hit ⇒ + val id = JsString(hit.id) + val body = Json.parse(JacksonSupport.mapper.writeValueAsString(hit.source)).as[JsObject] + val (parent, model) = (body \ "relations" \ "parent").asOpt[JsString] match { + case Some(p) ⇒ p → (body \ "relations" \ "name").as[JsString] + case None ⇒ JsNull → (body \ "relations").as[JsString] + } + body - "relations" + + ("_type" → model) + + ("_parent" → parent) + + ("_id" → id) + + } + Json.obj("top" → hits) } } class GroupByCategory(aggregationName: String, categories: Map[String, QueryDef], subAggs: Seq[Agg]) extends Agg(aggregationName) { - def apply(model: BaseModelDef): Seq[KeyedFiltersAggregationDefinition] = { + def apply(model: BaseModelDef): Seq[KeyedFiltersAggregation] = { val filters = categories.mapValues(_.query) val subAggregations = subAggs.flatMap(_.apply(model)) - Seq(KeyedFiltersAggregationDefinition(aggregationName, filters).subAggregations(subAggregations)) + Seq(KeyedFiltersAggregation(aggregationName, filters).subAggregations(subAggregations)) } - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { - val filters = aggregations.getAs[Filters](aggregationName) + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { + val filters = aggregations.keyedFilters(aggregationName) JsObject { categories.keys.toSeq.map { cat ⇒ - val subAggResults = filters.getBucketByKey(cat).getAggregations + val subAggResults = filters.aggResults(cat) cat → subAggs - .map(_.processResult(model, RichAggregations(subAggResults))) + .map(_.processResult(model, subAggResults)) .reduceOption(_ ++ _) .getOrElse(JsObject.empty) } @@ -188,11 +189,11 @@ class GroupByCategory(aggregationName: String, categories: Map[String, QueryDef] class GroupByTime(aggregationName: String, fields: Seq[String], interval: String, subAggs: Seq[Agg]) extends Agg(aggregationName) { - def apply(model: BaseModelDef): Seq[AggregationDefinition] = + def apply(model: BaseModelDef): Seq[Aggregation] = fields.map { fieldName ⇒ val dateHistoAgg = dateHistogramAggregation(s"${aggregationName}_$fieldName") .field(fieldName) - .interval(new DateHistogramInterval(interval)) + .interval(DateHistogramInterval.fromString(interval)) .subAggregations(subAggs.flatMap(_.apply(model))) fieldName .split("\\.") @@ -201,29 +202,26 @@ class GroupByTime(aggregationName: String, fields: Seq[String], interval: String .inits .toSeq .init - .foldLeft[AggregationDefinition](dateHistoAgg) { (agg, f) ⇒ + .foldLeft[Aggregation](dateHistoAgg) { (agg, f) ⇒ nestedAggregation(aggregationName, f.mkString(".")).subaggs(agg) } } - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { val aggs = fields.map { fieldName ⇒ val agg = fieldName.split("\\.").init.foldLeft(aggregations) { (a, _) ⇒ - RichAggregations(a.getAs[Nested](aggregationName).getAggregations) + a.nested(aggregationName) } - val buckets = agg.getAs[Histogram](s"${aggregationName}_$fieldName").getBuckets - fieldName → buckets - .asScala - .map { bucket ⇒ - val results = subAggs - .map(_.processResult(model, RichAggregations(bucket.getAggregations))) - .reduceOption(_ ++ _) - .getOrElse(JsObject.empty) - // date → obj(key{avg, min} → value) - bucket.getKey.asInstanceOf[DateTime].getMillis.toString → results - } - .toMap + val buckets = agg.histogram(s"${aggregationName}_$fieldName").buckets + fieldName → buckets.map { bucket ⇒ + val results = subAggs + .map(_.processResult(model, bucket)) + .reduceOption(_ ++ _) + .getOrElse(JsObject.empty) + // date → obj(key{avg, min} → value) + bucket.key → results + }.toMap }.toMap val keys = aggs.values.flatMap(_.keys).toSet JsObject { @@ -239,28 +237,28 @@ class GroupByTime(aggregationName: String, fields: Seq[String], interval: String class GroupByField(aggregationName: String, fieldName: String, size: Option[Int], sortBy: Seq[String], subAggs: Seq[Agg]) extends Agg(aggregationName) { - private def setSize(agg: TermsAggregationDefinition): TermsAggregationDefinition = + private def setSize(agg: TermsAggregation): TermsAggregation = size.fold(agg)(s ⇒ agg.size(s)) - private def setOrder(agg: TermsAggregationDefinition): TermsAggregationDefinition = { + private def setOrder(agg: TermsAggregation): TermsAggregation = { val sortDefinition = sortBy .flatMap { - case "_count" | "+_count" ⇒ Seq(Order.count(true)) - case "-_count" ⇒ Seq(Order.count(false)) - case "_term" | "+_term" ⇒ Seq(Order.term(true)) - case "-_term" ⇒ Seq(Order.term(false)) - case f if f.startsWith("+") ⇒ Seq(Order.aggregation(f.drop(1), true)) - case f if f.startsWith("-") ⇒ Seq(Order.aggregation(f.drop(1), false)) - case f if f.length() > 0 ⇒ Seq(Order.aggregation(f, true)) + case "_count" | "+_count" ⇒ Seq(TermsOrder("_count", true)) + case "-_count" ⇒ Seq(TermsOrder("_count", false)) + case "_term" | "+_term" ⇒ Seq(TermsOrder("_key", true)) + case "-_term" ⇒ Seq(TermsOrder("_key", false)) + case f if f.startsWith("+") ⇒ Seq(TermsOrder(f.drop(1), true)) + case f if f.startsWith("-") ⇒ Seq(TermsOrder(f.drop(1), false)) + case f if f.length() > 0 ⇒ Seq(TermsOrder(f, true)) case _ ⇒ Nil } if (sortDefinition.nonEmpty) - agg.order(Order.compound(sortDefinition.asJava)) + agg.order(sortDefinition) else agg } - def apply(model: BaseModelDef): Seq[AggregationDefinition] = { + def apply(model: BaseModelDef): Seq[Aggregation] = { val agg = setSize(setOrder(termsAggregation(s"${aggregationName}_$fieldName").field(fieldName).subAggregations(subAggs.flatMap(_.apply(model))))) Seq( fieldName @@ -270,32 +268,29 @@ class GroupByField(aggregationName: String, fieldName: String, size: Option[Int] .inits .toSeq .init - .foldLeft[AggregationDefinition](agg) { (agg, f) ⇒ + .foldLeft[Aggregation](agg) { (agg, f) ⇒ nestedAggregation(aggregationName, f.mkString(".")).subaggs(agg) } ) } - def processResult(model: BaseModelDef, aggregations: RichAggregations): JsObject = { + def processResult(model: BaseModelDef, aggregations: HasAggregations): JsObject = { val buckets = fieldName .split("\\.") .init .foldLeft(aggregations) { (a, _) ⇒ - RichAggregations(a.getAs[Nested](aggregationName).getAggregations) + a.nested(aggregationName) } - .getAs[Terms](s"${aggregationName}_$fieldName") - .getBuckets + .terms(s"${aggregationName}_$fieldName") + .buckets JsObject { - buckets - .asScala - .map { bucket ⇒ - val results = subAggs - .map(_.processResult(model, RichAggregations(bucket.getAggregations))) - .reduceOption(_ ++ _) - .getOrElse(JsObject.empty) - bucket.getKeyAsString → results - } - .toMap + buckets.map { bucket ⇒ + val results = subAggs + .map(_.processResult(model, bucket)) + .reduceOption(_ ++ _) + .getOrElse(JsObject.empty) + bucket.key → results + }.toMap } } } diff --git a/app/org/elastic4play/services/AttachmentSrv.scala b/app/org/elastic4play/services/AttachmentSrv.scala index 3aee3e6..443ed8c 100644 --- a/app/org/elastic4play/services/AttachmentSrv.scala +++ b/app/org/elastic4play/services/AttachmentSrv.scala @@ -3,21 +3,13 @@ package org.elastic4play.services import java.io.InputStream import java.nio.file.Files -import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future} - -import play.api.Configuration -import play.api.libs.json.JsValue.jsValueToJsLookup -import play.api.libs.json.Json.toJsFieldJsValueWrapper -import play.api.libs.json._ - import akka.NotUsed import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, Sink, Source, StreamConverters} import akka.util.ByteString +import com.sksamuel.elastic4s.http.ElasticDsl.search import javax.inject.{Inject, Singleton} - import org.elastic4play.controllers.JsonFormat.{attachmentInputValueReads, fileInputValueFormat} import org.elastic4play.controllers.{AttachmentInputValue, FileInputValue, JsonInputValue} import org.elastic4play.database.{DBCreate, DBFind, DBRemove} @@ -25,6 +17,12 @@ import org.elastic4play.models.{AttributeDef, BaseModelDef, EntityDef, ModelDef, import org.elastic4play.services.JsonFormat.attachmentFormat import org.elastic4play.utils.{Hash, Hasher, Retry} import org.elastic4play.{AttributeCheckingError, InvalidFormatAttributeError, MissingAttributeError} +import play.api.Configuration +import play.api.libs.json.JsValue.jsValueToJsLookup +import play.api.libs.json.Json.toJsFieldJsValueWrapper +import play.api.libs.json._ +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ExecutionContext, Future} case class Attachment(name: String, hashes: Seq[Hash], size: Long, contentType: String, id: String) @@ -220,9 +218,8 @@ class AttachmentSrv( removeChunks().recover { case _ ⇒ () } } - def cleanup: Future[Unit] = { - import com.sksamuel.elastic4s.ElasticDsl.{search, RichString} - dbFind(Some("all"), Nil)(index ⇒ search(index / attachmentModel.modelName).fetchSource(false)) + def cleanup: Future[Unit] = + dbFind(Some("all"), Nil)(index ⇒ search(index).matchQuery("relations", attachmentModel.modelName).fetchSource(false)) ._1 .mapConcat(o ⇒ (o \ "_id").asOpt[String].toList) .collect { case id if id.endsWith("_0") ⇒ id.dropRight(2) } @@ -235,5 +232,4 @@ class AttachmentSrv( } .runWith(Sink.ignore) .map(_ ⇒ ()) - } } diff --git a/app/org/elastic4play/services/CreateSrv.scala b/app/org/elastic4play/services/CreateSrv.scala index 6723de4..033199a 100644 --- a/app/org/elastic4play/services/CreateSrv.scala +++ b/app/org/elastic4play/services/CreateSrv.scala @@ -1,7 +1,6 @@ package org.elastic4play.services import java.util.Date -import javax.inject.{Inject, Singleton} import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -10,6 +9,7 @@ import play.api.libs.json.JsValue.jsValueToJsLookup import play.api.libs.json.Json.toJsFieldJsValueWrapper import play.api.libs.json.{JsNull, JsObject, JsValue, Json} +import javax.inject.{Inject, Singleton} import org.scalactic.Accumulation.convertGenTraversableOnceToValidatable import org.scalactic.Every.everyToGenTraversableOnce import org.scalactic.{Bad, One} @@ -46,6 +46,7 @@ class CreateSrv @Inject()( case (name, Some(value)) ⇒ name → value }) .fold(attrs ⇒ Future.successful(JsObject(attrs.toSeq)), errors ⇒ Future.failed(AttributeCheckingError(model.modelName, errors))) + private[services] def processAttributes(model: BaseModelDef, parent: Option[BaseEntity], attributes: JsObject)( implicit authContext: AuthContext ): Future[JsObject] = diff --git a/app/org/elastic4play/services/FindSrv.scala b/app/org/elastic4play/services/FindSrv.scala index 1eb1dfc..d17e6b4 100644 --- a/app/org/elastic4play/services/FindSrv.scala +++ b/app/org/elastic4play/services/FindSrv.scala @@ -1,7 +1,5 @@ package org.elastic4play.services -import javax.inject.{Inject, Singleton} - import scala.concurrent.{ExecutionContext, Future} import play.api.libs.json.JsObject @@ -9,13 +7,15 @@ import play.api.libs.json.JsValue.jsValueToJsLookup import akka.NotUsed import akka.stream.scaladsl.Source -import com.sksamuel.elastic4s.ElasticDsl._ -import com.sksamuel.elastic4s.searches.queries.QueryDefinition +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.searches.queries.Query +import javax.inject.{Inject, Singleton} import org.elastic4play.database.{DBConfiguration, DBFind} import org.elastic4play.models.{AbstractModelDef, BaseEntity, BaseModelDef} +import org.elastic4play.services.QueryDSL._ -case class QueryDef(query: QueryDefinition) +case class QueryDef(query: Query) @Singleton class FindSrv @Inject()(dbfind: DBFind, modelSrv: ModelSrv, implicit val ec: ExecutionContext) { @@ -28,7 +28,8 @@ class FindSrv @Inject()(dbfind: DBFind, modelSrv: ModelSrv, implicit val ec: Exe range: Option[String], sortBy: Seq[String] ): (Source[BaseEntity, NotUsed], Future[Long]) = { - val (src, total) = dbfind(range, sortBy)(indexName ⇒ modelName.fold(search(indexName))(m ⇒ search(indexName → m)).query(queryDef.query)) + val query = modelName.fold(queryDef)(m ⇒ and("relations" ~= m, queryDef)).query + val (src, total) = dbfind(range, sortBy)(indexName ⇒ search(indexName).query(query)) val entities = src.map { attrs ⇒ modelName match { //case Some("audit") ⇒ auditModel.get()(attrs) @@ -43,7 +44,7 @@ class FindSrv @Inject()(dbfind: DBFind, modelSrv: ModelSrv, implicit val ec: Exe } def apply(model: BaseModelDef, queryDef: QueryDef, range: Option[String], sortBy: Seq[String]): (Source[BaseEntity, NotUsed], Future[Long]) = { - val (src, total) = dbfind(range, sortBy)(indexName ⇒ search(indexName → model.modelName).query(queryDef.query)) + val (src, total) = dbfind(range, sortBy)(indexName ⇒ search(indexName).query(and("relations" ~= model.modelName, queryDef).query)) val entities = src.map(attrs ⇒ model(attrs)) (entities, total) } @@ -54,17 +55,18 @@ class FindSrv @Inject()(dbfind: DBFind, modelSrv: ModelSrv, implicit val ec: Exe range: Option[String], sortBy: Seq[String] ): (Source[E, NotUsed], Future[Long]) = { - val (src, total) = dbfind(range, sortBy)(indexName ⇒ search(indexName → model.modelName).query(queryDef.query)) + val (src, total) = dbfind(range, sortBy)(indexName ⇒ search(indexName).query(and("relations" ~= model.modelName, queryDef).query)) val entities = src.map(attrs ⇒ model(attrs)) (entities, total) } def apply(model: BaseModelDef, queryDef: QueryDef, aggs: Agg*): Future[JsObject] = - dbfind(indexName ⇒ search(indexName → model.modelName).query(queryDef.query).aggregations(aggs.flatMap(_.apply(model))).size(0)) - .map { searchResponse ⇒ - aggs - .map(_.processResult(model, searchResponse.aggregations)) - .reduceOption(_ ++ _) - .getOrElse(JsObject.empty) - } + dbfind( + indexName ⇒ search(indexName).query(and("relations" ~= model.modelName, queryDef).query).aggregations(aggs.flatMap(_.apply(model))).size(0) + ).map { searchResponse ⇒ + aggs + .map(_.processResult(model, searchResponse.aggregations)) + .reduceOption(_ ++ _) + .getOrElse(JsObject.empty) + } } diff --git a/app/org/elastic4play/services/MigrationSrv.scala b/app/org/elastic4play/services/MigrationSrv.scala index 5148701..5164734 100644 --- a/app/org/elastic4play/services/MigrationSrv.scala +++ b/app/org/elastic4play/services/MigrationSrv.scala @@ -14,7 +14,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer} import akka.stream.scaladsl.{Sink, Source} -import com.sksamuel.elastic4s.ElasticDsl.search +import com.sksamuel.elastic4s.http.ElasticDsl._ import com.typesafe.config.Config import org.elastic4play.InternalError @@ -23,10 +23,15 @@ import org.elastic4play.database._ case class MigrationEvent(modelName: String, current: Long, total: Long) extends EventMessage case object EndOfMigrationEvent extends EventMessage +object IndexType extends Enumeration { + val indexWithMappingTypes, indexWithoutMappingTypes = Value +} + trait MigrationOperations { val operations: PartialFunction[DatabaseState, Seq[Operation]] def beginMigration(version: Int): Future[Unit] def endMigration(version: Int): Future[Unit] + def indexType(version: Int): IndexType.Value } /* DatabaseState is the state of a specific version of the database. @@ -80,16 +85,56 @@ class MigrationSrv @Inject()( } /* Last version of database */ - case class OriginState(db: DBConfiguration) extends DatabaseState { + class OriginStateES6(db: DBConfiguration) extends DatabaseState { private val currentdbfind = dbfind.switchTo(db) private lazy val currentdbget = new DBGet(db, ec) override def version: Int = db.version override def source(tableName: String): Source[JsObject, NotUsed] = - currentdbfind.apply(Some("all"), Nil)(indexName ⇒ search(indexName → tableName).matchAllQuery)._1 + currentdbfind.apply(Some("all"), Nil)(indexName ⇒ search(indexName).matchQuery("relations", tableName))._1 override def count(tableName: String): Future[Long] = new DBIndex(db, 0, 0, Map.empty, ec).getSize(tableName) override def getEntity(tableName: String, entityId: String): Future[JsObject] = currentdbget(tableName, entityId) } + class OriginStateES5(db: DBConfiguration) extends DatabaseState { + private val currentdbfind = dbfind.switchTo(db) + private lazy val currentdbget = new DBGet(db, ec) + override def version: Int = db.version + override def source(tableName: String): Source[JsObject, NotUsed] = { + val searchQuery = search(db.indexName / tableName).storedFields("_source", "_routing", "_parent").start(0).version(true) + Source.fromGraph(new SearchWithScroll(db, searchQuery, currentdbfind.keepAliveStr, 0, Int.MaxValue)).map { hit ⇒ + val id = JsString(hit.id) + Json.parse(hit.sourceAsString).as[JsObject] + + ("_type" → JsString(hit.`type`)) + + ("_routing" → hit + .routing + .fold(id)(JsString.apply)) + //fields.routget("_routing").map(r ⇒ JsString(r.java.getValue[String])).getOrElse(id)) + + ("_parent" → hit + .parent + .fold[JsValue](JsNull)(JsString.apply)) + //hit.fields.get("_parent").map(r ⇒ JsString(r.java.getValue[String])).getOrElse(JsNull)) + + ("_id" → id) + + ("_version" → JsNumber(hit.version)) + } + } + override def count(tableName: String): Future[Long] = + db.execute { + search(db.indexName / tableName).matchAllQuery().size(0) + } + .map { + _.totalHits + } + .recover { case _ ⇒ 0L } + override def getEntity(tableName: String, entityId: String): Future[JsObject] = currentdbget(tableName, entityId) + } + + object OriginState { + + def apply(db: DBConfiguration) = + migration.indexType(db.version) match { + case IndexType.`indexWithoutMappingTypes` ⇒ new OriginStateES6(db) + case IndexType.indexWithMappingTypes ⇒ new OriginStateES5(db) + } + } + /* If there is no database, use empty one */ object EmptyState extends DatabaseState { override def version = 1 @@ -101,7 +146,7 @@ class MigrationSrv @Inject()( def migrationPath(db: DBConfiguration): Future[(Int, DatabaseState)] = new DBIndex(db, 0, 0, Map.empty, ec).getIndexStatus.flatMap { case true ⇒ - logger.info(s"Initiate database migration from version ${db.version}") + logger.info(s"Initiate database migration from version ${db.version} (${migration.indexType(db.version)})") Future.successful(db.version → OriginState(db)) case false if db.version == 1 ⇒ logger.info("Create a new empty database") @@ -149,7 +194,7 @@ class MigrationSrv @Inject()( val models = modelSrv.list migrationProcess = migrationPath(db) .flatMap { mig ⇒ - dbindex.createIndex(models).map(_ ⇒ mig) + dbindex.createIndex(new SequenceModel +: models).map(_ ⇒ mig) } .flatMap { versionMig ⇒ migration.beginMigration(versionMig._1).map(_ ⇒ versionMig) diff --git a/app/org/elastic4play/services/QueryDSL.scala b/app/org/elastic4play/services/QueryDSL.scala index c452103..0c53d72 100644 --- a/app/org/elastic4play/services/QueryDSL.scala +++ b/app/org/elastic4play/services/QueryDSL.scala @@ -1,6 +1,6 @@ package org.elastic4play.services -import com.sksamuel.elastic4s.ElasticDsl.{ +import com.sksamuel.elastic4s.http.ElasticDsl.{ boolQuery, existsQuery, hasChildQuery, @@ -15,8 +15,9 @@ import com.sksamuel.elastic4s.ElasticDsl.{ termsQuery, wildcardQuery } -import com.sksamuel.elastic4s.searches.queries.{BuildableTermsQueryImplicits, QueryDefinition} -import org.apache.lucene.search.join.ScoreMode +import com.sksamuel.elastic4s.searches.ScoreMode +import com.sksamuel.elastic4s.searches.queries.Query +import com.sksamuel.elastic4s.searches.queries.term.{BuildableTermsQuery, TermsQuery} import org.elastic4play.models.BaseEntity @@ -58,7 +59,7 @@ object QueryDSL { def groupByCaterogy(aggregationName: Option[String], categories: Map[String, QueryDef], selectables: Agg*) = new GroupByCategory(aggregationName.getOrElse("categories"), categories, selectables) - private def nestedField(field: String, q: (String) ⇒ QueryDefinition) = + private def nestedField(field: String, q: String ⇒ Query) = field .split("\\.") .init @@ -69,7 +70,10 @@ object QueryDSL { case (queryDef, subName) ⇒ nestedQuery(subName.mkString(".")).query(queryDef).scoreMode(ScoreMode.None) } - implicit class SearchField(field: String) extends BuildableTermsQueryImplicits { + implicit class SearchField(field: String) /*extends BuildableTermsQueryImplicits*/ { + implicit val stringTermsQueryBuilder = new BuildableTermsQuery[String] { + override def build(q: TermsQuery[String]): Any = q + } private def convertValue(value: Any): Any = value match { case _: Enumeration#Value ⇒ value.toString case bd: BigDecimal ⇒ bd.toDouble @@ -85,7 +89,7 @@ object QueryDSL { def ~>=(value: Any) = QueryDef(nestedField(field, rangeQuery(_).gte(value.toString))) def ~<>(value: (Any, Any)) = QueryDef(nestedField(field, rangeQuery(_).gt(value._1.toString).lt(value._2.toString))) def ~=<>=(value: (Any, Any)) = QueryDef(nestedField(field, rangeQuery(_).gte(value._1.toString).lte(value._2.toString))) - def in(values: AnyRef*) = QueryDef(nestedField(field, termsQuery(_, values))) + def in(values: String*) = QueryDef(nestedField(field, termsQuery(_, values))) } def ofType(value: String) = QueryDef(termQuery("_type", value)) @@ -97,11 +101,11 @@ object QueryDSL { def and(queries: QueryDef*): QueryDef = QueryDef(boolQuery().must(queries.map(_.query))) def and(queries: Iterable[QueryDef]): QueryDef = QueryDef(boolQuery().must(queries.map(_.query))) def not(query: QueryDef): QueryDef = QueryDef(boolQuery.not(query.query)) - def child(childType: String, query: QueryDef): QueryDef = QueryDef(hasChildQuery(childType).query(query.query).scoreMode(ScoreMode.None)) - def parent(parentType: String, query: QueryDef): QueryDef = QueryDef(hasParentQuery(parentType).query(query.query).scoreMode(false)) + def child(childType: String, query: QueryDef): QueryDef = QueryDef(hasChildQuery(childType, query.query, ScoreMode.None)) + def parent(parentType: String, query: QueryDef): QueryDef = QueryDef(hasParentQuery(parentType, query.query, false)) def withParent(parent: BaseEntity): QueryDef = withParent(parent.model.modelName, parent.id) def withParent(parentType: String, parentId: String): QueryDef = - QueryDef(hasParentQuery(parentType).query(idsQuery(parentId).types(parentType)).scoreMode(false)) // QueryDef(ParentIdQueryDefinition(parentType, parentId)) FIXME doesn't work yet + QueryDef(hasParentQuery(parentType, idsQuery(parentId).types(parentType), false)) def string(queryString: String): QueryDef = QueryDef(query(queryString)) } diff --git a/build.sbt b/build.sbt index eac4c5a..e4e6cb8 100644 --- a/build.sbt +++ b/build.sbt @@ -17,16 +17,15 @@ scalaVersion := "2.12.7" resolvers += "elasticsearch-releases" at "https://artifacts.elastic.co/maven" +val elastic4sVersion = "6.5.1" libraryDependencies ++= Seq( cacheApi, - "com.sksamuel.elastic4s" %% "elastic4s-core" % "5.6.6", - "com.sksamuel.elastic4s" %% "elastic4s-streams" % "5.6.6", - "com.sksamuel.elastic4s" %% "elastic4s-tcp" % "5.6.6", - "com.sksamuel.elastic4s" %% "elastic4s-xpack-security" % "5.6.6", - "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.10" % Test, + "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, + "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sVersion, + "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion, + "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.19" % Test, "org.scalactic" %% "scalactic" % "3.0.5", "org.bouncycastle" % "bcprov-jdk15on" % "1.58", - "com.floragunn" % "search-guard-ssl" % "5.6.9-23", specs2 % Test ) diff --git a/test/org/elastic4play/database/DBCreateSpec.scala b/test/org/elastic4play/database/DBCreateSpec.scala index a571297..455321c 100644 --- a/test/org/elastic4play/database/DBCreateSpec.scala +++ b/test/org/elastic4play/database/DBCreateSpec.scala @@ -6,8 +6,9 @@ import scala.concurrent.Future import play.api.libs.json.{JsObject, JsString, Json} import play.api.test.PlaySpecification -import com.sksamuel.elastic4s.index.RichIndexResponse -import com.sksamuel.elastic4s.indexes.IndexDefinition +import com.sksamuel.elastic4s.http.index.IndexResponse +import com.sksamuel.elastic4s.http.ElasticDsl.IndexHandler +import com.sksamuel.elastic4s.indexes.IndexRequest import common.{Fabricator ⇒ F} import org.junit.runner.RunWith import org.specs2.mock.Mockito @@ -26,22 +27,22 @@ class DBCreateSpec extends PlaySpecification with Mockito { val db: DBConfiguration = mock[DBConfiguration] val dbcreate = new DBCreate(db, ec) - def apply(modelName: String, attributes: JsObject): (JsObject, IndexDefinition) = { - val indexResponse = mock[RichIndexResponse] + def apply(modelName: String, attributes: JsObject): (JsObject, IndexRequest) = { + val indexResponse = mock[IndexResponse] indexResponse.id returns (attributes \ "_id").asOpt[String].getOrElse(defaultEntityId) - db.execute(any[IndexDefinition]) returns Future.successful(indexResponse) + db.execute(any[IndexRequest]) returns Future.successful(indexResponse) val attrs = dbcreate(modelName, attributes).await - val captor = capture[IndexDefinition] + val captor = capture[IndexRequest] there was one(db).execute(captor.capture) (attrs, captor.value) } - def apply(parent: BaseEntity, attributes: JsObject): (JsObject, IndexDefinition) = { - val indexResponse = mock[RichIndexResponse] + def apply(parent: BaseEntity, attributes: JsObject): (JsObject, IndexRequest) = { + val indexResponse = mock[IndexResponse] indexResponse.id returns (attributes \ "_id").asOpt[String].getOrElse(defaultEntityId) - db.execute(any[IndexDefinition]) returns Future.successful(indexResponse) + db.execute(any[IndexRequest]) returns Future.successful(indexResponse) val attrs = dbcreate(modelName, Some(parent), attributes).await - val captor = capture[IndexDefinition] + val captor = capture[IndexRequest] there was one(db).execute(captor.capture) (attrs, captor.value) } diff --git a/test/org/elastic4play/database/DBFindSpec.scala b/test/org/elastic4play/database/DBFindSpec.scala index 2d11877..5afad47 100644 --- a/test/org/elastic4play/database/DBFindSpec.scala +++ b/test/org/elastic4play/database/DBFindSpec.scala @@ -6,15 +6,15 @@ import scala.concurrent.duration._ import play.api.Application import play.api.inject.guice.GuiceApplicationBuilder -import play.api.libs.json.{JsNumber, JsString, Json} import play.api.test.PlaySpecification import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.testkit.scaladsl.TestSink +import com.sksamuel.elastic4s.http.search.{SearchHit, SearchHits, SearchResponse} +import com.sksamuel.elastic4s.http.ElasticDsl.{SearchHandler, SearchScrollHandler} import com.sksamuel.elastic4s.searches._ import common.{Fabricator ⇒ F} -import org.elasticsearch.search.SearchHitField import org.junit.runner.RunWith import org.specs2.mock.Mockito import org.specs2.runner.JUnitRunner @@ -81,33 +81,33 @@ class DBFindSpec extends PlaySpecification with Mockito { "execute search using scroll" in { val db = mock[DBConfiguration] val dbfind = new DBFind(pageSize, keepAlive, db, ec, mat) - val searchDef = mock[SearchDefinition] + val searchDef = mock[SearchRequest] searchDef.limit(pageSize) returns searchDef searchDef.scroll(dbfind.keepAliveStr) returns searchDef - val firstPageResult = mock[RichSearchResponse] + val firstPageResult = mock[SearchResponse] val scrollId = F.string("scrollId") val hits = Range(0, 24).map { i ⇒ - val m = mock[RichSearchHit] + val m = mock[SearchHit] m.toString returns s"MockResult-$i" m }.toArray - firstPageResult.scrollIdOpt returns Some(scrollId) + firstPageResult.scrollId returns Some(scrollId) firstPageResult.totalHits returns hits.length.toLong firstPageResult.isTimedOut returns false firstPageResult.isEmpty returns false - firstPageResult.hits returns hits.take(5) + firstPageResult.hits returns SearchHits(24, 0, hits.take(5)) db.execute(searchDef) returns Future.successful(firstPageResult) - val secondPageResult = mock[RichSearchResponse] - secondPageResult.scrollIdOpt returns Some(scrollId) + val secondPageResult = mock[SearchResponse] + secondPageResult.scrollId returns Some(scrollId) secondPageResult.isTimedOut returns false secondPageResult.isEmpty returns false - secondPageResult.hits returns hits.drop(5) - db.execute(any[SearchScrollDefinition]) returns Future.successful(secondPageResult) + secondPageResult.hits returns SearchHits(24, 0, hits.drop(5)) + db.execute(any[SearchScrollRequest]) returns Future.successful(secondPageResult) val (src, total) = dbfind.searchWithScroll(searchDef, 8, 10) src - .runWith(TestSink.probe[RichSearchHit]) + .runWith(TestSink.probe[SearchHit]) .request(2) .expectNextN(hits.slice(8, 10).toList) .request(5) @@ -118,7 +118,7 @@ class DBFindSpec extends PlaySpecification with Mockito { total.await must_== hits.length there was one(db).execute(searchDef) - there was one(db).execute(any[SearchScrollDefinition]) + there was one(db).execute(any[SearchScrollRequest]) // FIXME there was one(db).execute(any[ClearScrollDefinition]) } @@ -128,19 +128,19 @@ class DBFindSpec extends PlaySpecification with Mockito { val dbfind = new DBFind(pageSize, keepAlive, db, ec, mat) val limit = 24 val offset = 3 - val hits = Array.fill(limit)(mock[RichSearchHit]) - val searchDef = mock[SearchDefinition] + val hits = Array.fill(limit)(mock[SearchHit]) + val searchDef = mock[SearchRequest] searchDef.limit(limit) returns searchDef searchDef.start(offset) returns searchDef - val results = mock[RichSearchResponse] + val results = mock[SearchResponse] //db.execute(searchDef) returns Future.successful(results) doReturn(Future.successful(results)).when(db).execute(searchDef) results.totalHits returns 42 - results.hits returns hits + results.hits returns SearchHits(24, 0, hits) val (src, total) = dbfind.searchWithoutScroll(searchDef, offset, limit) src - .runWith(TestSink.probe[RichSearchHit]) + .runWith(TestSink.probe[SearchHit]) .request(2) .expectNextN(hits.take(2).toList) .request(10) @@ -152,34 +152,5 @@ class DBFindSpec extends PlaySpecification with Mockito { total.await must_== 42 there was one(db).execute(searchDef) } - - "convert hit to json" in { - val hit = mock[RichSearchHit] - val routing = F.string("routing") - val routingField = mock[SearchHitField] - routingField.getValue[String] returns routing - val parent = F.string("parent") - val parentField = mock[SearchHitField] - parentField.getValue[String] returns parent - val fields = Map("_routing" → RichSearchHitField(routingField), "_parent" → RichSearchHitField(parentField)) - hit.fields returns fields - val id = F.string("id") - hit.id returns id - val doc = Json.obj("magic-number" → 42, "text" → "blah", "really-good" → true) - hit.sourceAsString returns doc.toString - val tpe = "some-object" - hit.`type` returns tpe - val version = 12L - hit.version returns version - - // val db = mock[DBConfiguration] - // val dbfind = new DBFind(pageSize, keepAlive, db, ec, mat) - DBUtils.hit2json(hit) must_== (doc + - ("_id" → JsString(id)) + - ("_parent" → JsString(parent)) + - ("_routing" → JsString(routing)) + - ("_type" → JsString(tpe)) + - ("_version" → JsNumber(version))) - } } }