Skip to content

Commit

Permalink
#1340 Improve migration
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jun 25, 2020
1 parent 71266e1 commit 67b0344
Show file tree
Hide file tree
Showing 10 changed files with 605 additions and 350 deletions.
2 changes: 1 addition & 1 deletion conf/migration-logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
converterClass="play.api.libs.logback.ColoredLevel"/>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${application.home:-.}/logs/application.log</file>
<file>./logs/migration.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${application.home:-.}/logs/application.%i.log.zip</fileNamePattern>
<minIndex>1</minIndex>
Expand Down
53 changes: 43 additions & 10 deletions migration/src/main/scala/org/thp/thehive/migration/Input.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,58 @@
package org.thp.thehive.migration

import java.text.{ParseException, SimpleDateFormat}
import java.util.Date

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.typesafe.config.Config
import org.thp.thehive.migration.dto._

import scala.concurrent.Future
import scala.util.Try
import scala.util.{Failure, Try}

case class Filter(caseFromDate: Long, alertFromDate: Long, auditFromDate: Long)
case class Filter(
caseDateRange: (Option[Long], Option[Long]),
caseNumberRange: (Option[Int], Option[Int]),
alertDateRange: (Option[Long], Option[Long]),
auditDateRange: (Option[Long], Option[Long])
)

object Filter {
def fromConfig(config: Config): Filter = {
val now = System.currentTimeMillis()
val maxCaseAge = config.getDuration("maxCaseAge")
val caseFromDate = if (maxCaseAge.isZero) 0L else now - maxCaseAge.getSeconds * 1000
val maxAlertAge = config.getDuration("maxAlertAge")
val alertFromDate = if (maxAlertAge.isZero) 0L else now - maxAlertAge.getSeconds * 1000
val maxAuditAge = config.getDuration("maxAuditAge")
val auditFromDate = if (maxAuditAge.isZero) 0L else now - maxAuditAge.getSeconds * 1000
Filter(caseFromDate, alertFromDate, auditFromDate)
val now = System.currentTimeMillis()
lazy val dateFormats = Seq(
new SimpleDateFormat("yyyyMMddHHmmss"),
new SimpleDateFormat("yyyyMMddHHmm"),
new SimpleDateFormat("yyyyMMddHH"),
new SimpleDateFormat("yyyyMMdd"),
new SimpleDateFormat("MMdd")
)
def parseDate(s: String): Try[Date] =
dateFormats.foldLeft[Try[Date]](Failure(new ParseException(s"Unparseable date: $s", 0))) { (acc, format) =>
acc.recoverWith { case _ => Try(format.parse(s)) }
}
def readDate(dateConfigName: String, ageConfigName: String) =
Try(config.getString(dateConfigName))
.flatMap(parseDate)
.map(d => d.getTime)
.toOption
.orElse {
Try {
val age = config.getDuration(ageConfigName)
if (age.isZero) None else Some(now - age.getSeconds * 1000)
}.toOption.flatten
}
val caseFromDate = readDate("caseFromDate", "maxCaseAge")
val caseUntilDate = readDate("caseUntilDate", "minCaseAge")
val caseFromNumber = Try(config.getInt("caseFromNumber")).toOption
val caseUntilNumber = Try(config.getInt("caseUntilNumber")).toOption
val alertFromDate = readDate("alertFromDate", "maxAlertAge")
val alertUntilDate = readDate("alertUntilDate", "minAlertAge")
val auditFromDate = readDate("auditFromDate", "maxAuditAge")
val auditUntilDate = readDate("auditUntilDate", "minAuditAge")

Filter(caseFromDate -> caseUntilDate, caseFromNumber -> caseUntilNumber, alertFromDate -> alertUntilDate, auditFromDate -> auditUntilDate)
}
}

Expand Down
114 changes: 92 additions & 22 deletions migration/src/main/scala/org/thp/thehive/migration/Migrate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package org.thp.thehive.migration

import java.io.File

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.collection.JavaConverters._

import play.api.libs.logback.LogbackLoggerConfigurator
import play.api.{Configuration, Environment}

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.typesafe.config.{Config, ConfigFactory}
import play.api.libs.logback.LogbackLoggerConfigurator
import play.api.{Configuration, Environment}
import scopt.OParser
import scala.concurrent.duration.DurationInt

import scala.collection.JavaConverters._
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, ExecutionContext}

object Migrate extends App with MigrationOps {
def getVersion: String = Option(getClass.getPackage.getImplementationVersion).getOrElse("SNAPSHOT")
Expand Down Expand Up @@ -61,18 +59,76 @@ object Migrate extends App with MigrationOps {
opt[Int]('p', "es-pagesize")
.text("TheHive3 ElasticSearch page size")
.action((p, c) => addConfig(c, "input.search.pagesize" -> p)),
/* case age */
opt[Duration]("max-case-age")
.valueName("<duration>")
.text("migrate only recent cases")
.text("migrate only cases whose age is less than <duration>")
.action((v, c) => addConfig(c, "input.filter.maxCaseAge" -> v.toString)),
opt[Duration]("max-case-alert")
opt[Duration]("min-case-age")
.valueName("<duration>")
.text("migrate only cases whose age is greater than <duration>")
.action((v, c) => addConfig(c, "input.filter.minCaseAge" -> v.toString)),
opt[Duration]("case-from-date")
.valueName("<date>")
.text("migrate only cases created from <date>")
.action((v, c) => addConfig(c, "input.filter.caseFromDate" -> v.toString)),
opt[Duration]("case-until-date")
.valueName("<date>")
.text("migrate only cases created until <date>")
.action((v, c) => addConfig(c, "input.filter.caseUntilDate" -> v.toString)),
/* case number */
opt[Duration]("case-from-number")
.valueName("<number>")
.text("migrate only cases from this case number")
.action((v, c) => addConfig(c, "input.filter.caseFromNumber" -> v.toString)),
opt[Duration]("case-until-number")
.valueName("<number>")
.text("migrate only cases until this case number")
.action((v, c) => addConfig(c, "input.filter.caseUntilNumber" -> v.toString)),
/* alert age */
opt[Duration]("max-alert-age")
.valueName("<duration>")
.text("migrate only recent alerts")
.text("migrate only alerts whose age is less than <duration>")
.action((v, c) => addConfig(c, "input.filter.maxAlertAge" -> v.toString)),
opt[Duration]("min-alert-age")
.valueName("<duration>")
.text("migrate only alerts whose age is greater than <duration>")
.action((v, c) => addConfig(c, "input.filter.minAlertAge" -> v.toString)),
opt[Duration]("alert-from-date")
.valueName("<date>")
.text("migrate only alerts created from <date>")
.action((v, c) => addConfig(c, "input.filter.alertFromDate" -> v.toString)),
opt[Duration]("alert-until-date")
.valueName("<date>")
.text("migrate only alerts created until <date>")
.action((v, c) => addConfig(c, "input.filter.alertUntilDate" -> v.toString)),
/* audit age */
opt[Duration]("max-audit-age")
.valueName("<duration>")
.text("migrate only recent audits")
.action((v, c) => addConfig(c, "input.filter.maxAuditAge" -> v.toString))
.text("migrate only audits whose age is less than <duration>")
.action((v, c) => addConfig(c, "input.filter.minAuditAge" -> v.toString)),
opt[Duration]("min-audit-age")
.valueName("<duration>")
.text("migrate only audits whose age is greater than <duration>")
.action((v, c) => addConfig(c, "input.filter.maxAuditAge" -> v.toString)),
opt[Duration]("audit-from-date")
.valueName("<date>")
.text("migrate only audits created from <date>")
.action((v, c) => addConfig(c, "input.filter.auditFromDate" -> v.toString)),
opt[Duration]("audit-until-date")
.valueName("<date>")
.text("migrate only audits created until <date>")
.action((v, c) => addConfig(c, "input.filter.auditUntilDate" -> v.toString)),
note("Accepted date formats are \"yyyyMMdd[HH[mm[ss]]]\" and \"MMdd\""),
note(
"The Format for duration is: <length> <unit>.\n" +
"Accepted units are:\n" +
" DAY: d, day\n" +
" HOUR: h, hr, hour\n" +
" MINUTE: m, min, minute\n" +
" SECOND: s, sec, second\n" +
" MILLISECOND: ms, milli, millisecond"
)
)
}
val defaultConfig =
Expand All @@ -87,19 +143,33 @@ object Migrate extends App with MigrationOps {

(new LogbackLoggerConfigurator).configure(Environment.simple(), Configuration.empty, Map.empty)

val input = th3.Input(Configuration(config.getConfig("input").withFallback(config)))
val output = th4.Output(Configuration(config.getConfig("output").withFallback(config)))
val filter = Filter.fromConfig(config.getConfig("input.filter"))

val process = migrate(input, output, filter)
actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds) { () =>
val timer = actorSystem.scheduler.scheduleAtFixedRate(10.seconds, 10.seconds) { () =>
logger.info(migrationStats.showStats())
migrationStats.flush()
}
Await.result(process, Duration.Inf)
logger.info("Migration finished")

val returnStatus =
try {
val input = th3.Input(Configuration(config.getConfig("input").withFallback(config)))
val output = th4.Output(Configuration(config.getConfig("output").withFallback(config)))
val filter = Filter.fromConfig(config.getConfig("input.filter"))

val process = migrate(input, output, filter)

Await.result(process, Duration.Inf)
logger.info("Migration finished")
0
} catch {
case e: Throwable =>
logger.error(s"Migration failed", e)
1
} finally {
timer.cancel()
Await.ready(actorSystem.terminate(), 1.minute)
()
}
migrationStats.flush()
logger.info(migrationStats.toString)
System.exit(0)
System.exit(returnStatus)
}
}
Loading

0 comments on commit 67b0344

Please sign in to comment.