Skip to content

Commit 1c0f664

Browse files
committed
editoast: use pooling for amqp
1 parent 665e2b6 commit 1c0f664

File tree

3 files changed

+46
-68
lines changed

3 files changed

+46
-68
lines changed

editoast/Cargo.lock

+26
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

editoast/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ chrono.workspace = true
9696
clap = { version = "4.5.19", features = ["derive", "env"] }
9797
colored = "2.1.0"
9898
dashmap = "6.1.0"
99+
deadpool-lapin = "0.12.1"
99100
derivative.workspace = true
100101
diesel.workspace = true
101102
diesel-async = { workspace = true }

editoast/src/core/mq_client.rs

+19-68
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
1+
use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime};
12
use editoast_derive::EditoastError;
23
use futures_util::StreamExt;
34
use itertools::Itertools;
45
use lapin::{
56
options::{BasicConsumeOptions, BasicPublishOptions},
67
types::{ByteArray, FieldTable, ShortString},
7-
BasicProperties, Connection, ConnectionProperties,
8+
BasicProperties,
89
};
910
use serde::Serialize;
1011
use serde_json::to_vec;
11-
use std::{fmt::Debug, sync::Arc};
12+
use std::fmt::Debug;
1213
use thiserror::Error;
13-
use tokio::{
14-
sync::RwLock,
15-
time::{timeout, Duration},
16-
};
14+
use tokio::time::{timeout, Duration};
1715

1816
#[derive(Debug, Clone)]
1917
pub struct RabbitMQClient {
20-
connection: Arc<RwLock<Option<Connection>>>,
18+
pub pool: Pool,
2119
exchange: String,
2220
timeout: u64,
2321
hostname: String,
@@ -51,6 +49,12 @@ pub enum Error {
5149
#[error("Connection does not exist")]
5250
#[editoast_error(status = "500")]
5351
ConnectionDoesNotExist,
52+
#[error("Cannot create the pool")]
53+
#[editoast_error(status = "500")]
54+
CeatePoolLapin(CreatePoolError),
55+
#[error("Cannot acquire connection from pool")]
56+
#[editoast_error(status = "500")]
57+
DeadpoolLapin(PoolError),
5458
}
5559

5660
pub struct MQResponse {
@@ -64,61 +68,20 @@ impl RabbitMQClient {
6468
.map(|name| name.to_string_lossy().into_owned())
6569
.unwrap_or_else(|_| "unknown".to_string());
6670

67-
let conn = Arc::new(RwLock::new(None));
68-
69-
tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
71+
let mut cfg = Config::default();
72+
cfg.url = Some(options.uri);
73+
let pool = cfg
74+
.create_pool(Some(Runtime::Tokio1))
75+
.map_err(Error::CeatePoolLapin)?;
7076

7177
Ok(RabbitMQClient {
72-
connection: conn,
78+
pool,
7379
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
7480
timeout: options.timeout,
7581
hostname,
7682
})
7783
}
7884

79-
async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
80-
let guard = connection.as_ref().read().await;
81-
let conn = guard.as_ref();
82-
let status = match conn {
83-
None => return false,
84-
Some(conn) => conn.status().state(),
85-
};
86-
match status {
87-
lapin::ConnectionState::Initial => true,
88-
lapin::ConnectionState::Connecting => true,
89-
lapin::ConnectionState::Connected => true,
90-
lapin::ConnectionState::Closing => true,
91-
lapin::ConnectionState::Closed => false,
92-
lapin::ConnectionState::Error => false,
93-
}
94-
}
95-
96-
async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
97-
loop {
98-
if Self::connection_ok(&connection).await {
99-
tokio::time::sleep(Duration::from_secs(2)).await;
100-
continue;
101-
}
102-
103-
tracing::info!("Reconnecting to RabbitMQ");
104-
105-
// Connection should be re-established
106-
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;
107-
108-
match new_connection {
109-
Ok(new_connection) => {
110-
*connection.write().await = Some(new_connection);
111-
tracing::info!("Reconnected to RabbitMQ");
112-
}
113-
Err(e) => {
114-
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
115-
}
116-
}
117-
118-
tokio::time::sleep(Duration::from_secs(2)).await;
119-
}
120-
}
121-
12285
#[allow(dead_code)]
12386
pub async fn call<T>(
12487
&self,
@@ -131,14 +94,8 @@ impl RabbitMQClient {
13194
where
13295
T: Serialize,
13396
{
134-
// Get current connection
135-
let connection = self.connection.read().await;
136-
if connection.is_none() {
137-
return Err(Error::ConnectionDoesNotExist);
138-
}
139-
let connection = connection.as_ref().unwrap();
140-
14197
// Create a channel
98+
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
14299
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
143100

144101
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
@@ -186,14 +143,8 @@ impl RabbitMQClient {
186143
where
187144
T: Serialize,
188145
{
189-
// Get current connection
190-
let connection = self.connection.read().await;
191-
if connection.is_none() {
192-
return Err(Error::ConnectionDoesNotExist);
193-
}
194-
let connection = connection.as_ref().unwrap();
195-
196146
// Create a channel
147+
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
197148
let channel = connection.create_channel().await.map_err(Error::Lapin)?;
198149

199150
let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;

0 commit comments

Comments
 (0)