Skip to content

Commit

Permalink
editoast: derive: replace chunk_for_libpq! by quote expansion in Model
Browse files Browse the repository at this point in the history
Not only does this reduces the complexity of batch operation expansion,
it also allows `prettyplease` to format the code chunk in snapshots.

Signed-off-by: Leo Valais <[email protected]>
  • Loading branch information
leovalais committed Dec 16, 2024
1 parent 8ac92c3 commit 6ec445a
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 350 deletions.
83 changes: 83 additions & 0 deletions editoast/editoast_derive/src/model/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,86 @@ trait TokensIf: Sized {
}

impl<T: ToTokens> TokensIf for T {}

/// Generates an expression that splits a query into chunks to accommodate libpq's maximum number of binded parameters
///
/// This is a hack around a libpq limitation (cf. <https://github.com/diesel-rs/diesel/issues/2414>).
/// The rows to process are split into chunks for which at most `2^16 - 1` parameters are sent to libpq.
/// Therefore we need to know how many parameters are sent per row.
/// The result collection can be parametrized.
///
/// # On concurrency
///
/// There seem to be a problem with concurrent queries using deadpool, panicking with
/// 'Cannot access shared transaction state'. So this macro do not run each chunk's query concurrently.
/// While AsyncPgConnection supports pipelining, each query will be sent one after the other.
/// (But hey, it's still better than just making one query per row :p)
#[derive(Clone)]
struct LibpqChunkedIteration {
/// The number of binded values per row
parameters_per_row: usize,
/// The maximum number of rows per chunk (actual chunk size may be smaller, but never bigger)
chunk_size_limit: usize,
/// The identifier of the values to iterate over (must implement `IntoIterator<Item = ParameterType>`)
values_ident: syn::Ident,
/// How to collect the results
collector: LibpqChunkedIterationCollector,
/// The identifier of the chunk iteration variable
chunk_iteration_ident: syn::Ident,
/// The body of the chunk iteration
chunk_iteration_body: proc_macro2::TokenStream,
}

/// Describes how to collect the results of a chunked iteration
#[derive(Clone)]
enum LibpqChunkedIterationCollector {
/// All results are pushed into a Vec (item type has to be inferable)
VecPush,
/// Extends an existing collection. It's initialization expression must be provided.
///
/// The initialized collection must implement `Extend<Model>`.
Extend { collection_init: syn::Expr },
}

impl LibpqChunkedIteration {
fn with_iteration_body(&self, body: proc_macro2::TokenStream) -> Self {
Self {
chunk_iteration_body: body,
..self.clone()
}
}
}

impl ToTokens for LibpqChunkedIteration {
fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) {
let Self {
parameters_per_row,
chunk_size_limit,
values_ident,
chunk_iteration_ident,
chunk_iteration_body,
collector,
} = self;
let (init, extend) = match collector {
LibpqChunkedIterationCollector::VecPush => {
(syn::parse_quote! { Vec::new() }, quote::quote! { push })
}
LibpqChunkedIterationCollector::Extend { collection_init } => {
(collection_init.clone(), quote::quote! { extend })
}
};
tokens.extend(quote::quote! {
const LIBPQ_MAX_PARAMETERS: usize = 2_usize.pow(16) - 1;
// We need to divide further because of AsyncPgConnection, maybe it is related to connection pipelining
const ASYNC_SUBDIVISION: usize = 2_usize;
const CHUNK_SIZE: usize = LIBPQ_MAX_PARAMETERS / ASYNC_SUBDIVISION / #parameters_per_row;
let mut result = #init;
let chunks = #values_ident.chunks(CHUNK_SIZE.min(#chunk_size_limit));
for #chunk_iteration_ident in chunks {
let chunk_result = { #chunk_iteration_body };
result.#extend(chunk_result);
}
result
});
}
}
37 changes: 22 additions & 15 deletions editoast/editoast_derive/src/model/codegen/create_batch_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use quote::quote;
use quote::ToTokens;

use super::LibpqChunkedIteration;

pub(crate) struct CreateBatchImpl {
pub(super) model: syn::Ident,
pub(super) table_name: syn::Ident,
Expand All @@ -26,6 +28,25 @@ impl ToTokens for CreateBatchImpl {
} = self;
let span_name = format!("model:create_batch<{}>", model);

let create_loop = LibpqChunkedIteration {
parameters_per_row: *field_count,
chunk_size_limit: *chunk_size_limit,
values_ident: syn::parse_quote! { values },
collector: super::LibpqChunkedIterationCollector::Extend {
collection_init: syn::parse_quote! { C::default() },
},
chunk_iteration_ident: syn::parse_quote! { chunk },
chunk_iteration_body: quote! {
diesel::insert_into(dsl::#table_name)
.values(chunk)
.returning((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| s.map_ok(<#model as Model>::from_row).try_collect::<Vec<_>>())?
.await?
},
};

tokens.extend(quote! {
#[automatically_derived]
#[async_trait::async_trait]
Expand All @@ -45,21 +66,7 @@ impl ToTokens for CreateBatchImpl {
use diesel_async::RunQueryDsl;
use futures_util::stream::TryStreamExt;
let values = values.into_iter().collect::<Vec<_>>();
Ok(crate::chunked_for_libpq! {
#field_count,
#chunk_size_limit,
values,
C::default(),
chunk => {
diesel::insert_into(dsl::#table_name)
.values(chunk)
.returning((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| s.map_ok(<#model as Model>::from_row).try_collect::<Vec<_>>())?
.await?
}
})
Ok({ #create_loop })
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use quote::ToTokens;

use crate::model::identifier::Identifier;

use super::LibpqChunkedIteration;

pub(crate) struct CreateBatchWithKeyImpl {
pub(super) model: syn::Ident,
pub(super) table_name: syn::Ident,
Expand Down Expand Up @@ -31,6 +33,31 @@ impl ToTokens for CreateBatchWithKeyImpl {
let ty = identifier.get_type();
let span_name = format!("model:create_batch_with_key<{}>", model);

let create_loop = LibpqChunkedIteration {
parameters_per_row: *field_count,
chunk_size_limit: *chunk_size_limit,
values_ident: syn::parse_quote! { values },
chunk_iteration_ident: syn::parse_quote! { chunk },
collector: super::LibpqChunkedIterationCollector::Extend {
collection_init: syn::parse_quote! { C::default() },
},
chunk_iteration_body: quote! {
diesel::insert_into(dsl::#table_name)
.values(chunk)
.returning((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| {
s.map_ok(|row| {
let model = <#model as Model>::from_row(row);
(model.get_id(), model)
})
.try_collect::<Vec<_>>()
})?
.await?
},
};

tokens.extend(quote! {
#[automatically_derived]
#[async_trait::async_trait]
Expand All @@ -51,27 +78,7 @@ impl ToTokens for CreateBatchWithKeyImpl {
use diesel_async::RunQueryDsl;
use futures_util::stream::TryStreamExt;
let values = values.into_iter().collect::<Vec<_>>();
Ok(crate::chunked_for_libpq! {
#field_count,
#chunk_size_limit,
values,
C::default(),
chunk => {
diesel::insert_into(dsl::#table_name)
.values(chunk)
.returning((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| {
s.map_ok(|row| {
let model = <#model as Model>::from_row(row);
(model.get_id(), model)
})
.try_collect::<Vec<_>>()
})?
.await?
}
})
Ok({ #create_loop })
}
}
});
Expand Down
33 changes: 20 additions & 13 deletions editoast/editoast_derive/src/model/codegen/delete_batch_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use quote::ToTokens;

use crate::model::identifier::Identifier;

use super::LibpqChunkedIteration;
use super::LibpqChunkedIterationCollector;

pub(crate) struct DeleteBatchImpl {
pub(super) model: syn::Ident,
pub(super) table_name: syn::Ident,
Expand All @@ -22,10 +25,25 @@ impl ToTokens for DeleteBatchImpl {
} = self;
let ty = identifier.get_type();
let id_ident = identifier.get_lvalue();
let params_per_row = identifier.get_idents().len();
let parameters_per_row = identifier.get_idents().len();
let filters = identifier.get_diesel_eq_and_fold();
let span_name = format!("model:delete_batch<{}>", model);

let delete_loop = LibpqChunkedIteration {
parameters_per_row,
chunk_size_limit: *chunk_size_limit,
values_ident: syn::parse_quote! { ids },
collector: LibpqChunkedIterationCollector::VecPush,
chunk_iteration_ident: syn::parse_quote! { chunk },
chunk_iteration_body: quote! {
let mut query = diesel::delete(dsl::#table_name).into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query.execute(conn.write().await.deref_mut()).await?
},
};

tokens.extend(quote! {
#[automatically_derived]
#[async_trait::async_trait]
Expand All @@ -41,18 +59,7 @@ impl ToTokens for DeleteBatchImpl {
use std::ops::DerefMut;
let ids = ids.into_iter().collect::<Vec<_>>();
tracing::Span::current().record("query_ids", tracing::field::debug(&ids));
let counts = crate::chunked_for_libpq! {
#params_per_row,
#chunk_size_limit,
ids,
chunk => {
let mut query = diesel::delete(dsl::#table_name).into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query.execute(conn.write().await.deref_mut()).await?
}
};
let counts = { #delete_loop };
Ok(counts.into_iter().sum())
}
}
Expand Down
95 changes: 49 additions & 46 deletions editoast/editoast_derive/src/model/codegen/retrieve_batch_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use quote::ToTokens;

use crate::model::identifier::Identifier;

use super::LibpqChunkedIteration;

pub(crate) struct RetrieveBatchImpl {
pub(super) model: syn::Ident,
pub(super) table_name: syn::Ident,
Expand All @@ -26,11 +28,55 @@ impl ToTokens for RetrieveBatchImpl {
} = self;
let ty = identifier.get_type();
let id_ident = identifier.get_lvalue();
let params_per_row = identifier.get_idents().len();
let parameters_per_row = identifier.get_idents().len();
let filters = identifier.get_diesel_eq_and_fold();
let span_name = format!("model:retrieve_batch_unchecked<{}>", model);
let span_name_with_key = format!("model:retrieve_batch_with_key_unchecked<{}>", model);

let retrieve_loop = LibpqChunkedIteration {
parameters_per_row,
chunk_size_limit: *chunk_size_limit,
values_ident: syn::parse_quote! { ids },
chunk_iteration_ident: syn::parse_quote! { chunk },
collector: super::LibpqChunkedIterationCollector::Extend {
collection_init: syn::parse_quote! { C::default() },
},
chunk_iteration_body: quote! {
// Diesel doesn't allow `(col1, col2).eq_any(iterator<(&T, &U)>)` because it imposes restrictions
// on tuple usage. Doing it this way is the suggested workaround (https://github.com/diesel-rs/diesel/issues/3222#issuecomment-1177433434).
// eq_any reallocates its argument anyway so the additional cost with this method are the boxing and the diesel wrappers.
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| s.map_ok(<#model as Model>::from_row).try_collect::<Vec<_>>())?
.await?
},
};

let retrieve_with_key_loop = retrieve_loop.with_iteration_body(quote! {
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| {
s.map_ok(|row| {
let model = <#model as Model>::from_row(row);
(model.get_id(), model)
})
.try_collect::<Vec<_>>()
})?
.await?
});

tokens.extend(quote! {
#[automatically_derived]
#[async_trait::async_trait]
Expand All @@ -51,27 +97,7 @@ impl ToTokens for RetrieveBatchImpl {
use std::ops::DerefMut;
let ids = ids.into_iter().collect::<Vec<_>>();
tracing::Span::current().record("query_ids", tracing::field::debug(&ids));
Ok(crate::chunked_for_libpq! {
#params_per_row,
#chunk_size_limit,
ids,
C::default(),
chunk => {
// Diesel doesn't allow `(col1, col2).eq_any(iterator<(&T, &U)>)` because it imposes restrictions
// on tuple usage. Doing it this way is the suggested workaround (https://github.com/diesel-rs/diesel/issues/3222#issuecomment-1177433434).
// eq_any reallocates its argument anyway so the additional cost with this method are the boxing and the diesel wrappers.
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| s.map_ok(<#model as Model>::from_row).try_collect::<Vec<_>>())?
.await?
}
})
Ok({ #retrieve_loop })
}

#[tracing::instrument(name = #span_name_with_key, skip_all, err, fields(query_id))]
Expand All @@ -91,30 +117,7 @@ impl ToTokens for RetrieveBatchImpl {
use std::ops::DerefMut;
let ids = ids.into_iter().collect::<Vec<_>>();
tracing::Span::current().record("query_ids", tracing::field::debug(&ids));
Ok(crate::chunked_for_libpq! {
#params_per_row,
#chunk_size_limit,
ids,
C::default(),
chunk => {
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| {
s.map_ok(|row| {
let model = <#model as Model>::from_row(row);
(model.get_id(), model)
})
.try_collect::<Vec<_>>()
})?
.await?
}
})
Ok({ #retrieve_with_key_loop })
}
}
});
Expand Down
Loading

0 comments on commit 6ec445a

Please sign in to comment.