-
Notifications
You must be signed in to change notification settings - Fork 640
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
350 additions
and
4 deletions.
There are no files selected for viewing
Submodule ScalliGraph
updated
1 files
+20 −16 | core/src/main/scala/org/thp/scalligraph/services/IntegrityCheckOps.scala |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from { | ||
db.janusgraph { | ||
storage { | ||
backend: berkeleyje | ||
directory: /data/thehive/support/db | ||
} | ||
} | ||
} | ||
to { | ||
db.janusgraph { | ||
storage { | ||
backend: berkeleyje | ||
directory: /data/thehive/support2/db | ||
} | ||
index.search { | ||
backend: lucene | ||
directory: /data/thehive/support2/index | ||
} | ||
} | ||
} | ||
batchSize: 100 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
migration/src/main/scala/org/thp/thehive/cloner/Cloner.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package org.thp.thehive.cloner | ||
|
||
import akka.actor.ActorSystem | ||
import com.typesafe.config.{Config, ConfigFactory} | ||
import org.apache.tinkerpop.gremlin.structure.T | ||
import org.thp.scalligraph.SingleInstance | ||
import org.thp.scalligraph.janus.JanusDatabase | ||
import org.thp.scalligraph.models.Database | ||
import org.thp.scalligraph.traversal.TraversalOps._ | ||
import org.thp.thehive.connector.cortex.models.CortexSchemaDefinition | ||
import org.thp.thehive.migration.Migrate.getVersion | ||
import org.thp.thehive.models.TheHiveSchemaDefinition | ||
import play.api.libs.logback.LogbackLoggerConfigurator | ||
import play.api.{Configuration, Environment} | ||
import scopt.OParser | ||
|
||
import java.io.File | ||
import java.nio.file.{Files, Paths} | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.immutable | ||
import scala.util.Success | ||
|
||
object Cloner extends App with IntegrityCheckApp { | ||
def getDatabase(configuration: Configuration)(implicit system: ActorSystem): Database = { | ||
val janusDatabase = JanusDatabase.openDatabase(configuration, system) | ||
new JanusDatabase( | ||
janusDatabase, | ||
configuration, | ||
system, | ||
new SingleInstance(true) | ||
) | ||
} | ||
|
||
val defaultLoggerConfigFile = "/etc/thehive/logback-cloner.xml" | ||
if (System.getProperty("logger.file") == null && Files.exists(Paths.get(defaultLoggerConfigFile))) | ||
System.setProperty("logger.file", defaultLoggerConfigFile) | ||
|
||
val builder = OParser.builder[Config] | ||
val argParser = { | ||
import builder._ | ||
OParser.sequence( | ||
programName("clone"), | ||
version('v', "version"), | ||
help('h', "help"), | ||
head("TheHive cloner tool", getVersion), | ||
opt[File]('l', "logger-config") | ||
.valueName("<file>") | ||
.action { (f, c) => | ||
System.setProperty("logger.file", f.getAbsolutePath) | ||
c | ||
} | ||
.text("logback configuration file"), | ||
opt[File]('c', "config") | ||
.valueName("<file>") | ||
.required() | ||
.action((f, c) => ConfigFactory.parseFileAnySyntax(f).withFallback(c)) | ||
.text("configuration file") | ||
) | ||
} | ||
val defaultConfig = | ||
ConfigFactory | ||
.parseResources("play/reference-overrides.conf") | ||
.withFallback(ConfigFactory.defaultReference()) | ||
.resolve() | ||
OParser.parse(argParser, args, defaultConfig).foreach { config => | ||
implicit val actorSystem: ActorSystem = ActorSystem("TheHiveCloner", config) | ||
|
||
try { | ||
(new LogbackLoggerConfigurator).configure(Environment.simple(), Configuration.empty, Map.empty) | ||
|
||
val sourceDatabase: Database = getDatabase( | ||
Configuration( | ||
config | ||
.getConfig("from") | ||
.withoutPath("db.janusgraph.index.search") | ||
) | ||
) | ||
|
||
val thehiveSchema = new TheHiveSchemaDefinition | ||
val cortexSchema = new CortexSchemaDefinition | ||
if (sourceDatabase.version(thehiveSchema.name) != thehiveSchema.operations.operations.length + 1) { | ||
println( | ||
"The schema of TheHive is not valid " + | ||
s"(found ${sourceDatabase.version(thehiveSchema.name)}, expected ${thehiveSchema.operations.operations.length + 1})" | ||
) | ||
sys.exit(1) | ||
} | ||
if (sourceDatabase.version(cortexSchema.name) != cortexSchema.operations.operations.length + 1) { | ||
println( | ||
"The schema of Cortex is not valid " + | ||
s"(found ${sourceDatabase.version(cortexSchema.name)}, expected ${cortexSchema.operations.operations.length + 1})" | ||
) | ||
sys.exit(1) | ||
} | ||
|
||
val destDatabase: Database = getDatabase( | ||
Configuration( | ||
config | ||
.getConfig("to") | ||
) | ||
) | ||
val nonEmpty: Boolean = destDatabase.roTransaction { implicit graph => | ||
graph.VV().limit(1).exists | ||
} | ||
if (nonEmpty) { | ||
println("The destination database is not empty.") | ||
sys.exit(1) | ||
} | ||
|
||
println("Setting up the database schema ...") | ||
// don't create initial values | ||
val models = destDatabase.extraModels ++ thehiveSchema.modelList ++ cortexSchema.modelList | ||
destDatabase.createSchema(models) | ||
destDatabase.setVersion(thehiveSchema.name, thehiveSchema.operations.operations.length + 1) | ||
destDatabase.setVersion(cortexSchema.name, cortexSchema.operations.operations.length + 1) | ||
|
||
val batchSize: Int = config.getInt("batchSize") | ||
|
||
println("Copying vertices ...") | ||
sourceDatabase.roTransaction { from => | ||
val vertexIdsBuilder = immutable.Map.newBuilder[AnyRef, AnyRef] | ||
|
||
val allVertices = from.VV().toIterator | ||
Iterator | ||
.continually(allVertices) | ||
.takeWhile(_.hasNext) | ||
.map(_.take(batchSize)) | ||
.foreach { vv => | ||
destDatabase.tryTransaction { to => | ||
vv.foreach { vertex => | ||
val properties = T.label +: vertex.label +: vertex.properties[AnyRef]().asScala.flatMap(p => Seq(p.key(), p.value())).toSeq | ||
val createdVertex = to.underlying.addVertex(properties: _*) | ||
vertexIdsBuilder += vertex.id() -> createdVertex.id() | ||
} | ||
Success(()) | ||
} | ||
} | ||
|
||
val vertexIds = vertexIdsBuilder.result() | ||
println("Copying edges ...") | ||
val allEdges = from.EE().toIterator | ||
Iterator | ||
.continually(allEdges) | ||
.takeWhile(_.hasNext) | ||
.map(_.take(batchSize)) | ||
.foreach { ee => | ||
destDatabase.tryTransaction { to => | ||
ee.foreach { edge => | ||
val createdEdge = for { | ||
inVertex <- vertexIds.get(edge.inVertex().id()).toIterable.flatMap(to.traversal().V(_).asScala) | ||
outVertex <- vertexIds.get(edge.outVertex().id()).toIterable.flatMap(to.traversal().V(_).asScala) | ||
properties = edge.properties[AnyRef]().asScala.flatMap(p => Seq(p.key(), p.value())).toSeq | ||
} yield outVertex.addEdge(edge.label, inVertex, properties: _*) | ||
if (createdEdge.isEmpty) | ||
println( | ||
s"Edge ${edge.label()}:${edge.id()} " + | ||
s"from ${edge.outVertex().label()}:${edge.outVertex().id()} " + | ||
s"to ${edge.inVertex().label()}:${edge.inVertex().id()} " + | ||
s"cannot be created" | ||
) | ||
} | ||
Success(()) | ||
} | ||
} | ||
} | ||
sourceDatabase.close() | ||
|
||
println("Add indices ...") | ||
destDatabase.addSchemaIndexes(models) | ||
runChecks(destDatabase, Configuration(config)) | ||
destDatabase.close() | ||
} finally { | ||
actorSystem.terminate() | ||
() | ||
} | ||
} | ||
} |
99 changes: 99 additions & 0 deletions
99
migration/src/main/scala/org/thp/thehive/cloner/IntegrityCheckApp.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package org.thp.thehive.cloner | ||
|
||
import akka.actor.ActorSystem | ||
import akka.stream.Materializer | ||
import com.google.inject.{Guice, Injector => GInjector} | ||
import net.codingwell.scalaguice.{ScalaModule, ScalaMultibinder} | ||
import org.thp.scalligraph.auth.{AuthContext, UserSrv => UserDB} | ||
import org.thp.scalligraph.models.Database | ||
import org.thp.scalligraph.services._ | ||
import org.thp.thehive.migration.th4.DummyActor | ||
import org.thp.thehive.services._ | ||
import play.api.cache.ehcache.EhCacheModule | ||
import play.api.inject.guice.GuiceInjector | ||
import play.api.inject.{ApplicationLifecycle, DefaultApplicationLifecycle, Injector} | ||
import play.api.libs.concurrent.AkkaGuiceSupport | ||
import play.api.{Configuration, Environment} | ||
|
||
import javax.inject.Inject | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.immutable | ||
import scala.concurrent.ExecutionContext | ||
import scala.util.Success | ||
|
||
trait IntegrityCheckApp { | ||
private def buildApp(configuration: Configuration, db: Database)(implicit actorSystem: ActorSystem): GInjector = | ||
Guice | ||
.createInjector( | ||
(play.api.inject.guice.GuiceableModule.guiceable(new EhCacheModule).guiced(Environment.simple(), configuration, Set.empty) :+ | ||
new ScalaModule with AkkaGuiceSupport { | ||
override def configure(): Unit = { | ||
bind[Database].toInstance(db) | ||
|
||
bind[Configuration].toInstance(configuration) | ||
bind[ActorSystem].toInstance(actorSystem) | ||
bind[Materializer].toInstance(Materializer(actorSystem)) | ||
bind[ExecutionContext].toInstance(actorSystem.dispatcher) | ||
bind[Injector].to[GuiceInjector] | ||
bind[UserDB].to[LocalUserSrv] | ||
bindActor[DummyActor]("notification-actor") | ||
bindActor[DummyActor]("config-actor") | ||
bindActor[DummyActor]("cortex-actor") | ||
bindActor[DummyActor]("integrity-check-actor") | ||
|
||
val integrityCheckOpsBindings = ScalaMultibinder.newSetBinder[GenIntegrityCheckOps](binder) | ||
integrityCheckOpsBindings.addBinding.to[ProfileIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[OrganisationIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[TagIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[UserIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[ImpactStatusIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[ResolutionStatusIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[ObservableTypeIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[CustomFieldIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[CaseTemplateIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[DataIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[CaseIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[AlertIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[TaskIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[ObservableIntegrityCheckOps] | ||
integrityCheckOpsBindings.addBinding.to[LogIntegrityCheckOps] | ||
|
||
bind[Environment].toInstance(Environment.simple()) | ||
bind[ApplicationLifecycle].to[DefaultApplicationLifecycle] | ||
configuration.get[String]("storage.provider") match { | ||
case "localfs" => bind(classOf[StorageSrv]).to(classOf[LocalFileSystemStorageSrv]) | ||
case "database" => bind(classOf[StorageSrv]).to(classOf[DatabaseStorageSrv]) | ||
case "hdfs" => bind(classOf[StorageSrv]).to(classOf[HadoopStorageSrv]) | ||
case "s3" => bind(classOf[StorageSrv]).to(classOf[S3StorageSrv]) | ||
} | ||
() | ||
} | ||
}).asJava | ||
) | ||
|
||
def runChecks(db: Database, configuration: Configuration)(implicit actorSystem: ActorSystem): Unit = | ||
buildApp(configuration, db).getInstance(classOf[IntegrityChecks]).runChecks() | ||
} | ||
|
||
class IntegrityChecks @Inject() (db: Database, checks: immutable.Set[GenIntegrityCheckOps], userSrv: UserDB) extends MapMerger { | ||
def runChecks(): Unit = { | ||
implicit val authContext: AuthContext = userSrv.getSystemAuthContext | ||
checks.foreach { c => | ||
db.tryTransaction { implicit graph => | ||
println(s"Running check on ${c.name} ...") | ||
c.initialCheck() | ||
val stats = c.duplicationCheck() <+> c.globalCheck() | ||
val statsStr = stats | ||
.collect { | ||
case (k, v) if v != 0 => s"$k:$v" | ||
} | ||
.mkString(" ") | ||
if (statsStr.isEmpty) | ||
println(" no change needed") | ||
else | ||
println(s" $statsStr") | ||
Success(()) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters