From 2d0cd301614c8ef2aa35e160ef84e6aee23ff894 Mon Sep 17 00:00:00 2001 From: To-om Date: Thu, 27 May 2021 11:39:29 +0200 Subject: [PATCH] #2042 Add cloner tool --- ScalliGraph | 2 +- conf/cloner.conf | 21 +++ migration/src/main/resources/reference.conf | 41 ++++ .../scala/org/thp/thehive/cloner/Cloner.scala | 177 ++++++++++++++++++ .../thehive/cloner/IntegrityCheckApp.scala | 99 ++++++++++ .../org/thp/thehive/migration/Migrate.scala | 14 +- 6 files changed, 350 insertions(+), 4 deletions(-) create mode 100644 conf/cloner.conf create mode 100644 migration/src/main/scala/org/thp/thehive/cloner/Cloner.scala create mode 100644 migration/src/main/scala/org/thp/thehive/cloner/IntegrityCheckApp.scala diff --git a/ScalliGraph b/ScalliGraph index 8fefbc7ddb..7269094914 160000 --- a/ScalliGraph +++ b/ScalliGraph @@ -1 +1 @@ -Subproject commit 8fefbc7ddb9bf84835a6e4aa452026684ca66d8c +Subproject commit 72690949146e846a51a405c9272699ec8a7eb86f diff --git a/conf/cloner.conf b/conf/cloner.conf new file mode 100644 index 0000000000..1488e458a6 --- /dev/null +++ b/conf/cloner.conf @@ -0,0 +1,21 @@ +from { + db.janusgraph { + storage { + backend: berkeleyje + directory: /data/thehive/support/db + } + } +} +to { + db.janusgraph { + storage { + backend: berkeleyje + directory: /data/thehive/support2/db + } + index.search { + backend: lucene + directory: /data/thehive/support2/index + } + } +} +batchSize: 100 diff --git a/migration/src/main/resources/reference.conf b/migration/src/main/resources/reference.conf index b1f8f9e99a..90a131e6a0 100644 --- a/migration/src/main/resources/reference.conf +++ b/migration/src/main/resources/reference.conf @@ -76,3 +76,44 @@ output { # } } } + + +from { + db { + janusgraph { + connect { + maxAttempts = 10 + minBackoff = 1 second + maxBackoff = 5 seconds + randomFactor = 0.2 + } + } + onConflict { + maxAttempts = 6 + minBackoff = 100 milliseconds + maxBackoff = 1 seconds + randomFactor = 0.2 + } + chunkSize = 32k + } +} +to { + db { + janusgraph { + connect { + maxAttempts = 10 + minBackoff = 1 second + maxBackoff = 5 seconds + randomFactor = 0.2 + } + } + onConflict { + maxAttempts = 6 + minBackoff = 100 milliseconds + maxBackoff = 1 seconds + randomFactor = 0.2 + } + chunkSize = 32k + } +} +batchSize: 100 diff --git a/migration/src/main/scala/org/thp/thehive/cloner/Cloner.scala b/migration/src/main/scala/org/thp/thehive/cloner/Cloner.scala new file mode 100644 index 0000000000..f281ddf3a7 --- /dev/null +++ b/migration/src/main/scala/org/thp/thehive/cloner/Cloner.scala @@ -0,0 +1,177 @@ +package org.thp.thehive.cloner + +import akka.actor.ActorSystem +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.tinkerpop.gremlin.structure.T +import org.thp.scalligraph.SingleInstance +import org.thp.scalligraph.janus.JanusDatabase +import org.thp.scalligraph.models.Database +import org.thp.scalligraph.traversal.TraversalOps._ +import org.thp.thehive.connector.cortex.models.CortexSchemaDefinition +import org.thp.thehive.migration.Migrate.getVersion +import org.thp.thehive.models.TheHiveSchemaDefinition +import play.api.libs.logback.LogbackLoggerConfigurator +import play.api.{Configuration, Environment} +import scopt.OParser + +import java.io.File +import java.nio.file.{Files, Paths} +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.util.Success + +object Cloner extends App with IntegrityCheckApp { + def getDatabase(configuration: Configuration)(implicit system: ActorSystem): Database = { + val janusDatabase = JanusDatabase.openDatabase(configuration, system) + new JanusDatabase( + janusDatabase, + configuration, + system, + new SingleInstance(true) + ) + } + + val defaultLoggerConfigFile = "/etc/thehive/logback-cloner.xml" + if (System.getProperty("logger.file") == null && Files.exists(Paths.get(defaultLoggerConfigFile))) + System.setProperty("logger.file", defaultLoggerConfigFile) + + val builder = OParser.builder[Config] + val argParser = { + import builder._ + OParser.sequence( + programName("clone"), + version('v', "version"), + help('h', "help"), + head("TheHive cloner tool", getVersion), + opt[File]('l', "logger-config") + .valueName("") + .action { (f, c) => + System.setProperty("logger.file", f.getAbsolutePath) + c + } + .text("logback configuration file"), + opt[File]('c', "config") + .valueName("") + .required() + .action((f, c) => ConfigFactory.parseFileAnySyntax(f).withFallback(c)) + .text("configuration file") + ) + } + val defaultConfig = + ConfigFactory + .parseResources("play/reference-overrides.conf") + .withFallback(ConfigFactory.defaultReference()) + .resolve() + OParser.parse(argParser, args, defaultConfig).foreach { config => + implicit val actorSystem: ActorSystem = ActorSystem("TheHiveCloner", config) + + try { + (new LogbackLoggerConfigurator).configure(Environment.simple(), Configuration.empty, Map.empty) + + val sourceDatabase: Database = getDatabase( + Configuration( + config + .getConfig("from") + .withoutPath("db.janusgraph.index.search") + ) + ) + + val thehiveSchema = new TheHiveSchemaDefinition + val cortexSchema = new CortexSchemaDefinition + if (sourceDatabase.version(thehiveSchema.name) != thehiveSchema.operations.operations.length + 1) { + println( + "The schema of TheHive is not valid " + + s"(found ${sourceDatabase.version(thehiveSchema.name)}, expected ${thehiveSchema.operations.operations.length + 1})" + ) + sys.exit(1) + } + if (sourceDatabase.version(cortexSchema.name) != cortexSchema.operations.operations.length + 1) { + println( + "The schema of Cortex is not valid " + + s"(found ${sourceDatabase.version(cortexSchema.name)}, expected ${cortexSchema.operations.operations.length + 1})" + ) + sys.exit(1) + } + + val destDatabase: Database = getDatabase( + Configuration( + config + .getConfig("to") + ) + ) + val nonEmpty: Boolean = destDatabase.roTransaction { implicit graph => + graph.VV().limit(1).exists + } + if (nonEmpty) { + println("The destination database is not empty.") + sys.exit(1) + } + + println("Setting up the database schema ...") + // don't create initial values + val models = destDatabase.extraModels ++ thehiveSchema.modelList ++ cortexSchema.modelList + destDatabase.createSchema(models) + destDatabase.setVersion(thehiveSchema.name, thehiveSchema.operations.operations.length + 1) + destDatabase.setVersion(cortexSchema.name, cortexSchema.operations.operations.length + 1) + + val batchSize: Int = config.getInt("batchSize") + + println("Copying vertices ...") + sourceDatabase.roTransaction { from => + val vertexIdsBuilder = immutable.Map.newBuilder[AnyRef, AnyRef] + + val allVertices = from.VV().toIterator + Iterator + .continually(allVertices) + .takeWhile(_.hasNext) + .map(_.take(batchSize)) + .foreach { vv => + destDatabase.tryTransaction { to => + vv.foreach { vertex => + val properties = T.label +: vertex.label +: vertex.properties[AnyRef]().asScala.flatMap(p => Seq(p.key(), p.value())).toSeq + val createdVertex = to.underlying.addVertex(properties: _*) + vertexIdsBuilder += vertex.id() -> createdVertex.id() + } + Success(()) + } + } + + val vertexIds = vertexIdsBuilder.result() + println("Copying edges ...") + val allEdges = from.EE().toIterator + Iterator + .continually(allEdges) + .takeWhile(_.hasNext) + .map(_.take(batchSize)) + .foreach { ee => + destDatabase.tryTransaction { to => + ee.foreach { edge => + val createdEdge = for { + inVertex <- vertexIds.get(edge.inVertex().id()).toIterable.flatMap(to.traversal().V(_).asScala) + outVertex <- vertexIds.get(edge.outVertex().id()).toIterable.flatMap(to.traversal().V(_).asScala) + properties = edge.properties[AnyRef]().asScala.flatMap(p => Seq(p.key(), p.value())).toSeq + } yield outVertex.addEdge(edge.label, inVertex, properties: _*) + if (createdEdge.isEmpty) + println( + s"Edge ${edge.label()}:${edge.id()} " + + s"from ${edge.outVertex().label()}:${edge.outVertex().id()} " + + s"to ${edge.inVertex().label()}:${edge.inVertex().id()} " + + s"cannot be created" + ) + } + Success(()) + } + } + } + sourceDatabase.close() + + println("Add indices ...") + destDatabase.addSchemaIndexes(models) + runChecks(destDatabase, Configuration(config)) + destDatabase.close() + } finally { + actorSystem.terminate() + () + } + } +} diff --git a/migration/src/main/scala/org/thp/thehive/cloner/IntegrityCheckApp.scala b/migration/src/main/scala/org/thp/thehive/cloner/IntegrityCheckApp.scala new file mode 100644 index 0000000000..1224be9325 --- /dev/null +++ b/migration/src/main/scala/org/thp/thehive/cloner/IntegrityCheckApp.scala @@ -0,0 +1,99 @@ +package org.thp.thehive.cloner + +import akka.actor.ActorSystem +import akka.stream.Materializer +import com.google.inject.{Guice, Injector => GInjector} +import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder} +import org.thp.scalligraph.auth.{AuthContext, UserSrv => UserDB} +import org.thp.scalligraph.models.Database +import org.thp.scalligraph.services._ +import org.thp.thehive.migration.th4.DummyActor +import org.thp.thehive.services._ +import play.api.cache.ehcache.EhCacheModule +import play.api.inject.guice.GuiceInjector +import play.api.inject.{ApplicationLifecycle, DefaultApplicationLifecycle, Injector} +import play.api.libs.concurrent.AkkaGuiceSupport +import play.api.{Configuration, Environment} + +import javax.inject.Inject +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.util.Success + +trait IntegrityCheckApp { + private def buildApp(configuration: Configuration, db: Database)(implicit actorSystem: ActorSystem): GInjector = + Guice + .createInjector( + (play.api.inject.guice.GuiceableModule.guiceable(new EhCacheModule).guiced(Environment.simple(), configuration, Set.empty) :+ + new ScalaModule with AkkaGuiceSupport { + override def configure(): Unit = { + bind[Database].toInstance(db) + + bind[Configuration].toInstance(configuration) + bind[ActorSystem].toInstance(actorSystem) + bind[Materializer].toInstance(Materializer(actorSystem)) + bind[ExecutionContext].toInstance(actorSystem.dispatcher) + bind[Injector].to[GuiceInjector] + bind[UserDB].to[LocalUserSrv] + bindActor[DummyActor]("notification-actor") + bindActor[DummyActor]("config-actor") + bindActor[DummyActor]("cortex-actor") + bindActor[DummyActor]("integrity-check-actor") + + val integrityCheckOpsBindings = ScalaMultibinder.newSetBinder[GenIntegrityCheckOps](binder) + integrityCheckOpsBindings.addBinding.to[ProfileIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[OrganisationIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[TagIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[UserIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[ImpactStatusIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[ResolutionStatusIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[ObservableTypeIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[CustomFieldIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[CaseTemplateIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[DataIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[CaseIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[AlertIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[TaskIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[ObservableIntegrityCheckOps] + integrityCheckOpsBindings.addBinding.to[LogIntegrityCheckOps] + + bind[Environment].toInstance(Environment.simple()) + bind[ApplicationLifecycle].to[DefaultApplicationLifecycle] + configuration.get[String]("storage.provider") match { + case "localfs" => bind(classOf[StorageSrv]).to(classOf[LocalFileSystemStorageSrv]) + case "database" => bind(classOf[StorageSrv]).to(classOf[DatabaseStorageSrv]) + case "hdfs" => bind(classOf[StorageSrv]).to(classOf[HadoopStorageSrv]) + case "s3" => bind(classOf[StorageSrv]).to(classOf[S3StorageSrv]) + } + () + } + }).asJava + ) + + def runChecks(db: Database, configuration: Configuration)(implicit actorSystem: ActorSystem): Unit = + buildApp(configuration, db).getInstance(classOf[IntegrityChecks]).runChecks() +} + +class IntegrityChecks @Inject() (db: Database, checks: immutable.Set[GenIntegrityCheckOps], userSrv: UserDB) extends MapMerger { + def runChecks(): Unit = { + implicit val authContext: AuthContext = userSrv.getSystemAuthContext + checks.foreach { c => + db.tryTransaction { implicit graph => + println(s"Running check on ${c.name} ...") + c.initialCheck() + val stats = c.duplicationCheck() <+> c.globalCheck() + val statsStr = stats + .collect { + case (k, v) if v != 0 => s"$k:$v" + } + .mkString(" ") + if (statsStr.isEmpty) + println(" no change needed") + else + println(s" $statsStr") + Success(()) + } + } + } +} diff --git a/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala b/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala index f36146f9d8..32d32e2267 100644 --- a/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala +++ b/migration/src/main/scala/org/thp/thehive/migration/Migrate.scala @@ -8,14 +8,15 @@ import play.api.{Configuration, Environment} import scopt.OParser import java.io.File +import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, ExecutionContext} object Migrate extends App with MigrationOps { - Option(System.getProperty("logger.file")).getOrElse { - System.setProperty("logger.file", "/etc/thehive/logback-migration.xml") - } + val defaultLoggerConfigFile = "/etc/thehive/logback-migration.xml" + if (System.getProperty("logger.file") == null && Files.exists(Paths.get(defaultLoggerConfigFile))) + System.setProperty("logger.file", defaultLoggerConfigFile) def getVersion: String = Option(getClass.getPackage.getImplementationVersion).getOrElse("SNAPSHOT") @@ -30,6 +31,13 @@ object Migrate extends App with MigrationOps { version('v', "version"), help('h', "help"), head("TheHive migration tool", getVersion), + opt[File]('l', "logger-config") + .valueName("") + .action { (f, c) => + System.setProperty("logger.file", f.getAbsolutePath) + c + } + .text("logback configuration file"), opt[File]('c', "config") .valueName("") .action((f, c) => ConfigFactory.parseFileAnySyntax(f).withFallback(c))