diff --git a/core/gradle/libs.versions.toml b/core/gradle/libs.versions.toml index 52f87470e61..2a09ff8511e 100644 --- a/core/gradle/libs.versions.toml +++ b/core/gradle/libs.versions.toml @@ -67,7 +67,7 @@ opentelemetry-instrumentation-annotations = { module = 'io.opentelemetry.instrum kaml = { module = 'com.charleskorn.kaml:kaml', version = '0.66.0' } # Apache 2.0 -amqp-client = { module = 'com.rabbitmq:amqp-client', version = '5.23.0' } +amqp-client = { module = 'com.rabbitmq:amqp-client', version = '5.24.0' } [plugins] # kotlin diff --git a/core/src/main/java/fr/sncf/osrd/App.java b/core/src/main/java/fr/sncf/osrd/App.java index a30b6d57116..61f2a6dfdad 100644 --- a/core/src/main/java/fr/sncf/osrd/App.java +++ b/core/src/main/java/fr/sncf/osrd/App.java @@ -41,7 +41,12 @@ public static void main(String[] args) { } // run the user command - var statusCode = commands.get(commandName).run(); + var statusCode = 1; + try { + statusCode = commands.get(commandName).run(); + } catch (Exception e) { + e.printStackTrace(System.err); + } System.exit(statusCode); } } diff --git a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt index 52222378fab..1513baa62c9 100644 --- a/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt +++ b/core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt @@ -3,7 +3,10 @@ package fr.sncf.osrd.cli import com.beust.jcommander.Parameter import com.beust.jcommander.Parameters import com.rabbitmq.client.* -import fr.sncf.osrd.api.* +import fr.sncf.osrd.api.ElectricalProfileSetManager +import fr.sncf.osrd.api.InfraLoadEndpoint +import fr.sncf.osrd.api.InfraManager +import fr.sncf.osrd.api.VersionEndpoint import fr.sncf.osrd.api.api_v2.conflicts.ConflictDetectionEndpointV2 import fr.sncf.osrd.api.api_v2.path_properties.PathPropEndpoint import fr.sncf.osrd.api.api_v2.pathfinding.PathfindingBlocksEndpointV2 @@ -20,6 +23,7 @@ import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import kotlin.system.exitProcess import okhttp3.OkHttpClient import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -289,6 +293,17 @@ class WorkerCommand : CliCommand { executionTimeMS / 1_000.0 ) } + + val terminatorCallback = + fun(message: Delivery) { + try { + callback(message) + } catch (t: Throwable) { + t.printStackTrace(System.err) + exitProcess(1) + } + } + channel.basicConsume( WORKER_REQUESTS_QUEUE, false, @@ -298,12 +313,15 @@ class WorkerCommand : CliCommand { // We directly process the message with no dispatch if there's too many // locally queued tasks. Prevents the worker from consuming all the rabbitmq // at once, which would mess with the stats and automatic scaling. - callback(message) + terminatorCallback(message) } else { - executor.execute { callback(message) } + executor.execute { terminatorCallback(message) } } }, - { _ -> logger.error("consumer cancelled") }, + { _ -> + logger.error("consumer cancelled") + exitProcess(0) + }, { consumerTag, e -> logger.info("consume shutdown: {}, {}", consumerTag, e.toString()) }