From ecd465c2d1c5035a91cb1473767c6b6d7ec95307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-Etienne=20Bougu=C3=A9?= Date: Thu, 30 Jan 2025 11:10:56 +0100 Subject: [PATCH 1/3] core: fix unrecoverable freeze on rabbit shutdown or before consuming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hard fix (kind of) to kill the process (and let orchestrator restart) if: * rabbit shuts down (triggering consumer's ShutdownCallback) * or an exception before starting basicConsume() goes up to main() (even if other threads may run) Bump amqp-client on the way as it doesn't hurt Signed-off-by: Pierre-Etienne Bougué --- core/gradle/libs.versions.toml | 2 +- core/src/main/java/fr/sncf/osrd/App.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/gradle/libs.versions.toml b/core/gradle/libs.versions.toml index 52f87470e61..2a09ff8511e 100644 --- a/core/gradle/libs.versions.toml +++ b/core/gradle/libs.versions.toml @@ -67,7 +67,7 @@ opentelemetry-instrumentation-annotations = { module = 'io.opentelemetry.instrum kaml = { module = 'com.charleskorn.kaml:kaml', version = '0.66.0' } # Apache 2.0 -amqp-client = { module = 'com.rabbitmq:amqp-client', version = '5.23.0' } +amqp-client = { module = 'com.rabbitmq:amqp-client', version = '5.24.0' } [plugins] # kotlin diff --git a/core/src/main/java/fr/sncf/osrd/App.java b/core/src/main/java/fr/sncf/osrd/App.java index a30b6d57116..61f2a6dfdad 100644 --- a/core/src/main/java/fr/sncf/osrd/App.java +++ b/core/src/main/java/fr/sncf/osrd/App.java @@ -41,7 +41,12 @@ public static void main(String[] args) { } // run the user command - var statusCode = commands.get(commandName).run(); + var statusCode = 1; + try { + statusCode = commands.get(commandName).run(); + } catch (Exception e) { + e.printStackTrace(System.err); + } System.exit(statusCode); } } From d0121d201074ec91a986ea509334e80d55df90ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-Etienne=20Bougu=C3=A9?= Date: Wed, 5 Feb 2025 18:47:54 +0100 Subject: [PATCH 2/3] core: force process exit on rabbit's cancel notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From hand-tests, shutdown is already covered by the System.exit in App.java::main(). Signed-off-by: Pierre-Etienne Bougué --- core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt index 52222378fab..9e72160d223 100644 --- a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt +++ b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt @@ -3,7 +3,10 @@ package fr.sncf.osrd.cli import com.beust.jcommander.Parameter import com.beust.jcommander.Parameters import com.rabbitmq.client.* -import fr.sncf.osrd.api.* +import fr.sncf.osrd.api.ElectricalProfileSetManager +import fr.sncf.osrd.api.InfraLoadEndpoint +import fr.sncf.osrd.api.InfraManager +import fr.sncf.osrd.api.VersionEndpoint import fr.sncf.osrd.api.api_v2.conflicts.ConflictDetectionEndpointV2 import fr.sncf.osrd.api.api_v2.path_properties.PathPropEndpoint import fr.sncf.osrd.api.api_v2.pathfinding.PathfindingBlocksEndpointV2 @@ -20,6 +23,7 @@ import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.system.exitProcess import okhttp3.OkHttpClient import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -303,7 +307,10 @@ class WorkerCommand : CliCommand { executor.execute { callback(message) } } }, - { _ -> logger.error("consumer cancelled") }, + { _ -> + logger.error("consumer cancelled") + exitProcess(0) + }, { consumerTag, e -> logger.info("consume shutdown: {}, {}", consumerTag, e.toString()) } From e6c5f8c98730e46339c3aedc290af6ea62a8ab07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-Etienne=20Bougu=C3=A9?= Date: Thu, 6 Feb 2025 16:06:14 +0100 Subject: [PATCH 3/3] core: fix unrecoverable freeze on exception during rabbit's DeliverCallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hard fix (kind of) to kill the process (and let orchestrator restart) if an exception goes all the way up to the DeliverCallback. For example when not able to reach editoast for infra reload. This will release unacked messages and move them back to ready (instead of keeping them unacked until the worker exits). Signed-off-by: Pierre-Etienne Bougué --- .../main/java/fr/sncf/osrd/cli/WorkerCommand.kt | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt index 9e72160d223..1513baa62c9 100644 --- a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt +++ b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt @@ -293,6 +293,17 @@ class WorkerCommand : CliCommand { executionTimeMS / 1_000.0 ) } + + val terminatorCallback = + fun(message: Delivery) { + try { + callback(message) + } catch (t: Throwable) { + t.printStackTrace(System.err) + exitProcess(1) + } + } + channel.basicConsume( WORKER_REQUESTS_QUEUE, false, @@ -302,9 +313,9 @@ class WorkerCommand : CliCommand { // We directly process the message with no dispatch if there's too many // locally queued tasks. Prevents the worker from consuming all the rabbitmq // at once, which would mess with the stats and automatic scaling. - callback(message) + terminatorCallback(message) } else { - executor.execute { callback(message) } + executor.execute { terminatorCallback(message) } } }, { _ ->