Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

editoast: refactor infra tests part 7 #7932

Merged
merged 8 commits into from
Jul 10, 2024
8 changes: 8 additions & 0 deletions editoast/editoast_models/src/db_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ impl DbConnectionPoolV2 {
/// # }
/// ```
///
/// ### Deadlocks
///
/// We encountered a deadlock error in our tests,
/// especially those using `empty_infra` and `small_infra`.
/// Adding `#[serial_test::serial]` solved the issue.
/// We tried increasing the deadlock timeout, but that didn't work.
/// Using random `infra_id` with rand didn't help either.
///
/// ## Guidelines
///
/// To prevent these issues, prefer the following patterns:
Expand Down
6 changes: 3 additions & 3 deletions editoast/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod tests {
use std::io::Cursor;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use uuid::Uuid;

use editoast_models::create_connection_pool;
use editoast_models::DbConnection;
Expand Down Expand Up @@ -261,8 +262,7 @@ pub mod tests {
} = scenario_fixture_set().await;

let pathfinding = pathfinding(db_pool()).await;
let mut rs_name = "fast_rolling_stock_".to_string();
rs_name.push_str(name);
let rs_name = format!("fast_rolling_stock_{}_{name}", Uuid::new_v4()).to_string();
let rolling_stock = named_fast_rolling_stock(&rs_name, db_pool()).await;
let ts_model = make_train_schedule(
db_pool(),
Expand Down Expand Up @@ -474,7 +474,7 @@ pub mod tests {
Infra::changeset()
.name("small_infra".to_owned())
.last_railjson_version()
.persist(railjson, db_pool)
.persist(railjson, db_pool.get().await.unwrap().deref_mut())
.await
.unwrap()
}
Expand Down
62 changes: 33 additions & 29 deletions editoast/src/generated_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use operational_point::OperationalPointLayer;
use psl_sign::PSLSignLayer;
use signal::SignalLayer;
use speed_section::SpeedSectionLayer;
use std::ops::DerefMut;
use std::sync::Arc;
use switch::SwitchLayer;
use tracing::debug;
Expand All @@ -39,7 +40,7 @@ use crate::error::Result;
use crate::infra_cache::operation::CacheOperation;
use crate::infra_cache::InfraCache;
use editoast_models::DbConnection;
use editoast_models::DbConnectionPool;
use editoast_models::DbConnectionPoolV2;

editoast_common::schemas! {
error::schemas(),
Expand Down Expand Up @@ -68,12 +69,11 @@ pub trait GeneratedData {
}

async fn refresh_pool(
pool: Arc<DbConnectionPool>,
pool: Arc<DbConnectionPoolV2>,
infra: i64,
infra_cache: &InfraCache,
) -> Result<()> {
let mut conn = pool.get().await?;
Self::refresh(&mut conn, infra, infra_cache).await
Self::refresh(pool.get().await?.deref_mut(), infra, infra_cache).await
}

/// Search and update all objects that needs to be refreshed given a list of operation.
Expand All @@ -86,37 +86,39 @@ pub trait GeneratedData {
}

/// Refresh all the generated data of a given infra
#[tracing::instrument(level = "debug", skip_all, fields(infra_id))]
pub async fn refresh_all(
db_pool: Arc<DbConnectionPool>,
infra: i64,
db_pool: Arc<DbConnectionPoolV2>,
infra_id: i64,
infra_cache: &InfraCache,
) -> Result<()> {
// The other layers depend on track section layer.
// We must wait until its completion before running the other requests in parallel
TrackSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache).await?;
debug!("⚙️ Infra {infra}: track section layer is generated");
let mut conn = db_pool.get().await?;
TrackSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache).await?;
debug!("⚙️ Infra {infra_id}: track section layer is generated");
// The analyze step significantly improves the performance when importing and generating together
// It doesn’t seem to make a different when the generation step is ran separately
// It isn’t clear why without analyze the Postgres server seems to run at 100% without halting
sql_query("analyze").execute(&mut conn).await?;
debug!("⚙️ Infra {infra}: database analyzed");
sql_query("analyze")
.execute(db_pool.get().await?.deref_mut())
.await?;
debug!("⚙️ Infra {infra_id}: database analyzed");
futures::try_join!(
SpeedSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SignalLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SwitchLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
BufferStopLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
ElectrificationLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
DetectorLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
OperationalPointLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
PSLSignLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
NeutralSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
NeutralSignLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SpeedSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
SignalLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
SwitchLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
BufferStopLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
ElectrificationLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
DetectorLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
OperationalPointLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
PSLSignLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
NeutralSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
NeutralSignLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
)?;
debug!("⚙️ Infra {infra}: object layers is generated");
debug!("⚙️ Infra {infra_id}: object layers is generated");
// The error layer depends on the other layers and must be executed at the end.
ErrorLayer::refresh_pool(db_pool.clone(), infra, infra_cache).await?;
debug!("⚙️ Infra {infra}: errors layer is generated");
ErrorLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache).await?;
debug!("⚙️ Infra {infra_id}: errors layer is generated");
Ok(())
}

Expand Down Expand Up @@ -164,18 +166,20 @@ pub mod tests {
use rstest::rstest;
use std::ops::DerefMut;

use crate::fixtures::tests::db_pool;
use crate::generated_data::clear_all;
use crate::generated_data::refresh_all;
use crate::generated_data::update_all;
use crate::modelsv2::fixtures::create_empty_infra;
use editoast_models::DbConnectionPoolV2;

#[rstest] // Slow test
#[rstest]
// Slow test
// PostgreSQL deadlock can happen in this test, see section `Deadlock` of [DbConnectionPoolV2::get] for more information
#[serial_test::serial]
async fn refresh_all_test() {
let db_pool_v2 = DbConnectionPoolV2::for_tests();
let infra = create_empty_infra(db_pool_v2.get_ok().deref_mut()).await;
assert!(refresh_all(db_pool(), infra.id, &Default::default())
let db_pool = DbConnectionPoolV2::for_tests();
let infra = create_empty_infra(db_pool.get_ok().deref_mut()).await;
assert!(refresh_all(db_pool.into(), infra.id, &Default::default())
.await
.is_ok());
}
Expand Down
55 changes: 23 additions & 32 deletions editoast/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub use redis_utils::{RedisClient, RedisConnection};
use std::error::Error;
use std::fs::File;
use std::io::{BufReader, IsTerminal};
use std::ops::DerefMut;
use std::process::exit;
use std::sync::Arc;
use std::{env, fs};
Expand Down Expand Up @@ -220,12 +221,12 @@ async fn run() -> Result<(), Box<dyn Error + Send + Sync>> {
SearchCommands::Refresh(args) => refresh_search_tables(args, db_pool.pool_v1()).await,
},
Commands::Infra(subcommand) => match subcommand {
InfraCommands::Clone(args) => clone_infra(args, db_pool.pool_v1()).await,
InfraCommands::Clone(args) => clone_infra(args, db_pool.into()).await,
InfraCommands::Clear(args) => clear_infra(args, db_pool.pool_v1(), redis_config).await,
InfraCommands::Generate(args) => {
generate_infra(args, db_pool.pool_v1(), redis_config).await
generate_infra(args, db_pool.into(), redis_config).await
}
InfraCommands::ImportRailjson(args) => import_railjson(args, db_pool.pool_v1()).await,
InfraCommands::ImportRailjson(args) => import_railjson(args, db_pool.into()).await,
},
Commands::Timetables(subcommand) => match subcommand {
TimetablesCommands::Import(args) => trains_import(args, db_pool.pool_v1()).await,
Expand Down Expand Up @@ -484,27 +485,26 @@ async fn batch_retrieve_infras(
/// This command refresh all infra given as input (if no infra given then refresh all of them)
async fn generate_infra(
args: GenerateArgs,
db_pool: Arc<DbConnectionPool>,
db_pool: Arc<DbConnectionPoolV2>,
redis_config: RedisConfig,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut conn = db_pool.get().await?;
let mut infras = vec![];
if args.infra_ids.is_empty() {
// Retrieve all available infra
for infra in Infra::all(&mut conn).await {
for infra in Infra::all(db_pool.get().await?.deref_mut()).await {
infras.push(infra);
}
} else {
// Retrieve given infras
infras = batch_retrieve_infras(&mut conn, &args.infra_ids).await?;
infras = batch_retrieve_infras(db_pool.get().await?.deref_mut(), &args.infra_ids).await?;
}
for mut infra in infras {
println!(
"🍞 Infra {}[{}] is generating:",
infra.name.clone().bold(),
infra.id
);
let infra_cache = InfraCache::load(&mut conn, &infra).await?;
let infra_cache = InfraCache::load(db_pool.get().await?.deref_mut(), &infra).await?;
if infra
.refresh(db_pool.clone(), args.force, &infra_cache)
.await?
Expand Down Expand Up @@ -578,10 +578,9 @@ async fn import_rolling_stock(

async fn clone_infra(
infra_args: InfraCloneArgs,
db_pool: Arc<DbConnectionPool>,
db_pool: Arc<DbConnectionPoolV2>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let conn = &mut db_pool.get().await?;
let infra = Infra::retrieve(conn, infra_args.id as i64)
let infra = Infra::retrieve(db_pool.get().await?.deref_mut(), infra_args.id as i64)
.await?
.ok_or_else(|| {
// When EditoastError will be removed from the models crate,
Expand All @@ -594,7 +593,9 @@ async fn clone_infra(
let new_name = infra_args
.new_name
.unwrap_or_else(|| format!("{} (clone)", infra.name));
let cloned_infra = infra.clone(conn, new_name).await?;
let cloned_infra = infra
.clone(db_pool.get().await?.deref_mut(), new_name)
.await?;
println!(
"✅ Infra {} (ID: {}) was successfully cloned",
cloned_infra.name.bold(),
Expand All @@ -605,7 +606,7 @@ async fn clone_infra(

async fn import_railjson(
args: ImportRailjsonArgs,
db_pool: Arc<DbConnectionPool>,
db_pool: Arc<DbConnectionPoolV2>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let railjson_file = match File::open(args.railjson_path.clone()) {
Ok(file) => file,
Expand All @@ -629,18 +630,19 @@ async fn import_railjson(
let railjson: RailJson = serde_json::from_reader(BufReader::new(railjson_file))?;

println!("🍞 Importing infra {infra_name}");
let mut infra = infra.persist(railjson, db_pool.clone()).await?;
let mut infra = infra
.persist(railjson, db_pool.get().await?.deref_mut())
.await?;

let mut conn = db_pool.get().await?;
infra
.bump_version(&mut conn)
.bump_version(db_pool.get().await?.deref_mut())
.await
.map_err(|_| InfraApiError::NotFound { infra_id: infra.id })?;

println!("✅ Infra {infra_name}[{}] saved!", infra.id);
// Generate only if the was set
if args.generate {
let infra_cache = InfraCache::load(&mut conn, &infra).await?;
let infra_cache = InfraCache::load(db_pool.get().await?.deref_mut(), &infra).await?;
infra.refresh(db_pool, true, &infra_cache).await?;
println!(
"✅ Infra {infra_name}[{}] generated data refreshed!",
Expand Down Expand Up @@ -882,9 +884,6 @@ mod tests {
get_trainschedule_json_array, TestFixture,
};
use crate::modelsv2::RollingStockModel;
use diesel::sql_query;
use diesel::sql_types::Text;
use diesel_async::RunQueryDsl;
use modelsv2::DeleteStatic;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -1081,7 +1080,7 @@ mod tests {
}

#[rstest]
async fn import_railjson_ko_file_not_found(db_pool: Arc<DbConnectionPool>) {
async fn import_railjson_ko_file_not_found() {
// GIVEN
let railjson_path = "non/existing/railjson/file/location";
let args: ImportRailjsonArgs = ImportRailjsonArgs {
Expand All @@ -1091,7 +1090,7 @@ mod tests {
};

// WHEN
let result = import_railjson(args.clone(), db_pool).await;
let result = import_railjson(args.clone(), DbConnectionPoolV2::for_tests().into()).await;

// THEN
assert!(result.is_err());
Expand All @@ -1106,7 +1105,7 @@ mod tests {
}

#[rstest]
async fn import_railjson_ok(db_pool: Arc<DbConnectionPool>) {
async fn import_railjson_ok() {
// GIVEN
let railjson = Default::default();
let file = generate_temp_file::<RailJson>(&railjson);
Expand All @@ -1124,18 +1123,10 @@ mod tests {
};

// WHEN
let result = import_railjson(args, db_pool.clone()).await;
let result = import_railjson(args, DbConnectionPoolV2::for_tests().into()).await;

// THEN
assert!(result.is_ok());

// CLEANUP
let mut conn = db_pool.get().await.unwrap();
sql_query("DELETE FROM infra WHERE name = $1")
.bind::<Text, _>(infra_name)
.execute(&mut conn)
.await
.unwrap();
}

fn generate_temp_file<T: Serialize>(object: &T) -> NamedTempFile {
Expand Down
16 changes: 16 additions & 0 deletions editoast/src/modelsv2/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::Cursor;

use chrono::Utc;
use editoast_schemas::infra::InfraObject;
use editoast_schemas::infra::RailJson;
use editoast_schemas::primitives::OSRDObject;
use editoast_schemas::train_schedule::TrainScheduleBase;

Expand All @@ -21,6 +22,8 @@ use crate::modelsv2::Tags;
use crate::views::rolling_stocks::rolling_stock_form::RollingStockForm;
use crate::views::v2::train_schedule::TrainScheduleForm;
use editoast_models::DbConnection;
use editoast_models::DbConnectionPool;
use editoast_models::DbConnectionPoolV2;

pub fn project_changeset(name: &str) -> Changeset<Project> {
Project::changeset()
Expand Down Expand Up @@ -282,3 +285,16 @@ where
assert!(result.is_ok(), "Failed to create a {object_type}");
railjson_object
}

pub async fn create_small_infra(conn: &mut DbConnection) -> Infra {
let railjson: RailJson = serde_json::from_str(include_str!(
"../../../tests/data/infras/small_infra/infra.json"
))
.unwrap();
Infra::changeset()
.name("small_infra".to_owned())
.last_railjson_version()
.persist(railjson, conn)
.await
.unwrap()
}
Loading
Loading