@@ -9,13 +9,15 @@ use bollard::{
9
9
Docker ,
10
10
} ;
11
11
use serde:: { Deserialize , Serialize } ;
12
+ use sha2:: { Digest , Sha256 } ;
12
13
use uuid:: Uuid ;
13
14
14
15
use crate :: Key ;
15
16
16
17
use super :: {
17
18
worker_driver:: { DriverError , WorkerDriver , WorkerMetadata } ,
18
- LABEL_MANAGED_BY , LABEL_WORKER_ID , LABEL_WORKER_KEY , MANAGED_BY_VALUE ,
19
+ LABEL_MANAGED_BY , LABEL_VERSION_IDENTIFIER , LABEL_WORKER_ID , LABEL_WORKER_KEY ,
20
+ MANAGED_BY_VALUE ,
19
21
} ;
20
22
21
23
#[ derive( Serialize , Deserialize , Clone , Debug ) ]
@@ -39,6 +41,7 @@ pub struct DockerDriver {
39
41
options : DockerDriverOptions ,
40
42
amqp_uri : String ,
41
43
worker_pool : String ,
44
+ version_identifier : String ,
42
45
}
43
46
44
47
impl DockerDriver {
@@ -47,11 +50,17 @@ impl DockerDriver {
47
50
amqp_uri : String ,
48
51
worker_pool : String ,
49
52
) -> DockerDriver {
53
+ let version_identifier = std:: env:: var ( "OSRD_GIT_DESCRIBE" )
54
+ . unwrap_or_else ( |_| format ! ( "run-{}" , Uuid :: new_v4( ) ) ) ;
55
+
56
+ let hashed = format ! ( "{:x}" , Sha256 :: digest( version_identifier) ) [ ..16 ] . to_string ( ) ;
57
+
50
58
DockerDriver {
51
59
client : Docker :: connect_with_socket_defaults ( ) . expect ( "Failed to connect to Docker" ) ,
52
60
options,
53
61
amqp_uri,
54
62
worker_pool,
63
+ version_identifier : hashed,
55
64
}
56
65
}
57
66
}
@@ -63,14 +72,35 @@ impl WorkerDriver for DockerDriver {
63
72
worker_key : Key ,
64
73
) -> Pin < Box < dyn Future < Output = Result < Uuid , DriverError > > + Send + ' _ > > {
65
74
Box :: pin ( async move {
75
+ let mut current_worker_id = None ;
66
76
let current_workers = self . list_worker_groups ( ) . await ?;
77
+
67
78
for worker in current_workers {
68
- if worker. worker_key == worker_key {
79
+ let worker_version = worker
80
+ . metadata
81
+ . get ( LABEL_VERSION_IDENTIFIER )
82
+ . expect ( "version_identifier not found" )
83
+ . to_owned ( ) ;
84
+
85
+ if worker. worker_key == worker_key && worker_version == self . version_identifier {
69
86
return Ok ( worker. worker_id ) ;
87
+ } else if worker. worker_key == worker_key {
88
+ current_worker_id = Some ( worker. worker_id ) ;
89
+
90
+ self . client
91
+ . remove_container (
92
+ & worker. external_id ,
93
+ Some ( RemoveContainerOptions {
94
+ force : true ,
95
+ ..Default :: default ( )
96
+ } ) ,
97
+ )
98
+ . await
99
+ . map_err ( DriverError :: DockerError ) ?;
70
100
}
71
101
}
72
102
73
- let new_id = Uuid :: new_v4 ( ) ;
103
+ let new_id = current_worker_id . unwrap_or_else ( Uuid :: new_v4) ;
74
104
75
105
let final_env = {
76
106
let mut env: Vec < String > = self . options . default_env . clone ( ) ;
@@ -84,6 +114,10 @@ impl WorkerDriver for DockerDriver {
84
114
( LABEL_MANAGED_BY . to_owned ( ) , MANAGED_BY_VALUE . to_owned ( ) ) ,
85
115
( LABEL_WORKER_ID . to_owned ( ) , new_id. to_string ( ) ) ,
86
116
( LABEL_WORKER_KEY . to_owned ( ) , worker_key. to_string ( ) ) ,
117
+ (
118
+ LABEL_VERSION_IDENTIFIER . to_owned ( ) ,
119
+ self . version_identifier . clone ( ) ,
120
+ ) ,
87
121
] ) ;
88
122
89
123
let container_name = format ! (
@@ -183,6 +217,15 @@ impl WorkerDriver for DockerDriver {
183
217
. filter_map ( |container| {
184
218
container. labels . as_ref ( ) . and_then ( |labels| {
185
219
if labels. get ( LABEL_MANAGED_BY ) == Some ( & MANAGED_BY_VALUE . to_string ( ) ) {
220
+ let mut metadata = HashMap :: new ( ) ;
221
+ metadata. insert (
222
+ LABEL_VERSION_IDENTIFIER . to_owned ( ) ,
223
+ labels
224
+ . get ( LABEL_VERSION_IDENTIFIER )
225
+ . expect ( "version_identifier label missing" )
226
+ . clone ( ) ,
227
+ ) ;
228
+
186
229
Some ( WorkerMetadata {
187
230
external_id : container. id . clone ( ) . expect ( "container id missing" ) ,
188
231
worker_id : Uuid :: parse_str (
@@ -196,6 +239,7 @@ impl WorkerDriver for DockerDriver {
196
239
. get ( LABEL_WORKER_KEY )
197
240
. expect ( "worker_key label missing" ) ,
198
241
) ,
242
+ metadata,
199
243
} )
200
244
} else {
201
245
None
@@ -207,4 +251,10 @@ impl WorkerDriver for DockerDriver {
207
251
Ok ( workers)
208
252
} )
209
253
}
254
+
255
+ fn cleanup_stalled (
256
+ & mut self ,
257
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , DriverError > > + Send + ' _ > > {
258
+ Box :: pin ( async move { Ok ( ( ) ) } )
259
+ }
210
260
}
0 commit comments