From 95eb29e1ca5a9d899966ca73e59b8597d53a087e Mon Sep 17 00:00:00 2001 From: Youness CHRIFI ALAOUI Date: Thu, 23 Jan 2025 11:14:17 +0100 Subject: [PATCH] editoast: optimise unicity pathfinding and simulation Signed-off-by: Youness CHRIFI ALAOUI --- editoast/src/core/pathfinding.rs | 3 +- editoast/src/views/path/pathfinding.rs | 85 +++++++++++++++++--------- editoast/src/views/train_schedule.rs | 61 +++++++++++------- 3 files changed, 96 insertions(+), 53 deletions(-) diff --git a/editoast/src/core/pathfinding.rs b/editoast/src/core/pathfinding.rs index 8570eb93bd4..c69bc7e9a80 100644 --- a/editoast/src/core/pathfinding.rs +++ b/editoast/src/core/pathfinding.rs @@ -137,7 +137,7 @@ pub enum PathfindingInputError { } // Enum for not-found results and incompatible constraints -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema, Default)] #[serde(tag = "error_type", rename_all = "snake_case")] pub enum PathfindingNotFound { NotFoundInBlocks { @@ -148,6 +148,7 @@ pub enum PathfindingNotFound { track_section_ranges: Vec, length: u64, }, + #[default] NotFoundInTracks, IncompatibleConstraints { relaxed_constraints_path: Box, diff --git a/editoast/src/views/path/pathfinding.rs b/editoast/src/views/path/pathfinding.rs index a050140da3e..b4573606f9e 100644 --- a/editoast/src/views/path/pathfinding.rs +++ b/editoast/src/views/path/pathfinding.rs @@ -8,6 +8,7 @@ use axum::extract::Json; use axum::extract::Path; use axum::extract::State; use axum::Extension; +use derivative::Derivative; use editoast_authz::BuiltinRole; use editoast_common::units; use editoast_schemas::rolling_stock::LoadingGaugeType; @@ -144,12 +145,16 @@ impl From for PathfindingResult { } } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, ToSchema, Derivative)] +#[derivative(Default)] #[serde(tag = "failed_status", rename_all = "snake_case")] pub enum PathfindingFailure { PathfindingInputError(PathfindingInputError), + #[derivative(Default)] PathfindingNotFound(PathfindingNotFound), - InternalError { core_error: InternalError }, + InternalError { + core_error: InternalError, + }, } /// Compute a pathfinding @@ -215,18 +220,37 @@ async fn pathfinding_blocks_batch( infra: &Infra, pathfinding_inputs: &[PathfindingInput], ) -> Result> { + let mut hash_to_path_indexes: HashMap> = HashMap::default(); + let mut path_request_map: HashMap = HashMap::default(); + let mut pathfinding_results = + vec![PathfindingResult::Failure(PathfindingFailure::default()); pathfinding_inputs.len()]; + for (index, path_input) in pathfinding_inputs.iter().enumerate() { + let pathfinding_hash = path_input_hash(infra.id, &infra.version, path_input); + hash_to_path_indexes + .entry(pathfinding_hash.clone()) + .or_default() + .push(index); + path_request_map + .entry(pathfinding_hash.clone()) + .or_insert(path_input.clone()); + } + + info!( + nb_pathfindings = pathfinding_inputs.len(), + nb_unique_pathfindings = hash_to_path_indexes.len() + ); + // Compute hashes of all path_inputs - let hashes: Vec<_> = pathfinding_inputs - .iter() - .map(|input| path_input_hash(infra.id, &infra.version, input)) - .collect(); + let hashes = hash_to_path_indexes.keys().collect::>(); // Try to retrieve the result from Valkey - let mut pathfinding_results: Vec> = + let pathfinding_cached_results: Vec> = valkey_conn.json_get_bulk(&hashes).await?; + let pathfinding_cached_results: HashMap<_, _> = + hashes.into_iter().zip(pathfinding_cached_results).collect(); // Report number of hit cache - let nb_hit = pathfinding_results.iter().flatten().count(); + let nb_hit = pathfinding_cached_results.values().flatten().count(); info!( nb_hit, nb_miss = pathfinding_inputs.len() - nb_hit, @@ -235,11 +259,10 @@ async fn pathfinding_blocks_batch( // Handle miss cache: debug!("Extracting locations from path items"); - let path_items: Vec<_> = pathfinding_results + let path_items: Vec<_> = pathfinding_cached_results .iter() - .zip(pathfinding_inputs) - .filter(|(res, _)| res.is_none()) - .flat_map(|(_, input)| &input.path_items) + .filter(|(_, res)| res.is_none()) + .flat_map(|(hash, _)| &path_request_map[*hash].path_items) .collect(); let path_item_cache = PathItemCache::load(conn, infra.id, &path_items).await?; @@ -249,24 +272,25 @@ async fn pathfinding_blocks_batch( ); let mut to_cache = vec![]; let mut pathfinding_requests = vec![]; - let mut pathfinding_requests_index = vec![]; - for (index, (pathfinding_result, pathfinding_input)) in pathfinding_results - .iter_mut() - .zip(pathfinding_inputs) - .enumerate() - { - if pathfinding_result.is_some() { + let mut to_compute_hashes = vec![]; + for (hash, pathfinding_result) in pathfinding_cached_results.into_iter() { + if let Some(result) = pathfinding_result { + hash_to_path_indexes[hash] + .iter() + .for_each(|index| pathfinding_results[*index] = result.clone()); continue; } - + let pathfinding_input = &path_request_map[hash]; match build_pathfinding_request(pathfinding_input, infra, &path_item_cache) { Ok(pathfinding_request) => { pathfinding_requests.push(pathfinding_request); - pathfinding_requests_index.push(index); + to_compute_hashes.push(hash); } Err(result) => { - *pathfinding_result = Some(result.clone()); - to_cache.push((&hashes[index], result)); + hash_to_path_indexes[hash] + .iter() + .for_each(|index| pathfinding_results[*index] = result.clone()); + to_cache.push((hash, result)); } } } @@ -284,11 +308,10 @@ async fn pathfinding_blocks_batch( .into_iter() .collect(); - for (index, path_result) in computed_paths.into_iter().enumerate() { - let path_index = pathfinding_requests_index[index]; - let path = match path_result { + for (path_result, hash) in computed_paths.into_iter().zip(to_compute_hashes) { + let result = match path_result { Ok(path) => { - to_cache.push((&hashes[path_index], path.clone().into())); + to_cache.push((hash, path.clone().into())); path.into() } // TODO: only make HTTP status code errors non-fatal @@ -296,13 +319,15 @@ async fn pathfinding_blocks_batch( PathfindingResult::Failure(PathfindingFailure::InternalError { core_error }) } }; - pathfinding_results[path_index] = Some(path); + hash_to_path_indexes[hash] + .iter() + .for_each(|index| pathfinding_results[*index] = result.clone()); } - debug!(nb_results = to_cache.len(), "Caching pathfinding response"); + debug!(nb_cached = to_cache.len(), "Caching pathfinding response"); valkey_conn.json_set_bulk(&to_cache).await?; - Ok(pathfinding_results.into_iter().flatten().collect()) + Ok(pathfinding_results) } fn build_pathfinding_request( diff --git a/editoast/src/views/train_schedule.rs b/editoast/src/views/train_schedule.rs index a012152c90b..f6b3f6f0228 100644 --- a/editoast/src/views/train_schedule.rs +++ b/editoast/src/views/train_schedule.rs @@ -457,7 +457,8 @@ pub async fn consist_train_simulation_batch( .collect(); let mut simulation_results = vec![SimulationResponse::default(); train_schedules.len()]; - let mut to_sim = Vec::with_capacity(train_schedules.len()); + let mut to_sim: HashMap> = HashMap::default(); + let mut sim_request_map: HashMap = HashMap::default(); for (index, (pathfinding, train_schedule)) in pathfinding_results.iter().zip(train_schedules).enumerate() { @@ -500,12 +501,21 @@ pub async fn consist_train_simulation_batch( // Compute unique hash of the simulation input let simulation_hash = train_simulation_input_hash(infra.id, &infra.version, &simulation_request); - to_sim.push((index, simulation_hash, simulation_request)); + to_sim + .entry(simulation_hash.clone()) + .or_default() + .push(index); + sim_request_map + .entry(simulation_hash) + .or_insert(simulation_request); } - - let cached_results: Vec> = valkey_conn - .json_get_bulk(&to_sim.iter().map(|(_, hash, _)| hash).collect::>()) - .await?; + info!( + nb_train_schedules = train_schedules.len(), + nb_unique_sim = to_sim.len() + ); + let cached_simulation_hash = to_sim.keys().collect::>(); + let cached_results: Vec> = + valkey_conn.json_get_bulk(&cached_simulation_hash).await?; let nb_hit = cached_results.iter().flatten().count(); let nb_miss = to_sim.len() - nb_hit; @@ -513,14 +523,18 @@ pub async fn consist_train_simulation_batch( // Compute simulation from core let mut futures = Vec::with_capacity(nb_miss); - let mut futures_index_hash = Vec::with_capacity(nb_miss); - for ((train_index, train_hash, sim_request), sim_cached) in to_sim.iter().zip(cached_results) { + let mut futures_hash = Vec::with_capacity(nb_miss); + for (train_hash, sim_cached) in cached_simulation_hash.iter().zip(cached_results) { if let Some(sim_cached) = sim_cached { - simulation_results[*train_index] = sim_cached; + let train_indexes = &to_sim[*train_hash]; + for train_index in train_indexes { + simulation_results[*train_index] = sim_cached.clone(); + } continue; } + let sim_request = &sim_request_map[*train_hash]; futures.push(Box::pin(sim_request.fetch(core.as_ref()))); - futures_index_hash.push((*train_index, train_hash)); + futures_hash.push(train_hash); } let simulated: Vec<_> = futures::future::join_all(futures) @@ -528,21 +542,24 @@ pub async fn consist_train_simulation_batch( .into_iter() .collect(); - let mut is_cacheable = vec![false; train_schedules.len()]; - for (&(train_index, _), sim_res) in futures_index_hash.iter().zip(simulated) { - (simulation_results[train_index], is_cacheable[train_index]) = match sim_res { - Ok(sim) => (sim, true), - // TODO: only make HTTP status code errors non-fatal - Err(core_error) => (SimulationResponse::SimulationFailed { core_error }, false), + let mut to_cache = vec![]; + for (train_hash, sim_res) in futures_hash.iter().zip(simulated) { + let train_indexes = &to_sim[**train_hash]; + match sim_res { + Ok(sim_res) => { + to_cache.push((train_hash, sim_res.clone())); + train_indexes + .iter() + .for_each(|index| simulation_results[*index] = sim_res.clone()) + } + Err(core_error) => train_indexes.iter().for_each(|index| { + simulation_results[*index] = SimulationResponse::SimulationFailed { + core_error: core_error.clone(), + } + }), } } - let to_cache: Vec<_> = futures_index_hash - .into_iter() - .filter(|&(train_index, _)| is_cacheable[train_index]) - .map(|(train_index, train_hash)| (train_hash, &simulation_results[train_index])) - .collect(); - // Cache the simulation response valkey_conn.json_set_bulk(&to_cache).await?;