Skip to content

Commit

Permalink
editoast: optimise unicity pathfinding and simulation
Browse files Browse the repository at this point in the history
Signed-off-by: Youness CHRIFI ALAOUI <[email protected]>
  • Loading branch information
younesschrifi authored and flomonster committed Jan 23, 2025
1 parent d23f3c7 commit 95eb29e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 53 deletions.
3 changes: 2 additions & 1 deletion editoast/src/core/pathfinding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub enum PathfindingInputError {
}

// Enum for not-found results and incompatible constraints
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema, Default)]
#[serde(tag = "error_type", rename_all = "snake_case")]
pub enum PathfindingNotFound {
NotFoundInBlocks {
Expand All @@ -148,6 +148,7 @@ pub enum PathfindingNotFound {
track_section_ranges: Vec<TrackRange>,
length: u64,
},
#[default]
NotFoundInTracks,
IncompatibleConstraints {
relaxed_constraints_path: Box<PathfindingResultSuccess>,
Expand Down
85 changes: 55 additions & 30 deletions editoast/src/views/path/pathfinding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::extract::Json;
use axum::extract::Path;
use axum::extract::State;
use axum::Extension;
use derivative::Derivative;
use editoast_authz::BuiltinRole;
use editoast_common::units;
use editoast_schemas::rolling_stock::LoadingGaugeType;
Expand Down Expand Up @@ -144,12 +145,16 @@ impl From<PathfindingCoreResult> for PathfindingResult {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema, Derivative)]
#[derivative(Default)]
#[serde(tag = "failed_status", rename_all = "snake_case")]
pub enum PathfindingFailure {
PathfindingInputError(PathfindingInputError),
#[derivative(Default)]
PathfindingNotFound(PathfindingNotFound),
InternalError { core_error: InternalError },
InternalError {
core_error: InternalError,
},
}

/// Compute a pathfinding
Expand Down Expand Up @@ -215,18 +220,37 @@ async fn pathfinding_blocks_batch(
infra: &Infra,
pathfinding_inputs: &[PathfindingInput],
) -> Result<Vec<PathfindingResult>> {
let mut hash_to_path_indexes: HashMap<String, Vec<usize>> = HashMap::default();
let mut path_request_map: HashMap<String, PathfindingInput> = HashMap::default();
let mut pathfinding_results =
vec![PathfindingResult::Failure(PathfindingFailure::default()); pathfinding_inputs.len()];
for (index, path_input) in pathfinding_inputs.iter().enumerate() {
let pathfinding_hash = path_input_hash(infra.id, &infra.version, path_input);
hash_to_path_indexes
.entry(pathfinding_hash.clone())
.or_default()
.push(index);
path_request_map
.entry(pathfinding_hash.clone())
.or_insert(path_input.clone());
}

info!(
nb_pathfindings = pathfinding_inputs.len(),
nb_unique_pathfindings = hash_to_path_indexes.len()
);

// Compute hashes of all path_inputs
let hashes: Vec<_> = pathfinding_inputs
.iter()
.map(|input| path_input_hash(infra.id, &infra.version, input))
.collect();
let hashes = hash_to_path_indexes.keys().collect::<Vec<_>>();

// Try to retrieve the result from Valkey
let mut pathfinding_results: Vec<Option<PathfindingResult>> =
let pathfinding_cached_results: Vec<Option<PathfindingResult>> =
valkey_conn.json_get_bulk(&hashes).await?;
let pathfinding_cached_results: HashMap<_, _> =
hashes.into_iter().zip(pathfinding_cached_results).collect();

// Report number of hit cache
let nb_hit = pathfinding_results.iter().flatten().count();
let nb_hit = pathfinding_cached_results.values().flatten().count();
info!(
nb_hit,
nb_miss = pathfinding_inputs.len() - nb_hit,
Expand All @@ -235,11 +259,10 @@ async fn pathfinding_blocks_batch(

// Handle miss cache:
debug!("Extracting locations from path items");
let path_items: Vec<_> = pathfinding_results
let path_items: Vec<_> = pathfinding_cached_results
.iter()
.zip(pathfinding_inputs)
.filter(|(res, _)| res.is_none())
.flat_map(|(_, input)| &input.path_items)
.filter(|(_, res)| res.is_none())
.flat_map(|(hash, _)| &path_request_map[*hash].path_items)
.collect();
let path_item_cache = PathItemCache::load(conn, infra.id, &path_items).await?;

Expand All @@ -249,24 +272,25 @@ async fn pathfinding_blocks_batch(
);
let mut to_cache = vec![];
let mut pathfinding_requests = vec![];
let mut pathfinding_requests_index = vec![];
for (index, (pathfinding_result, pathfinding_input)) in pathfinding_results
.iter_mut()
.zip(pathfinding_inputs)
.enumerate()
{
if pathfinding_result.is_some() {
let mut to_compute_hashes = vec![];
for (hash, pathfinding_result) in pathfinding_cached_results.into_iter() {
if let Some(result) = pathfinding_result {
hash_to_path_indexes[hash]
.iter()
.for_each(|index| pathfinding_results[*index] = result.clone());
continue;
}

let pathfinding_input = &path_request_map[hash];
match build_pathfinding_request(pathfinding_input, infra, &path_item_cache) {
Ok(pathfinding_request) => {
pathfinding_requests.push(pathfinding_request);
pathfinding_requests_index.push(index);
to_compute_hashes.push(hash);
}
Err(result) => {
*pathfinding_result = Some(result.clone());
to_cache.push((&hashes[index], result));
hash_to_path_indexes[hash]
.iter()
.for_each(|index| pathfinding_results[*index] = result.clone());
to_cache.push((hash, result));
}
}
}
Expand All @@ -284,25 +308,26 @@ async fn pathfinding_blocks_batch(
.into_iter()
.collect();

for (index, path_result) in computed_paths.into_iter().enumerate() {
let path_index = pathfinding_requests_index[index];
let path = match path_result {
for (path_result, hash) in computed_paths.into_iter().zip(to_compute_hashes) {
let result = match path_result {
Ok(path) => {
to_cache.push((&hashes[path_index], path.clone().into()));
to_cache.push((hash, path.clone().into()));
path.into()
}
// TODO: only make HTTP status code errors non-fatal
Err(core_error) => {
PathfindingResult::Failure(PathfindingFailure::InternalError { core_error })
}
};
pathfinding_results[path_index] = Some(path);
hash_to_path_indexes[hash]
.iter()
.for_each(|index| pathfinding_results[*index] = result.clone());
}

debug!(nb_results = to_cache.len(), "Caching pathfinding response");
debug!(nb_cached = to_cache.len(), "Caching pathfinding response");
valkey_conn.json_set_bulk(&to_cache).await?;

Ok(pathfinding_results.into_iter().flatten().collect())
Ok(pathfinding_results)
}

fn build_pathfinding_request(
Expand Down
61 changes: 39 additions & 22 deletions editoast/src/views/train_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ pub async fn consist_train_simulation_batch(
.collect();

let mut simulation_results = vec![SimulationResponse::default(); train_schedules.len()];
let mut to_sim = Vec::with_capacity(train_schedules.len());
let mut to_sim: HashMap<String, Vec<usize>> = HashMap::default();
let mut sim_request_map: HashMap<String, SimulationRequest> = HashMap::default();
for (index, (pathfinding, train_schedule)) in
pathfinding_results.iter().zip(train_schedules).enumerate()
{
Expand Down Expand Up @@ -500,49 +501,65 @@ pub async fn consist_train_simulation_batch(
// Compute unique hash of the simulation input
let simulation_hash =
train_simulation_input_hash(infra.id, &infra.version, &simulation_request);
to_sim.push((index, simulation_hash, simulation_request));
to_sim
.entry(simulation_hash.clone())
.or_default()
.push(index);
sim_request_map
.entry(simulation_hash)
.or_insert(simulation_request);
}

let cached_results: Vec<Option<SimulationResponse>> = valkey_conn
.json_get_bulk(&to_sim.iter().map(|(_, hash, _)| hash).collect::<Vec<_>>())
.await?;
info!(
nb_train_schedules = train_schedules.len(),
nb_unique_sim = to_sim.len()
);
let cached_simulation_hash = to_sim.keys().collect::<Vec<_>>();
let cached_results: Vec<Option<SimulationResponse>> =
valkey_conn.json_get_bulk(&cached_simulation_hash).await?;

let nb_hit = cached_results.iter().flatten().count();
let nb_miss = to_sim.len() - nb_hit;
info!(nb_hit, nb_miss, "Hit cache");

// Compute simulation from core
let mut futures = Vec::with_capacity(nb_miss);
let mut futures_index_hash = Vec::with_capacity(nb_miss);
for ((train_index, train_hash, sim_request), sim_cached) in to_sim.iter().zip(cached_results) {
let mut futures_hash = Vec::with_capacity(nb_miss);
for (train_hash, sim_cached) in cached_simulation_hash.iter().zip(cached_results) {
if let Some(sim_cached) = sim_cached {
simulation_results[*train_index] = sim_cached;
let train_indexes = &to_sim[*train_hash];
for train_index in train_indexes {
simulation_results[*train_index] = sim_cached.clone();
}
continue;
}
let sim_request = &sim_request_map[*train_hash];
futures.push(Box::pin(sim_request.fetch(core.as_ref())));
futures_index_hash.push((*train_index, train_hash));
futures_hash.push(train_hash);
}

let simulated: Vec<_> = futures::future::join_all(futures)
.await
.into_iter()
.collect();

let mut is_cacheable = vec![false; train_schedules.len()];
for (&(train_index, _), sim_res) in futures_index_hash.iter().zip(simulated) {
(simulation_results[train_index], is_cacheable[train_index]) = match sim_res {
Ok(sim) => (sim, true),
// TODO: only make HTTP status code errors non-fatal
Err(core_error) => (SimulationResponse::SimulationFailed { core_error }, false),
let mut to_cache = vec![];
for (train_hash, sim_res) in futures_hash.iter().zip(simulated) {
let train_indexes = &to_sim[**train_hash];
match sim_res {
Ok(sim_res) => {
to_cache.push((train_hash, sim_res.clone()));
train_indexes
.iter()
.for_each(|index| simulation_results[*index] = sim_res.clone())
}
Err(core_error) => train_indexes.iter().for_each(|index| {
simulation_results[*index] = SimulationResponse::SimulationFailed {
core_error: core_error.clone(),
}
}),
}
}

let to_cache: Vec<_> = futures_index_hash
.into_iter()
.filter(|&(train_index, _)| is_cacheable[train_index])
.map(|(train_index, train_hash)| (train_hash, &simulation_results[train_index]))
.collect();

// Cache the simulation response
valkey_conn.json_set_bulk(&to_cache).await?;

Expand Down

0 comments on commit 95eb29e

Please sign in to comment.