Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

editoast: use pooling for amqp #9317

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions editoast/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions editoast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ chrono.workspace = true
clap = { version = "4.5.19", features = ["derive", "env"] }
colored = "2.1.0"
dashmap = "6.1.0"
deadpool-lapin = "0.12.1"
derivative.workspace = true
diesel.workspace = true
diesel-async = { workspace = true }
Expand Down
40 changes: 40 additions & 0 deletions editoast/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4068,6 +4068,8 @@ components:
- $ref: '#/components/schemas/EditoastEditoastUrlErrorInvalidUrl'
- $ref: '#/components/schemas/EditoastElectricalProfilesErrorNotFound'
- $ref: '#/components/schemas/EditoastErrorConnectionDoesNotExist'
- $ref: '#/components/schemas/EditoastErrorCreatePoolLapin'
- $ref: '#/components/schemas/EditoastErrorDeadpoolLapin'
- $ref: '#/components/schemas/EditoastErrorLapin'
- $ref: '#/components/schemas/EditoastErrorResponseTimeout'
- $ref: '#/components/schemas/EditoastErrorSerialization'
Expand Down Expand Up @@ -4148,6 +4150,44 @@ components:
type: string
enum:
- editoast:coreclient:ConnectionDoesNotExist
EditoastErrorCreatePoolLapin:
type: object
required:
- type
- status
- message
properties:
context:
type: object
message:
type: string
status:
type: integer
enum:
- 500
type:
type: string
enum:
- editoast:coreclient:CreatePoolLapin
EditoastErrorDeadpoolLapin:
type: object
required:
- type
- status
- message
properties:
context:
type: object
message:
type: string
status:
type: integer
enum:
- 500
type:
type: string
enum:
- editoast:coreclient:DeadpoolLapin
EditoastErrorLapin:
type: object
required:
Expand Down
121 changes: 49 additions & 72 deletions editoast/src/core/mq_client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use deadpool_lapin::{Config, CreatePoolError, Pool, PoolError, Runtime};
use editoast_derive::EditoastError;
use futures_util::StreamExt;
use itertools::Itertools;
use lapin::{
options::{BasicConsumeOptions, BasicPublishOptions},
types::{ByteArray, FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties,
BasicProperties,
};
use serde::Serialize;
use serde_json::to_vec;
use std::{fmt::Debug, sync::Arc};
use std::fmt::Debug;
use thiserror::Error;
use tokio::{
sync::RwLock,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

#[derive(Debug, Clone)]
pub struct RabbitMQClient {
connection: Arc<RwLock<Option<Connection>>>,
pub pool: Pool,
exchange: String,
timeout: u64,
hostname: String,
Expand Down Expand Up @@ -51,6 +49,12 @@ pub enum Error {
#[error("Connection does not exist")]
#[editoast_error(status = "500")]
ConnectionDoesNotExist,
#[error("Cannot create the pool")]
#[editoast_error(status = "500")]
CreatePoolLapin(CreatePoolError),
#[error("Cannot acquire connection from pool")]
#[editoast_error(status = "500")]
DeadpoolLapin(PoolError),
}

pub struct MQResponse {
Expand All @@ -64,61 +68,22 @@ impl RabbitMQClient {
.map(|name| name.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string());

let conn = Arc::new(RwLock::new(None));

tokio::spawn(Self::connection_loop(options.uri, conn.clone()));
let cfg = Config {
url: Some(options.uri),
..Default::default()
};
let pool = cfg
.create_pool(Some(Runtime::Tokio1))
.map_err(Error::CreatePoolLapin)?;

Ok(RabbitMQClient {
connection: conn,
pool,
exchange: format!("{}-req-xchg", options.worker_pool_identifier),
timeout: options.timeout,
hostname,
})
}

async fn connection_ok(connection: &Arc<RwLock<Option<Connection>>>) -> bool {
let guard = connection.as_ref().read().await;
let conn = guard.as_ref();
let status = match conn {
None => return false,
Some(conn) => conn.status().state(),
};
match status {
lapin::ConnectionState::Initial => true,
lapin::ConnectionState::Connecting => true,
lapin::ConnectionState::Connected => true,
lapin::ConnectionState::Closing => true,
lapin::ConnectionState::Closed => false,
lapin::ConnectionState::Error => false,
}
}

async fn connection_loop(uri: String, connection: Arc<RwLock<Option<Connection>>>) {
loop {
if Self::connection_ok(&connection).await {
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}

tracing::info!("Reconnecting to RabbitMQ");

// Connection should be re-established
let new_connection = Connection::connect(&uri, ConnectionProperties::default()).await;

match new_connection {
Ok(new_connection) => {
*connection.write().await = Some(new_connection);
tracing::info!("Reconnected to RabbitMQ");
}
Err(e) => {
tracing::error!("Error while reconnecting to RabbitMQ: {:?}", e);
}
}

tokio::time::sleep(Duration::from_secs(2)).await;
}
}

#[allow(dead_code)]
pub async fn call<T>(
&self,
Expand All @@ -131,14 +96,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down Expand Up @@ -172,6 +131,12 @@ impl RabbitMQClient {
.await
.map_err(Error::Lapin)?;

// Explicitly close the channel
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

Ok(())
}

Expand All @@ -186,14 +151,8 @@ impl RabbitMQClient {
where
T: Serialize,
{
// Get current connection
let connection = self.connection.read().await;
if connection.is_none() {
return Err(Error::ConnectionDoesNotExist);
}
let connection = connection.as_ref().unwrap();

// Create a channel
let connection = self.pool.get().await.map_err(Error::DeadpoolLapin)?;
let channel = connection.create_channel().await.map_err(Error::Lapin)?;

let serialized_payload_vec = to_vec(published_payload).map_err(Error::Serialization)?;
Expand Down Expand Up @@ -244,10 +203,20 @@ impl RabbitMQClient {
Duration::from_secs(override_timeout.unwrap_or(self.timeout)),
consumer.next(),
)
.await
.map_err(|_| Error::ResponseTimeout)?;
.await;

match response_delivery {
if response_delivery.is_err() {
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

return Err(Error::ResponseTimeout);
}

let response_delivery = response_delivery.unwrap();

let result = match response_delivery {
Some(Ok(delivery)) => {
let status = delivery
.properties
Expand All @@ -265,7 +234,15 @@ impl RabbitMQClient {
}
Some(Err(e)) => Err(e.into()),
None => panic!("Rabbitmq consumer was cancelled unexpectedly"),
}
};

// Explicitly close the channel
channel
.close(200, "Normal shutdown")
.await
.map_err(Error::Lapin)?;

result
}
}

Expand Down
4 changes: 3 additions & 1 deletion front/public/locales/en/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
"Serialization": "Core: cannot serialize request",
"StatusParsing": "Core: cannot parse status",
"UnparsableErrorOutput": "Core returned an error in an unknown format",
"ConnectionDoesNotExist": "Core: message queue: connection not established"
"ConnectionDoesNotExist": "Core: message queue: connection not established",
"CreatePoolLapin": "Core: message queue: cannot create pool",
"DeadpoolLapin": "Core: message queue: pool error"
},
"DatabaseAccessError": "Database access fatal error",
"document": {
Expand Down
4 changes: 3 additions & 1 deletion front/public/locales/fr/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
"Serialization": "Core: impossible de sérialiser la requête",
"StatusParsing": "Core: impossible d'obtenir le status",
"UnparsableErrorOutput": "Core: a renvoyé une erreur dans un format inconnu",
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie"
"ConnectionDoesNotExist": "Core: file d'attente de messages: connexion non établie",
"CreatePoolLapin": "Core: file d'attente de messages: erreur de création de pool",
"DeadpoolLapin": "Core: file d'attente de messages: erreur de pool"
},
"document": {
"NotFound": "Document '{{document_key}}' non trouvé"
Expand Down
Loading