-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathretrieve_batch_impl.rs
125 lines (117 loc) · 5.31 KB
/
retrieve_batch_impl.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use quote::quote;
use quote::ToTokens;
use crate::model::identifier::Identifier;
use super::LibpqChunkedIteration;
pub(crate) struct RetrieveBatchImpl {
pub(super) model: syn::Ident,
pub(super) table_name: syn::Ident,
pub(super) table_mod: syn::Path,
pub(super) chunk_size_limit: usize,
pub(super) row: syn::Ident,
pub(super) identifier: Identifier,
pub(super) columns: Vec<syn::Ident>,
}
impl ToTokens for RetrieveBatchImpl {
fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) {
let Self {
model,
table_name,
table_mod,
chunk_size_limit,
row,
identifier,
columns,
} = self;
let ty = identifier.get_type();
let id_ident = identifier.get_lvalue();
let parameters_per_row = identifier.get_idents().len();
let filters = identifier.get_diesel_eq_and_fold();
let span_name = format!("model:retrieve_batch_unchecked<{}>", model);
let span_name_with_key = format!("model:retrieve_batch_with_key_unchecked<{}>", model);
let retrieve_loop = LibpqChunkedIteration {
parameters_per_row,
chunk_size_limit: *chunk_size_limit,
values_ident: syn::parse_quote! { ids },
chunk_iteration_ident: syn::parse_quote! { chunk },
collector: super::LibpqChunkedIterationCollector::Extend {
collection_init: syn::parse_quote! { C::default() },
},
chunk_iteration_body: quote! {
// Diesel doesn't allow `(col1, col2).eq_any(iterator<(&T, &U)>)` because it imposes restrictions
// on tuple usage. Doing it this way is the suggested workaround (https://github.com/diesel-rs/diesel/issues/3222#issuecomment-1177433434).
// eq_any reallocates its argument anyway so the additional cost with this method are the boxing and the diesel wrappers.
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| s.map_ok(<#model as Model>::from_row).try_collect::<Vec<_>>())?
.await?
},
};
let retrieve_with_key_loop = retrieve_loop.with_iteration_body(quote! {
let mut query = dsl::#table_name.into_boxed();
for #id_ident in chunk.into_iter() {
query = query.or_filter(#filters);
}
query
.select((#(dsl::#columns,)*))
.load_stream::<#row>(conn.write().await.deref_mut())
.await
.map(|s| {
s.map_ok(|row| {
let model = <#model as Model>::from_row(row);
(model.get_id(), model)
})
.try_collect::<Vec<_>>()
})?
.await?
});
tokens.extend(quote! {
#[automatically_derived]
#[async_trait::async_trait]
impl crate::models::RetrieveBatchUnchecked<#ty> for #model {
#[tracing::instrument(name = #span_name, skip_all, err, fields(query_id))]
async fn retrieve_batch_unchecked<
I: std::iter::IntoIterator<Item = #ty> + Send + 'async_trait,
C: Default + std::iter::Extend<#model> + Send + std::fmt::Debug,
>(
conn: &mut editoast_models::DbConnection,
ids: I,
) -> crate::error::Result<C> {
use crate::models::Model;
use #table_mod::dsl;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use futures_util::stream::TryStreamExt;
use std::ops::DerefMut;
let ids = ids.into_iter().collect::<Vec<_>>();
tracing::Span::current().record("query_ids", tracing::field::debug(&ids));
Ok({ #retrieve_loop })
}
#[tracing::instrument(name = #span_name_with_key, skip_all, err, fields(query_id))]
async fn retrieve_batch_with_key_unchecked<
I: std::iter::IntoIterator<Item = #ty> + Send + 'async_trait,
C: Default + std::iter::Extend<(#ty, #model)> + Send + std::fmt::Debug,
>(
conn: &mut editoast_models::DbConnection,
ids: I,
) -> crate::error::Result<C> {
use crate::models::Identifiable;
use crate::models::Model;
use #table_mod::dsl;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use futures_util::stream::TryStreamExt;
use std::ops::DerefMut;
let ids = ids.into_iter().collect::<Vec<_>>();
tracing::Span::current().record("query_ids", tracing::field::debug(&ids));
Ok({ #retrieve_with_key_loop })
}
}
});
}
}