1
+ use deadpool_lapin:: { Config , CreatePoolError , Pool , PoolError , Runtime } ;
1
2
use editoast_derive:: EditoastError ;
2
3
use futures_util:: StreamExt ;
3
4
use itertools:: Itertools ;
4
5
use lapin:: {
5
6
options:: { BasicConsumeOptions , BasicPublishOptions } ,
6
7
types:: { ByteArray , FieldTable , ShortString } ,
7
- BasicProperties , Connection , ConnectionProperties ,
8
+ BasicProperties ,
8
9
} ;
9
10
use serde:: Serialize ;
10
11
use serde_json:: to_vec;
11
- use std:: { fmt:: Debug , sync :: Arc } ;
12
+ use std:: fmt:: Debug ;
12
13
use thiserror:: Error ;
13
- use tokio:: {
14
- sync:: RwLock ,
15
- time:: { timeout, Duration } ,
16
- } ;
14
+ use tokio:: time:: { timeout, Duration } ;
17
15
18
16
#[ derive( Debug , Clone ) ]
19
17
pub struct RabbitMQClient {
20
- connection : Arc < RwLock < Option < Connection > > > ,
18
+ pub pool : Pool ,
21
19
exchange : String ,
22
20
timeout : u64 ,
23
21
hostname : String ,
@@ -51,6 +49,12 @@ pub enum Error {
51
49
#[ error( "Connection does not exist" ) ]
52
50
#[ editoast_error( status = "500" ) ]
53
51
ConnectionDoesNotExist ,
52
+ #[ error( "Cannot create the pool" ) ]
53
+ #[ editoast_error( status = "500" ) ]
54
+ CreatePoolLapin ( CreatePoolError ) ,
55
+ #[ error( "Cannot acquire connection from pool" ) ]
56
+ #[ editoast_error( status = "500" ) ]
57
+ DeadpoolLapin ( PoolError ) ,
54
58
}
55
59
56
60
pub struct MQResponse {
@@ -64,61 +68,20 @@ impl RabbitMQClient {
64
68
. map ( |name| name. to_string_lossy ( ) . into_owned ( ) )
65
69
. unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
66
70
67
- let conn = Arc :: new ( RwLock :: new ( None ) ) ;
68
-
69
- tokio:: spawn ( Self :: connection_loop ( options. uri , conn. clone ( ) ) ) ;
71
+ let mut cfg = Config :: default ( ) ;
72
+ cfg. url = Some ( options. uri ) ;
73
+ let pool = cfg
74
+ . create_pool ( Some ( Runtime :: Tokio1 ) )
75
+ . map_err ( Error :: CreatePoolLapin ) ?;
70
76
71
77
Ok ( RabbitMQClient {
72
- connection : conn ,
78
+ pool ,
73
79
exchange : format ! ( "{}-req-xchg" , options. worker_pool_identifier) ,
74
80
timeout : options. timeout ,
75
81
hostname,
76
82
} )
77
83
}
78
84
79
- async fn connection_ok ( connection : & Arc < RwLock < Option < Connection > > > ) -> bool {
80
- let guard = connection. as_ref ( ) . read ( ) . await ;
81
- let conn = guard. as_ref ( ) ;
82
- let status = match conn {
83
- None => return false ,
84
- Some ( conn) => conn. status ( ) . state ( ) ,
85
- } ;
86
- match status {
87
- lapin:: ConnectionState :: Initial => true ,
88
- lapin:: ConnectionState :: Connecting => true ,
89
- lapin:: ConnectionState :: Connected => true ,
90
- lapin:: ConnectionState :: Closing => true ,
91
- lapin:: ConnectionState :: Closed => false ,
92
- lapin:: ConnectionState :: Error => false ,
93
- }
94
- }
95
-
96
- async fn connection_loop ( uri : String , connection : Arc < RwLock < Option < Connection > > > ) {
97
- loop {
98
- if Self :: connection_ok ( & connection) . await {
99
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
100
- continue ;
101
- }
102
-
103
- tracing:: info!( "Reconnecting to RabbitMQ" ) ;
104
-
105
- // Connection should be re-established
106
- let new_connection = Connection :: connect ( & uri, ConnectionProperties :: default ( ) ) . await ;
107
-
108
- match new_connection {
109
- Ok ( new_connection) => {
110
- * connection. write ( ) . await = Some ( new_connection) ;
111
- tracing:: info!( "Reconnected to RabbitMQ" ) ;
112
- }
113
- Err ( e) => {
114
- tracing:: error!( "Error while reconnecting to RabbitMQ: {:?}" , e) ;
115
- }
116
- }
117
-
118
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
119
- }
120
- }
121
-
122
85
#[ allow( dead_code) ]
123
86
pub async fn call < T > (
124
87
& self ,
@@ -131,14 +94,8 @@ impl RabbitMQClient {
131
94
where
132
95
T : Serialize ,
133
96
{
134
- // Get current connection
135
- let connection = self . connection . read ( ) . await ;
136
- if connection. is_none ( ) {
137
- return Err ( Error :: ConnectionDoesNotExist ) ;
138
- }
139
- let connection = connection. as_ref ( ) . unwrap ( ) ;
140
-
141
97
// Create a channel
98
+ let connection = self . pool . get ( ) . await . map_err ( Error :: DeadpoolLapin ) ?;
142
99
let channel = connection. create_channel ( ) . await . map_err ( Error :: Lapin ) ?;
143
100
144
101
let serialized_payload_vec = to_vec ( published_payload) . map_err ( Error :: Serialization ) ?;
@@ -172,6 +129,12 @@ impl RabbitMQClient {
172
129
. await
173
130
. map_err ( Error :: Lapin ) ?;
174
131
132
+ // Explicitly close the channel
133
+ channel
134
+ . close ( 200 , "Normal shutdown" )
135
+ . await
136
+ . map_err ( Error :: Lapin ) ?;
137
+
175
138
Ok ( ( ) )
176
139
}
177
140
@@ -186,14 +149,8 @@ impl RabbitMQClient {
186
149
where
187
150
T : Serialize ,
188
151
{
189
- // Get current connection
190
- let connection = self . connection . read ( ) . await ;
191
- if connection. is_none ( ) {
192
- return Err ( Error :: ConnectionDoesNotExist ) ;
193
- }
194
- let connection = connection. as_ref ( ) . unwrap ( ) ;
195
-
196
152
// Create a channel
153
+ let connection = self . pool . get ( ) . await . map_err ( Error :: DeadpoolLapin ) ?;
197
154
let channel = connection. create_channel ( ) . await . map_err ( Error :: Lapin ) ?;
198
155
199
156
let serialized_payload_vec = to_vec ( published_payload) . map_err ( Error :: Serialization ) ?;
@@ -247,7 +204,7 @@ impl RabbitMQClient {
247
204
. await
248
205
. map_err ( |_| Error :: ResponseTimeout ) ?;
249
206
250
- match response_delivery {
207
+ let result = match response_delivery {
251
208
Some ( Ok ( delivery) ) => {
252
209
let status = delivery
253
210
. properties
@@ -265,7 +222,15 @@ impl RabbitMQClient {
265
222
}
266
223
Some ( Err ( e) ) => Err ( e. into ( ) ) ,
267
224
None => panic ! ( "Rabbitmq consumer was cancelled unexpectedly" ) ,
268
- }
225
+ } ;
226
+
227
+ // Explicitly close the channel
228
+ channel
229
+ . close ( 200 , "Normal shutdown" )
230
+ . await
231
+ . map_err ( Error :: Lapin ) ?;
232
+
233
+ result
269
234
}
270
235
}
271
236
0 commit comments