From 3ed4e78099f2b8c6bc97e21687445a86a233684f Mon Sep 17 00:00:00 2001 From: Leo Valais Date: Wed, 4 Dec 2024 10:55:37 +0100 Subject: [PATCH 1/3] editoast: derive: setup insta to snapshot test macros Allows to easily test macro expansion, track changes, improves debuggability and ease review. HashMaps have been eliminated from `Model`'s internals to have a deterministic expansion. Otherwise snapshots would be flaky. Signed-off-by: Leo Valais --- .github/workflows/build.yml | 2 + editoast/.gitignore | 1 + editoast/Cargo.lock | 52 +- editoast/Cargo.toml | 7 + editoast/README.md | 17 + editoast/editoast_derive/Cargo.toml | 2 + editoast/editoast_derive/src/error.rs | 23 + editoast/editoast_derive/src/lib.rs | 32 + editoast/editoast_derive/src/model.rs | 40 +- editoast/editoast_derive/src/model/config.rs | 8 +- .../editoast_derive/src/model/identifier.rs | 24 + editoast/editoast_derive/src/search.rs | 103 +- ...st_derive__error__tests__construction.snap | 71 ++ ...st_derive__model__tests__construction.snap | 1029 +++++++++++++++++ ...e__search__tests__search_construction.snap | 53 + ...ve__search__tests__store_construction.snap | 97 ++ 16 files changed, 1470 insertions(+), 91 deletions(-) create mode 100644 editoast/editoast_derive/src/snapshots/editoast_derive__error__tests__construction.snap create mode 100644 editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap create mode 100644 editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap create mode 100644 editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d6fcd8e8643..4f85e622645 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -595,8 +595,10 @@ jobs: postgis/postgis:latest \ psql postgresql://postgres:password@localhost:5432 -f /init_db.sql + # snapshot testing library `insta` requires CI=true docker run --name=editoast-test --net=host -v $PWD/output:/output \ -e DATABASE_URL="postgres://osrd:password@localhost:5432/osrd" \ + -e CI="true" \ ${{ fromJSON(needs.build.outputs.stable_tags).editoast-test }} \ /bin/sh -c "diesel migration run --locked-schema && RUST_BACKTRACE=1 cargo test --workspace --verbose -- --test-threads=4 && grcov . --binary-path ./target/debug/ -s . -t lcov --branch --ignore-not-existing --ignore "/*" -o /output/lcov.info" diff --git a/editoast/.gitignore b/editoast/.gitignore index eb5a316cbd1..23b12b03d9b 100644 --- a/editoast/.gitignore +++ b/editoast/.gitignore @@ -1 +1,2 @@ target +*.snap.new diff --git a/editoast/Cargo.lock b/editoast/Cargo.lock index 376cc9a00c4..7d58fdc76dc 100644 --- a/editoast/Cargo.lock +++ b/editoast/Cargo.lock @@ -927,6 +927,18 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1408,8 +1420,10 @@ name = "editoast_derive" version = "0.1.0" dependencies = [ "darling", + "insta", "paste", "pretty_assertions", + "prettyplease", "proc-macro2", "quote", "serde_json", @@ -1488,6 +1502,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -2147,7 +2167,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -2447,6 +2467,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "insta" +version = "1.41.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9ffc4d4892617c50a928c52b2961cb5174b6fc6ebf252b2fac9d21955c48b8" +dependencies = [ + "console", + "lazy_static", + "linked-hash-map", + "similar", +] + [[package]] name = "instant" version = "0.1.13" @@ -2630,7 +2662,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2639,6 +2671,12 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -3473,9 +3511,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.20" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", "syn 2.0.90", @@ -4292,6 +4330,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "similar" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e" + [[package]] name = "siphasher" version = "0.3.11" diff --git a/editoast/Cargo.toml b/editoast/Cargo.toml index 4fc8b4750da..ee3e94260ce 100644 --- a/editoast/Cargo.toml +++ b/editoast/Cargo.toml @@ -213,3 +213,10 @@ debug = "limited" [profile.dev-for-debug] inherits = "dev" debug = "full" + +# Makes snapshot diffs faster to compute. +# insta is used in editast_derive to track changes made to proc-macros. +# Cf. https://insta.rs/docs/quickstart/#optional-faster-runs +[profile.dev.package] +insta.opt-level = 3 +similar.opt-level = 3 diff --git a/editoast/README.md b/editoast/README.md index 48c473f58e8..20e7784c3e8 100644 --- a/editoast/README.md +++ b/editoast/README.md @@ -96,3 +96,20 @@ OpenApi when a change has been made to an endpoint, run the following command: ```sh cargo run openapi > openapi.yaml ``` + +## Working with `editoast_derive` + +We define some custom procedural macros in the `editoast_derive` crate. These rely on snapshot testing library [`insta`](https://insta.rs/). It basically works like this: + +1. Change the output of a macro +2. Run the tests using `cargo test` +3. Since the output has changed, the test will fail, showing a diff of the old vs. new snapshot content. The new snapshot will be saved to disk with the extension `*.snap.new`. +4. If the new snapshot is correct, rename it to `*.snap` and commit it. + +> [!TIP] +> You can use [`cargo-insta`](https://insta.rs/docs/cli/) to review pending snapshots and accept them conveniently. +> ```sh +> $ cargo insta review +> ``` + +For more information, visit the [`insta` documentation](https://insta.rs/docs/). diff --git a/editoast/editoast_derive/Cargo.toml b/editoast/editoast_derive/Cargo.toml index cbda38d9690..0ed1123eaa8 100644 --- a/editoast/editoast_derive/Cargo.toml +++ b/editoast/editoast_derive/Cargo.toml @@ -18,7 +18,9 @@ syn = "2.0" proc-macro = true [dev-dependencies] +insta = "1.41" pretty_assertions = "1.4.1" +prettyplease = "0.2" [lints] workspace = true diff --git a/editoast/editoast_derive/src/error.rs b/editoast/editoast_derive/src/error.rs index 6d1fff0b49e..0e2b29ead97 100644 --- a/editoast/editoast_derive/src/error.rs +++ b/editoast/editoast_derive/src/error.rs @@ -235,3 +235,26 @@ fn extract_type(ty: &syn::Type) -> Option { _ => None, } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_construction() { + crate::assert_macro_expansion!( + expand_editoast_error, + syn::parse_quote! { + #[derive(EditoastError)] + #[editoast_error(base_id = "infra", default_status = 500)] + pub enum InfraApiError { + #[editoast_error(status = 404, no_context)] + NotFound { infra_id: i64 }, + #[editoast_error(status = 400)] + BadRequest { message: String }, + InternalError, + } + } + ); + } +} diff --git a/editoast/editoast_derive/src/lib.rs b/editoast/editoast_derive/src/lib.rs index b03d3ab6da4..bcb42c39053 100644 --- a/editoast/editoast_derive/src/lib.rs +++ b/editoast/editoast_derive/src/lib.rs @@ -237,3 +237,35 @@ pub fn model(input: proc_macro::TokenStream) -> proc_macro::TokenStream { .unwrap_or_else(darling::Error::write_errors) .into() } + +#[cfg(test)] +mod test_utils { + pub(crate) fn pretty_tokens(tokens: &proc_macro2::TokenStream) -> String { + let file = syn::parse_file(tokens.to_string().as_str()).unwrap(); + prettyplease::unparse(&file) + } + + macro_rules! assert_macro_expansion { + ($expansion:path, $derive_input:expr) => { + let input: syn::DeriveInput = $derive_input; + let source = crate::test_utils::pretty_tokens(&::to_token_stream(&input)); + let expansion = $expansion(&input).expect("macro should expand faultlessly"); + let expected = crate::test_utils::pretty_tokens(&expansion); + + // HACK: sadly insta doesn't let us print multiline strings in the snapshot description + // or info sections. So we have to incorporate the source input into the snapshot content + // in order to keep it pretty printed and next to its expansion. + insta::with_settings!({ + omit_expression => true, + }, { + let sep = "-".repeat(77); + insta::assert_snapshot!(format!("// Source\n// {sep}\n\n{source}\n// Macro expansion\n// {sep}\n\n{expected}")); + }); + }; + } + + pub(crate) use assert_macro_expansion; +} + +#[cfg(test)] +use test_utils::assert_macro_expansion; diff --git a/editoast/editoast_derive/src/model.rs b/editoast/editoast_derive/src/model.rs index e21f2facf9a..3c579427d99 100644 --- a/editoast/editoast_derive/src/model.rs +++ b/editoast/editoast_derive/src/model.rs @@ -87,21 +87,27 @@ pub fn model(input: &DeriveInput) -> Result { } #[cfg(test)] -#[test] -fn test_construction() { - let input = syn::parse_quote! { - #[derive(Clone, Model)] - #[model(table = editoast_models::tables::osrd_infra_document)] - #[model(row(type_name = "DocumentRow", derive(Debug)))] - #[model(changeset(type_name = "DocumentChangeset", public, derive(Debug)))] // fields are public - #[model(gen(ops = crud, batch_ops = crud, list))] - struct Document { - #[model(column = "id", preferred, primary)] - id_: i64, - #[model(identifier, json)] - content_type: String, - data: Vec, - } - }; - let _ = model(&input).expect("should generate"); +mod tests { + use super::*; + + #[test] + fn test_construction() { + crate::assert_macro_expansion!( + model, + syn::parse_quote! { + #[derive(Clone, Model)] + #[model(table = editoast_models::tables::osrd_infra_document)] + #[model(row(type_name = "DocumentRow", derive(Debug)))] + #[model(changeset(type_name = "DocumentChangeset", public, derive(Debug)))] // fields are public + #[model(gen(ops = crud, batch_ops = crud, list))] + struct Document { + #[model(column = "id", preferred, primary)] + id_: i64, + #[model(identifier, json)] + content_type: String, + data: Vec, + } + } + ); + } } diff --git a/editoast/editoast_derive/src/model/config.rs b/editoast/editoast_derive/src/model/config.rs index 1cfa0f52719..a62f3ed311c 100644 --- a/editoast/editoast_derive/src/model/config.rs +++ b/editoast/editoast_derive/src/model/config.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashSet, + collections::BTreeSet, ops::{Deref, DerefMut}, }; @@ -19,9 +19,9 @@ pub(crate) struct ModelConfig { pub(crate) fields: Fields, pub(crate) row: GeneratedTypeArgs, pub(crate) changeset: GeneratedTypeArgs, - pub(crate) identifiers: HashSet, // identifiers ⊆ fields - pub(crate) preferred_identifier: Identifier, // preferred_identifier ∈ identifiers - pub(crate) primary_identifier: Identifier, // primary_identifier ∈ identifiers + pub(crate) identifiers: BTreeSet, // identifiers ⊆ fields + pub(crate) preferred_identifier: Identifier, // preferred_identifier ∈ identifiers + pub(crate) primary_identifier: Identifier, // primary_identifier ∈ identifiers pub(crate) impl_plan: ImplPlan, } diff --git a/editoast/editoast_derive/src/model/identifier.rs b/editoast/editoast_derive/src/model/identifier.rs index ea145f84ffe..49334c2a69b 100644 --- a/editoast/editoast_derive/src/model/identifier.rs +++ b/editoast/editoast_derive/src/model/identifier.rs @@ -85,6 +85,30 @@ impl darling::FromMeta for RawIdentifier { } } +impl PartialOrd for RawIdentifier { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for RawIdentifier { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_idents().cmp(&other.get_idents()) + } +} + +impl PartialOrd for Identifier { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.raw.cmp(&other.raw)) + } +} + +impl Ord for Identifier { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.raw.cmp(&other.raw) + } +} + fn extract_ident_of_path(path: &syn::Path) -> darling::Result { let mut segments = path.segments.iter(); let first = segments.next().unwrap(); diff --git a/editoast/editoast_derive/src/search.rs b/editoast/editoast_derive/src/search.rs index bc3d8a89b58..8f11d727874 100644 --- a/editoast/editoast_derive/src/search.rs +++ b/editoast/editoast_derive/src/search.rs @@ -363,77 +363,48 @@ pub fn expand_store(input: &DeriveInput) -> Result { #[cfg(test)] mod tests { - use darling::FromDeriveInput; - use pretty_assertions::assert_eq; + use super::*; - use crate::search::SearchParams; - - use super::SearchField; - - fn track() -> syn::DeriveInput { - syn::parse_quote! { - #[search( - table = "search_track", - column(name = "infra_id", data_type = "INT"), - column(name = "line_code", data_type = "INT"), - column(name = "line_name", data_type = "TEXT") - )] - pub struct Track { - #[search(sql = "search_track.infra_id")] - infra_id: i64, - #[search(sql = "search_track.unprocessed_line_name")] - line_name: std::string::String, - #[search(sql = "search_track.line_code", rename = "code")] - line_code: i64, + #[test] + fn test_search_construction() { + crate::assert_macro_expansion!( + expand_search, + syn::parse_quote! { + #[search( + table = "search_track", + column(name = "infra_id", data_type = "INT"), + column(name = "line_code", data_type = "INT"), + column(name = "line_name", data_type = "TEXT") + )] + pub struct Track { + #[search(sql = "search_track.infra_id")] + infra_id: i64, + #[search(sql = "search_track.unprocessed_line_name")] + line_name: std::string::String, + #[search(sql = "search_track.line_code", rename = "code")] + line_code: i64, + } } - } - } - - impl SearchField { - fn to_tuple(&self) -> (String, String, String) { - ( - self.ident_string().unwrap(), - self.type_string(), - self.sql.to_owned(), - ) - } + ); } #[test] - fn test_construction() { - let params = SearchParams::from_derive_input(&track()).unwrap(); - assert_eq!(¶ms.table, "search_track"); - assert!(¶ms.joins.is_empty()); - assert_eq!( - params.columns.iter().map(|c| &c.name).collect::>(), - vec!["infra_id", "line_code", "line_name"] - ); - assert_eq!( - params - .columns - .iter() - .map(|c| &c.data_type) - .collect::>(), - vec!["INT", "INT", "TEXT"] - ); - let tuples = params - .data - .take_struct() - .unwrap() - .fields - .into_iter() - .map(|p| p.to_tuple()) - .collect::>(); - assert_eq!( - tuples - .iter() - .map(|t| (t.0.as_ref(), t.1.as_ref(), t.2.as_ref())) - .collect::>(), - vec![ - ("infra_id", "i64", "search_track.infra_id"), - ("line_name", "String", "search_track.unprocessed_line_name"), - ("code", "i64", "search_track.line_code") - ] + fn test_store_construction() { + crate::assert_macro_expansion!( + expand_store, + syn::parse_quote! { + #[derive(SearchConfigStore)] + #[search_config_store( + object(name = "track", config = SearchResultItemTrack), + object(name = "operationalpoint", config = SearchResultItemOperationalPoint), + object(name = "signal", config = SearchResultItemSignal), + object(name = "project", config = SearchResultItemProject), + object(name = "study", config = SearchResultItemStudy), + object(name = "scenario", config = SearchResultItemScenario), + object(name = "trainschedule", config = SearchResultItemTrainSchedule), + )] + pub struct SearchStore; + } ); } } diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__error__tests__construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__error__tests__construction.snap new file mode 100644 index 00000000000..0331449cfbc --- /dev/null +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__error__tests__construction.snap @@ -0,0 +1,71 @@ +--- +source: editoast_derive/src/error.rs +--- +// Source +// ----------------------------------------------------------------------------- + +#[derive(EditoastError)] +#[editoast_error(base_id = "infra", default_status = 500)] +pub enum InfraApiError { + #[editoast_error(status = 404, no_context)] + NotFound { infra_id: i64 }, + #[editoast_error(status = 400)] + BadRequest { message: String }, + InternalError, +} + +// Macro expansion +// ----------------------------------------------------------------------------- + +inventory::submit! { + crate ::error::ErrorDefinition::new("editoast:infra:NotFound", "NotFound", + "InfraApiError", 404u16, "{\"infra_id\":\"i64\"}") +} +inventory::submit! { + crate ::error::ErrorDefinition::new("editoast:infra:BadRequest", "BadRequest", + "InfraApiError", 400u16, "{\"message\":\"String\"}") +} +inventory::submit! { + crate ::error::ErrorDefinition::new("editoast:infra:InternalError", "InternalError", + "InfraApiError", 500u16, "{}") +} +impl crate::error::EditoastError for InfraApiError { + fn get_status(&self) -> axum::http::StatusCode { + match self { + #[allow(unused)] + Self::NotFound { infra_id } => { + axum::http::StatusCode::try_from(404) + .expect("EditoastError: invalid status expression") + } + #[allow(unused)] + Self::BadRequest { message } => { + axum::http::StatusCode::try_from(400) + .expect("EditoastError: invalid status expression") + } + #[allow(unused)] + Self::InternalError => axum::http::StatusCode::from_u16(500u16).unwrap(), + } + } + fn get_type(&self) -> &'static str { + match self { + Self::NotFound { .. } => "editoast:infra:NotFound", + Self::BadRequest { .. } => "editoast:infra:BadRequest", + Self::InternalError { .. } => "editoast:infra:InternalError", + } + } + fn context(&self) -> std::collections::HashMap { + match self { + Self::NotFound { .. } => Default::default(), + Self::BadRequest { message } => { + [ + ( + stringify!(message).to_string(), + serde_json::to_value(message).unwrap(), + ), + ] + .into() + } + Self::InternalError { .. } => Default::default(), + } + } +} diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap new file mode 100644 index 00000000000..abd2b44276d --- /dev/null +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap @@ -0,0 +1,1029 @@ +--- +source: editoast_derive/src/model.rs +--- +// Source +// ----------------------------------------------------------------------------- + +#[derive(Clone, Model)] +#[model(table = editoast_models::tables::osrd_infra_document)] +#[model(row(type_name = "DocumentRow", derive(Debug)))] +#[model(changeset(type_name = "DocumentChangeset", public, derive(Debug)))] +#[model(gen(ops = crud, batch_ops = crud, list))] +struct Document { + #[model(column = "id", preferred, primary)] + id_: i64, + #[model(identifier, json)] + content_type: String, + data: Vec, +} + +// Macro expansion +// ----------------------------------------------------------------------------- + +#[automatically_derived] +impl crate::models::Model for Document { + type Row = DocumentRow; + type Changeset = DocumentChangeset; + type Table = editoast_models::tables::osrd_infra_document::table; +} +paste::paste! { + impl Document { pub const [< id_ : snake : upper >] : crate ::models::ModelField < + Document, i64, id > = crate ::models::ModelField::new(); pub const [< content_type : + snake : upper >] : crate ::models::ModelField < Document, String, + editoast_models::tables::osrd_infra_document::content_type > = crate + ::models::ModelField::new(); pub const [< data : snake : upper >] : crate + ::models::ModelField < Document, Vec < u8 >, + editoast_models::tables::osrd_infra_document::data > = crate + ::models::ModelField::new(); } +} +impl crate::models::prelude::ModelField { + pub fn eq(&self, value: i64) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + crate::models::prelude::FilterSetting::new(id.eq(value)) + } + pub fn eq_any( + &self, + values: Vec, + ) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + crate::models::prelude::FilterSetting::new(id.eq_any(values)) + } + pub fn asc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting(Box::new(id.asc())) + } + pub fn desc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting(Box::new(id.desc())) + } +} +impl crate::models::prelude::ModelField< + Document, + String, + editoast_models::tables::osrd_infra_document::content_type, +> { + pub fn eq(&self, value: String) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + let value: diesel_json::Json = diesel_json::Json(value); + crate::models::prelude::FilterSetting::new( + editoast_models::tables::osrd_infra_document::content_type.eq(value), + ) + } + pub fn eq_any( + &self, + values: Vec, + ) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + let values: Vec> = values + .into_iter() + .map(|value| diesel_json::Json(value)) + .collect(); + crate::models::prelude::FilterSetting::new( + editoast_models::tables::osrd_infra_document::content_type.eq_any(values), + ) + } + pub fn asc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting( + Box::new(editoast_models::tables::osrd_infra_document::content_type.asc()), + ) + } + pub fn desc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting( + Box::new(editoast_models::tables::osrd_infra_document::content_type.desc()), + ) + } +} +impl crate::models::prelude::ModelField< + Document, + Vec, + editoast_models::tables::osrd_infra_document::data, +> { + pub fn eq(&self, value: Vec) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + crate::models::prelude::FilterSetting::new( + editoast_models::tables::osrd_infra_document::data.eq(value), + ) + } + pub fn eq_any( + &self, + values: Vec>, + ) -> crate::models::prelude::FilterSetting { + use diesel::ExpressionMethods; + crate::models::prelude::FilterSetting::new( + editoast_models::tables::osrd_infra_document::data.eq_any(values), + ) + } + pub fn asc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting( + Box::new(editoast_models::tables::osrd_infra_document::data.asc()), + ) + } + pub fn desc(&self) -> crate::models::prelude::SortSetting { + use diesel::ExpressionMethods; + crate::models::prelude::SortSetting( + Box::new(editoast_models::tables::osrd_infra_document::data.desc()), + ) + } +} +#[derive(Queryable, Debug)] +#[diesel(table_name = editoast_models::tables::osrd_infra_document)] +struct DocumentRow { + #[diesel(column_name = "id")] + id_: i64, + #[diesel(column_name = "content_type")] + content_type: diesel_json::Json, + #[diesel(column_name = "data")] + data: Vec, +} +#[derive(Debug, Default, Queryable, AsChangeset, Insertable, Debug)] +#[diesel(table_name = editoast_models::tables::osrd_infra_document)] +struct DocumentChangeset { + #[diesel(deserialize_as = diesel_json::Json, column_name = "content_type")] + pub content_type: Option>, + #[diesel(deserialize_as = Vec, column_name = "data")] + pub data: Option>, +} +#[automatically_derived] +impl crate::models::Identifiable<(String)> for Document { + fn get_id(&self) -> (String) { + (self.content_type.clone()) + } +} +#[automatically_derived] +impl crate::models::Identifiable<(i64)> for Document { + fn get_id(&self) -> (i64) { + (self.id_.clone()) + } +} +#[automatically_derived] +impl crate::models::PreferredId<(i64)> for Document {} +#[automatically_derived] +impl From for Document { + fn from(row: DocumentRow) -> Self { + Self { + id_: row.id_, + content_type: row.content_type.0, + data: row.data, + } + } +} +#[automatically_derived] +impl From for DocumentChangeset { + fn from(model: Document) -> Self { + Self { + content_type: Some(diesel_json::Json(model.content_type)), + data: Some(model.data), + } + } +} +#[automatically_derived] +impl DocumentChangeset { + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn content_type(mut self, content_type: String) -> Self { + self.content_type = Some(diesel_json::Json(content_type)); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn data(mut self, data: Vec) -> Self { + self.data = Some(data); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn flat_content_type(mut self, content_type: Option) -> Self { + self.content_type = content_type + .map(|content_type| diesel_json::Json(content_type)); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn flat_data(mut self, data: Option>) -> Self { + self.data = data.map(|data| data); + self + } +} +#[automatically_derived] +impl<'a> crate::models::Patch<'a, Document> { + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn content_type(mut self, content_type: String) -> Self { + self.changeset.content_type = Some(diesel_json::Json(content_type)); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn data(mut self, data: Vec) -> Self { + self.changeset.data = Some(data); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn flat_content_type(mut self, content_type: Option) -> Self { + self.changeset.content_type = content_type + .map(|content_type| diesel_json::Json(content_type)); + self + } + #[allow(unused)] + #[must_use = "builder methods are intended to be chained"] + pub fn flat_data(mut self, data: Option>) -> Self { + self.changeset.data = data.map(|data| data); + self + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Exists<(String)> for Document { + #[tracing::instrument( + name = "model:exists", + skip_all, + ret, + err, + fields(query_id) + )] + async fn exists( + conn: &mut editoast_models::DbConnection, + content_type: (String), + ) -> crate::error::Result { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current() + .record("query_id", tracing::field::debug(&content_type)); + diesel::select( + diesel::dsl::exists( + dsl::osrd_infra_document.filter(dsl::content_type.eq(content_type)), + ), + ) + .get_result(conn.write().await.deref_mut()) + .await + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Exists<(i64)> for Document { + #[tracing::instrument( + name = "model:exists", + skip_all, + ret, + err, + fields(query_id) + )] + async fn exists( + conn: &mut editoast_models::DbConnection, + id_: (i64), + ) -> crate::error::Result { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current().record("query_id", tracing::field::debug(&id_)); + diesel::select( + diesel::dsl::exists(dsl::osrd_infra_document.filter(dsl::id.eq(id_))), + ) + .get_result(conn.write().await.deref_mut()) + .await + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Retrieve<(String)> for Document { + #[tracing::instrument( + name = "model:retrieve", + skip_all, + err, + fields(query_id) + )] + async fn retrieve( + conn: &mut editoast_models::DbConnection, + content_type: (String), + ) -> crate::error::Result> { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + tracing::Span::current() + .record("query_id", tracing::field::debug(&content_type)); + dsl::osrd_infra_document + .filter(dsl::content_type.eq(content_type)) + .select((dsl::id, dsl::content_type, dsl::data)) + .first::(conn.write().await.deref_mut()) + .await + .map(Into::into) + .optional() + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Retrieve<(i64)> for Document { + #[tracing::instrument( + name = "model:retrieve", + skip_all, + err, + fields(query_id) + )] + async fn retrieve( + conn: &mut editoast_models::DbConnection, + id_: (i64), + ) -> crate::error::Result> { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + tracing::Span::current().record("query_id", tracing::field::debug(&id_)); + dsl::osrd_infra_document + .filter(dsl::id.eq(id_)) + .select((dsl::id, dsl::content_type, dsl::data)) + .first::(conn.write().await.deref_mut()) + .await + .map(Into::into) + .optional() + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Update<(String), Document> for DocumentChangeset { + #[tracing::instrument( + name = "model:update", + skip_all, + err, + fields(query_id) + )] + async fn update( + self, + conn: &mut editoast_models::DbConnection, + content_type: (String), + ) -> crate::error::Result> { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current() + .record("query_id", tracing::field::debug(&content_type)); + diesel::update( + dsl::osrd_infra_document.filter(dsl::content_type.eq(content_type)), + ) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .get_result::(conn.write().await.deref_mut()) + .await + .map(Into::into) + .optional() + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Update<(i64), Document> for DocumentChangeset { + #[tracing::instrument( + name = "model:update", + skip_all, + err, + fields(query_id) + )] + async fn update( + self, + conn: &mut editoast_models::DbConnection, + id_: (i64), + ) -> crate::error::Result> { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current().record("query_id", tracing::field::debug(&id_)); + diesel::update(dsl::osrd_infra_document.filter(dsl::id.eq(id_))) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .get_result::(conn.write().await.deref_mut()) + .await + .map(Into::into) + .optional() + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::DeleteStatic<(String)> for Document { + #[tracing::instrument( + name = "model:delete_static", + skip_all, + ret, + err, + fields(query_id) + )] + async fn delete_static( + conn: &mut editoast_models::DbConnection, + content_type: (String), + ) -> crate::error::Result { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current() + .record("query_id", tracing::field::debug(&content_type)); + diesel::delete( + dsl::osrd_infra_document.filter(dsl::content_type.eq(content_type)), + ) + .execute(conn.write().await.deref_mut()) + .await + .map(|n| n == 1) + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::DeleteStatic<(i64)> for Document { + #[tracing::instrument( + name = "model:delete_static", + skip_all, + ret, + err, + fields(query_id) + )] + async fn delete_static( + conn: &mut editoast_models::DbConnection, + id_: (i64), + ) -> crate::error::Result { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + tracing::Span::current().record("query_id", tracing::field::debug(&id_)); + diesel::delete(dsl::osrd_infra_document.filter(dsl::id.eq(id_))) + .execute(conn.write().await.deref_mut()) + .await + .map(|n| n == 1) + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Create for DocumentChangeset { + #[tracing::instrument(name = "model:create", skip_all, err)] + async fn create( + self, + conn: &mut editoast_models::DbConnection, + ) -> crate::error::Result { + use diesel_async::RunQueryDsl; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + diesel::insert_into(editoast_models::tables::osrd_infra_document::table) + .values(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .get_result::(conn.write().await.deref_mut()) + .await + .map(Into::into) + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::Delete for Document { + #[tracing::instrument( + name = "model:delete", + skip_all, + ret, + err, + fields(query_id = ?self.id_) + )] + async fn delete( + &self, + conn: &mut editoast_models::DbConnection, + ) -> crate::error::Result { + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + let id = self.id_; + diesel::delete(editoast_models::tables::osrd_infra_document::table.find(id)) + .execute(conn.write().await.deref_mut()) + .await + .map(|n| n == 1) + .map_err(Into::into) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::prelude::List for Document { + #[tracing::instrument( + name = "model:list", + skip_all, + err, + fields( + nb_filters = settings.filters.len(), + nb_sorts = settings.sorts.len(), + paginate_counting = settings.paginate_counting, + limit, + offset, + ) + )] + async fn list( + conn: &'async_trait mut editoast_models::DbConnection, + settings: crate::models::prelude::SelectionSettings, + ) -> crate::error::Result> { + use diesel::QueryDsl; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + let mut query = editoast_models::tables::osrd_infra_document::table.into_boxed(); + for filter_fun in settings.filters { + let crate::models::prelude::FilterSetting(filter) = (*filter_fun)(); + query = query.filter(filter); + } + for sort_fun in settings.sorts { + let crate::models::prelude::SortSetting(sort) = (*sort_fun)(); + query = query.order_by(sort); + } + if let Some(limit) = settings.limit { + tracing::Span::current().record("limit", limit); + query = query.limit(limit); + } + if let Some(offset) = settings.offset { + tracing::Span::current().record("offset", offset); + query = query.offset(offset); + } + let results: Vec = query + .select((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await? + .map_ok(::from_row) + .try_collect() + .await?; + Ok(results) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::prelude::Count for Document { + #[tracing::instrument( + name = "model:count", + skip_all, + ret, + err, + fields( + nb_filters = settings.filters.len(), + paginate_counting = settings.paginate_counting, + limit, + offset, + ) + )] + async fn count( + conn: &'async_trait mut editoast_models::DbConnection, + settings: crate::models::prelude::SelectionSettings, + ) -> crate::error::Result { + use diesel::QueryDsl; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let mut query = editoast_models::tables::osrd_infra_document::table + .select(diesel::dsl::count_star()) + .into_boxed(); + for filter_fun in settings.filters { + let crate::models::prelude::FilterSetting(filter) = (*filter_fun)(); + query = query.filter(filter); + } + if settings.paginate_counting { + if let Some(limit) = settings.limit { + tracing::Span::current().record("limit", limit); + query = query.limit(limit); + } + if let Some(offset) = settings.offset { + tracing::Span::current().record("offset", offset); + query = query.offset(offset); + } + } + Ok(query.get_result::(conn.write().await.deref_mut()).await? as u64) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::CreateBatch for Document { + #[tracing::instrument(name = "model:create_batch", skip_all, err)] + async fn create_batch< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, values: I) -> crate::error::Result { + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + let values = values.into_iter().collect::>(); + Ok( + crate::chunked_for_libpq! { + 2usize, 2048usize, values, C::default(), chunk => { + diesel::insert_into(dsl::osrd_infra_document).values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < + DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | s + .map_ok(< Document as Model > ::from_row).try_collect:: < Vec < _ >> ()) + ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::CreateBatchWithKey for Document { + #[tracing::instrument(name = "model:create_batch_with_key", skip_all, err)] + async fn create_batch_with_key< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((String), Self)> + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, values: I) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + let values = values.into_iter().collect::>(); + Ok( + crate::chunked_for_libpq! { + 2usize, 2048usize, values, C::default(), chunk => { + diesel::insert_into(dsl::osrd_infra_document).values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < + DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | { s + .map_ok(| row | { let model = < Document as Model > ::from_row(row); + (model.get_id(), model) }).try_collect:: < Vec < _ >> () }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::CreateBatchWithKey for Document { + #[tracing::instrument(name = "model:create_batch_with_key", skip_all, err)] + async fn create_batch_with_key< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((i64), Self)> + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, values: I) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use std::ops::DerefMut; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + let values = values.into_iter().collect::>(); + Ok( + crate::chunked_for_libpq! { + 2usize, 2048usize, values, C::default(), chunk => { + diesel::insert_into(dsl::osrd_infra_document).values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < + DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | { s + .map_ok(| row | { let model = < Document as Model > ::from_row(row); + (model.get_id(), model) }).try_collect:: < Vec < _ >> () }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::RetrieveBatchUnchecked<(String)> for Document { + #[tracing::instrument( + name = "model:retrieve_batch_unchecked", + skip_all, + err, + fields(query_id) + )] + async fn retrieve_batch_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.into_boxed(); for content_type in chunk + .into_iter() { query = query.or_filter(dsl::content_type + .eq(content_type)); } query.select((dsl::id, dsl::content_type, + dsl::data,)).load_stream:: < DocumentRow > (conn.write(). await + .deref_mut()). await .map(| s | s.map_ok(< Document as Model > + ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } + }, + ) + } + #[tracing::instrument( + name = "model:retrieve_batch_with_key_unchecked", + skip_all, + err, + fields(query_id) + )] + async fn retrieve_batch_with_key_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((String), Document)> + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.into_boxed(); for content_type in chunk + .into_iter() { query = query.or_filter(dsl::content_type + .eq(content_type)); } query.select((dsl::id, dsl::content_type, + dsl::data,)).load_stream:: < DocumentRow > (conn.write(). await + .deref_mut()). await .map(| s | { s.map_ok(| row | { let model = < + Document as Model > ::from_row(row); (model.get_id(), model) }) + .try_collect:: < Vec < _ >> () }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::RetrieveBatchUnchecked<(i64)> for Document { + #[tracing::instrument( + name = "model:retrieve_batch_unchecked", + skip_all, + err, + fields(query_id) + )] + async fn retrieve_batch_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.into_boxed(); for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); } query.select((dsl::id, + dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn + .write(). await .deref_mut()). await .map(| s | s.map_ok(< Document as + Model > ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } + }, + ) + } + #[tracing::instrument( + name = "model:retrieve_batch_with_key_unchecked", + skip_all, + err, + fields(query_id) + )] + async fn retrieve_batch_with_key_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((i64), Document)> + Send + std::fmt::Debug, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.into_boxed(); for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); } query.select((dsl::id, + dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn + .write(). await .deref_mut()). await .map(| s | { s.map_ok(| row | { let + model = < Document as Model > ::from_row(row); (model.get_id(), model) }) + .try_collect:: < Vec < _ >> () }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::UpdateBatchUnchecked for DocumentChangeset { + #[tracing::instrument( + name = "model:update_batch_unchecked", + skip_all, + err, + fields(query_ids) + )] + async fn update_batch_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend + Send + std::fmt::Debug, + >( + self, + conn: &mut editoast_models::DbConnection, + ids: I, + ) -> crate::error::Result { + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.select(dsl::id).into_boxed(); for content_type + in chunk.into_iter() { query = query.or_filter(dsl::content_type + .eq(content_type)); } diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)).set(& self).returning((dsl::id, + dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn + .write(). await .deref_mut()). await .map(| s | s.map_ok(< Document as + Model > ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } + }, + ) + } + #[tracing::instrument( + name = "model:update_batch_unchecked", + skip_all, + err, + fields(query_ids) + )] + async fn update_batch_with_key_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((String), Document)> + Send, + >( + self, + conn: &mut editoast_models::DbConnection, + ids: I, + ) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.select(dsl::id).into_boxed(); for content_type + in chunk.into_iter() { query = query.or_filter(dsl::content_type + .eq(content_type)); } diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)).set(& self).returning((dsl::id, + dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn + .write(). await .deref_mut()). await .map(| s | { s.map_ok(| row | { let + model = < Document as Model > ::from_row(row); (model.get_id(), model) }) + .try_collect:: < Vec < _ >> () }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::UpdateBatchUnchecked for DocumentChangeset { + #[tracing::instrument( + name = "model:update_batch_unchecked", + skip_all, + err, + fields(query_ids) + )] + async fn update_batch_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend + Send + std::fmt::Debug, + >( + self, + conn: &mut editoast_models::DbConnection, + ids: I, + ) -> crate::error::Result { + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.select(dsl::id).into_boxed(); for id_ in chunk + .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } + diesel::update(dsl::osrd_infra_document).filter(dsl::id.eq_any(query)) + .set(& self).returning((dsl::id, dsl::content_type, dsl::data,)) + .load_stream:: < DocumentRow > (conn.write(). await .deref_mut()). await + .map(| s | s.map_ok(< Document as Model > ::from_row).try_collect:: < Vec + < _ >> ()) ? . await ? } + }, + ) + } + #[tracing::instrument( + name = "model:update_batch_unchecked", + skip_all, + err, + fields(query_ids) + )] + async fn update_batch_with_key_unchecked< + I: std::iter::IntoIterator + Send + 'async_trait, + C: Default + std::iter::Extend<((i64), Document)> + Send, + >( + self, + conn: &mut editoast_models::DbConnection, + ids: I, + ) -> crate::error::Result { + use crate::models::Identifiable; + use crate::models::Model; + use editoast_models::tables::osrd_infra_document::dsl; + use std::ops::DerefMut; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use futures_util::stream::TryStreamExt; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + Ok( + crate::chunked_for_libpq! { + 1usize, 2048usize, ids, C::default(), chunk => { let mut query = + dsl::osrd_infra_document.select(dsl::id).into_boxed(); for id_ in chunk + .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } + diesel::update(dsl::osrd_infra_document).filter(dsl::id.eq_any(query)) + .set(& self).returning((dsl::id, dsl::content_type, dsl::data,)) + .load_stream:: < DocumentRow > (conn.write(). await .deref_mut()). await + .map(| s | { s.map_ok(| row | { let model = < Document as Model > + ::from_row(row); (model.get_id(), model) }).try_collect:: < Vec < _ >> () + }) ? . await ? } + }, + ) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::DeleteBatch<(String)> for Document { + #[tracing::instrument( + name = "model:delete_batch", + skip_all, + ret, + err, + fields(query_ids) + )] + async fn delete_batch< + I: std::iter::IntoIterator + Send + 'async_trait, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + let counts = crate::chunked_for_libpq! { + 1usize, 2048usize, ids, chunk => { let mut query = + diesel::delete(dsl::osrd_infra_document).into_boxed(); for content_type in + chunk.into_iter() { query = query.or_filter(dsl::content_type + .eq(content_type)); } query.execute(conn.write(). await .deref_mut()). await + ? } + }; + Ok(counts.into_iter().sum()) + } +} +#[automatically_derived] +#[async_trait::async_trait] +impl crate::models::DeleteBatch<(i64)> for Document { + #[tracing::instrument( + name = "model:delete_batch", + skip_all, + ret, + err, + fields(query_ids) + )] + async fn delete_batch< + I: std::iter::IntoIterator + Send + 'async_trait, + >(conn: &mut editoast_models::DbConnection, ids: I) -> crate::error::Result { + use editoast_models::tables::osrd_infra_document::dsl; + use diesel::prelude::*; + use diesel_async::RunQueryDsl; + use std::ops::DerefMut; + let ids = ids.into_iter().collect::>(); + tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); + let counts = crate::chunked_for_libpq! { + 1usize, 2048usize, ids, chunk => { let mut query = + diesel::delete(dsl::osrd_infra_document).into_boxed(); for id_ in chunk + .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } query.execute(conn + .write(). await .deref_mut()). await ? } + }; + Ok(counts.into_iter().sum()) + } +} diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap new file mode 100644 index 00000000000..f357356e51a --- /dev/null +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap @@ -0,0 +1,53 @@ +--- +source: editoast_derive/src/search.rs +--- +// Source +// ----------------------------------------------------------------------------- + +#[search( + table = "search_track", + column(name = "infra_id", data_type = "INT"), + column(name = "line_code", data_type = "INT"), + column(name = "line_name", data_type = "TEXT") +)] +pub struct Track { + #[search(sql = "search_track.infra_id")] + infra_id: i64, + #[search(sql = "search_track.unprocessed_line_name")] + line_name: std::string::String, + #[search(sql = "search_track.line_code", rename = "code")] + line_code: i64, +} + +// Macro expansion +// ----------------------------------------------------------------------------- + +impl editoast_search::SearchObject for Track { + fn search_config() -> editoast_search::SearchConfig { + editoast_search::SearchConfig { + table: "search_track".to_owned(), + joins: None, + criterias: vec![ + editoast_search::Criteria { name : "infra_id".to_owned(), data_type : + editoast_search::TypeSpec::Type(editoast_search::AstType::Integer), + migration : None, }, editoast_search::Criteria { name : "line_code" + .to_owned(), data_type : + editoast_search::TypeSpec::Type(editoast_search::AstType::Integer), + migration : None, }, editoast_search::Criteria { name : "line_name" + .to_owned(), data_type : + editoast_search::TypeSpec::Type(editoast_search::AstType::String), + migration : None, } + ], + properties: vec![ + editoast_search::Property { name : "infra_id".to_owned(), sql : + "search_track.infra_id".to_owned(), data_type : None, }, + editoast_search::Property { name : "line_name".to_owned(), sql : + "search_track.unprocessed_line_name".to_owned(), data_type : + Some(editoast_search::TypeSpec::Type(editoast_search::AstType::String)), + }, editoast_search::Property { name : "code".to_owned(), sql : + "search_track.line_code".to_owned(), data_type : None, } + ], + migration: None, + } + } +} diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap new file mode 100644 index 00000000000..6f6656b2cea --- /dev/null +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap @@ -0,0 +1,97 @@ +--- +source: editoast_derive/src/search.rs +--- +// Source +// ----------------------------------------------------------------------------- + +#[derive(SearchConfigStore)] +#[search_config_store( + object(name = "track", config = SearchResultItemTrack), + object(name = "operationalpoint", config = SearchResultItemOperationalPoint), + object(name = "signal", config = SearchResultItemSignal), + object(name = "project", config = SearchResultItemProject), + object(name = "study", config = SearchResultItemStudy), + object(name = "scenario", config = SearchResultItemScenario), + object(name = "trainschedule", config = SearchResultItemTrainSchedule), +)] +pub struct SearchStore; + +// Macro expansion +// ----------------------------------------------------------------------------- + +impl editoast_search::SearchConfigStore for SearchStore { + fn find>(object_name: S) -> Option { + match object_name.as_ref() { + "track" => { + Some( + ::search_config(), + ) + } + "operationalpoint" => { + Some( + ::search_config(), + ) + } + "signal" => { + Some( + ::search_config(), + ) + } + "project" => { + Some( + ::search_config(), + ) + } + "study" => { + Some( + ::search_config(), + ) + } + "scenario" => { + Some( + ::search_config(), + ) + } + "trainschedule" => { + Some( + ::search_config(), + ) + } + _ => None, + } + } + fn all() -> Vec<(&'static str, editoast_search::SearchConfig)> { + vec![ + ("track", < SearchResultItemTrack as editoast_search::SearchObject > + ::search_config()), ("operationalpoint", < SearchResultItemOperationalPoint + as editoast_search::SearchObject > ::search_config()), ("signal", < + SearchResultItemSignal as editoast_search::SearchObject > ::search_config()), + ("project", < SearchResultItemProject as editoast_search::SearchObject > + ::search_config()), ("study", < SearchResultItemStudy as + editoast_search::SearchObject > ::search_config()), ("scenario", < + SearchResultItemScenario as editoast_search::SearchObject > + ::search_config()), ("trainschedule", < SearchResultItemTrainSchedule as + editoast_search::SearchObject > ::search_config()) + ] + } +} +#[derive(Serialize, ToSchema)] +#[serde(untagged)] +#[allow(unused, clippy::enum_variant_names)] +/// A search result item that depends on the query's `object` +pub(super) enum SearchResultItem { + SearchResultItemTrack(SearchResultItemTrack), + SearchResultItemOperationalPoint(SearchResultItemOperationalPoint), + SearchResultItemSignal(SearchResultItemSignal), + SearchResultItemProject(SearchResultItemProject), + SearchResultItemStudy(SearchResultItemStudy), + SearchResultItemScenario(SearchResultItemScenario), + SearchResultItemTrainSchedule(SearchResultItemTrainSchedule), +} +impl SearchResultItem { + editoast_common::schemas! { + SearchResultItemTrack, SearchResultItemOperationalPoint, SearchResultItemSignal, + SearchResultItemProject, SearchResultItemStudy, SearchResultItemScenario, + SearchResultItemTrainSchedule, SearchResultItem, + } +} From 38fa425871a61805495933f69094d442661fb899 Mon Sep 17 00:00:00 2001 From: Leo Valais Date: Wed, 4 Dec 2024 11:20:31 +0100 Subject: [PATCH 2/3] editoast: derive: replace vec! by Vec::from in macro expansions Macros confuses `prettyplease` and we want snapshots as clean as possible. Signed-off-by: Leo Valais --- editoast/editoast_derive/src/search.rs | 8 +-- ...e__search__tests__search_construction.snap | 62 +++++++++++++------ ...ve__search__tests__store_construction.snap | 42 +++++++++---- 3 files changed, 76 insertions(+), 36 deletions(-) diff --git a/editoast/editoast_derive/src/search.rs b/editoast/editoast_derive/src/search.rs index 8f11d727874..b00b076ac9f 100644 --- a/editoast/editoast_derive/src/search.rs +++ b/editoast/editoast_derive/src/search.rs @@ -287,8 +287,8 @@ pub fn expand_search(input: &DeriveInput) -> Result { editoast_search::SearchConfig { table: #table.to_owned(), joins: #joins, - criterias: vec![#criterias], - properties: vec![#properties], + criterias: Vec::from([#criterias]), + properties: Vec::from([#properties]), migration: #migration, } } @@ -335,12 +335,12 @@ pub fn expand_store(input: &DeriveInput) -> Result { } fn all() -> Vec<(&'static str, editoast_search::SearchConfig)> { - vec![#( + Vec::from([#( ( #object_name, < #ident as editoast_search::SearchObject > :: search_config() ) - ),*] + ),*]) } } diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap index f357356e51a..effec0c1c44 100644 --- a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__search_construction.snap @@ -27,26 +27,48 @@ impl editoast_search::SearchObject for Track { editoast_search::SearchConfig { table: "search_track".to_owned(), joins: None, - criterias: vec![ - editoast_search::Criteria { name : "infra_id".to_owned(), data_type : - editoast_search::TypeSpec::Type(editoast_search::AstType::Integer), - migration : None, }, editoast_search::Criteria { name : "line_code" - .to_owned(), data_type : - editoast_search::TypeSpec::Type(editoast_search::AstType::Integer), - migration : None, }, editoast_search::Criteria { name : "line_name" - .to_owned(), data_type : - editoast_search::TypeSpec::Type(editoast_search::AstType::String), - migration : None, } - ], - properties: vec![ - editoast_search::Property { name : "infra_id".to_owned(), sql : - "search_track.infra_id".to_owned(), data_type : None, }, - editoast_search::Property { name : "line_name".to_owned(), sql : - "search_track.unprocessed_line_name".to_owned(), data_type : - Some(editoast_search::TypeSpec::Type(editoast_search::AstType::String)), - }, editoast_search::Property { name : "code".to_owned(), sql : - "search_track.line_code".to_owned(), data_type : None, } - ], + criterias: Vec::from([ + editoast_search::Criteria { + name: "infra_id".to_owned(), + data_type: editoast_search::TypeSpec::Type( + editoast_search::AstType::Integer, + ), + migration: None, + }, + editoast_search::Criteria { + name: "line_code".to_owned(), + data_type: editoast_search::TypeSpec::Type( + editoast_search::AstType::Integer, + ), + migration: None, + }, + editoast_search::Criteria { + name: "line_name".to_owned(), + data_type: editoast_search::TypeSpec::Type( + editoast_search::AstType::String, + ), + migration: None, + }, + ]), + properties: Vec::from([ + editoast_search::Property { + name: "infra_id".to_owned(), + sql: "search_track.infra_id".to_owned(), + data_type: None, + }, + editoast_search::Property { + name: "line_name".to_owned(), + sql: "search_track.unprocessed_line_name".to_owned(), + data_type: Some( + editoast_search::TypeSpec::Type(editoast_search::AstType::String), + ), + }, + editoast_search::Property { + name: "code".to_owned(), + sql: "search_track.line_code".to_owned(), + data_type: None, + }, + ]), migration: None, } } diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap index 6f6656b2cea..314b01ab445 100644 --- a/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__search__tests__store_construction.snap @@ -61,18 +61,36 @@ impl editoast_search::SearchConfigStore for SearchStore { } } fn all() -> Vec<(&'static str, editoast_search::SearchConfig)> { - vec![ - ("track", < SearchResultItemTrack as editoast_search::SearchObject > - ::search_config()), ("operationalpoint", < SearchResultItemOperationalPoint - as editoast_search::SearchObject > ::search_config()), ("signal", < - SearchResultItemSignal as editoast_search::SearchObject > ::search_config()), - ("project", < SearchResultItemProject as editoast_search::SearchObject > - ::search_config()), ("study", < SearchResultItemStudy as - editoast_search::SearchObject > ::search_config()), ("scenario", < - SearchResultItemScenario as editoast_search::SearchObject > - ::search_config()), ("trainschedule", < SearchResultItemTrainSchedule as - editoast_search::SearchObject > ::search_config()) - ] + Vec::from([ + ( + "track", + ::search_config(), + ), + ( + "operationalpoint", + ::search_config(), + ), + ( + "signal", + ::search_config(), + ), + ( + "project", + ::search_config(), + ), + ( + "study", + ::search_config(), + ), + ( + "scenario", + ::search_config(), + ), + ( + "trainschedule", + ::search_config(), + ), + ]) } } #[derive(Serialize, ToSchema)] From 8023bd317a842b55f8c411e8bc2ef8766962b98b Mon Sep 17 00:00:00 2001 From: Leo Valais Date: Wed, 4 Dec 2024 14:25:37 +0100 Subject: [PATCH 3/3] editoast: derive: replace chunk_for_libpq! by quote expansion in Model Not only does this reduces the complexity of batch operation expansion, it also allows `prettyplease` to format the code chunk in snapshots. Signed-off-by: Leo Valais --- editoast/editoast_derive/src/model/codegen.rs | 83 +++ .../src/model/codegen/create_batch_impl.rs | 37 +- .../codegen/create_batch_with_key_impl.rs | 49 +- .../src/model/codegen/delete_batch_impl.rs | 33 +- .../src/model/codegen/retrieve_batch_impl.rs | 95 ++-- .../src/model/codegen/update_batch_impl.rs | 106 ++-- ...st_derive__model__tests__construction.snap | 487 +++++++++++++----- editoast/src/models/prelude/mod.rs | 68 --- 8 files changed, 608 insertions(+), 350 deletions(-) diff --git a/editoast/editoast_derive/src/model/codegen.rs b/editoast/editoast_derive/src/model/codegen.rs index a55b2d2104c..d8b6d8950ff 100644 --- a/editoast/editoast_derive/src/model/codegen.rs +++ b/editoast/editoast_derive/src/model/codegen.rs @@ -457,3 +457,86 @@ trait TokensIf: Sized { } impl TokensIf for T {} + +/// Generates an expression that splits a query into chunks to accommodate libpq's maximum number of binded parameters +/// +/// This is a hack around a libpq limitation (cf. ). +/// The rows to process are split into chunks for which at most `2^16 - 1` parameters are sent to libpq. +/// Therefore we need to know how many parameters are sent per row. +/// The result collection can be parametrized. +/// +/// # On concurrency +/// +/// There seem to be a problem with concurrent queries using deadpool, panicking with +/// 'Cannot access shared transaction state'. So this macro do not run each chunk's query concurrently. +/// While AsyncPgConnection supports pipelining, each query will be sent one after the other. +/// (But hey, it's still better than just making one query per row :p) +#[derive(Clone)] +struct LibpqChunkedIteration { + /// The number of binded values per row + parameters_per_row: usize, + /// The maximum number of rows per chunk (actual chunk size may be smaller, but never bigger) + chunk_size_limit: usize, + /// The identifier of the values to iterate over (must implement `IntoIterator`) + values_ident: syn::Ident, + /// How to collect the results + collector: LibpqChunkedIterationCollector, + /// The identifier of the chunk iteration variable + chunk_iteration_ident: syn::Ident, + /// The body of the chunk iteration + chunk_iteration_body: proc_macro2::TokenStream, +} + +/// Describes how to collect the results of a chunked iteration +#[derive(Clone)] +enum LibpqChunkedIterationCollector { + /// All results are pushed into a Vec (item type has to be inferable) + VecPush, + /// Extends an existing collection. It's initialization expression must be provided. + /// + /// The initialized collection must implement `Extend`. + Extend { collection_init: syn::Expr }, +} + +impl LibpqChunkedIteration { + fn with_iteration_body(&self, body: proc_macro2::TokenStream) -> Self { + Self { + chunk_iteration_body: body, + ..self.clone() + } + } +} + +impl ToTokens for LibpqChunkedIteration { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + let Self { + parameters_per_row, + chunk_size_limit, + values_ident, + chunk_iteration_ident, + chunk_iteration_body, + collector, + } = self; + let (init, extend) = match collector { + LibpqChunkedIterationCollector::VecPush => { + (syn::parse_quote! { Vec::new() }, quote::quote! { push }) + } + LibpqChunkedIterationCollector::Extend { collection_init } => { + (collection_init.clone(), quote::quote! { extend }) + } + }; + tokens.extend(quote::quote! { + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + // We need to divide further because of AsyncPgConnection, maybe it is related to connection pipelining + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / #parameters_per_row; + let mut result = #init; + let chunks = #values_ident.chunks(CHUNK_SIZE.min(#chunk_size_limit)); + for #chunk_iteration_ident in chunks { + let chunk_result = { #chunk_iteration_body }; + result.#extend(chunk_result); + } + result + }); + } +} diff --git a/editoast/editoast_derive/src/model/codegen/create_batch_impl.rs b/editoast/editoast_derive/src/model/codegen/create_batch_impl.rs index c4b068ddb64..d1684fb2a56 100644 --- a/editoast/editoast_derive/src/model/codegen/create_batch_impl.rs +++ b/editoast/editoast_derive/src/model/codegen/create_batch_impl.rs @@ -1,6 +1,8 @@ use quote::quote; use quote::ToTokens; +use super::LibpqChunkedIteration; + pub(crate) struct CreateBatchImpl { pub(super) model: syn::Ident, pub(super) table_name: syn::Ident, @@ -26,6 +28,25 @@ impl ToTokens for CreateBatchImpl { } = self; let span_name = format!("model:create_batch<{}>", model); + let create_loop = LibpqChunkedIteration { + parameters_per_row: *field_count, + chunk_size_limit: *chunk_size_limit, + values_ident: syn::parse_quote! { values }, + collector: super::LibpqChunkedIterationCollector::Extend { + collection_init: syn::parse_quote! { C::default() }, + }, + chunk_iteration_ident: syn::parse_quote! { chunk }, + chunk_iteration_body: quote! { + diesel::insert_into(dsl::#table_name) + .values(chunk) + .returning((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? + .await? + }, + }; + tokens.extend(quote! { #[automatically_derived] #[async_trait::async_trait] @@ -45,21 +66,7 @@ impl ToTokens for CreateBatchImpl { use diesel_async::RunQueryDsl; use futures_util::stream::TryStreamExt; let values = values.into_iter().collect::>(); - Ok(crate::chunked_for_libpq! { - #field_count, - #chunk_size_limit, - values, - C::default(), - chunk => { - diesel::insert_into(dsl::#table_name) - .values(chunk) - .returning((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? - .await? - } - }) + Ok({ #create_loop }) } } }); diff --git a/editoast/editoast_derive/src/model/codegen/create_batch_with_key_impl.rs b/editoast/editoast_derive/src/model/codegen/create_batch_with_key_impl.rs index ff9db01d7e0..7d5f8141512 100644 --- a/editoast/editoast_derive/src/model/codegen/create_batch_with_key_impl.rs +++ b/editoast/editoast_derive/src/model/codegen/create_batch_with_key_impl.rs @@ -3,6 +3,8 @@ use quote::ToTokens; use crate::model::identifier::Identifier; +use super::LibpqChunkedIteration; + pub(crate) struct CreateBatchWithKeyImpl { pub(super) model: syn::Ident, pub(super) table_name: syn::Ident, @@ -31,6 +33,31 @@ impl ToTokens for CreateBatchWithKeyImpl { let ty = identifier.get_type(); let span_name = format!("model:create_batch_with_key<{}>", model); + let create_loop = LibpqChunkedIteration { + parameters_per_row: *field_count, + chunk_size_limit: *chunk_size_limit, + values_ident: syn::parse_quote! { values }, + chunk_iteration_ident: syn::parse_quote! { chunk }, + collector: super::LibpqChunkedIterationCollector::Extend { + collection_init: syn::parse_quote! { C::default() }, + }, + chunk_iteration_body: quote! { + diesel::insert_into(dsl::#table_name) + .values(chunk) + .returning((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = <#model as Model>::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }, + }; + tokens.extend(quote! { #[automatically_derived] #[async_trait::async_trait] @@ -51,27 +78,7 @@ impl ToTokens for CreateBatchWithKeyImpl { use diesel_async::RunQueryDsl; use futures_util::stream::TryStreamExt; let values = values.into_iter().collect::>(); - Ok(crate::chunked_for_libpq! { - #field_count, - #chunk_size_limit, - values, - C::default(), - chunk => { - diesel::insert_into(dsl::#table_name) - .values(chunk) - .returning((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| { - s.map_ok(|row| { - let model = <#model as Model>::from_row(row); - (model.get_id(), model) - }) - .try_collect::>() - })? - .await? - } - }) + Ok({ #create_loop }) } } }); diff --git a/editoast/editoast_derive/src/model/codegen/delete_batch_impl.rs b/editoast/editoast_derive/src/model/codegen/delete_batch_impl.rs index d89e8ac255c..83a8355c3c3 100644 --- a/editoast/editoast_derive/src/model/codegen/delete_batch_impl.rs +++ b/editoast/editoast_derive/src/model/codegen/delete_batch_impl.rs @@ -3,6 +3,9 @@ use quote::ToTokens; use crate::model::identifier::Identifier; +use super::LibpqChunkedIteration; +use super::LibpqChunkedIterationCollector; + pub(crate) struct DeleteBatchImpl { pub(super) model: syn::Ident, pub(super) table_name: syn::Ident, @@ -22,10 +25,25 @@ impl ToTokens for DeleteBatchImpl { } = self; let ty = identifier.get_type(); let id_ident = identifier.get_lvalue(); - let params_per_row = identifier.get_idents().len(); + let parameters_per_row = identifier.get_idents().len(); let filters = identifier.get_diesel_eq_and_fold(); let span_name = format!("model:delete_batch<{}>", model); + let delete_loop = LibpqChunkedIteration { + parameters_per_row, + chunk_size_limit: *chunk_size_limit, + values_ident: syn::parse_quote! { ids }, + collector: LibpqChunkedIterationCollector::VecPush, + chunk_iteration_ident: syn::parse_quote! { chunk }, + chunk_iteration_body: quote! { + let mut query = diesel::delete(dsl::#table_name).into_boxed(); + for #id_ident in chunk.into_iter() { + query = query.or_filter(#filters); + } + query.execute(conn.write().await.deref_mut()).await? + }, + }; + tokens.extend(quote! { #[automatically_derived] #[async_trait::async_trait] @@ -41,18 +59,7 @@ impl ToTokens for DeleteBatchImpl { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - let counts = crate::chunked_for_libpq! { - #params_per_row, - #chunk_size_limit, - ids, - chunk => { - let mut query = diesel::delete(dsl::#table_name).into_boxed(); - for #id_ident in chunk.into_iter() { - query = query.or_filter(#filters); - } - query.execute(conn.write().await.deref_mut()).await? - } - }; + let counts = { #delete_loop }; Ok(counts.into_iter().sum()) } } diff --git a/editoast/editoast_derive/src/model/codegen/retrieve_batch_impl.rs b/editoast/editoast_derive/src/model/codegen/retrieve_batch_impl.rs index c2563379e49..1b562189a97 100644 --- a/editoast/editoast_derive/src/model/codegen/retrieve_batch_impl.rs +++ b/editoast/editoast_derive/src/model/codegen/retrieve_batch_impl.rs @@ -3,6 +3,8 @@ use quote::ToTokens; use crate::model::identifier::Identifier; +use super::LibpqChunkedIteration; + pub(crate) struct RetrieveBatchImpl { pub(super) model: syn::Ident, pub(super) table_name: syn::Ident, @@ -26,11 +28,55 @@ impl ToTokens for RetrieveBatchImpl { } = self; let ty = identifier.get_type(); let id_ident = identifier.get_lvalue(); - let params_per_row = identifier.get_idents().len(); + let parameters_per_row = identifier.get_idents().len(); let filters = identifier.get_diesel_eq_and_fold(); let span_name = format!("model:retrieve_batch_unchecked<{}>", model); let span_name_with_key = format!("model:retrieve_batch_with_key_unchecked<{}>", model); + let retrieve_loop = LibpqChunkedIteration { + parameters_per_row, + chunk_size_limit: *chunk_size_limit, + values_ident: syn::parse_quote! { ids }, + chunk_iteration_ident: syn::parse_quote! { chunk }, + collector: super::LibpqChunkedIterationCollector::Extend { + collection_init: syn::parse_quote! { C::default() }, + }, + chunk_iteration_body: quote! { + // Diesel doesn't allow `(col1, col2).eq_any(iterator<(&T, &U)>)` because it imposes restrictions + // on tuple usage. Doing it this way is the suggested workaround (https://github.com/diesel-rs/diesel/issues/3222#issuecomment-1177433434). + // eq_any reallocates its argument anyway so the additional cost with this method are the boxing and the diesel wrappers. + let mut query = dsl::#table_name.into_boxed(); + for #id_ident in chunk.into_iter() { + query = query.or_filter(#filters); + } + query + .select((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? + .await? + }, + }; + + let retrieve_with_key_loop = retrieve_loop.with_iteration_body(quote! { + let mut query = dsl::#table_name.into_boxed(); + for #id_ident in chunk.into_iter() { + query = query.or_filter(#filters); + } + query + .select((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = <#model as Model>::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }); + tokens.extend(quote! { #[automatically_derived] #[async_trait::async_trait] @@ -51,27 +97,7 @@ impl ToTokens for RetrieveBatchImpl { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok(crate::chunked_for_libpq! { - #params_per_row, - #chunk_size_limit, - ids, - C::default(), - chunk => { - // Diesel doesn't allow `(col1, col2).eq_any(iterator<(&T, &U)>)` because it imposes restrictions - // on tuple usage. Doing it this way is the suggested workaround (https://github.com/diesel-rs/diesel/issues/3222#issuecomment-1177433434). - // eq_any reallocates its argument anyway so the additional cost with this method are the boxing and the diesel wrappers. - let mut query = dsl::#table_name.into_boxed(); - for #id_ident in chunk.into_iter() { - query = query.or_filter(#filters); - } - query - .select((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? - .await? - } - }) + Ok({ #retrieve_loop }) } #[tracing::instrument(name = #span_name_with_key, skip_all, err, fields(query_id))] @@ -91,30 +117,7 @@ impl ToTokens for RetrieveBatchImpl { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok(crate::chunked_for_libpq! { - #params_per_row, - #chunk_size_limit, - ids, - C::default(), - chunk => { - let mut query = dsl::#table_name.into_boxed(); - for #id_ident in chunk.into_iter() { - query = query.or_filter(#filters); - } - query - .select((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| { - s.map_ok(|row| { - let model = <#model as Model>::from_row(row); - (model.get_id(), model) - }) - .try_collect::>() - })? - .await? - } - }) + Ok({ #retrieve_with_key_loop }) } } }); diff --git a/editoast/editoast_derive/src/model/codegen/update_batch_impl.rs b/editoast/editoast_derive/src/model/codegen/update_batch_impl.rs index 3cee8c74af9..1f0aec6b56b 100644 --- a/editoast/editoast_derive/src/model/codegen/update_batch_impl.rs +++ b/editoast/editoast_derive/src/model/codegen/update_batch_impl.rs @@ -3,6 +3,9 @@ use quote::ToTokens; use crate::model::identifier::Identifier; +use super::LibpqChunkedIteration; +use super::LibpqChunkedIterationCollector; + pub(crate) struct UpdateBatchImpl { pub(super) model: syn::Ident, pub(super) table_name: syn::Ident, @@ -30,11 +33,59 @@ impl ToTokens for UpdateBatchImpl { } = self; let ty = identifier.get_type(); let id_ident = identifier.get_lvalue(); - let params_per_row = identifier.get_idents().len(); + let parameters_per_row = identifier.get_idents().len(); let filters = identifier.get_diesel_eq_and_fold(); let span_name = format!("model:update_batch_unchecked<{}>", model); let span_name_with_key = format!("model:update_batch_unchecked<{}>", model); + let update_loop = LibpqChunkedIteration { + // FIXME: that count is correct for each row, but the maximum buffer size + // should be libpq's max MINUS the size of the changeset + parameters_per_row, + chunk_size_limit: *chunk_size_limit, + values_ident: syn::parse_quote! { ids }, + chunk_iteration_ident: syn::parse_quote! { chunk }, + collector: LibpqChunkedIterationCollector::Extend { + collection_init: syn::parse_quote! { C::default() }, + }, + chunk_iteration_body: quote! { + // We have to do it this way because we can't .or_filter() on a boxed update statement + let mut query = dsl::#table_name.select(dsl::#primary_key_column).into_boxed(); + for #id_ident in chunk.into_iter() { + query = query.or_filter(#filters); + } + diesel::update(dsl::#table_name) + .filter(dsl::#primary_key_column.eq_any(query)) + .set(&self) + .returning((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? + .await? + }, + }; + + let update_with_key_loop = update_loop.with_iteration_body(quote! { + let mut query = dsl::#table_name.select(dsl::#primary_key_column).into_boxed(); + for #id_ident in chunk.into_iter() { + query = query.or_filter(#filters); + } + diesel::update(dsl::#table_name) + .filter(dsl::#primary_key_column.eq_any(query)) + .set(&self) + .returning((#(dsl::#columns,)*)) + .load_stream::<#row>(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = <#model as Model>::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }); + tokens.extend(quote! { #[automatically_derived] #[async_trait::async_trait] @@ -56,29 +107,7 @@ impl ToTokens for UpdateBatchImpl { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok(crate::chunked_for_libpq! { - // FIXME: that count is correct for each row, but the maximum buffer size - // should be libpq's max MINUS the size of the changeset - #params_per_row, - #chunk_size_limit, - ids, - C::default(), - chunk => { - // We have to do it this way because we can't .or_filter() on a boxed update statement - let mut query = dsl::#table_name.select(dsl::#primary_key_column).into_boxed(); - for #id_ident in chunk.into_iter() { - query = query.or_filter(#filters); - } - diesel::update(dsl::#table_name) - .filter(dsl::#primary_key_column.eq_any(query)) - .set(&self) - .returning((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| s.map_ok(<#model as Model>::from_row).try_collect::>())? - .await? - } - }) + Ok({ #update_loop }) } #[tracing::instrument(name = #span_name_with_key, skip_all, err, fields(query_ids))] @@ -99,34 +128,7 @@ impl ToTokens for UpdateBatchImpl { use futures_util::stream::TryStreamExt; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok(crate::chunked_for_libpq! { - // FIXME: that count is correct for each row, but the maximum buffer size - // should be libpq's max MINUS the size of the changeset - #params_per_row, - #chunk_size_limit, - ids, - C::default(), - chunk => { - let mut query = dsl::#table_name.select(dsl::#primary_key_column).into_boxed(); - for #id_ident in chunk.into_iter() { - query = query.or_filter(#filters); - } - diesel::update(dsl::#table_name) - .filter(dsl::#primary_key_column.eq_any(query)) - .set(&self) - .returning((#(dsl::#columns,)*)) - .load_stream::<#row>(conn.write().await.deref_mut()) - .await - .map(|s| { - s.map_ok(|row| { - let model = <#model as Model>::from_row(row); - (model.get_id(), model) - }) - .try_collect::>() - })? - .await? - } - }) + Ok({ #update_with_key_loop }) } } }); diff --git a/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap b/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap index abd2b44276d..fac0e57f846 100644 --- a/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap +++ b/editoast/editoast_derive/src/snapshots/editoast_derive__model__tests__construction.snap @@ -620,16 +620,30 @@ impl crate::models::CreateBatch for Document { use diesel_async::RunQueryDsl; use futures_util::stream::TryStreamExt; let values = values.into_iter().collect::>(); - Ok( - crate::chunked_for_libpq! { - 2usize, 2048usize, values, C::default(), chunk => { - diesel::insert_into(dsl::osrd_infra_document).values(chunk) - .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < - DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | s - .map_ok(< Document as Model > ::from_row).try_collect:: < Vec < _ >> ()) - ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 2usize; + let mut result = C::default(); + let chunks = values.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + diesel::insert_into(dsl::osrd_infra_document) + .values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s + .map_ok(::from_row) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -648,16 +662,32 @@ impl crate::models::CreateBatchWithKey for Document use diesel_async::RunQueryDsl; use futures_util::stream::TryStreamExt; let values = values.into_iter().collect::>(); - Ok( - crate::chunked_for_libpq! { - 2usize, 2048usize, values, C::default(), chunk => { - diesel::insert_into(dsl::osrd_infra_document).values(chunk) - .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < - DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | { s - .map_ok(| row | { let model = < Document as Model > ::from_row(row); - (model.get_id(), model) }).try_collect:: < Vec < _ >> () }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 2usize; + let mut result = C::default(); + let chunks = values.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + diesel::insert_into(dsl::osrd_infra_document) + .values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -676,16 +706,32 @@ impl crate::models::CreateBatchWithKey for Document { use diesel_async::RunQueryDsl; use futures_util::stream::TryStreamExt; let values = values.into_iter().collect::>(); - Ok( - crate::chunked_for_libpq! { - 2usize, 2048usize, values, C::default(), chunk => { - diesel::insert_into(dsl::osrd_infra_document).values(chunk) - .returning((dsl::id, dsl::content_type, dsl::data,)).load_stream:: < - DocumentRow > (conn.write(). await .deref_mut()). await .map(| s | { s - .map_ok(| row | { let model = < Document as Model > ::from_row(row); - (model.get_id(), model) }).try_collect:: < Vec < _ >> () }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 2usize; + let mut result = C::default(); + let chunks = values.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + diesel::insert_into(dsl::osrd_infra_document) + .values(chunk) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -709,17 +755,33 @@ impl crate::models::RetrieveBatchUnchecked<(String)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.into_boxed(); for content_type in chunk - .into_iter() { query = query.or_filter(dsl::content_type - .eq(content_type)); } query.select((dsl::id, dsl::content_type, - dsl::data,)).load_stream:: < DocumentRow > (conn.write(). await - .deref_mut()). await .map(| s | s.map_ok(< Document as Model > - ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document.into_boxed(); + for content_type in chunk.into_iter() { + query = query.or_filter(dsl::content_type.eq(content_type)); + } + query + .select((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s + .map_ok(::from_row) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } #[tracing::instrument( name = "model:retrieve_batch_with_key_unchecked", @@ -740,18 +802,35 @@ impl crate::models::RetrieveBatchUnchecked<(String)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.into_boxed(); for content_type in chunk - .into_iter() { query = query.or_filter(dsl::content_type - .eq(content_type)); } query.select((dsl::id, dsl::content_type, - dsl::data,)).load_stream:: < DocumentRow > (conn.write(). await - .deref_mut()). await .map(| s | { s.map_ok(| row | { let model = < - Document as Model > ::from_row(row); (model.get_id(), model) }) - .try_collect:: < Vec < _ >> () }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document.into_boxed(); + for content_type in chunk.into_iter() { + query = query.or_filter(dsl::content_type.eq(content_type)); + } + query + .select((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -775,16 +854,33 @@ impl crate::models::RetrieveBatchUnchecked<(i64)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.into_boxed(); for id_ in chunk.into_iter() { - query = query.or_filter(dsl::id.eq(id_)); } query.select((dsl::id, - dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn - .write(). await .deref_mut()). await .map(| s | s.map_ok(< Document as - Model > ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document.into_boxed(); + for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); + } + query + .select((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s + .map_ok(::from_row) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } #[tracing::instrument( name = "model:retrieve_batch_with_key_unchecked", @@ -805,17 +901,35 @@ impl crate::models::RetrieveBatchUnchecked<(i64)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.into_boxed(); for id_ in chunk.into_iter() { - query = query.or_filter(dsl::id.eq(id_)); } query.select((dsl::id, - dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn - .write(). await .deref_mut()). await .map(| s | { s.map_ok(| row | { let - model = < Document as Model > ::from_row(row); (model.get_id(), model) }) - .try_collect:: < Vec < _ >> () }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document.into_boxed(); + for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); + } + query + .select((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -843,18 +957,37 @@ impl crate::models::UpdateBatchUnchecked for DocumentChanges use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.select(dsl::id).into_boxed(); for content_type - in chunk.into_iter() { query = query.or_filter(dsl::content_type - .eq(content_type)); } diesel::update(dsl::osrd_infra_document) - .filter(dsl::id.eq_any(query)).set(& self).returning((dsl::id, - dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn - .write(). await .deref_mut()). await .map(| s | s.map_ok(< Document as - Model > ::from_row).try_collect:: < Vec < _ >> ()) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document + .select(dsl::id) + .into_boxed(); + for content_type in chunk.into_iter() { + query = query.or_filter(dsl::content_type.eq(content_type)); + } + diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s + .map_ok(::from_row) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } #[tracing::instrument( name = "model:update_batch_unchecked", @@ -879,19 +1012,39 @@ impl crate::models::UpdateBatchUnchecked for DocumentChanges use futures_util::stream::TryStreamExt; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.select(dsl::id).into_boxed(); for content_type - in chunk.into_iter() { query = query.or_filter(dsl::content_type - .eq(content_type)); } diesel::update(dsl::osrd_infra_document) - .filter(dsl::id.eq_any(query)).set(& self).returning((dsl::id, - dsl::content_type, dsl::data,)).load_stream:: < DocumentRow > (conn - .write(). await .deref_mut()). await .map(| s | { s.map_ok(| row | { let - model = < Document as Model > ::from_row(row); (model.get_id(), model) }) - .try_collect:: < Vec < _ >> () }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document + .select(dsl::id) + .into_boxed(); + for content_type in chunk.into_iter() { + query = query.or_filter(dsl::content_type.eq(content_type)); + } + diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -919,18 +1072,37 @@ impl crate::models::UpdateBatchUnchecked for DocumentChangeset use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.select(dsl::id).into_boxed(); for id_ in chunk - .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } - diesel::update(dsl::osrd_infra_document).filter(dsl::id.eq_any(query)) - .set(& self).returning((dsl::id, dsl::content_type, dsl::data,)) - .load_stream:: < DocumentRow > (conn.write(). await .deref_mut()). await - .map(| s | s.map_ok(< Document as Model > ::from_row).try_collect:: < Vec - < _ >> ()) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document + .select(dsl::id) + .into_boxed(); + for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); + } + diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s + .map_ok(::from_row) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } #[tracing::instrument( name = "model:update_batch_unchecked", @@ -955,19 +1127,39 @@ impl crate::models::UpdateBatchUnchecked for DocumentChangeset use futures_util::stream::TryStreamExt; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - Ok( - crate::chunked_for_libpq! { - 1usize, 2048usize, ids, C::default(), chunk => { let mut query = - dsl::osrd_infra_document.select(dsl::id).into_boxed(); for id_ in chunk - .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } - diesel::update(dsl::osrd_infra_document).filter(dsl::id.eq_any(query)) - .set(& self).returning((dsl::id, dsl::content_type, dsl::data,)) - .load_stream:: < DocumentRow > (conn.write(). await .deref_mut()). await - .map(| s | { s.map_ok(| row | { let model = < Document as Model > - ::from_row(row); (model.get_id(), model) }).try_collect:: < Vec < _ >> () - }) ? . await ? } - }, - ) + Ok({ + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = C::default(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = dsl::osrd_infra_document + .select(dsl::id) + .into_boxed(); + for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); + } + diesel::update(dsl::osrd_infra_document) + .filter(dsl::id.eq_any(query)) + .set(&self) + .returning((dsl::id, dsl::content_type, dsl::data)) + .load_stream::(conn.write().await.deref_mut()) + .await + .map(|s| { + s.map_ok(|row| { + let model = ::from_row(row); + (model.get_id(), model) + }) + .try_collect::>() + })? + .await? + }; + result.extend(chunk_result); + } + result + }) } } #[automatically_derived] @@ -989,12 +1181,24 @@ impl crate::models::DeleteBatch<(String)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - let counts = crate::chunked_for_libpq! { - 1usize, 2048usize, ids, chunk => { let mut query = - diesel::delete(dsl::osrd_infra_document).into_boxed(); for content_type in - chunk.into_iter() { query = query.or_filter(dsl::content_type - .eq(content_type)); } query.execute(conn.write(). await .deref_mut()). await - ? } + let counts = { + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = Vec::new(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = diesel::delete(dsl::osrd_infra_document) + .into_boxed(); + for content_type in chunk.into_iter() { + query = query.or_filter(dsl::content_type.eq(content_type)); + } + query.execute(conn.write().await.deref_mut()).await? + }; + result.push(chunk_result); + } + result }; Ok(counts.into_iter().sum()) } @@ -1018,11 +1222,24 @@ impl crate::models::DeleteBatch<(i64)> for Document { use std::ops::DerefMut; let ids = ids.into_iter().collect::>(); tracing::Span::current().record("query_ids", tracing::field::debug(&ids)); - let counts = crate::chunked_for_libpq! { - 1usize, 2048usize, ids, chunk => { let mut query = - diesel::delete(dsl::osrd_infra_document).into_boxed(); for id_ in chunk - .into_iter() { query = query.or_filter(dsl::id.eq(id_)); } query.execute(conn - .write(). await .deref_mut()). await ? } + let counts = { + const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; + const ASYNC_SUBDIVISION: usize = 2_usize; + const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / 1usize; + let mut result = Vec::new(); + let chunks = ids.chunks(CHUNK_SIZE.min(2048usize)); + for chunk in chunks { + let chunk_result = { + let mut query = diesel::delete(dsl::osrd_infra_document) + .into_boxed(); + for id_ in chunk.into_iter() { + query = query.or_filter(dsl::id.eq(id_)); + } + query.execute(conn.write().await.deref_mut()).await? + }; + result.push(chunk_result); + } + result }; Ok(counts.into_iter().sum()) } diff --git a/editoast/src/models/prelude/mod.rs b/editoast/src/models/prelude/mod.rs index 8046be9105a..15ce6c89664 100644 --- a/editoast/src/models/prelude/mod.rs +++ b/editoast/src/models/prelude/mod.rs @@ -100,71 +100,3 @@ impl + Clone> Identifiable for T { self.clone().id() } } - -/// Splits a query into chunks to accommodate libpq's maximum number of parameters -/// -/// This is a hack around a libpq limitation (cf. ). -/// The rows to process are split into chunks for which at most `2^16 - 1` parameters are sent to libpq. -/// Hence the macro needs to know how many parameters are sent per row. -/// The result of the chunked query is then concatenated into `result`, which must -/// implement `std::iter::Extend`. -/// The chunked query is defined using a closure-like syntax. The argument of the "closure" -/// is a variable of type `&[ParameterType]`, and it must "return" a `Result, E>`. -/// The values can be any type that implements `IntoIterator`. -/// -/// # Example -/// -/// ``` -/// chunked_for_libpq! { -/// 3, // 3 parameters are binded per row -/// values, // an iterator of parameters -/// Vec::new(), // the collection to extend with the result -/// chunk => { // chunk is a variable of type `&[ParameterType]` -/// diesel::insert_into(dsl::document) -/// .values(chunk) -/// .load_stream::<::Row>(conn) -/// .await -/// .map(|s| s.map_ok(::from_row).try_collect::>())? -/// .await? -/// // returns a Result, impl EditoastError> -/// } // (this is not a real closure) -/// } -/// ``` -/// -/// # On concurrency -/// -/// There seem to be a problem with concurrent queries using deadpool, panicking with -/// 'Cannot access shared transaction state'. So this macro do not run each chunk's query concurrently. -/// While AsyncPgConnection supports pipelining, each query will be sent one after the other. -/// (But hey, it's still better than just making one query per row :p) -#[macro_export] -macro_rules! chunked_for_libpq { - // Collects every chunk result into a vec - ($parameters_per_row:expr, $limit:literal, $values:expr, $chunk:ident => $query:tt) => {{ - const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; - // We need to divide further because of AsyncPgConnection, maybe it is related to connection pipelining - const ASYNC_SUBDIVISION: usize = 2_usize; - const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / $parameters_per_row; - let mut result = Vec::new(); - let chunks = $values.chunks(CHUNK_SIZE.min($limit)); - for $chunk in chunks { - let chunk_result = $query; - result.push(chunk_result); - } - result - }}; - // Extends the result structure with every chunked query result - ($parameters_per_row:expr, $limit:literal, $values:expr, $result:expr, $chunk:ident => $query:tt) => {{ - const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1; - // We need to divide further because of AsyncPgConnection, maybe it is related to connection pipelining - const ASYNC_SUBDIVISION: usize = 2_usize; - const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / $parameters_per_row; - let mut result = $result; - let chunks = $values.chunks(CHUNK_SIZE.min($limit)); - for $chunk in chunks { - let chunk_result = $query; - result.extend(chunk_result); - } - result - }}; -}