1
+ use deadpool_lapin:: { Config , CreatePoolError , Pool , PoolError , Runtime } ;
1
2
use editoast_derive:: EditoastError ;
2
3
use futures_util:: StreamExt ;
3
4
use itertools:: Itertools ;
4
5
use lapin:: {
5
6
options:: { BasicConsumeOptions , BasicPublishOptions } ,
6
7
types:: { ByteArray , FieldTable , ShortString } ,
7
- BasicProperties , Connection , ConnectionProperties ,
8
+ BasicProperties ,
8
9
} ;
9
10
use serde:: Serialize ;
10
11
use serde_json:: to_vec;
11
- use std:: { fmt:: Debug , sync :: Arc } ;
12
+ use std:: fmt:: Debug ;
12
13
use thiserror:: Error ;
13
- use tokio:: {
14
- sync:: RwLock ,
15
- time:: { timeout, Duration } ,
16
- } ;
14
+ use tokio:: time:: { timeout, Duration } ;
17
15
18
16
#[ derive( Debug , Clone ) ]
19
17
pub struct RabbitMQClient {
20
- connection : Arc < RwLock < Option < Connection > > > ,
18
+ pub pool : Pool ,
21
19
exchange : String ,
22
20
timeout : u64 ,
23
21
hostname : String ,
@@ -51,6 +49,12 @@ pub enum Error {
51
49
#[ error( "Connection does not exist" ) ]
52
50
#[ editoast_error( status = "500" ) ]
53
51
ConnectionDoesNotExist ,
52
+ #[ error( "Cannot create the pool" ) ]
53
+ #[ editoast_error( status = "500" ) ]
54
+ CreatePoolLapin ( CreatePoolError ) ,
55
+ #[ error( "Cannot acquire connection from pool" ) ]
56
+ #[ editoast_error( status = "500" ) ]
57
+ DeadpoolLapin ( PoolError ) ,
54
58
}
55
59
56
60
pub struct MQResponse {
@@ -64,61 +68,22 @@ impl RabbitMQClient {
64
68
. map ( |name| name. to_string_lossy ( ) . into_owned ( ) )
65
69
. unwrap_or_else ( |_| "unknown" . to_string ( ) ) ;
66
70
67
- let conn = Arc :: new ( RwLock :: new ( None ) ) ;
68
-
69
- tokio:: spawn ( Self :: connection_loop ( options. uri , conn. clone ( ) ) ) ;
71
+ let cfg = Config {
72
+ url : Some ( options. uri ) ,
73
+ ..Default :: default ( )
74
+ } ;
75
+ let pool = cfg
76
+ . create_pool ( Some ( Runtime :: Tokio1 ) )
77
+ . map_err ( Error :: CreatePoolLapin ) ?;
70
78
71
79
Ok ( RabbitMQClient {
72
- connection : conn ,
80
+ pool ,
73
81
exchange : format ! ( "{}-req-xchg" , options. worker_pool_identifier) ,
74
82
timeout : options. timeout ,
75
83
hostname,
76
84
} )
77
85
}
78
86
79
- async fn connection_ok ( connection : & Arc < RwLock < Option < Connection > > > ) -> bool {
80
- let guard = connection. as_ref ( ) . read ( ) . await ;
81
- let conn = guard. as_ref ( ) ;
82
- let status = match conn {
83
- None => return false ,
84
- Some ( conn) => conn. status ( ) . state ( ) ,
85
- } ;
86
- match status {
87
- lapin:: ConnectionState :: Initial => true ,
88
- lapin:: ConnectionState :: Connecting => true ,
89
- lapin:: ConnectionState :: Connected => true ,
90
- lapin:: ConnectionState :: Closing => true ,
91
- lapin:: ConnectionState :: Closed => false ,
92
- lapin:: ConnectionState :: Error => false ,
93
- }
94
- }
95
-
96
- async fn connection_loop ( uri : String , connection : Arc < RwLock < Option < Connection > > > ) {
97
- loop {
98
- if Self :: connection_ok ( & connection) . await {
99
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
100
- continue ;
101
- }
102
-
103
- tracing:: info!( "Reconnecting to RabbitMQ" ) ;
104
-
105
- // Connection should be re-established
106
- let new_connection = Connection :: connect ( & uri, ConnectionProperties :: default ( ) ) . await ;
107
-
108
- match new_connection {
109
- Ok ( new_connection) => {
110
- * connection. write ( ) . await = Some ( new_connection) ;
111
- tracing:: info!( "Reconnected to RabbitMQ" ) ;
112
- }
113
- Err ( e) => {
114
- tracing:: error!( "Error while reconnecting to RabbitMQ: {:?}" , e) ;
115
- }
116
- }
117
-
118
- tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ;
119
- }
120
- }
121
-
122
87
#[ allow( dead_code) ]
123
88
pub async fn call < T > (
124
89
& self ,
@@ -131,14 +96,8 @@ impl RabbitMQClient {
131
96
where
132
97
T : Serialize ,
133
98
{
134
- // Get current connection
135
- let connection = self . connection . read ( ) . await ;
136
- if connection. is_none ( ) {
137
- return Err ( Error :: ConnectionDoesNotExist ) ;
138
- }
139
- let connection = connection. as_ref ( ) . unwrap ( ) ;
140
-
141
99
// Create a channel
100
+ let connection = self . pool . get ( ) . await . map_err ( Error :: DeadpoolLapin ) ?;
142
101
let channel = connection. create_channel ( ) . await . map_err ( Error :: Lapin ) ?;
143
102
144
103
let serialized_payload_vec = to_vec ( published_payload) . map_err ( Error :: Serialization ) ?;
@@ -172,6 +131,12 @@ impl RabbitMQClient {
172
131
. await
173
132
. map_err ( Error :: Lapin ) ?;
174
133
134
+ // Explicitly close the channel
135
+ channel
136
+ . close ( 200 , "Normal shutdown" )
137
+ . await
138
+ . map_err ( Error :: Lapin ) ?;
139
+
175
140
Ok ( ( ) )
176
141
}
177
142
@@ -186,14 +151,8 @@ impl RabbitMQClient {
186
151
where
187
152
T : Serialize ,
188
153
{
189
- // Get current connection
190
- let connection = self . connection . read ( ) . await ;
191
- if connection. is_none ( ) {
192
- return Err ( Error :: ConnectionDoesNotExist ) ;
193
- }
194
- let connection = connection. as_ref ( ) . unwrap ( ) ;
195
-
196
154
// Create a channel
155
+ let connection = self . pool . get ( ) . await . map_err ( Error :: DeadpoolLapin ) ?;
197
156
let channel = connection. create_channel ( ) . await . map_err ( Error :: Lapin ) ?;
198
157
199
158
let serialized_payload_vec = to_vec ( published_payload) . map_err ( Error :: Serialization ) ?;
@@ -244,10 +203,20 @@ impl RabbitMQClient {
244
203
Duration :: from_secs ( override_timeout. unwrap_or ( self . timeout ) ) ,
245
204
consumer. next ( ) ,
246
205
)
247
- . await
248
- . map_err ( |_| Error :: ResponseTimeout ) ?;
206
+ . await ;
249
207
250
- match response_delivery {
208
+ if response_delivery. is_err ( ) {
209
+ channel
210
+ . close ( 200 , "Normal shutdown" )
211
+ . await
212
+ . map_err ( Error :: Lapin ) ?;
213
+
214
+ return Err ( Error :: ResponseTimeout ) ;
215
+ }
216
+
217
+ let response_delivery = response_delivery. unwrap ( ) ;
218
+
219
+ let result = match response_delivery {
251
220
Some ( Ok ( delivery) ) => {
252
221
let status = delivery
253
222
. properties
@@ -265,7 +234,15 @@ impl RabbitMQClient {
265
234
}
266
235
Some ( Err ( e) ) => Err ( e. into ( ) ) ,
267
236
None => panic ! ( "Rabbitmq consumer was cancelled unexpectedly" ) ,
268
- }
237
+ } ;
238
+
239
+ // Explicitly close the channel
240
+ channel
241
+ . close ( 200 , "Normal shutdown" )
242
+ . await
243
+ . map_err ( Error :: Lapin ) ?;
244
+
245
+ result
269
246
}
270
247
}
271
248
0 commit comments