diff --git a/editoast/editoast_osrdyne_client/src/lib.rs b/editoast/editoast_osrdyne_client/src/lib.rs index 8af792d861e..4d97057f215 100644 --- a/editoast/editoast_osrdyne_client/src/lib.rs +++ b/editoast/editoast_osrdyne_client/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use itertools::Itertools; use serde::Deserialize; +use url::Url; #[cfg(any(test, feature = "mock_client"))] mod mock_client; @@ -22,15 +23,13 @@ struct HTTPClient { } impl OsrdyneClient { - pub fn new(osrdyne_url: &str) -> Result { - let client = HTTPClient { - client: reqwest::Client::new(), - base_url: url::Url::parse(osrdyne_url)?, - }; - let client = OsrdyneClient { - inner: OsrdyneClientInternal::HTTPClient(client), - }; - Ok(client) + pub fn new(osrdyne_url: Url) -> Self { + OsrdyneClient { + inner: OsrdyneClientInternal::HTTPClient(HTTPClient { + client: reqwest::Client::new(), + base_url: osrdyne_url, + }), + } } #[cfg(any(test, feature = "mock_client"))] diff --git a/editoast/openapi.yaml b/editoast/openapi.yaml index ce8b1e5cc63..bd66266a264 100644 --- a/editoast/openapi.yaml +++ b/editoast/openapi.yaml @@ -4287,7 +4287,6 @@ components: - $ref: '#/components/schemas/EditoastTrainScheduleErrorBatchTrainScheduleNotFound' - $ref: '#/components/schemas/EditoastTrainScheduleErrorInfraNotFound' - $ref: '#/components/schemas/EditoastTrainScheduleErrorNotFound' - - $ref: '#/components/schemas/EditoastValkeyConfigErrorUrl' - $ref: '#/components/schemas/EditoastWorkScheduleErrorNameAlreadyUsed' description: Generated error type for Editoast discriminator: @@ -5683,30 +5682,6 @@ components: type: string enum: - editoast:train_schedule:NotFound - EditoastValkeyConfigErrorUrl: - type: object - required: - - type - - status - - message - properties: - context: - type: object - required: - - url - properties: - url: - type: string - message: - type: string - status: - type: integer - enum: - - 500 - type: - type: string - enum: - - editoast:valkey:Url EditoastWorkScheduleErrorNameAlreadyUsed: type: object required: diff --git a/editoast/src/client/healthcheck.rs b/editoast/src/client/healthcheck.rs new file mode 100644 index 00000000000..5669b9c235c --- /dev/null +++ b/editoast/src/client/healthcheck.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use editoast_models::DbConnectionPoolV2; + +use crate::{ + core::{mq_client, CoreClient}, + views::check_health, + ValkeyClient, +}; + +use super::{runserver::CoreArgs, ValkeyConfig}; + +pub async fn healthcheck_cmd( + db_pool: Arc, + valkey_config: ValkeyConfig, + core_config: CoreArgs, +) -> anyhow::Result<()> { + let valkey = ValkeyClient::new(valkey_config.into()).unwrap(); + let core_client = CoreClient::new_mq(mq_client::Options { + uri: core_config.mq_url, + worker_pool_identifier: String::from("core"), + timeout: core_config.core_timeout, + single_worker: core_config.core_single_worker, + num_channels: core_config.core_client_channels_size, + }) + .await?; + check_health(db_pool, valkey.into(), core_client.into()) + .await + .map_err(|e| anyhow!("❌ healthcheck failed: {e}"))?; + println!("✅ Healthcheck passed"); + Ok(()) +} diff --git a/editoast/src/client/infra_commands.rs b/editoast/src/client/infra_commands.rs index 68844374a67..e679245a551 100644 --- a/editoast/src/client/infra_commands.rs +++ b/editoast/src/client/infra_commands.rs @@ -210,11 +210,11 @@ async fn build_valkey_pool_and_invalidate_all_cache( valkey_config: ValkeyConfig, infra_id: i64, ) -> Result<(), Box> { - let valkey = ValkeyClient::new(valkey_config).unwrap(); + let valkey = ValkeyClient::new(valkey_config.into()).unwrap(); let mut conn = valkey.get_connection().await.unwrap(); Ok(map::invalidate_all( &mut conn, - &MapLayers::parse().layers.keys().cloned().collect(), + &MapLayers::default().layers.keys().cloned().collect(), infra_id, ) .await diff --git a/editoast/src/client/mod.rs b/editoast/src/client/mod.rs index 813bc9de9c0..7cb710a56d7 100644 --- a/editoast/src/client/mod.rs +++ b/editoast/src/client/mod.rs @@ -1,8 +1,10 @@ pub mod electrical_profiles_commands; +pub mod healthcheck; pub mod import_rolling_stock; pub mod infra_commands; mod postgres_config; pub mod roles; +pub mod runserver; pub mod search_commands; pub mod stdcm_search_env_commands; mod telemetry_config; @@ -22,6 +24,8 @@ use import_rolling_stock::ImportRollingStockArgs; use infra_commands::InfraCommands; pub use postgres_config::PostgresConfig; use roles::RolesCommand; +use runserver::CoreArgs; +use runserver::RunserverArgs; use search_commands::SearchCommands; use stdcm_search_env_commands::StdcmSearchEnvCommands; pub use telemetry_config::TelemetryConfig; @@ -32,6 +36,7 @@ use url::Url; pub use valkey_config::ValkeyConfig; use crate::error::Result; +use crate::views::OpenApiRoot; #[derive(Parser, Debug)] #[command(author, version)] @@ -89,60 +94,6 @@ pub enum Commands { Healthcheck(CoreArgs), } -#[derive(Args, Debug, Derivative, Clone)] -#[derivative(Default)] -pub struct MapLayersConfig { - #[derivative(Default(value = "18"))] - #[arg(long, env, default_value_t = 18)] - pub max_zoom: u64, - /// Number maximum of tiles before we consider invalidating full Valkey cache is required - #[derivative(Default(value = "250_000"))] - #[arg(long, env, default_value_t = 250_000)] - pub max_tiles: u64, -} - -#[derive(Args, Debug)] -#[command(about, long_about = "Launch the server")] -pub struct CoreArgs { - #[clap(long, env = "OSRD_MQ_URL", default_value_t = String::from("amqp://osrd:password@127.0.0.1:5672/%2f"))] - pub mq_url: String, - #[clap(long, env = "EDITOAST_CORE_TIMEOUT", default_value_t = 180)] - pub core_timeout: u64, - #[clap(long, env = "EDITOAST_CORE_SINGLE_WORKER", default_value_t = false)] - pub core_single_worker: bool, - #[clap(long, env = "CORE_CLIENT_CHANNELS_SIZE", default_value_t = 8)] - pub core_client_channels_size: usize, -} - -#[derive(Args, Debug)] -#[command(about, long_about = "Launch the server")] -pub struct RunserverArgs { - #[command(flatten)] - pub map_layers_config: MapLayersConfig, - #[arg(long, env = "EDITOAST_PORT", default_value_t = 8090)] - pub port: u16, - #[arg(long, env = "EDITOAST_ADDRESS", default_value_t = String::from("0.0.0.0"))] - pub address: String, - #[command(flatten)] - pub core: CoreArgs, - #[clap(long, env = "ROOT_PATH", default_value_t = String::new())] - pub root_path: String, - #[clap(long)] - pub workers: Option, - /// If this option is set, any role and permission check will be bypassed. Even if no user is - /// provided by the request headers of if the provided user doesn't have the required privileges. - // TODO: once the whole role system will be deployed, the default value of this option should - // be set to false. It's currently set to true in order to pass integration tests, which otherwise - // only recieve 401 responses. - #[clap(long, env = "EDITOAST_DISABLE_AUTHORIZATION", default_value_t = true)] - pub disable_authorization: bool, - #[clap(long, env = "OSRDYNE_API_URL", default_value_t = String::from("http://127.0.0.1:4242/"))] - pub osrdyne_api_url: String, - /// The timeout to use when performing the healthcheck, in milliseconds - #[clap(long, env = "EDITOAST_HEALTH_CHECK_TIMEOUT_MS", default_value_t = 500)] - pub health_check_timeout_ms: u64, -} - #[derive(Args, Debug)] #[command(about, long_about = "Extracts a railjson from OpenStreetMap data")] pub struct OsmToRailjsonArgs { @@ -152,6 +103,12 @@ pub struct OsmToRailjsonArgs { pub railjson_out: PathBuf, } +/// Prints the OpenApi to stdout +pub fn print_openapi() { + let openapi = OpenApiRoot::build_openapi(); + print!("{}", serde_yaml::to_string(&openapi).unwrap()); +} + /// Retrieve the ROOT_URL env var. If not found returns default local url. pub fn get_root_url() -> Result { let url = env::var("ROOT_URL").unwrap_or(String::from("http://localhost:8090")); diff --git a/editoast/src/client/postgres_config.rs b/editoast/src/client/postgres_config.rs index 794ad3202d4..0f62e3f5042 100644 --- a/editoast/src/client/postgres_config.rs +++ b/editoast/src/client/postgres_config.rs @@ -2,7 +2,7 @@ use clap::Args; use derivative::Derivative; use url::Url; -use crate::error::Result; +use crate::views; #[derive(Args, Debug, Derivative, Clone)] #[derivative(Default)] @@ -21,8 +21,16 @@ pub struct PostgresConfig { pub pool_size: usize, } -impl PostgresConfig { - pub fn url(&self) -> Result { - Ok(self.database_url.clone()) +impl From for views::PostgresConfig { + fn from( + PostgresConfig { + database_url, + pool_size, + }: PostgresConfig, + ) -> Self { + views::PostgresConfig { + database_url, + pool_size, + } } } diff --git a/editoast/src/client/runserver.rs b/editoast/src/client/runserver.rs new file mode 100644 index 00000000000..86dd8eaa8cb --- /dev/null +++ b/editoast/src/client/runserver.rs @@ -0,0 +1,94 @@ +use chrono::Duration; +use clap::Args; +use url::Url; + +use crate::views; + +use super::{PostgresConfig, ValkeyConfig}; + +#[derive(Args, Debug, Clone)] +struct MapLayersConfig { + #[arg(long, env, default_value_t = 18)] + max_zoom: u64, +} + +#[derive(Args, Debug)] +#[command(about, long_about = "Launch the server")] +pub struct CoreArgs { + #[clap(long, env = "OSRD_MQ_URL", default_value_t = Url::parse("amqp://osrd:password@127.0.0.1:5672/%2f").unwrap())] + pub(super) mq_url: Url, + #[clap(long, env = "EDITOAST_CORE_TIMEOUT", default_value_t = 180)] + pub(super) core_timeout: u64, + #[clap(long, env = "EDITOAST_CORE_SINGLE_WORKER", default_value_t = false)] + pub(super) core_single_worker: bool, + #[clap(long, env = "CORE_CLIENT_CHANNELS_SIZE", default_value_t = 8)] + pub(super) core_client_channels_size: usize, +} + +#[derive(Args, Debug)] +#[command(about, long_about = "Launch the server")] +pub struct RunserverArgs { + #[command(flatten)] + map_layers_config: MapLayersConfig, + #[arg(long, env = "EDITOAST_PORT", default_value_t = 8090)] + port: u16, + #[arg(long, env = "EDITOAST_ADDRESS", default_value_t = String::from("0.0.0.0"))] + address: String, + #[command(flatten)] + core: CoreArgs, + /// If this option is set, any role and permission check will be bypassed. Even if no user is + /// provided by the request headers of if the provided user doesn't have the required privileges. + // TODO: once the whole role system will be deployed, the default value of this option should + // be set to false. It's currently set to true in order to pass integration tests, which otherwise + // only recieve 401 responses. + #[clap(long, env = "EDITOAST_DISABLE_AUTHORIZATION", default_value_t = true)] + disable_authorization: bool, + #[clap(long, env = "OSRDYNE_API_URL", default_value_t = Url::parse("http://127.0.0.1:4242/").unwrap())] + osrdyne_api_url: Url, + /// The timeout to use when performing the healthcheck, in milliseconds + #[clap(long, env = "EDITOAST_HEALTH_CHECK_TIMEOUT_MS", default_value_t = 500)] + health_check_timeout_ms: u64, +} + +/// Create and run the server +pub async fn runserver( + RunserverArgs { + map_layers_config, + port, + address, + core: + CoreArgs { + mq_url, + core_timeout, + core_single_worker, + core_client_channels_size, + }, + disable_authorization, + osrdyne_api_url, + health_check_timeout_ms, + }: RunserverArgs, + postgres: PostgresConfig, + valkey: ValkeyConfig, +) -> anyhow::Result<()> { + let config = views::ServerConfig { + port, + address, + health_check_timeout: Duration::milliseconds(health_check_timeout_ms as i64), + map_layers_max_zoom: map_layers_config.max_zoom as u8, + disable_authorization, + postgres_config: postgres.into(), + osrdyne_config: views::OsrdyneConfig { + mq_url, + osrdyne_api_url, + core: views::CoreConfig { + timeout: Duration::seconds(core_timeout as i64), + single_worker: core_single_worker, + num_channels: core_client_channels_size, + }, + }, + valkey_config: valkey.into(), + }; + + let server = views::Server::new(config).await?; + server.start().await.map_err(Into::into) +} diff --git a/editoast/src/client/valkey_config.rs b/editoast/src/client/valkey_config.rs index 56be1e828b7..760cb3d85bf 100644 --- a/editoast/src/client/valkey_config.rs +++ b/editoast/src/client/valkey_config.rs @@ -1,10 +1,8 @@ use clap::Args; use derivative::Derivative; -use editoast_derive::EditoastError; -use thiserror::Error; use url::Url; -use crate::error::Result; +use crate::valkey_utils; #[derive(Args, Debug, Derivative, Clone)] #[derivative(Default)] @@ -16,25 +14,24 @@ pub struct ValkeyConfig { #[derivative(Default(value = "false"))] #[clap(long, env, default_value_t = false)] pub is_cluster_client: bool, - #[derivative(Default(value = r#""redis://localhost:6379".into()"#))] - #[arg(long, env, default_value = "redis://localhost:6379")] + #[derivative(Default(value = r#"Url::parse("redis://localhost:6379").unwrap()"#))] + #[arg(long, env, default_value_t = Url::parse("redis://localhost:6379").unwrap())] /// Valkey url like `redis://[:PASSWORD@]HOST[:PORT][/DATABASE]` - valkey_url: String, + pub valkey_url: Url, } -impl ValkeyConfig { - pub fn url(&self) -> Result { - let url = Url::parse(&self.valkey_url).map_err(|_| ValkeyConfigError::Url { - url: self.valkey_url.clone(), - })?; - - Ok(url) +impl From for valkey_utils::ValkeyConfig { + fn from( + ValkeyConfig { + no_cache, + is_cluster_client, + valkey_url, + }: ValkeyConfig, + ) -> Self { + valkey_utils::ValkeyConfig { + no_cache, + is_cluster_client, + valkey_url, + } } } - -#[derive(Debug, Error, EditoastError)] -#[editoast_error(base_id = "valkey", default_status = 500)] -pub enum ValkeyConfigError { - #[error("Invalid url '{url}'")] - Url { url: String }, -} diff --git a/editoast/src/core/mq_client.rs b/editoast/src/core/mq_client.rs index 5f6b972bb29..e6ef33c5064 100644 --- a/editoast/src/core/mq_client.rs +++ b/editoast/src/core/mq_client.rs @@ -17,6 +17,7 @@ use tokio::{ task, time::{timeout, Duration}, }; +use url::Url; use uuid::Uuid; #[derive(Debug, Clone)] @@ -151,7 +152,7 @@ impl ChannelWorker { pub struct Options { /// format `amqp://username:password@host:port/vhost` /// for instance: `amqp://osrd:password@localhost:5672/%2f` for the default vhost - pub uri: String, + pub uri: Url, /// Exchange name pub worker_pool_identifier: String, /// Default timeout for the response @@ -265,7 +266,7 @@ impl RabbitMQClient { } async fn connection_loop( - uri: String, + uri: Url, hostname: String, connection: Arc>>, ) { @@ -279,7 +280,7 @@ impl RabbitMQClient { // Connection should be re-established let new_connection = Connection::connect( - &uri, + uri.as_str(), ConnectionProperties::default().with_connection_name(hostname.clone().into()), ) .await; diff --git a/editoast/src/main.rs b/editoast/src/main.rs index 2bc35a792b2..6afefebbfd6 100644 --- a/editoast/src/main.rs +++ b/editoast/src/main.rs @@ -11,57 +11,37 @@ mod models; mod valkey_utils; mod views; -use crate::core::CoreClient; -use crate::views::OpenApiRoot; -use axum::extract::DefaultBodyLimit; -use axum::extract::FromRef; -use axum::{Router, ServiceExt}; -use axum_tracing_opentelemetry::middleware::OtelAxumLayer; use clap::Parser; use client::electrical_profiles_commands::*; +use client::healthcheck::healthcheck_cmd; use client::import_rolling_stock::*; use client::infra_commands::*; +use client::print_openapi; use client::roles; use client::roles::RolesCommand; +use client::runserver::runserver; use client::search_commands::*; use client::stdcm_search_env_commands::handle_stdcm_search_env_command; use client::timetables_commands::*; -use client::CoreArgs; -use client::{Client, Color, Commands, RunserverArgs, ValkeyConfig}; -use client::{MapLayersConfig, PostgresConfig}; -use dashmap::DashMap; -use editoast_models::DbConnectionPool; +use client::{Client, Color, Commands}; use editoast_models::DbConnectionPoolV2; -use editoast_osrdyne_client::OsrdyneClient; -use generated_data::speed_limit_tags_config::SpeedLimitTagIds; -use infra_cache::InfraCache; use models::RollingStockModel; use opentelemetry::trace::TracerProvider as _; use opentelemetry_sdk::propagation::TraceContextPropagator; -use tower::Layer as _; -use tower_http::cors::{Any, CorsLayer}; -use tower_http::limit::RequestBodyLimitLayer; -use tower_http::normalize_path::NormalizePathLayer; -use tower_http::trace::TraceLayer; -use views::check_health; +pub use views::AppState; -use core::mq_client; -use map::MapLayers; use models::prelude::*; use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig as _; use opentelemetry_sdk::Resource; -use std::env; use std::error::Error; use std::io::IsTerminal; use std::process::exit; use std::sync::Arc; -use std::time::Duration; use thiserror::Error; -use tracing::{error, info, warn}; +use tracing::error; use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _, Layer as _}; pub use valkey_utils::{ValkeyClient, ValkeyConnection}; -use views::authentication_middleware; /// The mode editoast is running in /// @@ -156,7 +136,9 @@ async fn run() -> Result<(), Box> { init_tracing(EditoastMode::from_client(&client), &client.telemetry_config); let pg_config = client.postgres_config; - let db_pool = DbConnectionPoolV2::try_initialize(pg_config.url()?, pg_config.pool_size).await?; + let db_pool = + DbConnectionPoolV2::try_initialize(pg_config.database_url.clone(), pg_config.pool_size) + .await?; let valkey_config = client.valkey_config; @@ -167,7 +149,9 @@ async fn run() -> Result<(), Box> { } match client.command { - Commands::Runserver(args) => runserver(args, pg_config, valkey_config).await, + Commands::Runserver(args) => runserver(args, pg_config, valkey_config) + .await + .map_err(Into::into), Commands::ImportRollingStock(args) => import_rolling_stock(args, db_pool.into()).await, Commands::ImportTowedRollingStock(args) => { import_towed_rolling_stock(args, db_pool.into()).await @@ -176,7 +160,7 @@ async fn run() -> Result<(), Box> { osm_to_railjson::osm_to_railjson(args.osm_pbf_in, args.railjson_out) } Commands::Openapi => { - generate_openapi(); + print_openapi(); Ok(()) } Commands::ElectricalProfiles(subcommand) => match subcommand { @@ -233,175 +217,13 @@ async fn run() -> Result<(), Box> { } }, Commands::Healthcheck(core_config) => { - healthcheck_cmd(db_pool.into(), valkey_config, core_config).await - } - } -} - -async fn healthcheck_cmd( - db_pool: Arc, - valkey_config: ValkeyConfig, - core_config: CoreArgs, -) -> Result<(), Box> { - let valkey = ValkeyClient::new(valkey_config).unwrap(); - let core_client = CoreClient::new_mq(mq_client::Options { - uri: core_config.mq_url, - worker_pool_identifier: String::from("core"), - timeout: core_config.core_timeout, - single_worker: core_config.core_single_worker, - num_channels: core_config.core_client_channels_size, - }) - .await?; - check_health(db_pool, valkey.into(), core_client.into()) - .await - .map_err(|e| CliError::new(1, format!("❌ healthcheck failed: {0}", e)))?; - println!("✅ Healthcheck passed"); - Ok(()) -} - -/// The state of the whole Editoast service, available to all handlers -/// -/// If only the database is needed, use `State`. -#[derive(Clone)] -pub struct AppState { - pub db_pool_v1: Arc, - pub db_pool_v2: Arc, - pub valkey: Arc, - pub infra_caches: Arc>, - pub map_layers: Arc, - pub map_layers_config: Arc, - pub speed_limit_tag_ids: Arc, - pub disable_authorization: bool, - pub core_client: Arc, - pub osrdyne_client: Arc, - pub health_check_timeout: Duration, -} - -impl AppState { - async fn init( - args: &RunserverArgs, - postgres_config: PostgresConfig, - valkey_config: ValkeyConfig, - ) -> Result> { - info!("Building application state..."); - - // Config database - let valkey = ValkeyClient::new(valkey_config)?.into(); - - // Create both database pools - let db_pool_v2 = - DbConnectionPoolV2::try_initialize(postgres_config.url()?, postgres_config.pool_size) - .await?; - let db_pool_v1 = db_pool_v2.pool_v1(); - let db_pool_v2 = Arc::new(db_pool_v2); - - // Setup infra cache map - let infra_caches = DashMap::::default().into(); - - // Static list of configured speed-limit tag ids - let speed_limit_tag_ids = Arc::new(SpeedLimitTagIds::load()); - - if args.disable_authorization { - warn!("authorization disabled — all role and permission checks are bypassed"); + healthcheck_cmd(db_pool.into(), valkey_config, core_config) + .await + .map_err(Into::into) } - - // Build Core client - let core_client = CoreClient::new_mq(mq_client::Options { - uri: args.core.mq_url.clone(), - worker_pool_identifier: "core".into(), - timeout: args.core.core_timeout, - single_worker: args.core.core_single_worker, - num_channels: args.core.core_client_channels_size, - }) - .await? - .into(); - - let osrdyne_client = Arc::new(OsrdyneClient::new(args.osrdyne_api_url.as_str())?); - - let health_check_timeout = Duration::from_millis(args.health_check_timeout_ms); - - Ok(Self { - valkey, - db_pool_v1, - db_pool_v2, - infra_caches, - core_client, - osrdyne_client, - map_layers: Arc::new(MapLayers::parse()), - map_layers_config: Arc::new(args.map_layers_config.clone()), - speed_limit_tag_ids, - disable_authorization: args.disable_authorization, - health_check_timeout, - }) - } -} - -impl FromRef for DbConnectionPoolV2 { - fn from_ref(input: &AppState) -> Self { - (*input.db_pool_v2).clone() } } -/// Create and run the server -async fn runserver( - args: RunserverArgs, - postgres_config: PostgresConfig, - valkey_config: ValkeyConfig, -) -> Result<(), Box> { - info!("Building server..."); - // Custom Bytes and String extractor configuration - let request_payload_limit = RequestBodyLimitLayer::new(250 * 1024 * 1024); // 250MiB - - // Build CORS layer - let cors = { - let allowed_origin = env::var("OSRD_ALLOWED_ORIGIN").ok(); - match allowed_origin { - Some(origin) => CorsLayer::new() - .allow_methods(Any) - .allow_headers(Any) - .allow_origin( - origin - .parse::() - .expect("invalid allowed origin"), - ), - None => CorsLayer::new() - .allow_methods(Any) - .allow_headers(Any) - .allow_origin(Any), - } - }; - - let app_state = AppState::init(&args, postgres_config, valkey_config).await?; - - // Configure the axum router - let router: Router<()> = axum::Router::::new() - .merge(views::router()) - .route_layer(axum::middleware::from_fn_with_state( - app_state.clone(), - authentication_middleware, - )) - .layer(OtelAxumLayer::default()) - .layer(DefaultBodyLimit::disable()) - .layer(request_payload_limit) - .layer(cors) - .layer(TraceLayer::new_for_http()) - .with_state(app_state); - let normalizing_router = NormalizePathLayer::trim_trailing_slash().layer(router); - - // Run server - info!("Running server..."); - let service = ServiceExt::::into_make_service(normalizing_router); - let listener = tokio::net::TcpListener::bind((args.address.clone(), args.port)).await?; - axum::serve(listener, service).await.expect("unreachable"); - Ok(()) -} - -/// Prints the OpenApi to stdout -fn generate_openapi() { - let openapi = OpenApiRoot::build_openapi(); - print!("{}", serde_yaml::to_string(&openapi).unwrap()); -} - #[derive(Debug, Error, PartialEq)] pub struct CliError { exit_code: i32, diff --git a/editoast/src/map/layers.rs b/editoast/src/map/layers.rs index e7be0b0d320..3bf20da5291 100644 --- a/editoast/src/map/layers.rs +++ b/editoast/src/map/layers.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use serde::Deserialize; -use serde::Serialize; // select C.stuff from A inner join B C on C.id = C.id; // \___________________________/ @@ -10,7 +9,7 @@ use serde::Serialize; type JoinExpr = String; /// Layer view description -#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +#[derive(Debug, Deserialize)] pub struct View { pub on_field: String, pub data_expr: String, @@ -23,7 +22,7 @@ pub struct View { } /// Layer description -#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +#[derive(Debug, Deserialize)] pub struct Layer { pub table_name: String, pub views: HashMap, @@ -33,14 +32,20 @@ pub struct Layer { pub attribution: Option, } -#[derive(Serialize, Deserialize, PartialEq, Debug)] +#[derive(Debug, Deserialize)] pub struct MapLayers { pub layers: HashMap, } impl MapLayers { - /// Parses file containing layers' description into MapLayers struct - pub fn parse() -> MapLayers { - serde_yaml::from_str(include_str!("../../map_layers.yml")).unwrap() + pub fn new(layers: HashMap) -> Self { + Self { layers } + } +} + +impl Default for MapLayers { + fn default() -> Self { + serde_yaml::from_str(include_str!("../../map_layers.yml")) + .expect("static data should be valid") } } diff --git a/editoast/src/models/layers/geo_json_and_data.rs b/editoast/src/models/layers/geo_json_and_data.rs index f794b5b5ae2..1dbbda87299 100644 --- a/editoast/src/models/layers/geo_json_and_data.rs +++ b/editoast/src/models/layers/geo_json_and_data.rs @@ -220,7 +220,7 @@ mod tests { #[test] fn test_query_creation() { - let map_layers = MapLayers::parse(); + let map_layers = MapLayers::default(); let expected_queries = [ " WITH bbox AS ( diff --git a/editoast/src/valkey_utils.rs b/editoast/src/valkey_utils.rs index 03c88b7424b..634a69ff3ed 100644 --- a/editoast/src/valkey_utils.rs +++ b/editoast/src/valkey_utils.rs @@ -17,8 +17,8 @@ use redis::ToRedisArgs; use serde::de::DeserializeOwned; use serde::Serialize; use tracing::trace; +use url::Url; -use crate::client::ValkeyConfig; use crate::error::Result; pub enum ValkeyConnection { @@ -155,7 +155,7 @@ impl ValkeyConnection { Err(_) => { return Err(RedisError::from(( ErrorKind::IoError, - "An error occured serializing to json", + "An error occurred serializing to json", )) .into()) } @@ -182,7 +182,7 @@ impl ValkeyConnection { .map_err(|_| { RedisError::from(( ErrorKind::IoError, - "An error occured serializing to json", + "An error occurred serializing to json", )) .into() }) @@ -202,19 +202,26 @@ pub enum ValkeyClient { NoCache, } +#[derive(Clone)] +pub struct ValkeyConfig { + /// Disables caching. This should not be used in production. + pub no_cache: bool, + pub is_cluster_client: bool, + pub valkey_url: Url, +} + impl ValkeyClient { pub fn new(valkey_config: ValkeyConfig) -> Result { if valkey_config.no_cache { return Ok(ValkeyClient::NoCache); } - let valkey_config_url = valkey_config.url()?; if valkey_config.is_cluster_client { return Ok(ValkeyClient::Cluster( - redis::cluster::ClusterClient::new(vec![valkey_config_url.as_str()]).unwrap(), + redis::cluster::ClusterClient::new(vec![valkey_config.valkey_url]).unwrap(), )); } Ok(ValkeyClient::Tokio( - redis::Client::open(valkey_config_url.as_str()).unwrap(), + redis::Client::open(valkey_config.valkey_url).unwrap(), )) } diff --git a/editoast/src/views/layers.rs b/editoast/src/views/layers.rs index 035af32553e..78a031822a7 100644 --- a/editoast/src/views/layers.rs +++ b/editoast/src/views/layers.rs @@ -113,9 +113,7 @@ struct ViewMetadata { )] async fn layer_view( State(AppState { - map_layers, - map_layers_config, - .. + map_layers, config, .. }): State, Extension(auth): AuthenticationExt, Path((layer_slug, view_slug)): Path<(String, String)>, @@ -155,7 +153,7 @@ async fn layer_view( tiles: vec![tiles_url_pattern], attribution: layer.attribution.clone().unwrap_or_default(), minzoom: 5, - maxzoom: map_layers_config.max_zoom, + maxzoom: config.map_layers_max_zoom as u64, })) } @@ -282,7 +280,7 @@ mod tests { #[rstest] async fn layer_view_ko() { - let map_layers = MapLayers::parse(); + let map_layers = MapLayers::default(); let error: InternalError = LayersError::new_view_not_found("does_not_exist", &map_layers.layers["track_sections"]) .into(); diff --git a/editoast/src/views/mod.rs b/editoast/src/views/mod.rs index 79fa63b9828..7d28083d466 100644 --- a/editoast/src/views/mod.rs +++ b/editoast/src/views/mod.rs @@ -25,14 +25,24 @@ pub mod work_schedules; mod test_app; use std::collections::HashSet; +use std::env; use std::sync::Arc; +use axum::extract::DefaultBodyLimit; +use axum::extract::FromRef; use axum::extract::Request; use axum::middleware::Next; use axum::response::Response; +use axum::Router; +use axum::ServiceExt; +use axum_tracing_opentelemetry::middleware::OtelAxumLayer; +use chrono::Duration; +use dashmap::DashMap; use editoast_authz::authorizer::Authorizer; use editoast_authz::authorizer::UserInfo; use editoast_authz::BuiltinRole; +use editoast_models::DbConnectionPool; +use editoast_osrdyne_client::OsrdyneClient; use futures::TryFutureExt; pub use openapi::OpenApiRoot; @@ -45,9 +55,20 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; use tokio::time::timeout; +use tower::Layer as _; +use tower_http::cors::Any; +use tower_http::cors::CorsLayer; +use tower_http::limit::RequestBodyLimitLayer; +use tower_http::normalize_path::NormalizePath; +use tower_http::normalize_path::NormalizePathLayer; +use tower_http::trace::TraceLayer; +use tracing::info; +use tracing::warn; +use url::Url; use utoipa::ToSchema; use crate::client::get_app_version; +use crate::core::mq_client; use crate::core::version::CoreVersionRequest; use crate::core::AsCoreRequest; use crate::core::CoreClient; @@ -56,14 +77,17 @@ use crate::core::{self}; use crate::error::Result; use crate::error::{self}; use crate::generated_data; +use crate::generated_data::speed_limit_tags_config::SpeedLimitTagIds; use crate::infra_cache::operation; +use crate::infra_cache::InfraCache; +use crate::map::MapLayers; use crate::models; use crate::models::auth::PgAuthDriver; -use crate::AppState; +use crate::valkey_utils::ValkeyConfig; use crate::ValkeyClient; crate::routes! { - pub fn router(); + fn router(); fn openapi_paths(); "/health" => health, @@ -199,7 +223,7 @@ async fn authenticate( Ok(Authentication::Authenticated(authorizer)) } -pub async fn authentication_middleware( +async fn authentication_middleware( State(AppState { db_pool_v2: db_pool, disable_authorization, @@ -262,7 +286,9 @@ async fn health( }): State, ) -> Result<&'static str> { timeout( - health_check_timeout, + health_check_timeout + .to_std() + .expect("timeout should be valid at this point"), check_health(db_pool, valkey, core_client), ) .await @@ -315,6 +341,196 @@ async fn core_version(app_state: State) -> Json { Json(response) } +#[derive(Clone)] +pub struct CoreConfig { + pub timeout: Duration, + pub single_worker: bool, + pub num_channels: usize, +} + +pub struct OsrdyneConfig { + pub mq_url: Url, + pub osrdyne_api_url: Url, + pub core: CoreConfig, +} + +#[derive(Clone)] +pub struct PostgresConfig { + pub database_url: Url, + pub pool_size: usize, +} + +pub struct ServerConfig { + pub port: u16, + pub address: String, + pub health_check_timeout: Duration, + pub map_layers_max_zoom: u8, + pub disable_authorization: bool, + + pub postgres_config: PostgresConfig, + pub osrdyne_config: OsrdyneConfig, + pub valkey_config: ValkeyConfig, +} + +pub struct Server { + app_state: AppState, + router: NormalizePath, +} + +/// The state of the whole Editoast service, available to all handlers +/// +/// If only the database is needed, use `State`. +#[derive(Clone)] +pub struct AppState { + pub config: Arc, + + pub db_pool_v1: Arc, + pub db_pool_v2: Arc, + pub valkey: Arc, + pub infra_caches: Arc>, + pub map_layers: Arc, + pub speed_limit_tag_ids: Arc, + pub disable_authorization: bool, + pub core_client: Arc, + pub osrdyne_client: Arc, + pub health_check_timeout: Duration, +} + +impl FromRef for DbConnectionPoolV2 { + fn from_ref(input: &AppState) -> Self { + (*input.db_pool_v2).clone() + } +} + +impl AppState { + async fn init(config: ServerConfig) -> Result { + info!("Building application state..."); + + // Config database + let valkey = ValkeyClient::new(config.valkey_config.clone())?.into(); + + // Create both database pools + let db_pool_v2 = { + let PostgresConfig { + database_url, + pool_size, + } = config.postgres_config.clone(); + DbConnectionPoolV2::try_initialize(database_url, pool_size).await? + }; + let db_pool_v1 = db_pool_v2.pool_v1(); + let db_pool_v2 = Arc::new(db_pool_v2); + + // Setup infra cache map + let infra_caches = DashMap::::default().into(); + + // Static list of configured speed-limit tag ids + let speed_limit_tag_ids = Arc::new(SpeedLimitTagIds::load()); + + // Build Core client + let core_client = { + let CoreConfig { + timeout, + single_worker, + num_channels, + } = config.osrdyne_config.core.clone(); + let options = mq_client::Options { + uri: config.osrdyne_config.mq_url.clone(), + worker_pool_identifier: "core".to_owned(), + timeout: timeout.num_seconds() as u64, + single_worker, + num_channels, + }; + CoreClient::new_mq(options).await?.into() + }; + + let osrdyne_client = Arc::new(OsrdyneClient::new( + config.osrdyne_config.osrdyne_api_url.clone(), + )); + + Ok(Self { + valkey, + db_pool_v1, + db_pool_v2, + infra_caches, + core_client, + osrdyne_client, + map_layers: Arc::new(MapLayers::default()), + speed_limit_tag_ids, + disable_authorization: config.disable_authorization, + health_check_timeout: config.health_check_timeout, + config: Arc::new(config), + }) + } +} + +impl Server { + pub async fn new(config: ServerConfig) -> Result { + info!("Building server..."); + let app_state = AppState::init(config).await?; + + // Custom Bytes and String extractor configuration + let request_payload_limit = RequestBodyLimitLayer::new(250 * 1024 * 1024); // 250MiB + + // Build CORS layer + let cors = { + let allowed_origin = env::var("OSRD_ALLOWED_ORIGIN").ok(); + match allowed_origin { + Some(origin) => CorsLayer::new() + .allow_methods(Any) + .allow_headers(Any) + .allow_origin( + origin + .parse::() + .expect("invalid allowed origin"), + ), + None => CorsLayer::new() + .allow_methods(Any) + .allow_headers(Any) + .allow_origin(Any), + } + }; + + // Configure the axum router + let router: Router<()> = axum::Router::::new() + .merge(router()) + .route_layer(axum::middleware::from_fn_with_state( + app_state.clone(), + authentication_middleware, + )) + .layer(OtelAxumLayer::default()) + .layer(DefaultBodyLimit::disable()) + .layer(request_payload_limit) + .layer(cors) + .layer(TraceLayer::new_for_http()) + .with_state(app_state.clone()); + let normalizing_router = NormalizePathLayer::trim_trailing_slash().layer(router); + + Ok(Self { + app_state, + router: normalizing_router, + }) + } + + pub async fn start(self) -> std::io::Result<()> { + let Self { app_state, router } = self; + let ServerConfig { + address, + port, + disable_authorization, + .. + } = app_state.config.as_ref(); + + if *disable_authorization { + warn!("authorization disabled — all role and permission checks are bypassed"); + } + + info!("Running server..."); + let service = ServiceExt::::into_make_service(router); + let listener = tokio::net::TcpListener::bind((address.as_str(), *port)).await?; + axum::serve(listener, service).await + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/editoast/src/views/test_app.rs b/editoast/src/views/test_app.rs index 80bbc72ffa4..fb1f0372810 100644 --- a/editoast/src/views/test_app.rs +++ b/editoast/src/views/test_app.rs @@ -2,7 +2,7 @@ //! test actix server, database connection pool, and different mocking //! components. -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use axum::Router; use axum_tracing_opentelemetry::middleware::OtelAxumLayer; @@ -12,19 +12,20 @@ use editoast_models::DbConnectionPoolV2; use editoast_osrdyne_client::OsrdyneClient; use serde::de::DeserializeOwned; use tower_http::trace::TraceLayer; +use url::Url; use crate::{ - client::{MapLayersConfig, PostgresConfig, ValkeyConfig}, core::{mocking::MockingClient, CoreClient}, generated_data::speed_limit_tags_config::SpeedLimitTagIds, infra_cache::InfraCache, map::MapLayers, + valkey_utils::ValkeyConfig, AppState, ValkeyClient, }; use axum_test::TestRequest; use axum_test::TestServer; -use super::authentication_middleware; +use super::{authentication_middleware, CoreConfig, OsrdyneConfig, PostgresConfig, ServerConfig}; /// A builder interface for [TestApp] /// @@ -81,6 +82,34 @@ impl TestAppBuilder { } pub fn build(self) -> TestApp { + // Generate test server config + let config = ServerConfig { + port: 0, + address: String::default(), + health_check_timeout: chrono::Duration::milliseconds(500), + disable_authorization: true, + map_layers_max_zoom: 18, + postgres_config: PostgresConfig { + database_url: Url::parse("postgres://osrd:password@localhost:5432/osrd").unwrap(), + pool_size: 32, + }, + osrdyne_config: OsrdyneConfig { + mq_url: Url::parse("amqp://osrd:password@127.0.0.1:5672/%2f").unwrap(), + osrdyne_api_url: Url::parse("http://127.0.0.1:4242/").unwrap(), + core: CoreConfig { + timeout: chrono::Duration::seconds(180), + single_worker: false, + num_channels: 8, + }, + }, + valkey_config: ValkeyConfig { + no_cache: false, + is_cluster_client: false, + valkey_url: Url::parse("redis://localhost:6379").unwrap(), + }, + }; + + // Setup tracing let sub = tracing_subscriber::fmt() .pretty() .with_env_filter( @@ -93,15 +122,17 @@ impl TestAppBuilder { let tracing_guard = tracing::subscriber::set_default(sub); // Config valkey - let valkey = ValkeyClient::new(ValkeyConfig::default()) + let valkey = ValkeyClient::new(config.valkey_config.clone()) .expect("Could not build Valkey client") .into(); // Create both database pools let (db_pool_v2, db_pool_v1) = if self.db_pool_v1 { - let config = PostgresConfig::default(); - let pg_config_url = config.url().expect("cannot get postgres config url"); - let pool = create_connection_pool(pg_config_url, config.pool_size) + let PostgresConfig { + database_url, + pool_size, + } = config.postgres_config.clone(); + let pool = create_connection_pool(database_url, pool_size) .expect("could not create connection pool for tests"); let v1 = Arc::new(pool); let v2 = futures::executor::block_on(DbConnectionPoolV2::from_pool(v1.clone())); @@ -138,11 +169,11 @@ impl TestAppBuilder { osrdyne_client, valkey, infra_caches, - map_layers: MapLayers::parse().into(), - map_layers_config: MapLayersConfig::default().into(), + map_layers: Arc::new(MapLayers::default()), speed_limit_tag_ids, disable_authorization: true, - health_check_timeout: Duration::from_millis(500), + health_check_timeout: config.health_check_timeout, + config: Arc::new(config), }; // Configure the axum router