Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/spawned_task_component_tags.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Internal telemetry (metrics and logs) emitted from work that Vector runs on spawned `tokio` tasks now correctly inherits the owning component's tags (`component_id`, `component_kind`, `component_type`). Previously, several components spawned background tasks without propagating the tracing span, so some internal events emitted from those tasks were missing their component tags. Affected emissions include the `datadog_logs` sink's `component_discarded_events_total` (events too large to encode), the `gcp_pubsub` source's `component_errors_total`/`component_discarded_events_total` from its per-stream tasks, and the `splunk_hec` sinks' acknowledgement-handling `component_errors_total`.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

authors: gwenaskell
17 changes: 10 additions & 7 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::{
time::sleep,
};

use tracing::{debug, error, info, trace};
use tracing::{Instrument, debug, error, info, trace};

use crate::{
file_watcher::{FileWatcher, RawLineResult},
Expand Down Expand Up @@ -148,12 +148,15 @@ where
let mut stats = TimingStats::default();

// Spawn the checkpoint writer task
let checkpoint_task_handle = tokio::spawn(checkpoint_writer(
checkpointer,
self.glob_minimum_cooldown,
shutdown_checkpointer,
self.emitter.clone(),
));
let checkpoint_task_handle = tokio::spawn(
Comment thread
gwenaskell marked this conversation as resolved.
Outdated
checkpoint_writer(
checkpointer,
self.glob_minimum_cooldown,
shutdown_checkpointer,
self.emitter.clone(),
)
.in_current_span(),
);

// Alright friends, how does this work?
//
Expand Down
14 changes: 9 additions & 5 deletions lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use futures::StreamExt;
use rkyv::{Archive, Serialize, with::Atomic};
use snafu::{ResultExt, Snafu};
use tokio::{fs, io::AsyncWriteExt, sync::Notify};
use tracing::Instrument;
use vector_common::finalizer::OrderedFinalizer;

use super::{
Expand Down Expand Up @@ -700,12 +701,15 @@ where
#[must_use]
pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
let (finalizer, mut stream) = OrderedFinalizer::new(None);
tokio::spawn(async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
tokio::spawn(
async move {
while let Some((_status, amount)) = stream.next().await {
self.increment_pending_acks(amount);
self.notify_writer_waiters();
}
}
});
.in_current_span(),
);
finalizer
}
}
Expand Down
4 changes: 4 additions & 0 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ impl<'a> SendGroup<'a> {

fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
if let Some(send) = self.sends.remove(id) {
// Deliberately not instrumented with the current span: this drains a send to a sink
// that has just been detached from the topology, so it is unrelated to the upstream
// component that owns this fanout. Attaching the current span would mis-tag this
// task's logs with the upstream component's identity rather than the detached sink's.
tokio::spawn(async move {
if let Err(e) = send.await {
warn!(
Expand Down
5 changes: 5 additions & 0 deletions lib/vector-stream/src/concurrent_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ where
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(item)) => {
let fut = (this.f)(item);
// `ConcurrentMap` does not instrument the spawned future itself: the
// mapping closure runs on a detached task, so the current span at poll
// time is not necessarily meaningful for the work being performed. It is
// the caller's responsibility to propagate any span (e.g. the owning
// component's span for internal metric/log tagging) into `fut`.
let handle = tokio::spawn(fut);
this.in_flight.push_back(handle);
}
Expand Down
25 changes: 14 additions & 11 deletions lib/vector-tap/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,20 @@ impl TapController {
fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

tokio::spawn(async move {
_ = shutdown_rx.await;
if control_tx
.send(fanout::ControlMessage::Remove(sink_id.clone()))
.is_err()
{
debug!(message = "Couldn't disconnect sink.", ?sink_id);
} else {
debug!(message = "Disconnected sink.", ?sink_id);
tokio::spawn(
async move {
_ = shutdown_rx.await;
if control_tx
.send(fanout::ControlMessage::Remove(sink_id.clone()))
.is_err()
{
debug!(message = "Couldn't disconnect sink.", ?sink_id);
} else {
debug!(message = "Disconnected sink.", ?sink_id);
}
}
});
.in_current_span(),
);

shutdown_tx
}
Expand Down Expand Up @@ -370,7 +373,7 @@ async fn tap_handler(
while let Some(events) = tap_buffer_rx.next().await {
tap_transformer.try_send(events);
}
});
}.in_current_span());

// Attempt to connect the sink.
//
Expand Down
36 changes: 20 additions & 16 deletions src/api/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio_stream::{
wrappers::{IntervalStream, ReceiverStream},
};
use tonic::{Request, Response, Status};
use tracing::Instrument;
use vector_lib::tap::{
controller::{TapController, TapPatterns, TapPayload},
topology::WatchRx,
Expand Down Expand Up @@ -676,27 +677,30 @@ impl observability::Service for ObservabilityService {

let watch_rx = self.watch_rx.clone();

tokio::spawn(async move {
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
let mut tap_rx = ReceiverStream::new(tap_rx);
let mut interval = time::interval(time::Duration::from_millis(interval_ms));
let mut reservoir = Reservoir::new(limit);

loop {
select! {
Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => {
if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() {
break;
tokio::spawn(
async move {
let _tap_controller = TapController::new(watch_rx, tap_tx, patterns);
let mut tap_rx = ReceiverStream::new(tap_rx);
let mut interval = time::interval(time::Duration::from_millis(interval_ms));
let mut reservoir = Reservoir::new(limit);

loop {
select! {
Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => {
if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() {
break;
}
}
}
_ = interval.tick() => {
if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() {
break;
_ = interval.tick() => {
if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() {
break;
}
}
}
}
}
});
.in_current_span(),
);

let stream = FuturesStreamExt::flat_map(ReceiverStream::new(event_rx), |events| {
stream::iter(events.into_iter().map(Ok))
Expand Down
80 changes: 42 additions & 38 deletions src/api/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use axum::{
use tokio::sync::oneshot;
use tonic::transport::Server as TonicServer;
use tonic_health::server::{HealthReporter, health_reporter};
use tracing::Instrument;
use vector_lib::tap::topology::WatchRx;

use super::grpc::ObservabilityService;
Expand Down Expand Up @@ -80,45 +81,48 @@ impl GrpcServer {
let router_serving = Arc::clone(&serving);

// Spawn the server with the already-bound listener
tokio::spawn(async move {
// Build reflection service for tools like grpcurl
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
crate::proto::observability::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
.build()
.expect("Failed to build reflection service");

// Build the tonic router (gRPC services) and merge with the HTTP router
// so both protocols share the same port. `accept_http1(true)` lets plain
// HTTP/1.1 requests reach the merged axum routes.
let router = TonicServer::builder()
.accept_http1(true)
.add_service(health_service)
.add_service(ObservabilityServer::new(service))
.add_service(reflection_service)
.into_router()
.merge(http_router(router_serving));

let result = hyper::Server::from_tcp(std_listener)
.expect("Failed to build HTTP server from TCP listener")
.serve(router.into_make_service())
.with_graceful_shutdown(async {
rx.await.ok();
info!("GRPC API server shutting down.");
})
.await;

if let Err(e) = result {
error!(
message = "GRPC server encountered an error.",
error = %e,
error_source = ?e.source(),
bind_addr = %actual_addr,
);
tokio::spawn(
Comment thread
gwenaskell marked this conversation as resolved.
Outdated
async move {
// Build reflection service for tools like grpcurl
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
crate::proto::observability::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
.build()
.expect("Failed to build reflection service");

// Build the tonic router (gRPC services) and merge with the HTTP router
// so both protocols share the same port. `accept_http1(true)` lets plain
// HTTP/1.1 requests reach the merged axum routes.
let router = TonicServer::builder()
.accept_http1(true)
.add_service(health_service)
.add_service(ObservabilityServer::new(service))
.add_service(reflection_service)
.into_router()
.merge(http_router(router_serving));

let result = hyper::Server::from_tcp(std_listener)
.expect("Failed to build HTTP server from TCP listener")
.serve(router.into_make_service())
.with_graceful_shutdown(async {
rx.await.ok();
info!("GRPC API server shutting down.");
})
.await;

if let Err(e) = result {
error!(
message = "GRPC server encountered an error.",
error = %e,
error_source = ?e.source(),
bind_addr = %actual_addr,
);
}
}
});
.in_current_span(),
);

info!("GRPC API server started on {}.", actual_addr);

Expand Down
3 changes: 2 additions & 1 deletion src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use hyper::header::AUTHORIZATION;
use smpl_jwt::Jwt;
use snafu::{ResultExt, Snafu};
use tokio::sync::watch;
use tracing::Instrument;
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};

use crate::{
Expand Down Expand Up @@ -194,7 +195,7 @@ impl GcpAuthenticator {

pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
let (sender, receiver) = watch::channel(());
tokio::spawn(self.clone().token_regenerator(sender));
tokio::spawn(self.clone().token_regenerator(sender).in_current_span());
receiver
}

Expand Down
3 changes: 2 additions & 1 deletion src/secrets/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command, time};
use tokio_util::codec;
use tracing::Instrument;
use vector_lib::configurable::{component::GenerateConfig, configurable_component};
use vrl::value::Value;

Expand Down Expand Up @@ -179,7 +180,7 @@ async fn query_backend(
.ok_or("unable to acquire stdout")?;

let query = serde_json::to_vec(&query)?;
tokio::spawn(async move { stdin.write_all(&query).await });
tokio::spawn(async move { stdin.write_all(&query).await }.in_current_span());

let timeout = time::sleep(time::Duration::from_secs(timeout));
tokio::pin!(timeout);
Expand Down
47 changes: 26 additions & 21 deletions src/sinks/blackhole/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use vector_lib::{
},
};

use tracing::Instrument;

use crate::{
event::{EventArray, EventContainer, EventStatus, Finalizable},
sinks::{blackhole::config::BlackholeConfig, util::StreamSink},
Expand Down Expand Up @@ -57,29 +59,32 @@ impl StreamSink<EventArray> for BlackholeSink {

if self.config.print_interval_secs.as_secs() > 0 {
let interval_dur = self.config.print_interval_secs;
tokio::spawn(async move {
let mut print_interval = interval(interval_dur);
loop {
select! {
_ = print_interval.tick() => {
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
},
_ = tripwire.changed() => break,
tokio::spawn(
async move {
let mut print_interval = interval(interval_dur);
loop {
select! {
_ = print_interval.tick() => {
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
},
_ = tripwire.changed() => break,
}
}
}

info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
});
info!(
events = total_events.load(Ordering::Relaxed),
raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed),
internal_log_rate_limit = false,
"Collected events."
);
}
.in_current_span(),
);
}

while let Some(mut events) = input.next().await {
Expand Down
Loading
Loading