Skip to content

Commit cb4e0bc

Browse files
committed
core: stop worker when unable to process requests after infra load
The worker is able to process requests if infra load: * ... went fine, happy path (just process request) * ... failed with a perennial error (expected behavior is reject requests on the same version of the same infra) If infra load failed with a temporary error (and after possible retries), better just stop and let orchestrator decide to retry or not. No retry implemented so far. Signed-off-by: Pierre-Etienne Bougué <[email protected]>
1 parent 579f6e6 commit cb4e0bc

File tree

6 files changed

+57
-22
lines changed

6 files changed

+57
-22
lines changed

core/osrd-reporting/src/main/java/fr/sncf/osrd/reporting/exceptions/ErrorType.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ public enum ErrorType {
2626
StrictWarningError("strict_warning", "Warning was reported with strict mode enabled", ErrorCause.USER),
2727
AssertionError("assert_error", "assert check failed", ErrorCause.INTERNAL),
2828
UnknownError("unknown_error", "An unknown exception was thrown", ErrorCause.INTERNAL),
29-
InfraSoftLoadingError("infra_loading:soft_error", "soft error while loading new infra", ErrorCause.USER),
29+
InfraSoftLoadingError("infra_loading:soft_error", "soft error while loading new infra", ErrorCause.USER, false),
3030
InfraHardLoadingError("infra_loading:hard_error", "hard error while loading new infra", ErrorCause.USER),
3131
InfraHardError("infra:hard_error", "hard error while parsing infra", ErrorCause.USER),
3232
InfraLoadingCacheException("infra_loading:cache_exception", "cached exception", ErrorCause.INTERNAL),
33-
InfraLoadingInvalidStatusException("infra_loading:invalid_status", "Status doesn’t exist", ErrorCause.INTERNAL),
33+
InfraLoadingInvalidStatusException(
34+
"infra_loading:invalid_status", "infra not loaded correctly", ErrorCause.INTERNAL),
3435
InfraInvalidStatusWhileWaitingStable(
3536
"infra_loading:invalid_status_waiting_stable", "invalid status after waitUntilStable", ErrorCause.INTERNAL),
3637
InfraInvalidVersionException("infra:invalid_version", "Invalid infra version", ErrorCause.USER),

core/src/main/java/fr/sncf/osrd/api/ExceptionHandler.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fr.sncf.osrd.api;
22

33
import fr.sncf.osrd.reporting.exceptions.ErrorCause;
4+
import fr.sncf.osrd.reporting.exceptions.ErrorType;
45
import fr.sncf.osrd.reporting.exceptions.OSRDError;
56
import org.takes.Response;
67
import org.takes.rs.RsJson;
@@ -9,11 +10,16 @@
910

1011
public class ExceptionHandler {
1112

12-
/** Handles an exception, returns an HTTP response with all relevant information */
13-
public static Response handle(Throwable ex) {
13+
/** Handles an exception, returns an HTTP response with all relevant information or
14+
* re-throw if exception is an OSRDError of type InfraSoftLoadingError */
15+
public static Response handle(Throwable ex) throws OSRDError {
1416
ex.printStackTrace();
15-
if (ex instanceof OSRDError) return toResponse((OSRDError) ex);
16-
else if (ex instanceof AssertionError) return toResponse(OSRDError.newAssertionWrapper((AssertionError) ex));
17+
if (ex instanceof OSRDError osrdError) {
18+
if (osrdError.osrdErrorType == ErrorType.InfraSoftLoadingError) {
19+
throw osrdError;
20+
}
21+
return toResponse(osrdError);
22+
} else if (ex instanceof AssertionError) return toResponse(OSRDError.newAssertionWrapper((AssertionError) ex));
1723
else {
1824
return toResponse(OSRDError.newUnknownError(ex));
1925
}

core/src/main/java/fr/sncf/osrd/api/InfraManager.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,22 @@ private FullInfra downloadInfra(InfraCacheEntry cacheEntry, String infraId, Diag
111111
RJSInfra rjsInfra;
112112
String version;
113113
try (var response = httpClient.newCall(request).execute()) {
114-
if (!response.isSuccessful()) throw new UnexpectedHttpResponse(response);
114+
if (!response.isSuccessful()) {
115+
if (response.code() != 404) {
116+
throw new UnexpectedHttpResponse(response);
117+
} else {
118+
logger.info("Infra not found (deleted) on supplier middleware");
119+
throw OSRDError.newInfraLoadingError(
120+
ErrorType.InfraHardLoadingError, "Infra not found (deleted) on supplier middleware");
121+
}
122+
}
115123

116124
// Parse the response
117125
logger.info("parsing the JSON of {}", request.url());
118126
cacheEntry.transitionTo(InfraStatus.PARSING_JSON);
119127
version = response.header("x-infra-version");
120128
assert version != null : "missing x-infra-version header in railjson response";
129+
cacheEntry.version = version;
121130
rjsInfra = RJSInfra.adapter.fromJson(response.body().source());
122131
}
123132

@@ -137,11 +146,11 @@ private FullInfra downloadInfra(InfraCacheEntry cacheEntry, String infraId, Diag
137146
// Cache the infra
138147
logger.info("successfully cached {}", request.url());
139148
cacheEntry.infra = new FullInfra(rawInfra, loadedSignalInfra, blockInfra, signalingSimulator);
140-
cacheEntry.version = version;
141149
cacheEntry.transitionTo(InfraStatus.CACHED);
142150
return cacheEntry.infra;
143151
} catch (IOException | UnexpectedHttpResponse | VirtualMachineError e) {
144152
cacheEntry.transitionTo(InfraStatus.TRANSIENT_ERROR, e);
153+
// TODO: retry with an exponential backoff and jitter (use a concurrent Thread.sleep)
145154
throw OSRDError.newInfraLoadingError(ErrorType.InfraSoftLoadingError, cacheEntry.lastStatus.name(), e);
146155
} catch (Throwable e) {
147156
cacheEntry.transitionTo(InfraStatus.ERROR, e);

core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt

+26-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import fr.sncf.osrd.api.api_v2.standalone_sim.SimulationEndpoint
1212
import fr.sncf.osrd.api.api_v2.stdcm.STDCMEndpointV2
1313
import fr.sncf.osrd.api.pathfinding.PathfindingBlocksEndpoint
1414
import fr.sncf.osrd.api.stdcm.STDCMEndpoint
15+
import fr.sncf.osrd.reporting.exceptions.ErrorType
16+
import fr.sncf.osrd.reporting.exceptions.OSRDError
1517
import fr.sncf.osrd.reporting.warnings.DiagnosticRecorderImpl
1618
import io.opentelemetry.api.GlobalOpenTelemetry
1719
import io.opentelemetry.context.Context
@@ -134,7 +136,26 @@ class WorkerCommand : CliCommand {
134136
connection.createChannel().use { channel -> reportActivity(channel, "started") }
135137

136138
if (!ALL_INFRA) {
137-
infraManager.load(infraId, null, diagnosticRecorder)
139+
try {
140+
infraManager.load(infraId, null, diagnosticRecorder)
141+
} catch (e: OSRDError) {
142+
if (e.osrdErrorType == ErrorType.InfraHardLoadingError) {
143+
logger.warn("Failed to load infra $infraId with a perennial error: $e")
144+
// go on and future requests will be consumed and rejected
145+
} else if (e.osrdErrorType == ErrorType.InfraSoftLoadingError) {
146+
logger.error("Failed to load infra $infraId with a temporary error: $e")
147+
// Stop worker and let another worker spawn eventually
148+
throw e
149+
} else {
150+
logger.error(
151+
"Failed to load infra $infraId with an unexpected OSRD Error: $e"
152+
)
153+
throw e
154+
}
155+
} catch (t: Throwable) {
156+
logger.error("Failed to load infra $infraId with an unexpected exception: $t")
157+
throw t
158+
}
138159
}
139160

140161
connection.createChannel().use { channel -> reportActivity(channel, "ready") }
@@ -233,6 +254,10 @@ class WorkerCommand : CliCommand {
233254
"ERROR, exception received"
234255
.toByteArray() // TODO: have a valid payload for uncaught exceptions
235256
status = "core_error".encodeToByteArray()
257+
// Stop worker and let another worker spawn eventually
258+
if (t is OSRDError && t.osrdErrorType == ErrorType.InfraSoftLoadingError) {
259+
throw t
260+
}
236261
} finally {
237262
span.end()
238263
}

core/src/main/kotlin/fr/sncf/osrd/api/api_v2/pathfinding/PathfindingBlocksEndpointV2.kt

+4-7
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,12 @@ class PathfindingBlocksEndpointV2(private val infraManager: InfraManager) : Take
6161
} catch (error: NoPathFoundException) {
6262
pathfindingLogger.info("No path found")
6363
RsJson(RsWithBody(pathfindingResponseAdapter.toJson(error.response)))
64-
} catch (error: OSRDError) {
65-
if (!error.osrdErrorType.isCacheable) {
66-
ExceptionHandler.handle(error)
67-
} else {
68-
pathfindingLogger.info("Pathfinding failed: ${error.message}")
69-
val response = PathfindingFailed(error)
64+
} catch (ex: Throwable) {
65+
if (ex is OSRDError && ex.osrdErrorType.isCacheable) {
66+
pathfindingLogger.info("Pathfinding failed: ${ex.message}")
67+
val response = PathfindingFailed(ex)
7068
RsJson(RsWithBody(pathfindingResponseAdapter.toJson(response)))
7169
}
72-
} catch (ex: Throwable) {
7370
ExceptionHandler.handle(ex)
7471
}
7572
}

core/src/main/kotlin/fr/sncf/osrd/api/api_v2/standalone_sim/SimulationEndpoint.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,11 @@ class SimulationEndpoint(
7272
request.path.pathItemPositions,
7373
)
7474
return RsJson(RsWithBody(simulationResponseAdapter.toJson(res)))
75-
} catch (error: OSRDError) {
76-
if (!error.osrdErrorType.isCacheable) {
77-
return ExceptionHandler.handle(error)
78-
} else {
79-
val response = SimulationFailed(error)
75+
} catch (ex: Throwable) {
76+
if (ex is OSRDError && ex.osrdErrorType.isCacheable) {
77+
val response = SimulationFailed(ex)
8078
return RsJson(RsWithBody(simulationResponseAdapter.toJson(response)))
8179
}
82-
} catch (ex: Throwable) {
8380
return ExceptionHandler.handle(ex)
8481
}
8582
}

0 commit comments

Comments
 (0)