Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/spawned_task_component_tags.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Internal telemetry (metrics and logs) emitted from work that Vector runs on spawned `tokio` tasks now correctly inherits the owning component's tags (`component_id`, `component_kind`, `component_type`). Previously, several components spawned background tasks without propagating the tracing span, so some internal events emitted from those tasks were missing their component tags. Affected emissions include the `datadog_logs` sink's `component_discarded_events_total` (events too large to encode), the `gcp_pubsub` source's `component_errors_total`/`component_discarded_events_total` from its per-stream tasks, and the `splunk_hec` sinks' acknowledgement-handling `component_errors_total`.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

authors: gwenaskell
2 changes: 1 addition & 1 deletion lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
let mut stats = TimingStats::default();

// Spawn the checkpoint writer task
let checkpoint_task_handle = tokio::spawn(checkpoint_writer(
let checkpoint_task_handle = vector_common::spawn_in_current_span(checkpoint_writer(
checkpointer,
self.glob_minimum_cooldown,
shutdown_checkpointer,
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ where
#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
let (finalizer, mut stream) = OrderedFinalizer::new(None);
tokio::spawn(async move {
vector_common::spawn_in_current_span(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
Expand Down
16 changes: 16 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,19 @@ pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Vector's basic result type, defined in terms of [`Error`] and generic over
/// `T`.
pub type Result<T> = std::result::Result<T, Error>;

/// Spawn a future on the current tokio runtime, propagating the current tracing span into the
/// spawned task. This ensures that any logs or internal metrics emitted by the task retain the
/// component tags (`component_id`, `component_kind`, `component_type`) of the caller.
///
/// Prefer this over `tokio::spawn(future.in_current_span())` to keep call sites concise.
#[track_caller]
pub fn spawn_in_current_span<T>(
task: impl std::future::Future<Output = T> + Send + 'static,
) -> tokio::task::JoinHandle<T>
where
T: Send + 'static,
{
use tracing::Instrument as _;
tokio::spawn(task.in_current_span())
Comment thread
pront marked this conversation as resolved.
}
4 changes: 4 additions & 0 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ impl<'a> SendGroup<'a> {

fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
if let Some(send) = self.sends.remove(id) {
// Deliberately not instrumented with the current span: this drains a send to a sink
// that has just been detached from the topology, so it is unrelated to the upstream
// component that owns this fanout. Attaching the current span would mis-tag this
// task's logs with the upstream component's identity rather than the detached sink's.
tokio::spawn(async move {
if let Err(e) = send.await {
warn!(
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub use vector_common::{
Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap,
byte_size_of, byte_size_of::ByteSizeOf, conversion, counter, encode_logfmt, finalization,
finalizer, gauge, histogram, id, impl_event_data_eq, internal_event, json_size,
registered_event, request_metadata, sensitive_string, shutdown, stats, trigger,
registered_event, request_metadata, sensitive_string, shutdown, spawn_in_current_span, stats,
trigger,
};
pub use vector_config as configurable;
pub use vector_config::impl_generate_config_from_default;
Expand Down
5 changes: 5 additions & 0 deletions lib/vector-stream/src/concurrent_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ where
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(item)) => {
let fut = (this.f)(item);
// `ConcurrentMap` does not instrument the spawned future itself: the
// mapping closure runs on a detached task, so the current span at poll
// time is not necessarily meaningful for the work being performed. It is
// the caller's responsibility to propagate any span (e.g. the owning
// component's span for internal metric/log tagging) into `fut`.
let handle = tokio::spawn(fut);
this.in_flight.push_back(handle);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-tap/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl TapController {
fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

tokio::spawn(async move {
vector_common::spawn_in_current_span(async move {
_ = shutdown_rx.await;
if control_tx
.send(fanout::ControlMessage::Remove(sink_id.clone()))
Expand Down Expand Up @@ -366,7 +366,7 @@ async fn tap_handler(
);
let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone());

tokio::spawn(async move {
vector_common::spawn_in_current_span(async move {
while let Some(events) = tap_buffer_rx.next().await {
tap_transformer.try_send(events);
}
Expand Down
2 changes: 1 addition & 1 deletion src/api/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ impl observability::Service for ObservabilityService {

let watch_rx = self.watch_rx.clone();

tokio::spawn(async move {
crate::spawn_in_current_span(async move {
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
let mut tap_rx = ReceiverStream::new(tap_rx);
let mut interval = time::interval(time::Duration::from_millis(interval_ms));
Expand Down
2 changes: 1 addition & 1 deletion src/api/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl GrpcServer {
let router_serving = Arc::clone(&serving);

// Spawn the server with the already-bound listener
tokio::spawn(async move {
crate::spawn_in_current_span(async move {
// Build reflection service for tools like grpcurl
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
Expand Down
2 changes: 1 addition & 1 deletion src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl GcpAuthenticator {

pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
let (sender, receiver) = watch::channel(());
tokio::spawn(self.clone().token_regenerator(sender));
crate::spawn_in_current_span(self.clone().token_regenerator(sender));
receiver
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ pub fn get_hostname() -> std::io::Result<String> {
})
}

pub(crate) use vector_lib::spawn_in_current_span;

/// Spawn a task with the given name. The name is only used if
/// built with [`tokio_unstable`][tokio_unstable].
///
Expand Down
2 changes: 1 addition & 1 deletion src/secrets/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn query_backend(
.ok_or("unable to acquire stdout")?;

let query = serde_json::to_vec(&query)?;
tokio::spawn(async move { stdin.write_all(&query).await });
crate::spawn_in_current_span(async move { stdin.write_all(&query).await });

let timeout = time::sleep(time::Duration::from_secs(timeout));
tokio::pin!(timeout);
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/blackhole/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl StreamSink<EventArray> for BlackholeSink {

if self.config.print_interval_secs.as_secs() > 0 {
let interval_dur = self.config.print_interval_secs;
tokio::spawn(async move {
crate::spawn_in_current_span(async move {
let mut print_interval = interval(interval_dur);
loop {
select! {
Expand Down
18 changes: 13 additions & 5 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};

use itertools::Itertools;
use snafu::Snafu;
use tracing::Instrument;
use vector_lib::{
event::{ObjectMap, Value},
internal_event::{ComponentEventsDropped, UNINTENTIONAL},
Expand Down Expand Up @@ -393,12 +394,19 @@ where
.concurrent_map(default_request_builder_concurrency_limit(), move |input| {
let builder = Arc::clone(&builder);

Box::pin(async move {
let (api_key, events) = input;
let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
// `concurrent_map` spawns this future on a detached task. The closure itself runs
// within `run_inner`'s span, so `in_current_span` captures the sink span here and
// re-enters it on the spawned task to preserve the sink's automatic component tags.
Box::pin(
async move {
let (api_key, events) = input;
let api_key =
api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));

builder.build_request(events, api_key)
})
builder.build_request(events, api_key)
}
.in_current_span(),
)
})
.filter_map(|request| async move {
match request {
Expand Down
18 changes: 13 additions & 5 deletions src/sinks/datadog/metrics/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_util::{
stream::{self, BoxStream},
};
use tower::Service;
use tracing::Instrument;
use vector_lib::{
event::{Event, Metric, MetricValue},
partition::Partitioner,
Expand Down Expand Up @@ -136,11 +137,18 @@ where
.concurrent_map(
default_request_builder_concurrency_limit(),
|((api_key, endpoint), metrics)| {
Box::pin(async move {
let collapsed_metrics =
sort_and_collapse_counters_by_series_and_timestamp(metrics);
((api_key, endpoint), collapsed_metrics)
})
// `concurrent_map` spawns this future on a detached task. The closure itself
// runs within `run_inner`'s span, so `in_current_span` captures the sink span
// here and re-enters it on the spawned task to preserve the sink's automatic
// component tags on any internal metrics/logs emitted during aggregation.
Box::pin(
async move {
let collapsed_metrics =
sort_and_collapse_counters_by_series_and_timestamp(metrics);
((api_key, endpoint), collapsed_metrics)
}
.in_current_span(),
)
},
)
// We build our requests "incrementally", which means that for a single batch of metrics, we might generate
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/datadog/traces/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl DatadogTracesConfig {
// Send the APM stats payloads independently of the sink framework.
// This is necessary to comply with what the APM stats backend of Datadog expects with
// respect to receiving stats payloads.
tokio::spawn(flush_apm_stats_thread(
crate::spawn_in_current_span(flush_apm_stats_thread(
tripwire,
client,
compression,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/mqtt/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl MqttSink {
let (client, mut connection) = self.connector.connect();

// This is necessary to keep the mqtt event loop moving forward.
tokio::spawn(async move {
crate::spawn_in_current_span(async move {
loop {
// If an error is returned here there is currently no way to tie this back
// to the event that was posted which means we can't accurately provide
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ impl PrometheusExporter {
let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
let listener = tls.bind(&address).await?;

tokio::spawn(async move {
crate::spawn_in_current_span(async move {
info!(message = "Building HTTP server.", address = %address);

Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/redis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl RedisConnection {
Ok(Self::Sentinel {
connection_send: conn_tx,
connection_recv: conn_rx,
repair_task: Arc::new(tokio::spawn(async move {
repair_task: Arc::new(crate::spawn_in_current_span(async move {
Self::repair_connection_manager_task(
sentinel,
service_name,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/splunk_hec/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
let max_pending_acks = indexer_acknowledgements.max_pending_acks.get();
let tx = if let Some(ack_client) = ack_client {
let (tx, rx) = mpsc::channel(128);
tokio::spawn(run_acknowledgements(
crate::spawn_in_current_span(run_acknowledgements(
rx,
ack_client,
Arc::clone(&http_request_builder),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/util/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ where
this.lingers.remove(partition);

let batch = batch.finish();
let future = tokio::spawn(this.service.call(batch));
let future = crate::spawn_in_current_span(this.service.call(batch));

if let Some(map) = this.in_flight.as_mut() {
map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
Expand Down
49 changes: 21 additions & 28 deletions src/sinks/websocket_server/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use tokio_tungstenite::tungstenite::{
handshake::server::{ErrorResponse, Request, Response},
};
use tokio_util::codec::Encoder as _;
use tracing::Instrument;
use url::Url;
use uuid::Uuid;
use vector_lib::{
Expand Down Expand Up @@ -129,20 +128,17 @@ impl WebSocketListenerSink {
let open_gauge = OpenGauge::new();

while let Ok(stream) = listener.accept().await {
tokio::spawn(
Self::handle_connection(
auth.clone(),
message_buffering.clone(),
subprotocol.clone(),
Arc::clone(&peers),
Arc::clone(&client_checkpoints),
Arc::clone(&buffer),
stream,
extra_tags_config.clone(),
open_gauge.clone(),
)
.in_current_span(),
);
crate::spawn_in_current_span(Self::handle_connection(
auth.clone(),
message_buffering.clone(),
subprotocol.clone(),
Arc::clone(&peers),
Arc::clone(&client_checkpoints),
Arc::clone(&buffer),
stream,
extra_tags_config.clone(),
open_gauge.clone(),
));
}
}

Expand Down Expand Up @@ -360,19 +356,16 @@ impl StreamSink<Event> for WebSocketListenerSink {
)));
let client_checkpoints = Arc::new(Mutex::new(HashMap::default()));

tokio::spawn(
Self::handle_connections(
self.auth,
self.message_buffering.clone(),
self.subprotocol.clone(),
Arc::clone(&peers),
self.extra_tags_config,
Arc::clone(&client_checkpoints),
Arc::clone(&message_buffer),
listener,
)
.in_current_span(),
);
crate::spawn_in_current_span(Self::handle_connections(
self.auth,
self.message_buffering.clone(),
self.subprotocol.clone(),
Arc::clone(&peers),
self.extra_tags_config,
Arc::clone(&client_checkpoints),
Arc::clone(&message_buffer),
listener,
));

while input.as_mut().peek().await.is_some() {
let mut event = input.next().await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use smallvec::SmallVec;
use snafu::{ResultExt, Snafu};
use tokio::{pin, select};
use tokio_util::codec::FramedRead;
use tracing::Instrument;
use vector_lib::{
codecs::decoding::FramingError,
config::{LegacyKey, LogNamespace, log_schema},
Expand Down Expand Up @@ -358,7 +357,7 @@ impl Ingestor {
acknowledgements,
);
let fut = process.run();
let handle = tokio::spawn(fut.in_current_span());
let handle = crate::spawn_in_current_span(fut);
handles.push(handle);
}

Expand Down
Loading
Loading