Skip to content

Commit ff12eb1

Browse files
committed
editoast: use pooling for amqp
Signed-off-by: ElysaSrc <[email protected]>
1 parent 665e2b6 commit ff12eb1

File tree

6 files changed

+108
-72
lines changed

6 files changed

+108
-72
lines changed

editoast/Cargo.lock

+26
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

editoast/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ chrono.workspace = true
9696
clap = { version = "4.5.19", features = ["derive", "env"] }
9797
colored = "2.1.0"
9898
dashmap = "6.1.0"
99+
deadpool-lapin = "0.12.1"
99100
derivative.workspace = true
100101
diesel.workspace = true
101102
diesel-async = { workspace = true }

editoast/openapi.yaml

+40
Original file line numberDiff line numberDiff line change
@@ -4067,7 +4067,9 @@ components:
40674067
- $ref: '#/components/schemas/EditoastEditionErrorSplitTrackSectionBadOffset'
40684068
- $ref: '#/components/schemas/EditoastEditoastUrlErrorInvalidUrl'
40694069
- $ref: '#/components/schemas/EditoastElectricalProfilesErrorNotFound'
4070+
- $ref: '#/components/schemas/EditoastErrorCeatePoolLapin'
40704071
- $ref: '#/components/schemas/EditoastErrorConnectionDoesNotExist'
4072+
- $ref: '#/components/schemas/EditoastErrorDeadpoolLapin'
40714073
- $ref: '#/components/schemas/EditoastErrorLapin'
40724074
- $ref: '#/components/schemas/EditoastErrorResponseTimeout'
40734075
- $ref: '#/components/schemas/EditoastErrorSerialization'
@@ -4129,6 +4131,25 @@ components:
41294131
description: Generated error type for Editoast
41304132
discriminator:
41314133
propertyName: type
4134+
EditoastErrorCeatePoolLapin:
4135+
type: object
4136+
required:
4137+
- type
4138+
- status
4139+
- message
4140+
properties:
4141+
context:
4142+
type: object
4143+
message:
4144+
type: string
4145+
status:
4146+
type: integer
4147+
enum:
4148+
- 500
4149+
type:
4150+
type: string
4151+
enum:
4152+
- editoast:coreclient:CeatePoolLapin
41324153
EditoastErrorConnectionDoesNotExist:
41334154
type: object
41344155
required:
@@ -4148,6 +4169,25 @@ components:
41484169
type: string
41494170
enum:
41504171
- editoast:coreclient:ConnectionDoesNotExist
4172+
EditoastErrorDeadpoolLapin:
4173+
type: object
4174+
required:
4175+
- type
4176+
- status
4177+
- message
4178+
properties:
4179+
context:
4180+
type: object
4181+
message:
4182+
type: string
4183+
status:
4184+
type: integer
4185+
enum:
4186+
- 500
4187+
type:
4188+
type: string
4189+
enum:
4190+
- editoast:coreclient:DeadpoolLapin
41514191
EditoastErrorLapin:
41524192
type: object
41534193
required:

editoast/src/core/mq_client.rs

+35-70
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
1+
use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime};
12
use editoast_derive::EditoastError;
23
use futures_util::StreamExt;
34
use itertools::Itertools;
45
use lapin::{
56
options::{BasicConsumeOptions, BasicPublishOptions},
67
types::{ByteArray, FieldTable, ShortString},
7-
BasicProperties, Connection, ConnectionProperties,
8+
BasicProperties,
89
};
910
use serde::Serialize;
1011
use serde_json::to_vec;
11-
use std::{fmt::Debug, sync::Arc};
12+
use std::fmt::Debug;
1213
use thiserror::Error;
13-
use tokio::{
14-
sync::RwLock,
15-
time::{timeout, Duration},
16-
};
14+
use tokio::time::{timeout, Duration};
1715

1816
#[derive(Debug, Clone)]
1917
pub struct RabbitMQClient {
20-
connection: Arc<RwLock<Option<Connection>>>,
18+
pub pool: Pool,
2119
exchange: String,
2220
timeout: u64,
2321
hostname: String,
@@ -51,6 +49,12 @@ pub enum Error {
5149
#[error("Connection does not exist")]
5250
#[editoast_error(status = "500")]
5351
ConnectionDoesNotExist,
52+
#[error("Cannot create the pool")]
53+
#[editoast_error(status = "500")]
54+
CeatePoolLapin(CreatePoolError),
55+
#[error("Cannot acquire connection from pool")]
56+
#[editoast_error(status = "500")]
57+
DeadpoolLapin(PoolError),
5458
}
5559

5660
pub struct MQResponse {
@@ -64,61 +68,20 @@ impl RabbitMQClient {
6468
.map(|name| name.to_string_lossy().into_owned())
6569
.unwrap_or_else(|_| "unknown".to_string());
6670

67-
let conn = Arc::new(RwLock::new(None));
68-
69-
tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
71+
let mut cfg = Config::default();
72+
cfg.url = Some(options.uri);
73+
let pool = cfg
74+
.create_pool(Some(Runtime::Tokio1))
75+
.map_err(Error::CeatePoolLapin)?;
7076

7177
Ok(RabbitMQClient {
72-
connection: conn,
78+
pool,
7379
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
7480
timeout: options.timeout,
7581
hostname,
7682
})
7783
}
7884

79-
async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
80-
let guard = connection.as_ref().read().await;
81-
let conn = guard.as_ref();
82-
let status = match conn {
83-
None => return false,
84-
Some(conn) => conn.status().state(),
85-
};
86-
match status {
87-
lapin::ConnectionState::Initial => true,
88-
lapin::ConnectionState::Connecting => true,
89-
lapin::ConnectionState::Connected => true,
90-
lapin::ConnectionState::Closing => true,
91-
lapin::ConnectionState::Closed => false,
92-
lapin::ConnectionState::Error => false,
93-
}
94-
}
95-
96-
async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
97-
loop {
98-
if Self::connection_ok(&connection).await {
99-
tokio::time::sleep(Duration::from_secs(2)).await;
100-
continue;
101-
}
102-
103-
tracing::info!("Reconnecting to RabbitMQ");
104-
105-
// Connection should be re-established
106-
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;
107-
108-
match new_connection {
109-
Ok(new_connection) => {
110-
*connection.write().await = Some(new_connection);
111-
tracing::info!("Reconnected to RabbitMQ");
112-
}
113-
Err(e) => {
114-
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
115-
}
116-
}
117-
118-
tokio::time::sleep(Duration::from_secs(2)).await;
119-
}
120-
}
121-
12285
#[allow(dead_code)]
12386
pub async fn call<T>(
12487
&self,
@@ -131,14 +94,8 @@ impl RabbitMQClient {
13194
where
13295
T: Serialize,
13396
{
134-
// Get current connection
135-
let connection = self.connection.read().await;
136-
if connection.is_none() {
137-
return Err(Error::ConnectionDoesNotExist);
138-
}
139-
let connection = connection.as_ref().unwrap();
140-
14197
// Create a channel
98+
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
14299
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
143100

144101
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
@@ -172,6 +129,12 @@ impl RabbitMQClient {
172129
.await
173130
.map_err(Error::Lapin)?;
174131

132+
// Explicitly close the channel
133+
channel
134+
.close(200, "Normal shutdown")
135+
.await
136+
.map_err(Error::Lapin)?;
137+
175138
Ok(())
176139
}
177140

@@ -186,14 +149,8 @@ impl RabbitMQClient {
186149
where
187150
T: Serialize,
188151
{
189-
// Get current connection
190-
let connection = self.connection.read().await;
191-
if connection.is_none() {
192-
return Err(Error::ConnectionDoesNotExist);
193-
}
194-
let connection = connection.as_ref().unwrap();
195-
196152
// Create a channel
153+
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
197154
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
198155

199156
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
@@ -247,7 +204,7 @@ impl RabbitMQClient {
247204
.await
248205
.map_err(|_| Error::ResponseTimeout)?;
249206

250-
match response_delivery {
207+
let result = match response_delivery {
251208
Some(Ok(delivery)) => {
252209
let status = delivery
253210
.properties
@@ -265,7 +222,15 @@ impl RabbitMQClient {
265222
}
266223
Some(Err(e)) => Err(e.into()),
267224
None => panic!("Rabbitmq consumer was cancelled unexpectedly"),
268-
}
225+
};
226+
227+
// Explicitly close the channel
228+
channel
229+
.close(200, "Normal shutdown")
230+
.await
231+
.map_err(Error::Lapin)?;
232+
233+
result
269234
}
270235
}
271236

front/public/locales/en/errors.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
"Serialization": "Core: cannot serialize request",
4848
"StatusParsing": "Core: cannot parse status",
4949
"UnparsableErrorOutput": "Core returned an error in an unknown format",
50-
"ConnectionDoesNotExist": "Core: message queue: connection not established"
50+
"ConnectionDoesNotExist": "Core: message queue: connection not established",
51+
"CeatePoolLapin": "Core: message queue: cannot create pool",
52+
"DeadpoolLapin": "Core: message queue: pool error"
5153
},
5254
"DatabaseAccessError": "Database access fatal error",
5355
"document": {

front/public/locales/fr/errors.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@
4747
"Serialization": "Core: impossible de sérialiser la requête",
4848
"StatusParsing": "Core: impossible d'obtenir le status",
4949
"UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu",
50-
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie"
50+
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie",
51+
"CeatePoolLapin": "Core: file d'attente de messages: erreur de création de pool",
52+
"DeadpoolLapin": "Core: file d'attente de messages: erreur de pool"
5153
},
5254
"document": {
5355
"NotFound": "Document '{{document_key}}' non trouvé"

0 commit comments

Comments
 (0)