@@ -3,59 +3,68 @@ package main
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
- "strconv"
7
6
"fmt"
8
7
"github.com/KittenConnect/rh-api/model"
9
8
"github.com/KittenConnect/rh-api/util"
10
9
"github.com/joho/godotenv"
11
10
amqp "github.com/rabbitmq/amqp091-go"
12
11
"os"
12
+ "strconv"
13
13
"time"
14
14
)
15
15
16
- func failOnError (err error , msg string ) {
16
+ func failWithError (err error , formatString string , args ... any ) {
17
17
if err != nil {
18
- util .Err (fmt .Errorf ("%s: %w" , msg , err ).Error ())
18
+ util .Err (fmt .Errorf (fmt . Sprintf ( "%s: %w" ,formatString ), append ( args , err ) ... ).Error ())
19
19
}
20
20
}
21
21
22
22
var RETRY_DELAY = 5
23
23
24
-
25
24
func main () {
26
25
err := godotenv .Load ()
27
- failOnError (err , fmt . Sprintf ( "Error loading .env file : %s" , err ) )
26
+ failWithError (err , "Error loading .env file" )
28
27
29
28
conn , err := amqp .Dial (os .Getenv ("RABBITMQ_URL" ))
30
- failOnError (err , fmt . Sprintf ( "Failed to connect to broker : %s" , err ) )
29
+ failWithError (err , "Failed to connect to broker" )
31
30
32
31
defer conn .Close ()
33
32
34
33
ch , err := conn .Channel ()
35
- failOnError (err , fmt . Sprintf ( "Failed to open a channel : %s" , err ) )
34
+ failWithError (err , "Failed to open a channel" )
36
35
37
36
incomingQueue := os .Getenv ("RABBITMQ_INCOMING_QUEUE" )
38
- outcomingQueue := os .Getenv ("RABBITMQ_OUTGOING_QUEUE" )
37
+ outgoingQueue := os .Getenv ("RABBITMQ_OUTGOING_QUEUE" )
39
38
40
39
if value , ok := os .LookupEnv ("RABBITMQ_RETRY_DELAY" ); ok {
41
40
if i , err := strconv .Atoi (value ); err == nil {
42
41
RETRY_DELAY = i
43
42
}
44
- }
43
+ }
45
44
46
- q , err := ch .QueueDeclare (
45
+ inQ , err := ch .QueueDeclare (
47
46
incomingQueue ,
48
47
true ,
49
48
false ,
50
49
false ,
51
50
false ,
52
51
nil ,
53
52
)
54
- failOnError (err , fmt . Sprintf ( "Failed to declare a queue : %s" , err ) )
53
+ failWithError (err , "Failed to declare queue %s" , incomingQueue )
55
54
56
- exchangeArgs := map [string ]interface {}{
55
+ outQ , err := ch .QueueDeclare (
56
+ outgoingQueue ,
57
+ true ,
58
+ false ,
59
+ false ,
60
+ false ,
61
+ nil ,
62
+ )
63
+ failWithError (err , "Failed to declare queue %s" , outgoingQueue )
64
+
65
+ exchangeArgs := map [string ]interface {}{
57
66
"x-delayed-type" : "direct" ,
58
- }
67
+ }
59
68
60
69
err = ch .ExchangeDeclare (
61
70
incomingQueue ,
@@ -66,32 +75,32 @@ func main() {
66
75
false ,
67
76
exchangeArgs ,
68
77
)
69
- failOnError (err , fmt . Sprintf ( "Failed to declare an exchange : %s" , err ) )
78
+ failWithError (err , "Failed to declare exchange %s" , incomingQueue )
70
79
71
- err = ch .QueueBind (
72
- incomingQueue , // queue name
73
- incomingQueue , // routing key
74
- incomingQueue , // exchange
75
- false ,
76
- nil )
77
- failOnError (err , fmt . Sprintf ( "Failed to bind queue: %s" , err ) )
80
+ err = ch .QueueBind (
81
+ incomingQueue , // queue name
82
+ incomingQueue , // routing key
83
+ incomingQueue , // exchange
84
+ false ,
85
+ nil )
86
+ failWithError (err , "Failed to bind queue %s to exchange %s " , incomingQueue , incomingQueue )
78
87
79
88
// Consommation des messages
80
89
msgs , err := ch .Consume (
81
- q .Name , // nom de la queue
90
+ inQ .Name , // nom de la queue
82
91
"consumer" , // consumer
83
92
true , // autoAck
84
93
false , // exclusive
85
94
false , // noLocal
86
95
false , // noWait
87
96
nil , // arguments
88
97
)
89
- failOnError (err , "Failed to register a consumer" )
98
+ failWithError (err , "Failed to register %s consumer" , inQ . Name )
90
99
util .Info ("Connected to message broker" )
91
100
92
101
netbox := model .NewNetbox ()
93
102
err = netbox .Connect ()
94
- failOnError (err , "Failed to connect to netbox" )
103
+ failWithError (err , "Failed to connect to netbox" )
95
104
96
105
if netbox .IsConnected () == false {
97
106
util .Err ("Unable to connect to netbox" )
@@ -136,7 +145,7 @@ func main() {
136
145
chErr := ch .PublishWithContext (
137
146
ctx ,
138
147
incomingQueue ,
139
- q .Name ,
148
+ inQ .Name ,
140
149
false ,
141
150
false ,
142
151
amqp.Publishing {
@@ -167,7 +176,7 @@ func main() {
167
176
chErr := ch .PublishWithContext (
168
177
ctx ,
169
178
"" ,
170
- outcomingQueue ,
179
+ outQ . Name ,
171
180
false ,
172
181
false ,
173
182
amqp.Publishing {
0 commit comments