Skip to content

Commit 508ac32

Browse files
committed
core, osrdyne, docker: make rmq max message size configurable through an environment variable
Signed-off-by: Ethan Perruzza <[email protected]>
1 parent 47eefea commit 508ac32

File tree

8 files changed

+32
-3
lines changed

8 files changed

+32
-3
lines changed

core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class WorkerCommand : CliCommand {
4545
val WORKER_ID_USE_HOSTNAME: Boolean
4646
val WORKER_KEY: String?
4747
val WORKER_AMQP_URI: String
48+
val WORKER_MAX_MSG_SIZE: Int
4849
val WORKER_POOL: String
4950
val WORKER_REQUESTS_QUEUE: String
5051
val WORKER_ACTIVITY_EXCHANGE: String
@@ -56,6 +57,7 @@ class WorkerCommand : CliCommand {
5657
WORKER_KEY = if (ALL_INFRA) "all" else System.getenv("WORKER_KEY")
5758
WORKER_AMQP_URI =
5859
System.getenv("WORKER_AMQP_URI") ?: "amqp://osrd:[email protected]:5672/%2f"
60+
WORKER_MAX_MSG_SIZE = getIntEnvvar("WORKER_MAX_MSG_SIZE") ?: 1024 * 1024 * 128 * 5
5961
WORKER_POOL = System.getenv("WORKER_POOL") ?: "core"
6062
WORKER_REQUESTS_QUEUE =
6163
System.getenv("WORKER_REQUESTS_QUEUE") ?: "$WORKER_POOL-req-$WORKER_KEY"
@@ -76,6 +78,10 @@ class WorkerCommand : CliCommand {
7678
return System.getenv(name)?.lowercase() !in arrayOf(null, "", "0", "false")
7779
}
7880

81+
private fun getIntEnvvar(name: String): Int? {
82+
return System.getenv(name)?.toIntOrNull()
83+
}
84+
7985
override fun run(): Int {
8086
if (WORKER_ID == null || WORKER_KEY == null) {
8187
throw IllegalStateException(
@@ -125,7 +131,7 @@ class WorkerCommand : CliCommand {
125131

126132
val factory = ConnectionFactory()
127133
factory.setUri(WORKER_AMQP_URI)
128-
factory.setMaxInboundMessageBodySize(1024 * 1024 * 128 * 5)
134+
factory.setMaxInboundMessageBodySize(WORKER_MAX_MSG_SIZE)
129135
val connection = factory.newConnection()
130136
connection.createChannel().use { channel -> reportActivity(channel, "started") }
131137

docker/docker-compose.host.yml

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ services:
5454
OSRDYNE__WORKER_DRIVER__DEFAULT_ENV: >
5555
[ "CORE_EDITOAST_URL=http://127.0.0.1:8090", "JAVA_TOOL_OPTIONS=-javaagent:/app/opentelemetry-javaagent.jar", "CORE_MONITOR_TYPE=opentelemetry", "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc", "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:4317", "OTEL_METRICS_EXPORTER=none", "OTEL_LOGS_EXPORTER=none" ]
5656
OSRDYNE__AMQP_URI: "amqp://osrd:[email protected]:5672/%2f"
57+
OSRDYNE__MAX_MSG_SIZE: 671088640 # 1024 * 1024 * 128 * 5
5758
OSRDYNE__MANAGEMENT_URI: "http://osrd:[email protected]:15672/"
5859
OSRDYNE__OPENTELEMETRY__ENDPOINT: "http://localhost:4317"
5960

osrdyne/configuration_templates/process-compose-driver.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ worker_driver:
99
worker_id: /tmp/worker_id # WORKER_ID
1010
worker_key: /tmp/worker_key # WORKER_KEY
1111
amqp_uri: /tmp/worker_amqp_uri # WORKER_AMQP_URI
12+
max_msg_size: /tmp/worker_max_msg_size # WORKER_MAX_MSG_SIZE

osrdyne/configuration_templates/process-compose.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ processes:
1919
export WORKER_ID=$(cat /tmp/worker_id);
2020
export WORKER_KEY=$(cat /tmp/worker_key);
2121
export WORKER_AMQP_URI="$(cat /tmp/worker_amqp_uri);"
22+
export WORKER_MAX_MSG_SIZE="$(cat /tmp/worker_max_msg_size);"
2223
java $$JAVA_OPTS -ea -jar build/libs/osrd-all.jar worker
2324
)
2425
working_dir: "$OSRD_PATH/core"

osrdyne/src/config.rs

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub enum WorkerDriverConfig {
2525
#[derive(Debug, Deserialize, Serialize)]
2626
pub struct OsrdyneConfig {
2727
pub amqp_uri: String,
28+
pub max_msg_size: i64,
2829
pub management_uri: String,
2930
pub pool_id: String,
3031
pub worker_driver: WorkerDriverConfig,
@@ -56,6 +57,7 @@ impl Default for OsrdyneConfig {
5657
fn default() -> Self {
5758
Self {
5859
amqp_uri: "amqp://osrd:password@osrd-rabbitmq:5672/%2f".into(),
60+
max_msg_size: 1024 * 1024 * 128 * 5,
5961
management_uri: "http://osrd:password@osrd-rabbitmq:15672".into(),
6062
pool_id: "core".to_string(),
6163
worker_driver: WorkerDriverConfig::Noop,

osrdyne/src/drivers/docker.rs

+4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct DockerDriver {
4141
client: Docker,
4242
options: DockerDriverOptions,
4343
amqp_uri: String,
44+
max_message_size: i64,
4445
worker_pool: String,
4546
version_identifier: String,
4647
}
@@ -60,6 +61,7 @@ impl DockerDriver {
6061
pub fn new(
6162
options: DockerDriverOptions,
6263
amqp_uri: String,
64+
max_message_size: i64,
6365
worker_pool: String,
6466
) -> DockerDriver {
6567
let version_identifier = std::env::var("OSRD_GIT_DESCRIBE")
@@ -71,6 +73,7 @@ impl DockerDriver {
7173
client: Docker::connect_with_socket_defaults().expect("Failed to connect to Docker"),
7274
options,
7375
amqp_uri,
76+
max_message_size,
7477
worker_pool,
7578
version_identifier: hashed,
7679
}
@@ -120,6 +123,7 @@ impl WorkerDriver for DockerDriver {
120123
env.push(format!("WORKER_ID={}", new_id));
121124
env.push(format!("WORKER_KEY={}", worker_key));
122125
env.push(format!("WORKER_AMQP_URI={}", self.amqp_uri));
126+
env.push(format!("WORKER_MAX_MSG_SIZE={}", self.max_message_size));
123127
env
124128
};
125129

osrdyne/src/drivers/kubernetes.rs

+8
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ pub struct KubernetesDriver {
139139
pool_id: String,
140140
options: KubernetesDriverOptions,
141141
amqp_uri: String,
142+
max_message_size: i64,
142143
version_identifier: String,
143144
}
144145

@@ -157,6 +158,7 @@ impl KubernetesDriver {
157158
pub async fn new(
158159
options: KubernetesDriverOptions,
159160
amqp_uri: String,
161+
max_message_size: i64,
160162
pool_id: String,
161163
) -> KubernetesDriver {
162164
let version_identifier = std::env::var("OSRD_GIT_DESCRIBE")
@@ -170,6 +172,7 @@ impl KubernetesDriver {
170172
.expect("Failed to connect to Kubernetes"),
171173
options,
172174
amqp_uri,
175+
max_message_size,
173176
pool_id,
174177
version_identifier: hashed,
175178
}
@@ -408,6 +411,11 @@ impl WorkerDriver for KubernetesDriver {
408411
value: Some(self.amqp_uri.clone()),
409412
..Default::default()
410413
});
414+
env.push(EnvVar {
415+
name: "WORKER_MAX_MSG_SIZE".to_string(),
416+
value: Some(self.max_message_size.to_string()),
417+
..Default::default()
418+
});
411419
env.push(EnvVar {
412420
name: "WORKEY_ID_USE_HOSTNAME".to_string(),
413421
value: Some("1".to_string()),

osrdyne/src/main.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,21 @@ async fn main() -> Result<(), anyhow::Error> {
131131
Box::new(DockerDriver::new(
132132
opts,
133133
config.amqp_uri.clone(),
134+
config.max_msg_size,
134135
config.pool_id.clone(),
135136
))
136137
}
137138

138139
WorkerDriverConfig::KubernetesDriver(opts) => {
139140
info!("Using Kubernetes driver");
140141
Box::new(
141-
KubernetesDriver::new(opts, config.amqp_uri.clone(), config.pool_id.clone())
142-
.await,
142+
KubernetesDriver::new(
143+
opts,
144+
config.amqp_uri.clone(),
145+
config.max_msg_size,
146+
config.pool_id.clone(),
147+
)
148+
.await,
143149
)
144150
}
145151

0 commit comments

Comments
 (0)