Skip to content

Commit a84a33c

Browse files
ElysaSrcKhoyo
authored andcommitted
editoast: reconnect on rabbitmq disconnection
Currently, when the Lapin driver loose the connection to the RabbitMQ (like when the Kubernetes is rescheduling the RabbitMQ server pod on another node) editoast is left in an inconsistent state. We add a loop to try to reconnect to the RabbitMQ server when the connection is lost. Co-authored-by: Younes Khoudli <[email protected]>
1 parent f25fc61 commit a84a33c

File tree

4 files changed

+97
-19
lines changed

4 files changed

+97
-19
lines changed

editoast/openapi.yaml

+20
Original file line numberDiff line numberDiff line change
@@ -4412,6 +4412,7 @@ components:
44124412
- $ref: '#/components/schemas/EditoastEditionErrorSplitTrackSectionBadOffset'
44134413
- $ref: '#/components/schemas/EditoastEditoastUrlErrorInvalidUrl'
44144414
- $ref: '#/components/schemas/EditoastElectricalProfilesErrorNotFound'
4415+
- $ref: '#/components/schemas/EditoastErrorConnectionDoesNotExist'
44154416
- $ref: '#/components/schemas/EditoastErrorLapin'
44164417
- $ref: '#/components/schemas/EditoastErrorResponseTimeout'
44174418
- $ref: '#/components/schemas/EditoastErrorSerialization'
@@ -4495,6 +4496,25 @@ components:
44954496
description: Generated error type for Editoast
44964497
discriminator:
44974498
propertyName: type
4499+
EditoastErrorConnectionDoesNotExist:
4500+
type: object
4501+
required:
4502+
- type
4503+
- status
4504+
- message
4505+
properties:
4506+
context:
4507+
type: object
4508+
message:
4509+
type: string
4510+
status:
4511+
type: integer
4512+
enum:
4513+
- 500
4514+
type:
4515+
type: string
4516+
enum:
4517+
- editoast:coreclient:ConnectionDoesNotExist
44984518
EditoastErrorLapin:
44994519
type: object
45004520
required:

editoast/src/core/mq_client.rs

+72-16
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ use serde::Serialize;
1010
use serde_json::to_vec;
1111
use std::{fmt::Debug, sync::Arc};
1212
use thiserror::Error;
13-
use tokio::time::{timeout, Duration};
13+
use tokio::{
14+
sync::RwLock,
15+
time::{timeout, Duration},
16+
};
1417

1518
#[derive(Debug, Clone)]
1619
pub struct RabbitMQClient {
17-
connection: Arc<Connection>,
20+
connection: Arc<RwLock<Option<Connection>>>,
1821
exchange: String,
1922
timeout: u64,
2023
hostname: String,
@@ -45,6 +48,9 @@ pub enum Error {
4548
#[error("Response timeout")]
4649
#[editoast_error(status = "500")]
4750
ResponseTimeout,
51+
#[error("Connection does not exist")]
52+
#[editoast_error(status = "500")]
53+
ConnectionDoesNotExist,
4854
}
4955

5056
pub struct MQResponse {
@@ -54,21 +60,65 @@ pub struct MQResponse {
5460

5561
impl RabbitMQClient {
5662
pub async fn new(options: Options) -> Result<Self, Error> {
57-
let connection = Connection::connect(&options.uri, ConnectionProperties::default())
58-
.await
59-
.map_err(Error::Lapin)?;
6063
let hostname = hostname::get()
6164
.map(|name| name.to_string_lossy().into_owned())
6265
.unwrap_or_else(|_| "unknown".to_string());
6366

67+
let conn = Arc::new(RwLock::new(None));
68+
69+
tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
70+
6471
Ok(RabbitMQClient {
65-
connection: Arc::new(connection),
72+
connection: conn,
6673
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
6774
timeout: options.timeout,
6875
hostname,
6976
})
7077
}
7178

79+
async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
80+
let guard = connection.as_ref().read().await;
81+
let conn = guard.as_ref();
82+
let status = match conn {
83+
None => return false,
84+
Some(conn) => conn.status().state(),
85+
};
86+
match status {
87+
lapin::ConnectionState::Initial => true,
88+
lapin::ConnectionState::Connecting => true,
89+
lapin::ConnectionState::Connected => true,
90+
lapin::ConnectionState::Closing => true,
91+
lapin::ConnectionState::Closed => false,
92+
lapin::ConnectionState::Error => false,
93+
}
94+
}
95+
96+
async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
97+
loop {
98+
if Self::connection_ok(&connection).await {
99+
tokio::time::sleep(Duration::from_secs(2)).await;
100+
continue;
101+
}
102+
103+
tracing::info!("Reconnecting to RabbitMQ");
104+
105+
// Connection should be re-established
106+
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;
107+
108+
match new_connection {
109+
Ok(new_connection) => {
110+
*connection.write().await = Some(new_connection);
111+
tracing::info!("Reconnected to RabbitMQ");
112+
}
113+
Err(e) => {
114+
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
115+
}
116+
}
117+
118+
tokio::time::sleep(Duration::from_secs(2)).await;
119+
}
120+
}
121+
72122
#[allow(dead_code)]
73123
pub async fn call<T>(
74124
&self,
@@ -81,12 +131,15 @@ impl RabbitMQClient {
81131
where
82132
T: Serialize,
83133
{
134+
// Get current connection
135+
let connection = self.connection.read().await;
136+
if connection.is_none() {
137+
return Err(Error::ConnectionDoesNotExist);
138+
}
139+
let connection = connection.as_ref().unwrap();
140+
84141
// Create a channel
85-
let channel = self
86-
.connection
87-
.create_channel()
88-
.await
89-
.map_err(Error::Lapin)?;
142+
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
90143

91144
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
92145
let serialized_payload = serialized_payload_vec.as_slice();
@@ -133,12 +186,15 @@ impl RabbitMQClient {
133186
where
134187
T: Serialize,
135188
{
189+
// Get current connection
190+
let connection = self.connection.read().await;
191+
if connection.is_none() {
192+
return Err(Error::ConnectionDoesNotExist);
193+
}
194+
let connection = connection.as_ref().unwrap();
195+
136196
// Create a channel
137-
let channel = self
138-
.connection
139-
.create_channel()
140-
.await
141-
.map_err(Error::Lapin)?;
197+
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
142198

143199
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
144200
let serialized_payload = serialized_payload_vec.as_slice();

front/public/locales/en/errors.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
"ResponseTimeout": "Core: request timeout",
4343
"Serialization": "Core: cannot serialize request",
4444
"StatusParsing": "Core: cannot parse status",
45-
"UnparsableErrorOutput": "Core returned an error in an unknown format"
45+
"UnparsableErrorOutput": "Core returned an error in an unknown format",
46+
"ConnectionDoesNotExist": "Core: message queue: connection not established"
4647
},
4748
"DatabaseAccessError": "Database access fatal error",
4849
"document": {

front/public/locales/fr/errors.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@
3838
"ConnectionResetByPeer": "Core: réinitialisation de la connexion. Nouvelle tentative.",
3939
"CoreResponseFormatError": "Core: impossible d'analyser la réponse '{{msg}}'",
4040
"GenericCoreError": "Core: erreur {{raw_error}}",
41-
"Lapin": "Core: file d'attente de message: erreur de protocole",
41+
"Lapin": "Core: file d'attente de messages: erreur de protocole",
4242
"ResponseTimeout": "Core: temps d'attente écoulé",
4343
"Serialization": "Core: impossible de sérialiser la requête",
4444
"StatusParsing": "Core: impossible d'obtenir le status",
45-
"UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu"
45+
"UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu",
46+
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie"
4647
},
4748
"document": {
4849
"NotFound": "Document '{{document_key}}' non trouvé"

0 commit comments

Comments
 (0)