@@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet};
10
10
use std:: mem:: swap;
11
11
use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
12
12
use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13
- use std:: sync:: { Arc , RwLock } ;
13
+ use std:: sync:: { Arc , OnceLock , RwLock } ;
14
14
15
15
use aggregate:: { is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT } ;
16
16
pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
52
52
/// Trackers store the values associated with different attribute sets.
53
53
trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
54
54
55
- /// Used by collect exclusively. The data type must match the one used in
56
- /// `trackers` to allow mem::swap.
57
- trackers_for_collect : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
55
+ /// Used ONLY by Delta collect. The data type must match the one used in
56
+ /// `trackers` to allow mem::swap. Wrapping the type in `OnceLock` to
57
+ /// avoid this allocation for Cumulative aggregation.
58
+ trackers_for_collect : OnceLock < RwLock < HashMap < Vec < KeyValue > , Arc < A > > > > ,
58
59
59
60
/// Number of different attribute set stored in the `trackers` map.
60
61
count : AtomicUsize ,
@@ -73,16 +74,20 @@ where
73
74
fn new ( config : A :: InitConfig ) -> Self {
74
75
ValueMap {
75
76
trackers : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
76
- // TODO: For cumulative, this is not required, so avoid this
77
- // pre-allocation.
78
- trackers_for_collect : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
77
+ trackers_for_collect : OnceLock :: new ( ) ,
79
78
has_no_attribute_value : AtomicBool :: new ( false ) ,
80
79
no_attribute_tracker : A :: create ( & config) ,
81
80
count : AtomicUsize :: new ( 0 ) ,
82
81
config,
83
82
}
84
83
}
85
84
85
+ #[ inline]
86
+ fn trackers_for_collect ( & self ) -> & RwLock < HashMap < Vec < KeyValue > , Arc < A > > > {
87
+ self . trackers_for_collect
88
+ . get_or_init ( || RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) )
89
+ }
90
+
86
91
fn measure ( & self , value : A :: PreComputedValue , attributes : & [ KeyValue ] ) {
87
92
if attributes. is_empty ( ) {
88
93
self . no_attribute_tracker . update ( value) ;
@@ -178,7 +183,7 @@ where
178
183
) ) ;
179
184
}
180
185
181
- if let Ok ( mut trackers_collect) = self . trackers_for_collect . write ( ) {
186
+ if let Ok ( mut trackers_collect) = self . trackers_for_collect ( ) . write ( ) {
182
187
if let Ok ( mut trackers_current) = self . trackers . write ( ) {
183
188
swap ( trackers_collect. deref_mut ( ) , trackers_current. deref_mut ( ) ) ;
184
189
self . count . store ( 0 , Ordering :: SeqCst ) ;
0 commit comments