Skip to content

Commit

Permalink
#2305 clear scroll, retry on failure, improve resume
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jan 7, 2022
1 parent 3979a54 commit be4009b
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 151 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Dependencies._
import com.typesafe.sbt.packager.Keys.bashScriptDefines
import org.thp.ghcl.Milestone

val thehiveVersion = "4.1.16-1"
val thehiveVersion = "4.1.17-RC1-1"
val scala212 = "2.12.13"
val scala213 = "2.13.1"
val supportedScalaVersions = List(scala212, scala213)
Expand Down
6 changes: 6 additions & 0 deletions migration/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ input {
keepalive: 10h
# Size of the page for scroll
pagesize: 10

maxAttempts = 5
minBackoff = 10 milliseconds
maxBackoff = 10 seconds
randomFactor = 0.2
}
filter {
maxCaseAge: 0
Expand Down Expand Up @@ -39,6 +44,7 @@ input {

output {
caseNumberShift: 0
resume: false
removeData: false
db {
provider: janusgraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ object Migrate extends App with MigrationOps {
opt[Unit]('d', "drop-database")
.action((_, c) => addConfig(c, "output.dropDatabase", true))
.text("Drop TheHive4 database before migration"),
opt[Boolean]('r', "resume")
.action((_, c) => addConfig(c, "output.resume", true))
.text("Resume migration (or migrate on existing database)"),
opt[String]('m', "main-organisation")
.valueName("<organisation>")
.action((o, c) => addConfig(c, "input.mainOrganisation", o)),
Expand All @@ -75,6 +78,10 @@ object Migrate extends App with MigrationOps {
opt[Int]('p', "es-pagesize")
.text("TheHive3 ElasticSearch page size")
.action((p, c) => addConfig(c, "input.search.pagesize", p)),
opt[Boolean]('s', "es-single-type")
.valueName("<bool>")
.text("Elasticsearch single type")
.action((s, c) => addConfig(c, "search.singleType", s)),
/* case age */
opt[String]("max-case-age")
.valueName("<duration>")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.thp.thehive.migration.th3

import akka.NotUsed
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Scheduler}
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.sslconfig.ssl.{KeyManagerConfig, KeyStoreConfig, SSLConfigSettings, TrustManagerConfig, TrustStoreConfig}
import org.thp.client.{Authentication, NoAuthentication, PasswordAuthentication}
import org.thp.scalligraph.utils.Retry
import org.thp.scalligraph.{InternalError, NotFoundError}
import play.api.http.HeaderNames
import play.api.libs.json.{JsNumber, JsObject, JsValue, Json}
Expand All @@ -15,7 +16,7 @@ import play.api.{Configuration, Logger}

import java.net.{URI, URLEncoder}
import javax.inject.{Inject, Provider, Singleton}
import scala.concurrent.duration.{Duration, DurationInt, DurationLong}
import scala.concurrent.duration.{Duration, DurationInt, DurationLong, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}

@Singleton
Expand Down Expand Up @@ -79,10 +80,26 @@ class ElasticClientProvider @Inject() (
} yield PasswordAuthentication(user, password))
.getOrElse(NoAuthentication)

val esUri = config.get[String]("search.uri")
val pageSize = config.get[Int]("search.pagesize")
val keepAlive = config.getMillis("search.keepalive").millis
val elasticConfig = new ElasticConfig(ws, authentication, esUri, pageSize, keepAlive.toMillis + "ms")
val esUri = config.get[String]("search.uri")
val pageSize = config.get[Int]("search.pagesize")
val keepAlive = config.getMillis("search.keepalive").millis
val maxAttempts = config.get[Int]("search.maxAttempts")
val minBackoff = config.get[FiniteDuration]("search.minBackoff")
val maxBackoff = config.get[FiniteDuration]("search.maxBackoff")
val randomFactor = config.get[Double]("search.randomFactor")

val elasticConfig = new ElasticConfig(
ws,
authentication,
esUri,
pageSize,
keepAlive.toMillis + "ms",
maxAttempts,
minBackoff,
maxBackoff,
randomFactor,
actorSystem.scheduler
)
val elasticVersion = elasticConfig.version
logger.info(s"Found ElasticSearch $elasticVersion")
lazy val indexName: String = {
Expand All @@ -102,14 +119,25 @@ class ElasticClientProvider @Inject() (
}
logger.info(s"Found Index $indexName")

val isSingleType = elasticConfig.isSingleType(indexName)
val isSingleType = config.getOptional[Boolean]("search.singleType").getOrElse(elasticConfig.isSingleType(indexName))
logger.info(s"Found index with ${if (isSingleType) "single type" else "multiple types"}")
if (elasticConfig.isSingleType(indexName)) new ElasticSingleTypeClient(elasticConfig, indexName)
if (isSingleType) new ElasticSingleTypeClient(elasticConfig, indexName)
else new ElasticMultiTypeClient(elasticConfig, indexName)
}
}

class ElasticConfig(ws: WSClient, authentication: Authentication, esUri: String, val pageSize: Int, val keepAlive: String) {
class ElasticConfig(
ws: WSClient,
authentication: Authentication,
esUri: String,
val pageSize: Int,
val keepAlive: String,
maxAttempts: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
scheduler: Scheduler
) {
lazy val logger: Logger = Logger(getClass)
def stripUrl(url: String): String = new URI(url).normalize().toASCIIString.replaceAll("/+$", "")

Expand All @@ -118,13 +146,32 @@ class ElasticConfig(ws: WSClient, authentication: Authentication, esUri: String,
.map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}")
.mkString("&")
logger.debug(s"POST ${stripUrl(s"$esUri/$url?$encodedParams")}\n$body")
Retry(maxAttempts).withBackoff(minBackoff, maxBackoff, randomFactor)(scheduler, ec) {
authentication(
ws.url(stripUrl(s"$esUri/$url?$encodedParams"))
.withHttpHeaders(HeaderNames.CONTENT_TYPE -> "application/json")
)
.post(body)
.map {
case response if response.status == 200 => response.json
case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}")
}
}
}

def delete(url: String, body: JsValue, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue] = {
val encodedParams = params
.map(p => s"${URLEncoder.encode(p._1, "UTF-8")}=${URLEncoder.encode(p._2, "UTF-8")}")
.mkString("&")
authentication(
ws.url(stripUrl(s"$esUri/$url?$encodedParams"))
ws
.url(stripUrl(s"$esUri/$url?$encodedParams"))
.withHttpHeaders(HeaderNames.CONTENT_TYPE -> "application/json")
)
.post(body)
.withBody(body)
.execute("DELETE")
.map {
case response if response.status == 200 => response.json
case response if response.status == 200 => response.body[JsValue]
case response => throw InternalError(s"Unexpected response from Elasticsearch: ${response.status} ${response.statusText}\n${response.body}")
}
}
Expand Down Expand Up @@ -162,6 +209,7 @@ trait ElasticClient {
val keepAlive: String
def search(docType: String, request: JsObject, params: (String, String)*)(implicit ec: ExecutionContext): Future[JsValue]
def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue]
def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue]

def apply(docType: String, query: JsObject)(implicit ec: ExecutionContext): Source[JsValue, NotUsed] = {
val searchWithScroll = new SearchWithScroll(this, docType, query + ("size" -> JsNumber(pageSize)), keepAlive)
Expand All @@ -184,7 +232,10 @@ class ElasticMultiTypeClient(elasticConfig: ElasticConfig, indexName: String) ex
elasticConfig.post(s"/$indexName/$docType/_search", request, params: _*)
override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId))
}

class ElasticSingleTypeClient(elasticConfig: ElasticConfig, indexName: String) extends ElasticClient {
override val pageSize: Int = elasticConfig.pageSize
override val keepAlive: String = elasticConfig.keepAlive
Expand All @@ -196,4 +247,6 @@ class ElasticSingleTypeClient(elasticConfig: ElasticConfig, indexName: String) e
}
override def scroll(scrollId: String, keepAlive: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.post("/_search/scroll", Json.obj("scroll_id" -> scrollId, "scroll" -> keepAlive))
override def clearScroll(scrollId: String)(implicit ec: ExecutionContext): Future[JsValue] =
elasticConfig.delete("/_search/scroll", Json.obj("scroll_id" -> scrollId))
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ class SearchWithScroll(client: ElasticClient, docType: String, query: JsObject,
else firstResults.onComplete(firstCallback.invoke)
}
)

override def postStop(): Unit =
scrollId.foreach(client.clearScroll(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import org.janusgraph.core.JanusGraph
import org.thp.scalligraph.SingleInstance
import org.thp.scalligraph.janus.JanusDatabase
import org.thp.scalligraph.models.{Database, UpdatableSchema}
import org.thp.thehive.services.LocalUserSrv
import play.api.Configuration

import javax.inject.{Inject, Provider, Singleton}
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.util.Success

@Singleton
class JanusDatabaseProvider @Inject() (configuration: Configuration, system: ActorSystem, schemas: immutable.Set[UpdatableSchema])
Expand All @@ -36,11 +34,7 @@ class JanusDatabaseProvider @Inject() (configuration: Configuration, system: Act
system,
new SingleInstance(true)
)
db.createSchema(schemas.flatMap(_.modelList).toSeq)
db.tryTransaction { graph =>
schemas.flatMap(_.initialValues).foreach(_.create()(graph, LocalUserSrv.getSystemAuthContext))
Success(())
}
db.createSchema(schemas.flatMap(_.modelList).toSeq).get
db
}
}
Loading

0 comments on commit be4009b

Please sign in to comment.