diff --git a/libdd-data-pipeline/Cargo.toml b/libdd-data-pipeline/Cargo.toml index ef4da9e955..68e73cb4bb 100644 --- a/libdd-data-pipeline/Cargo.toml +++ b/libdd-data-pipeline/Cargo.toml @@ -85,6 +85,16 @@ duplicate = "2.0.1" [features] default = ["https", "telemetry"] telemetry = ["libdd-telemetry"] +# Note (async-libdatadog-host-runtime): the sync entry points on `TraceExporter` +# (`send`, `send_trace_chunks`, `shutdown`) and `TraceExporterBuilder::build` are +# thin `SharedRuntime::block_on` wrappers over their `_async` counterparts. They +# are always compiled today, but they panic / return `io::Error::Unsupported` if +# invoked from inside a host tokio runtime (i.e. on a `SharedRuntime` built via +# `from_handle`). A `sync-api` cargo feature was prototyped to make that a +# compile-time error for async-only consumers like dd-trace-rs; it was reverted +# as not worth the churn for the current set of in-tree callers. Re-add it the +# day a consumer wants a build of libdd-data-pipeline that statically forbids +# the sync facade. https = [ "libdd-common/https", "libdd-capabilities-impl/https", diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 09698eef18..105b59af9c 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -343,6 +343,19 @@ impl TraceBuffer { self.tx.trigger_flush() } + /// Trigger a flush and synchronously wait for it to be processed by the worker. + /// + /// Useful at shutdown to make sure the last batch has been handed off to the export + /// operation (and therefore any side effects like spawning the stats worker have + /// happened) before tearing down the runtime. Returns immediately if the buffer is + /// empty. + pub fn flush_and_wait(&self, timeout: Duration) -> Result<(), TraceBufferError> { + let Some(flush_gen) = self.tx.trigger_flush_and_capture_gen()? else { + return Ok(()); + }; + self.tx.wait_flush_done(flush_gen, Some(timeout)) + } + pub fn queue_metrics(&self) -> QueueMetricsFetcher { QueueMetricsFetcher { waiter: self.tx.waiter.clone(), @@ -360,6 +373,19 @@ impl fmt::Debug for TraceBuffer { } } +impl Drop for TraceBuffer { + /// Best-effort flush so any buffered chunks are handed to the worker before the producer + /// end disappears. The worker itself is owned by the [`SharedRuntime`] and continues to + /// run independently — its own [`Worker::shutdown`] hook (invoked by `SharedRuntime`) + /// will drain whatever remains after this notify. + /// + /// Errors are intentionally ignored: a `TraceBuffer` dropped after the runtime has + /// already been shut down has nothing useful to do here. + fn drop(&mut self) { + let _ = self.tx.trigger_flush(); + } +} + pub struct QueueMetricsFetcher { waiter: Arc>, } @@ -488,6 +514,20 @@ impl Sender { Ok(()) } + /// Trigger a flush and capture the batch generation that the worker must overtake + /// before the flush can be considered done. Returns `Ok(None)` if the batch is + /// currently empty (nothing to flush, no need to wait). + fn trigger_flush_and_capture_gen(&self) -> Result, TraceBufferError> { + let mut state = self.get_running_state()?; + if state.batch.byte_count == 0 { + return Ok(None); + } + state.flush_needed = true; + let gen = state.batch.batch_gen; + self.waiter.notify_receiver(state); + Ok(Some(gen)) + } + fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), TraceBufferError> { if timeout.is_zero() { return Err(TraceBufferError::TimedOut(Duration::ZERO)); @@ -578,6 +618,16 @@ impl Receiver { self.waiter.notify_sender(state); Ok(()) } + + /// Synchronously drain the current batch without waiting for a flush trigger. + /// + /// Used during shutdown to recover any chunks that the sender accumulated but never had + /// the chance to flush (e.g. the worker loop was cancelled before the next timeout tick). + fn drain(&self) -> Result>, MutexPoisonedError> { + let mut state = self.lock_state()?; + state.flush_needed = false; + Ok(state.batch.export()) + } } #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Default)] @@ -765,6 +815,19 @@ impl Worker for TraceExporterWorker { } async fn shutdown(&mut self) { + // Drain any chunks the sender has buffered but not yet flushed. Without this the + // final partial batch is silently dropped on shutdown — including the common case + // where a tokio app calls `tracer.shutdown()` immediately after producing spans. + match self.rx.drain() { + Ok(trace_chunks) if !trace_chunks.is_empty() => { + self.export_trace_chunks(trace_chunks).await; + let _ = self.rx.ack_export(); + } + Ok(_) => {} + Err(MutexPoisonedError) => { + tracing::error!("TraceExporterWorker mailbox poisoned during shutdown drain"); + } + } let _ = self.rx.shutdown_done(); } @@ -888,7 +951,7 @@ mod tests { ); // pause - rt.before_fork(); + rt.before_fork().expect("error pausing"); for i in 1..=3 { sender.send_chunk(vec![(); i]).unwrap(); @@ -968,6 +1031,69 @@ mod tests { rt.shutdown(None).unwrap(); } + #[test] + #[cfg_attr(miri, ignore)] + fn test_shutdown_drains_pending_batch() { + // Set thresholds high enough that send_chunk alone never triggers a flush, + // and the timer long enough that it won't fire during the test. The only way + // the assert_export closure should be invoked is via the shutdown drain path. + let exported = Arc::new(std::sync::Mutex::new(Vec::::new())); + let exported_clone = exported.clone(); + let (rt, _, sender) = make_buffer( + Box::new(move |chunks| { + let mut lengths = chunks.into_iter().map(|c| c.len()).collect::>(); + lengths.sort(); + exported_clone.lock().unwrap().extend(lengths); + }), + TraceBufferConfig::default() + .max_buffered_bytes(100) + .flush_threshold_bytes(100) + .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + ); + + sender.send_chunk(vec![()]).unwrap(); + sender.send_chunk(vec![(), ()]).unwrap(); + + // Shutdown must export the two buffered chunks even though no flush ever fired. + rt.shutdown(Some(Duration::from_secs(10))).unwrap(); + sender.wait_shutdown_done(Duration::from_secs(10)).unwrap(); + + let exported = exported.lock().unwrap().clone(); + assert_eq!(exported, vec![1, 2]); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_flush_and_wait() { + // Same setup as test_force_flush, but verify flush_and_wait blocks until the + // worker has actually processed the batch. + let (rt, sem, sender) = make_buffer( + Box::new(|chunks| assert_eq!(chunks.len(), 2)), + TraceBufferConfig::default() + .max_buffered_bytes(100) + .flush_threshold_bytes(100) + .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + ); + + sender.send_chunk(vec![()]).unwrap(); + sender.send_chunk(vec![(), ()]).unwrap(); + assert_eq!(sem.available_permits(), 0); + + sender + .flush_and_wait(Duration::from_secs(10)) + .expect("flush_and_wait failed"); + // After flush_and_wait returns, the worker must have actually exported. + assert_eq!(sem.available_permits(), 1); + + // Calling flush_and_wait on an empty buffer is a no-op and must not block. + sender + .flush_and_wait(Duration::from_secs(10)) + .expect("flush_and_wait on empty buffer should not error"); + + rt.shutdown(None).unwrap(); + sender.wait_shutdown_done(Duration::from_secs(10)).unwrap(); + } + #[test] #[cfg_attr(miri, ignore)] fn test_force_flush() { @@ -1006,7 +1132,7 @@ mod tests { sender.send_chunk(vec![()]).unwrap(); assert_eq!(sem.available_permits(), 0); - rt.before_fork(); + rt.before_fork().unwrap(); rt.after_fork_child().unwrap(); sender.send_chunk(vec![(), ()]).unwrap(); @@ -1015,4 +1141,69 @@ mod tests { assert_eq!(sender.queue_metrics().get_metrics().spans_queued, 2); rt.shutdown(None).unwrap(); } + + /// Regression coverage for the [`Drop`] impl on [`TraceBuffer`]: dropping the producer + /// after spans have been buffered (but without an explicit `force_flush`) should + /// trigger one final flush so the worker exports the pending chunks instead of losing + /// them. + #[test] + #[cfg_attr(miri, ignore)] + fn test_drop_triggers_flush() { + let exported = Arc::new(std::sync::Mutex::new(Vec::::new())); + let exported_clone = exported.clone(); + let (rt, sem, sender) = make_buffer( + Box::new(move |chunks| { + let mut lengths = chunks.into_iter().map(|c| c.len()).collect::>(); + lengths.sort(); + exported_clone.lock().unwrap().extend(lengths); + }), + TraceBufferConfig::default() + .max_buffered_bytes(100) + .flush_threshold_bytes(100) + .max_flush_interval(Duration::from_secs(u32::MAX as u64)), + ); + + sender.send_chunk(vec![()]).unwrap(); + sender.send_chunk(vec![(), ()]).unwrap(); + assert_eq!(sem.available_permits(), 0); + + // Drop the sender — no `force_flush`/`flush_and_wait` call — and verify the + // background worker still receives the pending chunks because Drop notifies it. + drop(sender); + let _ = rt.block_on(sem.acquire_many(1)).unwrap().unwrap(); + + let exported = exported.lock().unwrap().clone(); + assert_eq!(exported, vec![1, 2]); + rt.shutdown(None).unwrap(); + } + + /// Dropping a [`TraceBuffer`] (and shutting down the surrounding [`SharedRuntime`]) + /// from inside a host tokio runtime must not panic. Uses the borrowed-mode + /// `SharedRuntime::from_handle` so the trigger + Condvar wait path is exercised + /// instead of sync `block_on` (which would panic "Cannot start a runtime from + /// within a runtime" on a host worker thread). + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_drop_inside_tokio_runtime() { + let rt = Arc::new(SharedRuntime::from_handle(tokio::runtime::Handle::current())); + let (sender, worker) = TraceBuffer::new( + TraceBufferConfig::default(), + Box::new( + |_r: Result| {}, + ), + Box::new(AssertExporter( + Box::new(|_chunks| {}), + Arc::new(tokio::sync::Semaphore::new(0)), + )), + ); + let _ = rt.spawn_worker(worker, true).unwrap(); + + // Send a chunk so the Drop has something to flush — exercises the full + // notify path under a tokio scheduler. + sender.send_chunk(vec![()]).unwrap(); + drop(sender); + + // Borrowed mode: use trigger + Condvar wait, never block_on. + rt.trigger_shutdown_signal().unwrap(); + rt.wait_shutdown_done(Duration::from_secs(5)).unwrap(); + } } diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index c9430eadf2..9379a0bbb8 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -286,8 +286,49 @@ impl TraceExporterBuilder { self } + /// Build the [`TraceExporter`] synchronously. + /// + /// Sync facade over [`Self::build_async`]. It materializes the [`SharedRuntime`] + /// up-front (creating a fresh owned one if none was supplied) and drives the async + /// builder to completion via `block_on`. This is the only place in the builder that + /// blocks; all internal I/O — including telemetry start-up — happens inside + /// `build_async`. + /// + /// Calling this from within an existing tokio context will panic. Async callers + /// should use [`Self::build_async`] directly. #[allow(missing_docs)] pub fn build( + mut self, + ) -> Result, TraceExporterError> { + // Materialize the SharedRuntime here so we have something to block_on with and + // so build_async reuses the same instance rather than creating a second one. + let shared_runtime = match &self.shared_runtime { + Some(rt) => rt.clone(), + None => { + let rt = Arc::new(SharedRuntime::new().map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?); + self.shared_runtime = Some(rt.clone()); + rt + } + }; + shared_runtime + .block_on(self.build_async::()) + .map_err(TraceExporterError::Io)? + } + + /// Build the [`TraceExporter`] asynchronously. + /// + /// This is the async-internal entry point: every operation that used to live behind + /// `SharedRuntime::block_on` (currently just telemetry start-up) is awaited directly. + /// It is safe to drive from any async context, but note that on native targets + /// `C::new_client()` may capture `tokio::runtime::Handle::current()` — the caller + /// must be inside a tokio runtime that should own the resulting HTTP client. + pub async fn build_async< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + >( self, ) -> Result, TraceExporterError> { if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) { @@ -322,14 +363,21 @@ impl TraceExporterBuilder { // internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context // so that handle is available. On wasm this is a no-op — the JS event loop is // always the implicit executor. - #[cfg(not(target_arch = "wasm32"))] - let _guard = shared_runtime - .runtime_handle() - .map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })? - .enter(); - let capabilities = C::new_client(); + // + // The `EnterGuard` returned by `Handle::enter()` is `!Send`, so we scope it to a + // block that drops it before any later `.await` to keep this future `Send`. + let capabilities = { + #[cfg(not(target_arch = "wasm32"))] + let _guard = shared_runtime + .runtime_handle() + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })? + .enter(); + C::new_client() + }; // --- Platform-specific worker setup --- // The blocks below spawn background workers via `SharedRuntime`. On @@ -400,11 +448,7 @@ impl TraceExporterBuilder { e.to_string(), )) })?; - shared_runtime.block_on(client_tel.start()).map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; + client_tel.start().await; (Some(client_tel), Some(handle)) } Some(Err(e)) => return Err(e), diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 7e3dd1c951..c10b475af1 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -214,26 +214,39 @@ impl Tra /// Only the workers spawned for this exporter are stopped. Workers from other components /// sharing the same [`SharedRuntime`] are unaffected. /// + /// Sync facade over [`Self::shutdown_async`]. Will panic if called from inside an + /// existing tokio context; async callers should use [`Self::shutdown_async`] directly. + /// /// # Errors - /// Returns [`SharedRuntimeError::ShutdownTimedOut`] if a timeout was given and elapsed before - /// all workers finished. + /// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was + /// given and elapsed before all workers finished. #[cfg(not(target_arch = "wasm32"))] pub fn shutdown(self, timeout: Option) -> Result<(), TraceExporterError> { let runtime = self.shared_runtime.clone(); + runtime + .block_on(self.shutdown_async(timeout)) + .map_err(TraceExporterError::Io)? + } + + /// Async version of [`Self::shutdown`]. + /// + /// Stops every worker owned by this exporter; safe to drive from any async context. + /// Workers belonging to other components on the same [`SharedRuntime`] are unaffected. + /// + /// # Errors + /// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was + /// given and elapsed before all workers finished. + #[cfg(not(target_arch = "wasm32"))] + pub async fn shutdown_async(self, timeout: Option) -> Result<(), TraceExporterError> { if let Some(timeout) = timeout { - match runtime - .block_on(async { tokio::time::timeout(timeout, self.shutdown_workers()).await }) - .map_err(TraceExporterError::Io)? - { + match tokio::time::timeout(timeout, self.shutdown_workers()).await { Ok(()) => Ok(()), Err(_) => Err(TraceExporterError::Shutdown(ShutdownError::TimedOut( timeout, ))), } } else { - runtime - .block_on(self.shutdown_workers()) - .map_err(TraceExporterError::Io)?; + self.shutdown_workers().await; Ok(()) } } @@ -281,24 +294,22 @@ impl Tra /// # Returns /// * Ok(AgentResponse): The response from the agent /// * Err(TraceExporterError): An error detailing what went wrong in the process + /// + /// This is the sync facade: it drives [`Self::send_async`] to completion on the + /// [`SharedRuntime`]. The single `block_on` here is intentional — every internal + /// libdatadog path is async, and this wrapper exists for FFI / non-tokio callers. + /// Calling it from within an existing tokio context will panic. #[cfg(not(target_arch = "wasm32"))] pub fn send(&self, data: &[u8]) -> Result { - self.check_agent_info(); - - let res = self.send_deser(data, self.input_format.into())?; - if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) { - return Err(TraceExporterError::Agent( - error::AgentErrorKind::EmptyResponse, - )); - } - - Ok(res) + self.shared_runtime + .block_on(self.send_async(data)) + .map_err(TraceExporterError::Io)? } /// **WARNING**: This method is experimental and should not be used for production. /// Async version of [`Self::send`] for platforms that cannot use `block_on` (e.g. wasm) pub async fn send_async(&self, data: &[u8]) -> Result { - self.check_agent_info(); + self.check_agent_info().await; let format: DeserInputFormat = self.input_format.into(); @@ -344,48 +355,54 @@ impl Tra } #[cfg(not(target_arch = "wasm32"))] - fn check_agent_info(&self) { - if let Some(agent_info) = agent_info::get_agent_info() { - if self.has_agent_info_state_changed(&agent_info) { - match &**self.client_side_stats.status.load() { - StatsComputationStatus::Disabled => {} - StatsComputationStatus::DisabledByAgent { .. } => { - let ctx = stats::StatsContext { - metadata: &self.metadata, - endpoint_url: &self.endpoint.url, - shared_runtime: &self.shared_runtime, - }; - stats::handle_stats_disabled_by_agent( - &ctx, - &agent_info, - self.capabilities.clone(), - &self.client_side_stats, - ); - } - StatsComputationStatus::Enabled { - stats_concentrator, .. - } => { - let ctx = stats::StatsContext { - metadata: &self.metadata, - endpoint_url: &self.endpoint.url, - shared_runtime: &self.shared_runtime, - }; - stats::handle_stats_enabled( - &ctx, - &agent_info, - stats_concentrator, - &self.client_side_stats, - ); - } - } - self.previous_info_state - .store(Some(agent_info.state_hash.clone().into())) + /// Reconcile in-process stats computation state with the latest agent info. + /// + /// Async because the `Enabled` arm may need to await a stats-worker shutdown + /// (see [`stats::handle_stats_enabled`]). Internally, no `block_on` is used — + /// this future is safe to await from any tokio context. + async fn check_agent_info(&self) { + let Some(agent_info) = agent_info::get_agent_info() else { + return; + }; + if !self.has_agent_info_state_changed(&agent_info) { + return; + } + + // Take an owned snapshot via `load_full()`. `ArcSwap::load()` returns a + // `!Send` guard that cannot be held across `.await`. + let status = self.client_side_stats.status.load_full(); + match &*status { + StatsComputationStatus::Disabled => {} + StatsComputationStatus::DisabledByAgent { .. } => { + let ctx = stats::StatsContext { + metadata: &self.metadata, + endpoint_url: &self.endpoint.url, + shared_runtime: &self.shared_runtime, + }; + stats::handle_stats_disabled_by_agent( + &ctx, + &agent_info, + self.capabilities.clone(), + &self.client_side_stats, + ); + } + StatsComputationStatus::Enabled { + stats_concentrator, .. + } => { + stats::handle_stats_enabled( + &agent_info, + stats_concentrator, + &self.client_side_stats, + ) + .await; } } + self.previous_info_state + .store(Some(agent_info.state_hash.clone().into())) } #[cfg(target_arch = "wasm32")] - fn check_agent_info(&self) { + async fn check_agent_info(&self) { // No background workers on wasm — agent info is never fetched, stats are // never computed. This is intentionally a no-op. } @@ -443,14 +460,16 @@ impl Tra /// # Returns /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) /// * Err(TraceExporterError): An error detailing what went wrong in the process + /// + /// Sync facade over [`Self::send_trace_chunks_async`]. #[cfg(not(target_arch = "wasm32"))] pub fn send_trace_chunks( &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info(); self.shared_runtime - .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })? + .block_on(self.send_trace_chunks_async(trace_chunks)) + .map_err(TraceExporterError::Io)? } /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). @@ -465,7 +484,7 @@ impl Tra &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info(); + self.check_agent_info().await; self.send_trace_chunks_inner(trace_chunks).await } @@ -500,38 +519,6 @@ impl Tra Ok(AgentResponse::Unchanged) } - /// Deserializes, processes and sends trace chunks to the agent - #[cfg(not(target_arch = "wasm32"))] - fn send_deser( - &self, - data: &[u8], - format: DeserInputFormat, - ) -> Result { - let (traces, _) = match format { - DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data), - DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data), - } - .map_err(|e| { - error!("Error deserializing trace from request body: {e}"); - self.emit_metric( - HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1), - None, - ); - TraceExporterError::Deserialization(e) - })?; - debug!( - trace_count = traces.len(), - "Trace deserialization completed successfully" - ); - self.emit_metric( - HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64), - None, - ); - - self.shared_runtime - .block_on(async { self.send_trace_chunks_inner(traces).await })? - } - /// Send traces payload to agent with retry and telemetry reporting async fn send_traces_with_telemetry( &self, diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 7f1aaea36a..801ca91482 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -168,25 +168,27 @@ fn create_and_start_stats_worker< #[cfg(not(target_arch = "wasm32"))] /// Stops the stats exporter and disable stats computation /// -/// Used when client-side stats is disabled by the agent -pub(crate) fn stop_stats_computation( - ctx: &StatsContext, - client_side_stats: &ArcSwap, -) { +/// Used when client-side stats is disabled by the agent. +/// +/// This is the async-internal API. The future awaits the stats worker shutdown directly +/// instead of routing it through `SharedRuntime::block_on`, so it is safe to call from +/// any async context — including code running on the host tokio runtime. +pub(crate) async fn stop_stats_computation(client_side_stats: &ArcSwap) { + // Take an owned snapshot of the current status. `ArcSwap::load()` returns a Guard + // that is `!Send` and cannot be held across an `.await`, so we use `load_full()`. + let snapshot = client_side_stats.load_full(); if let StatsComputationStatus::Enabled { stats_concentrator, worker_handle, .. - } = &**client_side_stats.load() + } = &*snapshot { let bucket_size = stats_concentrator.lock_or_panic().get_bucket_size(); client_side_stats.store(Arc::new(StatsComputationStatus::DisabledByAgent { bucket_size, })); - match ctx.shared_runtime.block_on(worker_handle.clone().stop()) { - Ok(Err(e)) => error!("Failed to stop stats worker: {e}"), - Err(e) => error!("Failed to stop stats worker: {e}"), - _ => {} + if let Err(e) = worker_handle.clone().stop().await { + error!("Failed to stop stats worker: {e}"); } } } @@ -254,21 +256,25 @@ fn update_obfuscation_config( } #[cfg(not(target_arch = "wasm32"))] -/// Handle stats computation when it's already enabled -pub(crate) fn handle_stats_enabled( - ctx: &StatsContext, +/// Handle stats computation when it's already enabled. +/// +/// Async because the `disabled-by-agent` branch needs to await the stats worker +/// shutdown via [`stop_stats_computation`]. +pub(crate) async fn handle_stats_enabled( agent_info: &Arc, stats_concentrator: &Arc>, client_side_stats: &StatsComputationConfig, ) { if agent_info.info.client_drop_p0s.is_some_and(|v| v) { + // The mutex guard is scoped to this block so it is *not* held across the + // `.await` in the `else` branch, which keeps the resulting future `Send`. let mut concentrator = stats_concentrator.lock_or_panic(); concentrator.set_span_kinds(get_span_kinds_for_stats(agent_info)); concentrator.set_peer_tags(agent_info.info.peer_tags.clone().unwrap_or_default()); #[cfg(feature = "stats-obfuscation")] update_obfuscation_config(agent_info, client_side_stats); } else { - stop_stats_computation(ctx, &client_side_stats.status); + stop_stats_computation(&client_side_stats.status).await; debug!("Client-side stats computation has been disabled by the agent") } } diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 7a9a0ac084..9cc51be02e 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -23,6 +23,11 @@ pub enum SharedRuntimeErrorCode { RuntimeCreation, /// Shutdown timed out. ShutdownTimedOut, + /// The operation is not supported in borrowed mode (e.g. fork hooks or sync + /// shutdown on a `SharedRuntime` created via `from_handle`). FFI callers should + /// never see this since they construct owned runtimes via + /// `ddog_shared_runtime_new`. + NotSupportedInBorrowedMode, /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] Panic, @@ -54,6 +59,10 @@ impl From for SharedRuntimeFFIError { SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError, SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation, SharedRuntimeError::ShutdownTimedOut(_) => SharedRuntimeErrorCode::ShutdownTimedOut, + SharedRuntimeError::ForkUnsupportedInBorrowedMode + | SharedRuntimeError::SyncShutdownNotSupportedInBorrowedMode => { + SharedRuntimeErrorCode::NotSupportedInBorrowedMode + } }; SharedRuntimeFFIError::new(code, &err.to_string()) } @@ -142,8 +151,10 @@ pub unsafe extern "C" fn ddog_shared_runtime_before_fork( match handle { Some(runtime) => { // SAFETY: handle was produced by Arc::into_raw and the Arc is still alive. - runtime.before_fork(); - None + match runtime.before_fork() { + Ok(()) => None, + Err(err) => Some(Box::new(SharedRuntimeFFIError::from(err))), + } } None => Some(Box::new(SharedRuntimeFFIError::new( SharedRuntimeErrorCode::InvalidArgument, diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 058bf730a5..ac062af241 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -15,7 +15,8 @@ use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; use std::sync::atomic::AtomicU64; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; use std::{fmt, io}; use tracing::{debug, error}; @@ -27,7 +28,7 @@ mod native { use super::*; use pausable_worker::tokio_spawn_fn; use std::sync::atomic::Ordering; - use tokio::runtime::{Builder, Runtime}; + use tokio::runtime::{Builder, Handle, Runtime}; fn build_runtime() -> Result { Builder::new_multi_thread() @@ -36,33 +37,102 @@ mod native { .build() } + /// The tokio runtime that backs a [`SharedRuntime`]. + /// + /// Two flavours coexist: + /// - [`Self::Owned`]: a tokio runtime created and owned by the `SharedRuntime`. Used by FFI + /// callers and any code that constructs its own runtime up-front. Supports fork hooks, sync + /// `block_on`, sync `shutdown`. + /// - [`Self::Borrowed`]: a handle to a tokio runtime owned by the caller (typically a Rust + /// application's host runtime). Used when the caller is itself an async tokio program and we + /// must integrate with their executor instead of spinning up our own. Borrowed mode does + /// **not** support fork hooks, sync `block_on`, or sync `shutdown` — callers should use the + /// async/condvar shutdown paths instead. + #[derive(Debug, Clone)] + pub(super) enum RuntimeBacking { + Owned(Arc), + Borrowed(Handle), + } + + impl RuntimeBacking { + pub(super) fn handle(&self) -> Handle { + match self { + Self::Owned(rt) => rt.handle().clone(), + Self::Borrowed(h) => h.clone(), + } + } + + pub(super) fn is_borrowed(&self) -> bool { + matches!(self, Self::Borrowed(_)) + } + } + impl SharedRuntime { pub(in super::super) fn new_native() -> Result { Ok(Self { - runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))), + runtime: Arc::new(Mutex::new(Some(RuntimeBacking::Owned(Arc::new( + build_runtime()?, + ))))), workers: Arc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), + shutdown_tracker: Arc::new(ShutdownTracker::default()), }) } + /// Create a `SharedRuntime` that borrows an externally-owned tokio runtime via + /// the given [`Handle`]. + /// + /// The borrowed runtime is **not** owned by the `SharedRuntime`; dropping the + /// `SharedRuntime` does not stop or drain it. Use this when the caller is itself + /// an async tokio program (e.g. a Rust web service using `dd-trace-rs`) and you + /// want libdatadog's workers to share the host runtime rather than spinning up + /// a second tokio runtime in the same process. + /// + /// # Trade-offs vs. owned mode + /// Borrowed mode does **not** support: + /// - Fork safety: [`before_fork`](Self::before_fork), + /// [`after_fork_parent`](Self::after_fork_parent), and + /// [`after_fork_child`](Self::after_fork_child) all return + /// [`SharedRuntimeError::ForkUnsupportedInBorrowedMode`]. Rust applications rarely fork + /// after init; if you need fork-safety use owned mode. + /// - Synchronous [`block_on`](Self::block_on): the caller is already inside a tokio runtime + /// and would deadlock; this method returns + /// [`SharedRuntimeError::BlockOnNotSupportedInBorrowedMode`]. + /// - Synchronous [`shutdown`](Self::shutdown): same reason — use + /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) + + /// [`wait_shutdown_done`](Self::wait_shutdown_done) (sync) or + /// [`shutdown_async`](Self::shutdown_async) instead. + pub fn from_handle(handle: Handle) -> Self { + Self { + runtime: Arc::new(Mutex::new(Some(RuntimeBacking::Borrowed(handle)))), + workers: Arc::new(Mutex::new(Vec::new())), + next_worker_id: AtomicU64::new(1), + shutdown_tracker: Arc::new(ShutdownTracker::default()), + } + } + /// Returns a clone of the tokio runtime handle managed by this SharedRuntime. /// + /// Works for both owned and borrowed mode. + /// /// # Errors - /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down. - pub fn runtime_handle(&self) -> Result { + /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut + /// down (owned mode only; borrowed mode hands out the externally-owned handle as + /// long as this `SharedRuntime` itself hasn't been dropped). + pub fn runtime_handle(&self) -> Result { Ok(self .runtime .lock_or_panic() .as_ref() .ok_or(SharedRuntimeError::RuntimeUnavailable)? - .handle() - .clone()) + .handle()) } /// Spawn a PausableWorker on this runtime. /// - /// The worker will be tracked by this SharedRuntime and will be paused/resumed - /// during fork operations (native only). + /// The worker will be tracked by this SharedRuntime and (in owned mode) will be + /// paused/resumed during fork operations. In borrowed mode `restart_on_fork` is + /// ignored because fork hooks are unsupported. /// If `restart_on_fork` is true, the worker will be reset and restarted when calling /// `after_fork_child` else the worker is dropped *without* calling `Worker::shutdown`. /// @@ -89,8 +159,9 @@ mod native { let runtime_guard = self.runtime.lock_or_panic(); let mut workers_guard = self.workers.lock_or_panic(); - if let Some(rt) = runtime_guard.as_ref() { - if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) { + if let Some(backing) = runtime_guard.as_ref() { + let handle = backing.handle(); + if let Err(e) = pausable_worker.start(tokio_spawn_fn(&handle)) { return Err(e.into()); } } @@ -117,31 +188,54 @@ mod native { /// /// Worker errors are logged but do not cause the function to fail. /// If the worker fails to pause it is dropped without calling shutdown. - pub fn before_fork(&self) { + /// + /// # Errors + /// Returns [`SharedRuntimeError::ForkUnsupportedInBorrowedMode`] in borrowed mode. + /// Rust applications rarely fork after init; if you need fork-safety use owned + /// mode (i.e. construct the `SharedRuntime` via [`SharedRuntime::new`]). + pub fn before_fork(&self) -> Result<(), SharedRuntimeError> { debug!("before_fork: pausing all workers"); - if let Some(runtime) = self.runtime.lock_or_panic().take() { - let mut workers_lock = self.workers.lock_or_panic(); - runtime.block_on(async { - let futures: FuturesUnordered<_> = workers_lock - .iter_mut() - .map(|worker_entry| async { - if let Err(e) = worker_entry.worker.pause().await { - error!("Worker failed to pause before fork: {:?}", e); - } - }) - .collect(); - - futures.collect::<()>().await; - }); - } + let runtime = { + let mut runtime_lock = self.runtime.lock_or_panic(); + match runtime_lock.as_ref() { + Some(RuntimeBacking::Borrowed(_)) => { + return Err(SharedRuntimeError::ForkUnsupportedInBorrowedMode); + } + Some(RuntimeBacking::Owned(_)) => match runtime_lock.take() { + Some(RuntimeBacking::Owned(rt)) => rt, + _ => return Ok(()), + }, + None => return Ok(()), + } + }; + let mut workers_lock = self.workers.lock_or_panic(); + runtime.block_on(async { + let futures: FuturesUnordered<_> = workers_lock + .iter_mut() + .map(|worker_entry| async { + if let Err(e) = worker_entry.worker.pause().await { + error!("Worker failed to pause before fork: {:?}", e); + } + }) + .collect(); + + futures.collect::<()>().await; + }); + Ok(()) } fn restart_runtime(&self) -> Result<(), SharedRuntimeError> { let mut runtime_lock = self.runtime.lock_or_panic(); - if runtime_lock.is_none() { - *runtime_lock = Some(Arc::new(build_runtime()?)); + match runtime_lock.as_ref() { + Some(RuntimeBacking::Borrowed(_)) => { + Err(SharedRuntimeError::ForkUnsupportedInBorrowedMode) + } + Some(RuntimeBacking::Owned(_)) => Ok(()), + None => { + *runtime_lock = Some(RuntimeBacking::Owned(Arc::new(build_runtime()?))); + Ok(()) + } } - Ok(()) } /// Hook to be called in the parent process after forking. @@ -150,18 +244,13 @@ mod native { /// The runtime may need to be recreated if it was shut down in before_fork. /// /// # Errors - /// Returns an error if workers cannot be restarted or the runtime cannot be recreated. + /// Returns [`SharedRuntimeError::ForkUnsupportedInBorrowedMode`] in borrowed mode, + /// or another error if workers cannot be restarted or the runtime cannot be recreated. pub fn after_fork_parent(&self) -> Result<(), SharedRuntimeError> { debug!("after_fork_parent: restarting runtime and workers"); self.restart_runtime()?; - let runtime_lock = self.runtime.lock_or_panic(); - let handle = runtime_lock - .as_ref() - .ok_or(SharedRuntimeError::RuntimeUnavailable)? - .handle() - .clone(); - drop(runtime_lock); + let handle = self.runtime_handle()?; let mut workers_lock = self.workers.lock_or_panic(); @@ -179,18 +268,13 @@ mod native { /// Workers are reset and restarted to resume operations in the child. /// /// # Errors - /// Returns an error if the runtime cannot be reinitialized or workers cannot be started. + /// Returns [`SharedRuntimeError::ForkUnsupportedInBorrowedMode`] in borrowed mode, + /// or another error if the runtime cannot be reinitialized or workers cannot be started. pub fn after_fork_child(&self) -> Result<(), SharedRuntimeError> { debug!("after_fork_child: reinitializing runtime and workers"); self.restart_runtime()?; - let runtime_lock = self.runtime.lock_or_panic(); - let handle = runtime_lock - .as_ref() - .ok_or(SharedRuntimeError::RuntimeUnavailable)? - .handle() - .clone(); - drop(runtime_lock); + let handle = self.runtime_handle()?; let mut workers_lock = self.workers.lock_or_panic(); @@ -209,51 +293,279 @@ mod native { /// If the runtime is not available (e.g. after calling before_fork), a temporary /// single-threaded runtime is used. /// - /// Not available on wasm32 -- use async paths instead. + /// Not available on wasm32 — use async paths instead. /// /// # Errors - /// Returns an error if it fails to create a fallback runtime. + /// Returns an [`io::Error`] with kind [`io::ErrorKind::Unsupported`] in borrowed + /// mode: the caller is already inside their own tokio runtime and blocking on it + /// would deadlock. Use an async API instead. Also returns an [`io::Error`] if + /// the fallback single-threaded runtime cannot be created (owned mode, + /// post-fork window). pub fn block_on(&self, f: F) -> Result { let runtime = match self.runtime.lock_or_panic().as_ref() { + Some(RuntimeBacking::Borrowed(_)) => { + return Err(io::Error::new( + io::ErrorKind::Unsupported, + "SharedRuntime::block_on is not supported in borrowed mode; the \ + caller is already inside a tokio runtime — use an async API instead", + )); + } + Some(RuntimeBacking::Owned(rt)) => rt.clone(), None => Arc::new(Builder::new_current_thread().enable_all().build()?), - Some(runtime) => runtime.clone(), }; Ok(runtime.block_on(f)) } /// Shutdown the runtime and all workers synchronously with optional timeout. /// - /// Not available on wasm32 -- use [`shutdown_async`](Self::shutdown_async) instead. + /// Not available on wasm32 — use [`shutdown_async`](Self::shutdown_async) instead. /// /// Worker errors are logged but do not cause the function to fail. /// /// # Errors - /// Returns an error only if shutdown times out. - pub fn shutdown( - &self, - timeout: Option, - ) -> Result<(), SharedRuntimeError> { + /// Returns [`SharedRuntimeError::SyncShutdownNotSupportedInBorrowedMode`] in + /// borrowed mode — sync callers there should use + /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) + + /// [`wait_shutdown_done`](Self::wait_shutdown_done) instead. + /// Returns [`SharedRuntimeError::ShutdownTimedOut`] if shutdown times out. + pub fn shutdown(&self, timeout: Option) -> Result<(), SharedRuntimeError> { debug!(?timeout, "Shutting down SharedRuntime"); - match self.runtime.lock_or_panic().take() { - Some(runtime) => { - if let Some(timeout) = timeout { - match runtime.block_on(async { - tokio::time::timeout(timeout, self.shutdown_async()).await - }) { - Ok(()) => Ok(()), - Err(_) => Err(SharedRuntimeError::ShutdownTimedOut(timeout)), - } - } else { - runtime.block_on(self.shutdown_async()); - Ok(()) + let runtime = { + let mut runtime_lock = self.runtime.lock_or_panic(); + match runtime_lock.as_ref() { + Some(RuntimeBacking::Borrowed(_)) => { + return Err(SharedRuntimeError::SyncShutdownNotSupportedInBorrowedMode); } + Some(RuntimeBacking::Owned(_)) => match runtime_lock.take() { + Some(RuntimeBacking::Owned(rt)) => rt, + _ => return Ok(()), + }, + None => return Ok(()), + } + }; + if let Some(timeout) = timeout { + match runtime + .block_on(async { tokio::time::timeout(timeout, self.shutdown_async()).await }) + { + Ok(()) => Ok(()), + Err(_) => Err(SharedRuntimeError::ShutdownTimedOut(timeout)), } - None => Ok(()), + } else { + runtime.block_on(self.shutdown_async()); + Ok(()) + } + } + + /// Whether this `SharedRuntime` is in borrowed mode (constructed via + /// [`SharedRuntime::from_handle`]). + /// Returns whether this runtime is borrowed (constructed from a host + /// [`tokio::runtime::Handle`]) or owned. + /// + /// Poison-tolerant: if the internal mutex was poisoned by a previous panic the + /// inner state is still readable and we honor it rather than propagating the + /// poison; the alternative would be a panicky bool accessor in code paths that + /// can't surface an error. + pub fn is_borrowed(&self) -> bool { + match self.runtime.lock() { + Ok(guard) => guard.as_ref().is_some_and(|b| b.is_borrowed()), + Err(poison) => poison + .into_inner() + .as_ref() + .is_some_and(|b| b.is_borrowed()), + } + } + + /// Initiate shutdown of every currently-registered worker **without blocking**. + /// + /// Pairs with [`wait_shutdown_done`](Self::wait_shutdown_done) to give sync code + /// (e.g. `Drop` impls of higher-level exporters) a way to coordinate worker + /// shutdown without calling `block_on` on the underlying tokio runtime — which is + /// essential in borrowed mode where the caller is already a tokio worker thread. + /// + /// Concretely this: + /// 1. Snapshots and removes every registered worker. + /// 2. Records the snapshot's size as the "expected" completion count in the shutdown + /// tracker (idempotent: subsequent calls add to the expected count). + /// 3. Spawns one tokio task per worker on the underlying runtime that pauses then shuts + /// down the worker and bumps the tracker on completion. + /// + /// Workers spawned *after* this call returns are not tracked; shutdown is + /// considered terminal for the dd-trace-rs use case. + /// + /// # Errors + /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has already + /// been taken (e.g. after a previous sync `shutdown` in owned mode), or + /// [`SharedRuntimeError::LockFailed`] if an internal mutex was poisoned by a + /// previous panic. + pub fn trigger_shutdown_signal(&self) -> Result<(), SharedRuntimeError> { + let handle = self.runtime_handle()?; + + let workers = { + let mut workers_lock = self.workers.lock().map_err(|e| { + SharedRuntimeError::LockFailed(format!("workers mutex poisoned: {e}")) + })?; + std::mem::take(&mut *workers_lock) + }; + let count = workers.len(); + + { + let mut state = self.shutdown_tracker.state.lock().map_err(|e| { + SharedRuntimeError::LockFailed(format!( + "shutdown tracker state mutex poisoned: {e}" + )) + })?; + state.triggered = true; + state.expected = state.expected.saturating_add(count); + } + + // If no workers were registered, wake any pre-existing waiter so they don't + // block forever expecting a notify. + if count == 0 { + self.shutdown_tracker.cv.notify_all(); + return Ok(()); + } + + for mut entry in workers { + let tracker = self.shutdown_tracker.clone(); + handle.spawn(async move { + if let Err(e) = entry.worker.pause().await { + error!("Worker failed to pause on shutdown trigger: {:?}", e); + } else { + entry.worker.shutdown().await; + } + // Recover from a poisoned mutex rather than panic: panicking here + // would skip the counter bump and leave `wait_shutdown_done` + // waiting forever for a sibling task that may have already poisoned + // the lock. The inner counter is still readable through the + // returned guard. + let mut state = match tracker.state.lock() { + Ok(guard) => guard, + Err(poison) => { + error!( + "Shutdown tracker state mutex poisoned; counters may be inaccurate" + ); + poison.into_inner() + } + }; + state.completed = state.completed.saturating_add(1); + tracker.cv.notify_all(); + }); + } + Ok(()) + } + + /// Block the calling thread until every worker triggered by + /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) has reported + /// completion, or `timeout` elapses. + /// + /// Safe from any sync context — including from inside a tokio worker thread of + /// the borrowed/host runtime — because it relies on a `Condvar` rather than + /// `block_on`. Idempotent; returns immediately if shutdown has already completed. + /// + /// # Errors + /// Returns [`SharedRuntimeError::ShutdownTimedOut`] if `timeout` elapses before + /// all triggered workers complete. + pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), SharedRuntimeError> { + // Tolerate poison on the initial lock by extracting the inner guard. The + // counter state is plain data — there's no invariant a poisoned previous + // holder could have broken — so it's safe to proceed and the result is + // strictly better than panicking and leaking the caller's shutdown signal. + let state = self + .shutdown_tracker + .state + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + let (_state, res) = self + .shutdown_tracker + .cv + .wait_timeout_while(state, timeout, |s| s.completed < s.expected) + .unwrap_or_else(|err| err.into_inner()); + if res.timed_out() { + Err(SharedRuntimeError::ShutdownTimedOut(timeout)) + } else { + Ok(()) + } + } + } + + impl Drop for SharedRuntime { + fn drop(&mut self) { + // In borrowed mode we don't own the runtime so we cannot block on its + // shutdown — and blocking on a host tokio worker would deadlock. Best effort: + // signal cancellation to every still-registered worker (fire-and-forget) and + // let the host runtime reap the tasks on its own teardown. + // + // In owned mode, leaving cleanup to `drop(Arc)` is fine: tokio will + // drop the runtime and abort in-flight tasks. Callers that need graceful + // shutdown should call `shutdown(...)` explicitly. + // + // Critical: a Drop impl must not panic. Every lock acquisition below is + // poison-tolerant — if a sibling thread previously poisoned the mutex we + // recover the inner value and degrade gracefully rather than risking a + // double-panic and abort. + let borrowed = match self.runtime.lock() { + Ok(guard) => guard.as_ref().is_some_and(|b| b.is_borrowed()), + Err(poison) => poison + .into_inner() + .as_ref() + .is_some_and(|b| b.is_borrowed()), + }; + if !borrowed { + return; + } + let workers = { + let mut guard = match self.workers.lock() { + Ok(g) => g, + Err(poison) => poison.into_inner(), + }; + std::mem::take(&mut *guard) + }; + if workers.is_empty() { + return; + } + let Ok(handle) = self.runtime_handle() else { + return; + }; + for mut entry in workers { + handle.spawn(async move { + if let Err(e) = entry.worker.pause().await { + debug!("Worker failed to pause during borrowed-mode Drop: {:?}", e); + } + }); } } } } +#[cfg(not(target_arch = "wasm32"))] +use native::RuntimeBacking; + +/// Tracks how many workers have completed shutdown after [`trigger_shutdown_signal`]( +/// SharedRuntime::trigger_shutdown_signal). +/// +/// Mirrors the `TraceBuffer::wait_shutdown_done` Condvar pattern so sync callers can wait +/// without `block_on`. +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug, Default)] +struct ShutdownTracker { + state: Mutex, + cv: Condvar, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug, Default)] +struct ShutdownState { + /// Total workers we're awaiting completion from. Bumped by `trigger_shutdown_signal` + /// each time it runs (idempotent: shutdown is terminal so this only ever grows). + expected: usize, + /// Bumped by each per-worker shutdown task once `Worker::shutdown` returns. + completed: usize, + /// Set true by the first `trigger_shutdown_signal`. Currently informational; could be + /// used in debug builds to assert that no further workers are spawned post-trigger. + #[allow(dead_code)] + triggered: bool, +} + type BoxedWorker = Box; #[derive(Debug)] @@ -344,7 +656,18 @@ pub enum SharedRuntimeError { /// Failed to create the tokio runtime. RuntimeCreation(io::Error), /// Shutdown timed out. - ShutdownTimedOut(std::time::Duration), + ShutdownTimedOut(Duration), + /// Fork hooks (`before_fork`/`after_fork_parent`/`after_fork_child`) were called on a + /// `SharedRuntime` constructed via [`SharedRuntime::from_handle`]. Fork-safety is an + /// owned-runtime-only feature; callers that need it must use [`SharedRuntime::new`]. + #[cfg(not(target_arch = "wasm32"))] + ForkUnsupportedInBorrowedMode, + /// Sync `SharedRuntime::shutdown` was called on a borrowed-mode runtime. Use + /// [`SharedRuntime::trigger_shutdown_signal`] + + /// [`SharedRuntime::wait_shutdown_done`] (sync) or + /// [`SharedRuntime::shutdown_async`] (async) instead. + #[cfg(not(target_arch = "wasm32"))] + SyncShutdownNotSupportedInBorrowedMode, } impl fmt::Display for SharedRuntimeError { @@ -361,6 +684,19 @@ impl fmt::Display for SharedRuntimeError { Self::ShutdownTimedOut(duration) => { write!(f, "Shutdown timed out after {:?}", duration) } + #[cfg(not(target_arch = "wasm32"))] + Self::ForkUnsupportedInBorrowedMode => write!( + f, + "Fork hooks are not supported on a SharedRuntime created via from_handle; \ + use SharedRuntime::new to opt back into owned mode + fork safety" + ), + #[cfg(not(target_arch = "wasm32"))] + Self::SyncShutdownNotSupportedInBorrowedMode => write!( + f, + "Sync SharedRuntime::shutdown is not supported in borrowed mode; use \ + trigger_shutdown_signal + wait_shutdown_done (sync) or shutdown_async \ + (async) instead" + ), } } } @@ -381,22 +717,34 @@ impl From for SharedRuntimeError { /// A shared runtime that manages PausableWorkers and provides fork safety hooks. /// -/// The SharedRuntime owns a tokio runtime (on native) and tracks PausableWorkers -/// spawned on it. It provides methods to safely pause workers before forking and -/// restart them after fork in both parent and child processes. +/// The SharedRuntime owns or borrows a tokio runtime (on native) and tracks +/// PausableWorkers spawned on it. It provides methods to safely pause workers before +/// forking and restart them after fork in both parent and child processes (owned mode +/// only). /// /// On wasm32, no tokio runtime is created. Workers are spawned via `spawn_local` /// on the JS event loop. /// +/// # Construction +/// - [`SharedRuntime::new`]: owned mode. The runtime is created here and owned by the +/// `SharedRuntime`. Supports fork safety, sync `block_on`, sync `shutdown`. Used by FFI callers +/// and test code. +/// - [`SharedRuntime::from_handle`]: borrowed mode. The tokio runtime is owned by the caller; the +/// `SharedRuntime` just shares its [`Handle`](tokio::runtime::Handle). Does **not** support fork +/// safety, sync `block_on`, or sync `shutdown`. Used by Rust apps where libdatadog should +/// integrate with the caller's host runtime (e.g. dd-trace-rs from a tokio-based web service). +/// /// # Mutex lock order /// When locking both [Self::runtime] and [Self::workers], the mutex must be locked in the order of /// the fields in the struct. When possible avoid holding both locks simultaneously. #[derive(Debug)] pub struct SharedRuntime { #[cfg(not(target_arch = "wasm32"))] - runtime: Arc>>>, + runtime: Arc>>, workers: Arc>>, next_worker_id: AtomicU64, + #[cfg(not(target_arch = "wasm32"))] + shutdown_tracker: Arc, } impl SharedRuntime { @@ -598,7 +946,7 @@ mod tests { .expect("worker did not advance state before fork"); } - shared_runtime.before_fork(); + shared_runtime.before_fork().unwrap(); // Drain pre-fork buffered messages now that the worker is paused while receiver.try_recv().is_ok() {} @@ -629,7 +977,7 @@ mod tests { .expect("worker did not advance state before fork"); } - shared_runtime.before_fork(); + shared_runtime.before_fork().unwrap(); // Drain pre-fork buffered messages now that the worker is paused while receiver.try_recv().is_ok() {} @@ -681,7 +1029,7 @@ mod tests { .recv_timeout(Duration::from_secs(1)) .expect("worker did not run"); - shared_runtime.before_fork(); + shared_runtime.before_fork().unwrap(); // Drain buffered messages now that the worker is paused while receiver.try_recv().is_ok() {} @@ -696,4 +1044,86 @@ mod tests { "worker should not run or shut down after fork in child when restart_on_fork is false" ); } + + /// Smoke test for borrowed mode: workers spawned on a `SharedRuntime` created from a + /// host runtime's `Handle` run on that host runtime and can be shut down via the + /// Condvar-based [`SharedRuntime::wait_shutdown_done`] without `block_on`. + #[test] + fn test_from_handle_borrowed_shutdown_wait() { + // Host tokio runtime — borrowed by the SharedRuntime. + let host = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + let shared_runtime = SharedRuntime::from_handle(host.handle().clone()); + assert!(shared_runtime.is_borrowed()); + + let (worker, receiver) = make_test_worker(); + let _ = shared_runtime.spawn_worker(worker, true).unwrap(); + + // Wait for the worker to advance at least once on the host runtime. + receiver + .recv_timeout(Duration::from_secs(1)) + .expect("worker did not run on host runtime"); + + shared_runtime + .trigger_shutdown_signal() + .expect("trigger_shutdown_signal failed"); + shared_runtime + .wait_shutdown_done(Duration::from_secs(5)) + .expect("shutdown did not complete in time"); + + // Drain remaining messages — the last one must be the shutdown sentinel (-1). + let mut last = receiver + .recv_timeout(Duration::from_secs(1)) + .expect("shutdown sentinel was not produced"); + while let Ok(v) = receiver.try_recv() { + last = v; + } + assert_eq!(last, -1); + } + + /// Fork hooks and sync `block_on`/`shutdown` are unsupported in borrowed mode. + #[test] + fn test_borrowed_mode_unsupported_apis() { + let host = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let shared_runtime = SharedRuntime::from_handle(host.handle().clone()); + + assert!(matches!( + shared_runtime.before_fork(), + Err(SharedRuntimeError::ForkUnsupportedInBorrowedMode) + )); + assert!(matches!( + shared_runtime.after_fork_parent(), + Err(SharedRuntimeError::ForkUnsupportedInBorrowedMode) + )); + assert!(matches!( + shared_runtime.after_fork_child(), + Err(SharedRuntimeError::ForkUnsupportedInBorrowedMode) + )); + assert!(matches!( + shared_runtime.shutdown(None), + Err(SharedRuntimeError::SyncShutdownNotSupportedInBorrowedMode) + )); + + let err = shared_runtime + .block_on(async {}) + .expect_err("block_on should fail in borrowed mode"); + assert_eq!(err.kind(), std::io::ErrorKind::Unsupported); + } + + /// `wait_shutdown_done` returns `Ok` immediately when no workers were ever registered + /// (expected == completed == 0). + #[test] + fn test_wait_shutdown_done_no_workers() { + let shared_runtime = SharedRuntime::new().unwrap(); + shared_runtime.trigger_shutdown_signal().unwrap(); + shared_runtime + .wait_shutdown_done(Duration::from_secs(1)) + .expect("wait_shutdown_done should return immediately with no workers"); + } }