Skip to content

Commit

Permalink
#82 Add support of ElasticSearch 6.x
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed May 10, 2019
1 parent d91c757 commit 61b536d
Show file tree
Hide file tree
Showing 38 changed files with 579 additions and 660 deletions.
15 changes: 6 additions & 9 deletions app/org/elastic4play/ErrorHandler.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.elastic4play

import java.net.ConnectException

import scala.concurrent.Future

import play.api.Logger
import play.api.http.{HttpErrorHandler, Status, Writeable}
import play.api.libs.json.{JsNull, JsValue, Json}
import play.api.mvc.{RequestHeader, ResponseHeader, Result, Results}

import org.elasticsearch.client.transport.NoNodeAvailableException
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.query.QueryShardException

import org.elastic4play.JsonFormat.attributeCheckingExceptionWrites

/**
Expand All @@ -35,10 +33,10 @@ class ErrorHandler extends HttpErrorHandler {
Some(Status.BAD_REQUEST Json.obj("type" "NumberFormatException", "message" ("Invalid format " + nfe.getMessage)))
case NotFoundError(message) Some(Status.NOT_FOUND Json.obj("type" "NotFoundError", "message" message))
case BadRequestError(message) Some(Status.BAD_REQUEST Json.obj("type" "BadRequest", "message" message))
case SearchError(message, cause) Some(Status.BAD_REQUEST Json.obj("type" "SearchError", "message" s"$message (${cause.getMessage})"))
case SearchError(message) Some(Status.BAD_REQUEST Json.obj("type" "SearchError", "message" s"$message"))
case ace: AttributeCheckingError Some(Status.BAD_REQUEST Json.toJson(ace))
case iae: IllegalArgumentException Some(Status.BAD_REQUEST Json.obj("type" "IllegalArgument", "message" iae.getMessage))
case _: NoNodeAvailableException
case _: ConnectException
Some(Status.INTERNAL_SERVER_ERROR Json.obj("type" "NoNodeAvailable", "message" "ElasticSearch cluster is unreachable"))
case CreateError(_, message, attributes)
Some(Status.INTERNAL_SERVER_ERROR Json.obj("type" "CreateError", "message" message, "object" attributes))
Expand All @@ -49,9 +47,8 @@ class ErrorHandler extends HttpErrorHandler {
case Some((_, j)) j
}
Some(Status.MULTI_STATUS Json.obj("type" "MultiError", "error" message, "suberrors" suberrors))
case _: IndexNotFoundException Some(520 JsNull)
case qse: QueryShardException Some(Status.BAD_REQUEST Json.obj("type" "Invalid search query", "message" qse.getMessage))
case t: Throwable Option(t.getCause).flatMap(toErrorResult)
case IndexNotFoundException Some(520 JsNull)
case t: Throwable Option(t.getCause).flatMap(toErrorResult)
}

def toResult[C](status: Int, c: C)(implicit writeable: Writeable[C]) = Result(header = ResponseHeader(status), body = writeable.toEntity(c))
Expand Down
3 changes: 2 additions & 1 deletion app/org/elastic4play/Errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ case class NotFoundError(message: String)
case class GetError(message: String) extends Exception(message)
case class UpdateError(status: Option[String], message: String, attributes: JsObject) extends ErrorWithObject(message, attributes)
case class InternalError(message: String) extends Exception(message)
case class SearchError(message: String, cause: Throwable) extends Exception(message, cause)
case class SearchError(message: String) extends Exception(message)
case class AuthenticationError(message: String) extends Exception(message)
case class OAuth2Redirect(redirectUrl: String, params: Map[String, Seq[String]]) extends Exception(redirectUrl)
case class AuthorizationError(message: String) extends Exception(message)
case class MultiError(message: String, exceptions: Seq[Exception])
extends Exception(message + exceptions.map(_.getMessage).mkString(" :\n\t- ", "\n\t- ", ""))
case object IndexNotFoundException extends Exception

case class AttributeCheckingError(tableName: String, errors: Seq[AttributeError] = Nil) extends Exception(errors.mkString("[", "][", "]")) {
override def toString: String = errors.mkString("[", "][", "]")
Expand Down
293 changes: 122 additions & 171 deletions app/org/elastic4play/database/DBConfiguration.scala

Large diffs are not rendered by default.

62 changes: 23 additions & 39 deletions app/org/elastic4play/database/DBCreate.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package org.elastic4play.database

import javax.inject.{Inject, Singleton}

import scala.concurrent.{ExecutionContext, Future}

import play.api.Logger
import play.api.libs.json.JsValue.jsValueToJsLookup
import play.api.libs.json._

import akka.stream.scaladsl.Sink
import com.sksamuel.elastic4s.ElasticDsl.indexInto
import com.sksamuel.elastic4s.indexes.IndexDefinition
import com.sksamuel.elastic4s.RefreshPolicy
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.indexes.IndexRequest
import com.sksamuel.elastic4s.streams.RequestBuilder
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy
import org.elasticsearch.index.engine.VersionConflictEngineException
import org.elasticsearch.transport.RemoteTransportException
import javax.inject.{Inject, Singleton}

import org.elastic4play.CreateError
import org.elastic4play.models.BaseEntity
import org.elastic4play.{ConflictError, CreateError, InternalError}

/**
* Service lass responsible for entity creation
Expand Down Expand Up @@ -58,69 +55,56 @@ class DBCreate @Inject()(db: DBConfiguration, implicit val ec: ExecutionContext)
.orElse(id)

// remove attributes that starts with "_" because we wan't permit to interfere with elasticsearch internal fields
val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString
val docSource = addParent(modelName, parent, JsObject(attributes.fields.filterNot(_._1.startsWith("_"))))

db.execute {
addId(id).andThen(addParent(parentId)).andThen(addRouting(routing)) {
indexInto(db.indexName, modelName).source(docSource).refresh(RefreshPolicy.WAIT_UNTIL)
addId(id).andThen(addRouting(routing)) {
indexInto(db.indexName / "doc").source(docSource.toString).refresh(RefreshPolicy.WAIT_UNTIL)
}
}
.transform(
.map(
indexResponse
attributes +
("_type" JsString(modelName)) +
("_id" JsString(indexResponse.id)) +
("_parent" parentId.fold[JsValue](JsNull)(JsString)) +
("_routing" JsString(routing.getOrElse(indexResponse.id))) +
("_version" JsNumber(indexResponse.version)),
convertError(attributes, _)
("_version" JsNumber(indexResponse.version))
)
}

private[database] def convertError(attributes: JsObject, error: Throwable): Throwable = error match {
case rte: RemoteTransportException convertError(attributes, rte.getCause)
case vcee: VersionConflictEngineException ConflictError(vcee.getMessage, attributes)
case other
logger.warn("create error", other)
CreateError(None, other.getMessage, attributes)
}

/**
* add id information in index definition
*/
private def addId(id: Option[String]): IndexDefinition IndexDefinition = id match {
private def addId(id: Option[String]): IndexRequest IndexRequest = id match {
case Some(i) _ id i createOnly true
case None identity
}

/**
* add parent information in index definition
*/
private def addParent(parent: Option[String]): IndexDefinition IndexDefinition = parent match {
case Some(p) _ parent p
case None identity
}

/**
* add routing information in index definition
*/
private def addRouting(routing: Option[String]): IndexDefinition IndexDefinition = routing match {
private def addRouting(routing: Option[String]): IndexRequest IndexRequest = routing match {
case Some(r) _ routing r
case None identity
}

private def addParent(modelName: String, parent: Option[BaseEntity], entity: JsObject): JsObject = parent match {
case Some(p) entity + ("relations" Json.obj("name" modelName, "parent" p.id))
case None entity + ("relations" JsString(modelName))
}

/**
* Class used to build index definition based on model name and attributes
* This class is used by sink (ElasticSearch reactive stream)
*/
private class AttributeRequestBuilder() extends RequestBuilder[JsObject] {
override def request(attributes: JsObject): IndexDefinition = {
val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_"))).toString
override def request(attributes: JsObject): IndexRequest = {
val id = (attributes \ "_id").asOpt[String]
val parent = (attributes \ "_parent").asOpt[String]
val routing = (attributes \ "_routing").asOpt[String] orElse parent orElse id
val modelName = (attributes \ "_type").asOpt[String].getOrElse(throw InternalError("The entity doesn't contain _type attribute"))
addId(id).andThen(addParent(parent)).andThen(addRouting(routing)) {
indexInto(db.indexName, modelName).source(docSource)
val routing = (attributes \ "_routing").asOpt[String] orElse id
val docSource = JsObject(attributes.fields.filterNot(_._1.startsWith("_")))
addId(id).andThen(addRouting(routing)) {
indexInto(db.indexName, "doc").source(docSource.toString)
}
}
}
Expand Down
Loading

0 comments on commit 61b536d

Please sign in to comment.