Skip to content

Commit 122c919

Browse files
authored
Merge branch 'main' into patch-1
2 parents ba076cf + 8f27e26 commit 122c919

File tree

1 file changed

+66
-27
lines changed

1 file changed

+66
-27
lines changed

opentelemetry-sdk/src/metrics/meter_provider.rs

+66-27
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ use super::{meter::SdkMeter, pipeline::Pipelines, reader::MetricReader, view::Vi
2727
/// [Meter]: opentelemetry::metrics::Meter
2828
#[derive(Clone, Debug)]
2929
pub struct SdkMeterProvider {
30+
inner: Arc<SdkMeterProviderInner>,
31+
}
32+
33+
#[derive(Clone, Debug)]
34+
struct SdkMeterProviderInner {
3035
pipes: Arc<Pipelines>,
3136
meters: Arc<Mutex<HashMap<Scope, Arc<SdkMeter>>>>,
3237
is_shutdown: Arc<AtomicBool>,
@@ -84,7 +89,7 @@ impl SdkMeterProvider {
8489
/// }
8590
/// ```
8691
pub fn force_flush(&self) -> Result<()> {
87-
self.pipes.force_flush()
92+
self.inner.force_flush()
8893
}
8994

9095
/// Shuts down the meter provider flushing all pending telemetry and releasing
@@ -100,6 +105,16 @@ impl SdkMeterProvider {
100105
/// There is no guaranteed that all telemetry be flushed or all resources have
101106
/// been released on error.
102107
pub fn shutdown(&self) -> Result<()> {
108+
self.inner.shutdown()
109+
}
110+
}
111+
112+
impl SdkMeterProviderInner {
113+
fn force_flush(&self) -> Result<()> {
114+
self.pipes.force_flush()
115+
}
116+
117+
fn shutdown(&self) -> Result<()> {
103118
if self
104119
.is_shutdown
105120
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
@@ -114,7 +129,7 @@ impl SdkMeterProvider {
114129
}
115130
}
116131

117-
impl Drop for SdkMeterProvider {
132+
impl Drop for SdkMeterProviderInner {
118133
fn drop(&mut self) {
119134
if let Err(err) = self.shutdown() {
120135
global::handle_error(err);
@@ -129,17 +144,17 @@ impl MeterProvider for SdkMeterProvider {
129144
schema_url: Option<impl Into<Cow<'static, str>>>,
130145
attributes: Option<Vec<KeyValue>>,
131146
) -> Meter {
132-
if self.is_shutdown.load(Ordering::Relaxed) {
147+
if self.inner.is_shutdown.load(Ordering::Relaxed) {
133148
return Meter::new(Arc::new(NoopMeterCore::new()));
134149
}
135150

136151
let scope = Scope::new(name, version, schema_url, attributes);
137152

138-
if let Ok(mut meters) = self.meters.lock() {
153+
if let Ok(mut meters) = self.inner.meters.lock() {
139154
let meter = meters
140155
.entry(scope)
141156
.or_insert_with_key(|scope| {
142-
Arc::new(SdkMeter::new(scope.clone(), self.pipes.clone()))
157+
Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()))
143158
})
144159
.clone();
145160
Meter::new(meter)
@@ -193,15 +208,18 @@ impl MeterProviderBuilder {
193208
}
194209

195210
/// Construct a new [MeterProvider] with this configuration.
211+
196212
pub fn build(self) -> SdkMeterProvider {
197213
SdkMeterProvider {
198-
pipes: Arc::new(Pipelines::new(
199-
self.resource.unwrap_or_default(),
200-
self.readers,
201-
self.views,
202-
)),
203-
meters: Default::default(),
204-
is_shutdown: Arc::new(AtomicBool::new(false)),
214+
inner: Arc::new(SdkMeterProviderInner {
215+
pipes: Arc::new(Pipelines::new(
216+
self.resource.unwrap_or_default(),
217+
self.readers,
218+
self.views,
219+
)),
220+
meters: Default::default(),
221+
is_shutdown: Arc::new(AtomicBool::new(false)),
222+
}),
205223
}
206224
}
207225
}
@@ -232,7 +250,7 @@ mod tests {
232250
resource_key: &'static str,
233251
expect: Option<&'static str>| {
234252
assert_eq!(
235-
provider.pipes.0[0]
253+
provider.inner.pipes.0[0]
236254
.resource
237255
.get(Key::from_static_str(resource_key))
238256
.map(|v| v.to_string()),
@@ -241,17 +259,19 @@ mod tests {
241259
};
242260
let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
243261
assert_eq!(
244-
provider.pipes.0[0]
262+
provider.inner.pipes.0[0]
245263
.resource
246264
.get(TELEMETRY_SDK_LANGUAGE.into()),
247265
Some(Value::from("rust"))
248266
);
249267
assert_eq!(
250-
provider.pipes.0[0].resource.get(TELEMETRY_SDK_NAME.into()),
268+
provider.inner.pipes.0[0]
269+
.resource
270+
.get(TELEMETRY_SDK_NAME.into()),
251271
Some(Value::from("opentelemetry"))
252272
);
253273
assert_eq!(
254-
provider.pipes.0[0]
274+
provider.inner.pipes.0[0]
255275
.resource
256276
.get(TELEMETRY_SDK_VERSION.into()),
257277
Some(Value::from(env!("CARGO_PKG_VERSION")))
@@ -282,7 +302,7 @@ mod tests {
282302
)]))
283303
.build();
284304
assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
285-
assert_eq!(custom_meter_provider.pipes.0[0].resource.len(), 1);
305+
assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
286306

287307
temp_env::with_var(
288308
"OTEL_RESOURCE_ATTRIBUTES",
@@ -301,7 +321,7 @@ mod tests {
301321
assert_resource(&env_resource_provider, "key1", Some("value1"));
302322
assert_resource(&env_resource_provider, "k3", Some("value2"));
303323
assert_telemetry_resource(&env_resource_provider);
304-
assert_eq!(env_resource_provider.pipes.0[0].resource.len(), 6);
324+
assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
305325
},
306326
);
307327

@@ -340,7 +360,7 @@ mod tests {
340360
);
341361
assert_telemetry_resource(&user_provided_resource_config_provider);
342362
assert_eq!(
343-
user_provided_resource_config_provider.pipes.0[0]
363+
user_provided_resource_config_provider.inner.pipes.0[0]
344364
.resource
345365
.len(),
346366
7
@@ -355,7 +375,7 @@ mod tests {
355375
.with_resource(Resource::empty())
356376
.build();
357377

358-
assert_eq!(no_service_name.pipes.0[0].resource.len(), 0)
378+
assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
359379
}
360380

361381
#[test]
@@ -364,22 +384,41 @@ mod tests {
364384
let provider = super::SdkMeterProvider::builder()
365385
.with_reader(reader.clone())
366386
.build();
367-
global::set_meter_provider(provider.clone());
368-
assert!(!provider
369-
.is_shutdown
370-
.load(std::sync::atomic::Ordering::Relaxed));
387+
global::set_meter_provider(provider);
371388
assert!(!reader.is_shutdown());
372389
// create a meter and an instrument
373390
let meter = global::meter("test");
374391
let counter = meter.u64_counter("test_counter").init();
375392
// no need to drop a meter for meter_provider shutdown
376393
global::shutdown_meter_provider();
377-
assert!(provider
378-
.is_shutdown
379-
.load(std::sync::atomic::Ordering::Relaxed));
380394
assert!(reader.is_shutdown());
381395
// TODO Fix: the instrument is still available, and can be used.
382396
// While the reader is shutdown, and no collect is happening
383397
counter.add(1, &[]);
384398
}
399+
#[test]
400+
fn test_shutdown_invoked_on_last_drop() {
401+
let reader = TestMetricReader::new();
402+
let provider = super::SdkMeterProvider::builder()
403+
.with_reader(reader.clone())
404+
.build();
405+
let clone1 = provider.clone();
406+
let clone2 = provider.clone();
407+
408+
// Initially, shutdown should not be called
409+
assert!(!reader.is_shutdown());
410+
411+
// Drop the first clone
412+
drop(clone1);
413+
assert!(!reader.is_shutdown());
414+
415+
// Drop the second clone
416+
drop(clone2);
417+
assert!(!reader.is_shutdown());
418+
419+
// Drop the last original provider
420+
drop(provider);
421+
// Now the shutdown should be invoked
422+
assert!(reader.is_shutdown());
423+
}
385424
}

0 commit comments

Comments
 (0)