Skip to content

Commit

Permalink
core: aggregate errors that happen too often during infra loading
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi Charpentier <[email protected]>
  • Loading branch information
eckter committed Jan 15, 2025
1 parent 675d279 commit a450ed4
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,11 @@ private fun parseRjsTrackSection(
}
}

fun parseRjsElectrification(builder: RawInfraBuilder, electrification: RJSElectrification) {
fun parseRjsElectrification(
builder: RawInfraBuilder,
electrification: RJSElectrification,
electrificationConflictAggregator: LogAggregator
) {
for (electrificationRange in electrification.trackRanges) {
val applyElectrificationForChunkBetween =
{ chunk: TrackChunkDescriptor, chunkLower: Distance, chunkUpper: Distance ->
Expand All @@ -414,7 +418,7 @@ fun parseRjsElectrification(builder: RawInfraBuilder, electrification: RJSElectr
previousElectrification.value != electrification.voltage &&
previousElectrification.value != ""
) {
logger.warn(
electrificationConflictAggregator.registerError(
"Electrification conflict on track-range ${electrificationRange.trackSectionID}" +
"[${previousElectrification.lower + chunk.offset.distance}, " +
"${previousElectrification.upper + chunk.offset.distance}]: " +
Expand Down Expand Up @@ -756,9 +760,11 @@ fun parseRJSInfra(rjsInfra: RJSInfra): RawInfra {
}

// Parse electrifications
val electrificationConflictAggregator = LogAggregator({ logger.warn(it) })
for (electrification in rjsInfra.electrifications) {
parseRjsElectrification(builder, electrification)
parseRjsElectrification(builder, electrification, electrificationConflictAggregator)
}
electrificationConflictAggregator.logAggregatedSummary()

for (neutralSection in rjsInfra.neutralSections) {
parseNeutralRanges(builder, false, neutralSection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fr.sncf.osrd.signaling.impl
import fr.sncf.osrd.sim_infra.api.*
import fr.sncf.osrd.sim_infra.impl.BlockInfraBuilder
import fr.sncf.osrd.sim_infra.impl.blockInfraBuilder
import fr.sncf.osrd.utils.LogAggregator
import fr.sncf.osrd.utils.indexing.IdxMap
import fr.sncf.osrd.utils.indexing.MutableStaticIdxArrayList
import fr.sncf.osrd.utils.units.*
Expand All @@ -18,86 +19,87 @@ internal fun internalBuildBlocks(
// Step 1) associate DirDetectorIds to a list of delimiting logical signals
val signalDelimiters = findSignalDelimiters(rawSignalingInfra, loadedSignalInfra)
val detectorEntrySignals = makeDetectorEntrySignals(loadedSignalInfra, signalDelimiters)
return blockInfraBuilder(loadedSignalInfra, rawSignalingInfra) {
// Step 2) iterate on zone paths along the route path.
// - maintain a list of currently active blocks
// - At each signal, add it to compatible current blocks.
// - if the signal is delimiting, stop and create the block (deduplicate it too)
for (route in rawSignalingInfra.routes) {
val routeEntryDet = rawSignalingInfra.getRouteEntry(route)
val routeExitDet = rawSignalingInfra.getRouteExit(route)
val entrySignals = detectorEntrySignals[routeEntryDet]
var currentBlocks =
getInitPartialBlocks(
sigModuleManager,
rawSignalingInfra,
loadedSignalInfra,
entrySignals,
routeEntryDet
)
// while inside the route, we maintain a list of currently active blocks.
// each block either expect any signaling system (when starting from a buffer stop or
// wildcard
// signal),
// or expects a given signaling system. blocks can therefore tell whether a signal
// belongs
// there.
// if a signal is not part of a block, it is ignored
// if a signal delimits a block, it ends the block and starts a new ones, one per driver
// if a signal does not delimit a block and has a single driver, it continues the block
// if a signal does not delimit a block and has multiple drivers, it duplicates the
// block
val missingSignalLogAggregator = LogAggregator({ logger.debug(it) })
val result =
blockInfraBuilder(loadedSignalInfra, rawSignalingInfra) {
// Step 2) iterate on zone paths along the route path.
// - maintain a list of currently active blocks
// - At each signal, add it to compatible current blocks.
// - if the signal is delimiting, stop and create the block (deduplicate it too)
for (route in rawSignalingInfra.routes) {
val routeEntryDet = rawSignalingInfra.getRouteEntry(route)
val routeExitDet = rawSignalingInfra.getRouteExit(route)
val entrySignals = detectorEntrySignals[routeEntryDet]
var currentBlocks =
getInitPartialBlocks(
sigModuleManager,
rawSignalingInfra,
loadedSignalInfra,
entrySignals,
routeEntryDet,
missingSignalLogAggregator,
)
// While inside the route, we maintain a list of currently active blocks. Each block
// either expect any signaling system (when starting from a buffer stop or wildcard
// signal), or expects a given signaling system. Blocks can therefore tell whether a
// signal belongs there.
// If a signal is not part of a block, it is ignored. If a signal delimits a block,
// it ends the block and starts a new ones, one per driver. If a signal does not
// delimit a block and has a single driver, it continues the block. If a signal does
// not delimit a block and has multiple drivers, it duplicates the block.

for (zonePath in rawSignalingInfra.getRoutePath(route)) {
val zonePathLength = rawSignalingInfra.getZonePathLength(zonePath)
for (block in currentBlocks) block.addZonePath(zonePath, zonePathLength)
for (zonePath in rawSignalingInfra.getRoutePath(route)) {
val zonePathLength = rawSignalingInfra.getZonePathLength(zonePath)
for (block in currentBlocks) block.addZonePath(zonePath, zonePathLength)

// iterate over signals which are between the block entry and the block exit
val signals = rawSignalingInfra.getSignals(zonePath)
val signalsPositions = rawSignalingInfra.getSignalPositions(zonePath)
for ((physicalSignal, position) in signals.zip(signalsPositions)) {
val distanceToZonePathEnd = zonePathLength - position
assert(distanceToZonePathEnd >= Distance.ZERO)
assert(distanceToZonePathEnd <= zonePathLength.distance)
for (signal in loadedSignalInfra.getLogicalSignals(physicalSignal)) {
currentBlocks =
updatePartialBlocks(
sigModuleManager,
currentBlocks,
loadedSignalInfra,
signal,
distanceToZonePathEnd,
)
// iterate over signals which are between the block entry and the block exit
val signals = rawSignalingInfra.getSignals(zonePath)
val signalsPositions = rawSignalingInfra.getSignalPositions(zonePath)
for ((physicalSignal, position) in signals.zip(signalsPositions)) {
val distanceToZonePathEnd = zonePathLength - position
assert(distanceToZonePathEnd >= Distance.ZERO)
assert(distanceToZonePathEnd <= zonePathLength.distance)
for (signal in loadedSignalInfra.getLogicalSignals(physicalSignal)) {
currentBlocks =
updatePartialBlocks(
sigModuleManager,
currentBlocks,
loadedSignalInfra,
signal,
distanceToZonePathEnd,
)
}
}
}
}

// when a route ends at a buffer stop, unterminated blocks are expected,
// as the buffer stop sort of acts as a closed signal. when a route does not
// end with a buffer stop, blocks are expected to end with the route.
// such blocks are not valid, and can be fixed by adding a delimiter signal
// right before the end of the route.
val routeEndsAtBufferStop = rawSignalingInfra.isBufferStop(routeExitDet.value)
for (curBlock in currentBlocks) {
if (curBlock.zonePaths.size == 0) continue
if (curBlock.signals.size == 0) continue
// when a route ends at a buffer stop, unterminated blocks are expected,
// as the buffer stop sort of acts as a closed signal. when a route does not
// end with a buffer stop, blocks are expected to end with the route.
// such blocks are not valid, and can be fixed by adding a delimiter signal
// right before the end of the route.
val routeEndsAtBufferStop = rawSignalingInfra.isBufferStop(routeExitDet.value)
for (curBlock in currentBlocks) {
if (curBlock.zonePaths.size == 0) continue
if (curBlock.signals.size == 0) continue

val lastZonePath = curBlock.zonePaths[curBlock.zonePaths.size - 1]
assert(routeExitDet == rawSignalingInfra.getZonePathExit(lastZonePath))
if (!routeEndsAtBufferStop)
logger.debug {
"unterminated block at end of route ${rawSignalingInfra.getRouteName(route)}"
}
block(
curBlock.startAtBufferStop,
true,
curBlock.zonePaths,
curBlock.signals,
curBlock.signalPositions
)
val lastZonePath = curBlock.zonePaths[curBlock.zonePaths.size - 1]
assert(routeExitDet == rawSignalingInfra.getZonePathExit(lastZonePath))
if (!routeEndsAtBufferStop)
logger.debug {
"unterminated block at end of route ${rawSignalingInfra.getRouteName(route)}"
}
block(
curBlock.startAtBufferStop,
true,
curBlock.zonePaths,
curBlock.signals,
curBlock.signalPositions
)
}
}
}
}
missingSignalLogAggregator.logAggregatedSummary()
return result
}

data class AssociatedDetector(val detector: DirDetectorId, val distance: Distance)
Expand Down Expand Up @@ -210,14 +212,15 @@ private fun getInitPartialBlocks(
loadedSignalInfra: LoadedSignalInfra,
entrySignals: IdxMap<SignalingSystemId, AssociatedSignal>?,
entryDet: DirDetectorId,
missingSignalLogAggregator: LogAggregator,
): MutableList<PartialBlock> {
val initialBlocks = mutableListOf<PartialBlock>()
val isBufferStop = rawSignalingInfra.isBufferStop(entryDet.value)
if (entrySignals == null) {
if (!isBufferStop)
logger.debug {
missingSignalLogAggregator.registerError(
"no signal at non buffer stop ${rawSignalingInfra.getDetectorName(entryDet.value)}:${entryDet.direction}"
}
)
initialBlocks.add(
PartialBlock(
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import fr.sncf.osrd.signaling.*
import fr.sncf.osrd.sim_infra.api.*
import fr.sncf.osrd.sim_infra.impl.SignalParameters
import fr.sncf.osrd.sim_infra.impl.loadedSignalInfra
import fr.sncf.osrd.utils.LogAggregator
import fr.sncf.osrd.utils.indexing.*
import fr.sncf.osrd.utils.units.Distance
import mu.KotlinLogging
Expand Down Expand Up @@ -83,6 +84,14 @@ class SignalingSimulatorImpl(override val sigModuleManager: SigSystemManager) :
loadedSignalInfra: LoadedSignalInfra
): BlockInfra {
val blockInfra = internalBuildBlocks(sigModuleManager, rawSignalingInfra, loadedSignalInfra)
val blockLogAggregator =
LogAggregator(
{ logger.debug(it) },
)
val signalLogAggregator =
LogAggregator(
{ logger.debug(it) },
)
for (block in blockInfra.blocks) {
val sigSystem = blockInfra.getBlockSignalingSystem(block)
val path = blockInfra.getBlockPath(block)
Expand Down Expand Up @@ -110,19 +119,17 @@ class SignalingSimulatorImpl(override val sigModuleManager: SigSystemManager) :
val reporter =
object : BlockDiagReporter {
override fun reportBlock(errorType: String) {
logger.debug {
val entrySignal = rawSignalingInfra.getLogicalSignalName(signals[0])
val exitSignal =
rawSignalingInfra.getLogicalSignalName(signals[signals.size - 1])
val entrySignal = rawSignalingInfra.getLogicalSignalName(signals[0])
val exitSignal =
rawSignalingInfra.getLogicalSignalName(signals[signals.size - 1])
blockLogAggregator.registerError(
"error in block from $entrySignal to $exitSignal: $errorType"
}
)
}

override fun reportSignal(sigIndex: Int, errorType: String) {
logger.debug {
val signal = rawSignalingInfra.getLogicalSignalName(signals[sigIndex])
"error at signal $signal: $errorType"
}
val signal = rawSignalingInfra.getLogicalSignalName(signals[sigIndex])
signalLogAggregator.registerError("error at signal $signal: $errorType")
}
}
sigModuleManager.checkSignalingSystemBlock(reporter, sigSystem, sigBlock)
Expand Down Expand Up @@ -151,6 +158,8 @@ class SignalingSimulatorImpl(override val sigModuleManager: SigSystemManager) :
)
}
}
blockLogAggregator.logAggregatedSummary()
signalLogAggregator.logAggregatedSummary()
return blockInfra
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.sncf.osrd.utils

/**
* This class can be used to aggregate logs: when a specific error/warning happens thousands of time
* in a row, we only report the first $n and only log the total error number afterward. The class
* must be initialized once before the loop, errors should be logged with `logError`, and then
* `logAggregatedSummary` should be called once at the end.
*/
data class LogAggregator(
/** Function to use to log anything (e.g. `{ logger.warn(it) }` ). */
val logFunction: (str: String) -> Unit,
/** String to be used for collapsed errors, using %d and .format for the remaining number. */
val summaryErrorMessage: String = "... and %d other similar errors",
/** Max number of errors before collapsing the rest. */
val maxReportedErrors: Int = 3,
) {
private var nErrors = 0
private var savedErrors = mutableListOf<String>()

/** Registers an error. Does not log anything before the `reportSummary` call. */
fun registerError(msg: String) {
nErrors++
if (savedErrors.size < maxReportedErrors) savedErrors.add(msg)
}

/** Logs the errors, collapsing the ones after `maxReportedErrors`. */
fun logAggregatedSummary() {
for (err in savedErrors) logFunction(err)
val remainingErrors = nErrors - savedErrors.size
if (remainingErrors > 0) logFunction(summaryErrorMessage.format(remainingErrors))
}
}
14 changes: 11 additions & 3 deletions core/src/main/kotlin/fr/sncf/osrd/api/api_v2/RequirementsParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import fr.sncf.osrd.api.api_v2.conflicts.WorkSchedulesRequest
import fr.sncf.osrd.conflicts.*
import fr.sncf.osrd.sim_infra.api.RawSignalingInfra
import fr.sncf.osrd.standalone_sim.result.ResultTrain
import fr.sncf.osrd.utils.LogAggregator
import fr.sncf.osrd.utils.units.Duration
import fr.sncf.osrd.utils.units.TimeDelta
import fr.sncf.osrd.utils.units.seconds
Expand Down Expand Up @@ -99,9 +100,12 @@ fun convertWorkScheduleMap(
timeToAdd: TimeDelta = 0.seconds
): Collection<Requirements> {
val res = mutableListOf<Requirements>()
val logAggregator = LogAggregator({ requirementsParserLogger.warn(it) })
for (entry in workSchedules) {
val workScheduleRequirements = mutableListOf<ResultTrain.SpacingRequirement>()
workScheduleRequirements.addAll(convertWorkSchedule(rawInfra, entry.value, timeToAdd))
workScheduleRequirements.addAll(
convertWorkSchedule(rawInfra, entry.value, timeToAdd, logAggregator)
)
res.add(
Requirements(
RequirementId(entry.key, RequirementType.WORK_SCHEDULE),
Expand All @@ -122,9 +126,12 @@ fun convertWorkScheduleCollection(
workSchedules: Collection<WorkSchedule>,
timeToAdd: TimeDelta = 0.seconds,
): Requirements {
val logAggregator = LogAggregator({ requirementsParserLogger.warn(it) })
val workSchedulesRequirements = mutableListOf<ResultTrain.SpacingRequirement>()
for (workSchedule in workSchedules) {
workSchedulesRequirements.addAll(convertWorkSchedule(rawInfra, workSchedule, timeToAdd))
workSchedulesRequirements.addAll(
convertWorkSchedule(rawInfra, workSchedule, timeToAdd, logAggregator)
)
}
return Requirements(
RequirementId(DEFAULT_WORK_SCHEDULE_ID, RequirementType.WORK_SCHEDULE),
Expand All @@ -137,6 +144,7 @@ private fun convertWorkSchedule(
rawInfra: RawSignalingInfra,
workSchedule: WorkSchedule,
timeToAdd: TimeDelta = 0.seconds,
logAggregator: LogAggregator,
): Collection<ResultTrain.SpacingRequirement> {
val res = mutableListOf<ResultTrain.SpacingRequirement>()

Expand Down Expand Up @@ -181,7 +189,7 @@ private fun convertWorkSchedule(
"${tracksNotCoveredByRoutes.size} track sections were not fully covered by routes (ignoring some work schedules): " +
tracksNotCoveredByRoutes.take(3).joinToString(", ") +
(if (tracksNotCoveredByRoutes.size > 3) ", ..." else "")
requirementsParserLogger.warn(msg)
logAggregator.registerError(msg)
}
return res
}

0 comments on commit a450ed4

Please sign in to comment.