Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 143 additions & 47 deletions tracectl/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,71 @@ where
}
}

/// `AtomicThrottle` wraps an *optional* [`TracingRateLimitLayer`] in an
/// `ArcSwap` so the rate-limit policy can be swapped at runtime — the same
/// pattern [`AtomicEnvFilter`] uses for the level filter. One clone is moved
/// into the subscriber as a per-layer `Filter`; another is kept on
/// [`TracingControl`] to push reloads. Both share the same
/// `Arc<ArcSwap<Option<TracingRateLimitLayer>>>`.
///
/// `None` means "no throttling": every event passes. [`reload`](Self::reload)
/// builds a brand-new layer (with fresh token buckets) from the new config and
/// swaps it in atomically, or stores `None` to disable throttling entirely.
/// A `Some(config)` that fails to build falls back to `None` (unthrottled),
/// mirroring the init-time behavior of [`TracingControl::build_rate_limit_layer`].
#[derive(Clone)]
struct AtomicThrottle {
inner: Arc<ArcSwap<Option<TracingRateLimitLayer>>>,
Comment thread
sergeymatov marked this conversation as resolved.
Outdated
}

impl AtomicThrottle {
fn new(config: Option<TracingRateLimitConfig>) -> Self {
Self {
inner: Arc::new(ArcSwap::from_pointee(
TracingControl::build_rate_limit_layer(config),
)),
}
}

/// Swap the active throttle. `Some(config)` installs a freshly-built
/// token-bucket layer; `None` removes throttling. Infallible: a config
/// that fails to build degrades to unthrottled.
fn reload(&self, config: Option<TracingRateLimitConfig>) {
self.inner.store(std::sync::Arc::new(
TracingControl::build_rate_limit_layer(config),
)); // nosemgrep: rust-no-direct-std-sync-import
callsite::rebuild_interest_cache();
Comment thread
Copilot marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

// Used as a per-layer `Filter` (via `level_filter.and(throttle)`), exactly
// like the baked-in `TracingRateLimitLayer` it replaces. The throttle verdict
// is made per event in `event_enabled`; `callsite_enabled` deliberately
// reports `sometimes` so a later `reload` that installs a throttle is never
// short-circuited by a cached always/never interest.
impl<S> Filter<S> for AtomicThrottle
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
match &**self.inner.load() {
Some(layer) => Filter::<S>::enabled(layer, meta, cx),
None => true,
}
}

fn callsite_enabled(&self, _meta: &'static Metadata<'static>) -> Interest {
Interest::sometimes()
}

fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool {
match &**self.inner.load() {
Some(layer) => Filter::<S>::event_enabled(layer, event, cx),
None => true,
}
}
}

// ---------------------------------------------------------------------------
// TracingControl
// ---------------------------------------------------------------------------
Expand All @@ -386,6 +451,7 @@ type BoxedTracingLayer<S> = Box<dyn Layer<S> + Send + Sync>;
pub struct TracingControl {
db: Arc<Mutex<TargetCfgDb>>,
reload_filter: AtomicEnvFilter,
reload_throttle: AtomicThrottle,
}
impl TracingControl {
fn fmt_layer<S>() -> impl Layer<S> + Send + Sync
Expand Down Expand Up @@ -433,21 +499,17 @@ impl TracingControl {
.boxed()
}

fn throttled_fmt_layer<S>(
rate_limit_config: Option<TracingRateLimitConfig>,
) -> BoxedTracingLayer<S>
// The throttle is always installed as a swappable filter (even when it
// currently holds `None`) so the policy can be turned on, off, or
// reconfigured at runtime via `AtomicThrottle::reload` without rebuilding
// the subscriber.
fn throttled_fmt_layer<S>(throttle: AtomicThrottle) -> BoxedTracingLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync + 'static,
{
if let Some(rate_limit) = Self::build_rate_limit_layer(rate_limit_config) {
Self::fmt_layer()
.with_filter(Self::throttled_level_filter().and(rate_limit))
.boxed()
} else {
Self::fmt_layer()
.with_filter(Self::throttled_level_filter())
.boxed()
}
Self::fmt_layer()
.with_filter(Self::throttled_level_filter().and(throttle))
.boxed()
}

fn build_rate_limit_layer(
Expand Down Expand Up @@ -483,11 +545,11 @@ impl TracingControl {
}
}

fn init_subscriber(filter: AtomicEnvFilter, rate_limit_config: Option<TracingRateLimitConfig>) {
fn init_subscriber(filter: AtomicEnvFilter, throttle: AtomicThrottle) {
if let Err(e) = tracing_subscriber::registry()
.with(filter)
.with(Self::unthrottled_fmt_layer())
.with(Self::throttled_fmt_layer(rate_limit_config))
.with(Self::throttled_fmt_layer(throttle))
.with(tracing_error::ErrorLayer::default())
.try_init()
{
Expand All @@ -499,15 +561,18 @@ impl TracingControl {
let db = TargetCfgDb::new();
let filter = AtomicEnvFilter::new(db.env_filter());
let reload_filter = filter.clone();
let throttle = AtomicThrottle::new(rate_limit_config);
let reload_throttle = throttle.clone();

Self::init_subscriber(filter, rate_limit_config);
Self::init_subscriber(filter, throttle);

if let Err(e) = color_eyre::install() {
eprintln!("Failed to initialize color_eyre:\n{e}");
}
Self {
db: Arc::new(Mutex::new(db)),
reload_filter,
reload_throttle,
}
}
fn lock(&self) -> MutexGuard<'_, TargetCfgDb> {
Expand Down Expand Up @@ -587,6 +652,18 @@ impl TracingControl {
Ok(db.default)
}

/// Reload (or disable) the tracing rate limiter at runtime.
///
/// `Some(config)` swaps in a freshly-built token-bucket layer (with fresh
/// buckets — a reload grants a new burst allowance); `None` removes
/// throttling so every event passes. Infallible: like init, a config that
/// fails to build degrades to unthrottled. This is the hook a config/CRD
/// handler calls when the rate-limit parameters change, analogous to
/// [`reconfigure`](Self::reconfigure) for log levels.
pub fn reload_rate_limit(&self, config: Option<TracingRateLimitConfig>) {
self.reload_throttle.reload(config);
}

/// Parse a string made of comma-separated tag=level; level = [off,error,warn,info,debug,trace]
fn parse_tracing_config(input: &str) -> Result<OrderMap<String, LevelFilter>, TraceCtlError> {
let mut result = OrderMap::new();
Expand Down Expand Up @@ -1059,70 +1136,89 @@ mod tests {
assert_eq!(check_level!(Y4), LevelFilter::DEBUG);
}

// Verify the rate limiter actually drops events past the burst.
// Exercise the runtime reload path: no throttle -> install one -> remove it,
// counting events at each phase to prove the swap takes effect live.
//
// The throttle is held in an `AtomicThrottle` installed as a per-layer
// filter, the same handle `TracingControl` keeps on `reload_throttle`. We
// build a *local* subscriber (rather than the global one) only so the fmt
// output lands in a capturable `MockWriter` buffer instead of stdout — the
// mechanism under test is identical.
#[test]
#[serial]
#[allow(clippy::disallowed_types)]
fn test_rate_limit_drops_burst_overflow() {
use crate::control::{TracingControl, TracingRateLimitConfig};
fn test_rate_limit_reload_phases() {
use crate::control::{AtomicThrottle, TracingControl, TracingRateLimitConfig};
use std::sync::Mutex; // nosemgrep: rust-no-direct-std-sync-import
use tracing_subscriber::{EnvFilter, filter::FilterExt, layer::Layer, prelude::*};
use tracing_test::internal::MockWriter;

const INFO_MARKER: &str = "HH_RATE_TEST_INFO";
const DEBUG_MARKER: &str = "HH_RATE_TEST_DEBUG";
// Distinct markers so one accumulating buffer can be counted per phase.
const P1_OFF: &str = "HH_RL_PHASE1_OFF";
const P2_ON: &str = "HH_RL_PHASE2_ON";
const P3_OFF: &str = "HH_RL_PHASE3_OFF";
const BURST: u32 = 5;
const EMITTED: usize = 100;

// We leak a `Mutex<Vec<u8>>` to get a `'static` writer buffer for the subscriber layer.
let buf: &'static Mutex<Vec<u8>> = Box::leak(Box::new(Mutex::new(Vec::new())));

let rate_limit = TracingControl::build_rate_limit_layer(Some(TracingRateLimitConfig {
burst: BURST,
replenish_per_second: 1,
}))
.expect("build rate-limit layer");

// Same for dataplane: `DEBUG` events are unthrottled, `INFO` events are
let fmt_unthrottled = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(MockWriter::new(buf))
.with_filter(TracingControl::unthrottled_level_filter());
// Start with throttling disabled; keep a handle to flip it mid-test.
let throttle = AtomicThrottle::new(None);

let fmt_throttled = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(MockWriter::new(buf))
.with_filter(TracingControl::throttled_level_filter().and(rate_limit));
.with_filter(TracingControl::throttled_level_filter().and(throttle.clone()));

let subscriber = tracing_subscriber::registry()
.with(EnvFilter::new("debug"))
.with(fmt_unthrottled)
.with(EnvFilter::new("info"))
.with(fmt_throttled);

tracing::subscriber::with_default(subscriber, || {
// Phase 1 — no throttle: every INFO event must pass.
for _ in 0..EMITTED {
tracing::info!("{P1_OFF}");
}

// Phase 2 — install a small-burst throttle and reload the layer.
throttle.reload(Some(TracingRateLimitConfig {
burst: BURST,
replenish_per_second: 1,
}));
for _ in 0..EMITTED {
tracing::debug!("{DEBUG_MARKER}");
tracing::info!("{INFO_MARKER}");
tracing::info!("{P2_ON}");
}

// Phase 3 — remove the throttle again: every INFO event must pass.
throttle.reload(None);
for _ in 0..EMITTED {
tracing::info!("{P3_OFF}");
}
});

let captured = String::from_utf8(buf.lock().unwrap().clone()).unwrap();
let info_kept = captured.matches(INFO_MARKER).count();
let debug_kept = captured.matches(DEBUG_MARKER).count();
let count = |marker: &str| captured.matches(marker).count();
let (p1, p2, p3) = (count(P1_OFF), count(P2_ON), count(P3_OFF));

// DEBUG is unthrottled — every event must make it through, and the
// throttle layer never sees it so it can't burn tokens budgeted for INFO.
// Phase 1: throttle disabled, nothing dropped.
assert_eq!(
debug_kept, EMITTED,
"DEBUG events must pass the unthrottled layer; got {debug_kept}/{EMITTED}"
p1, EMITTED,
"phase 1 (no throttle) dropped events: {p1}/{EMITTED}"
);

// Phase 2: throttle active — at least the burst survives, but the bulk
// is dropped. If this saw EMITTED, the reload didn't take effect.
assert!(p2 >= 1, "phase 2 throttle swallowed the entire burst");
assert!(
info_kept >= 1,
"rate limiter swallowed the entire INFO burst"
p2 < EMITTED / 2,
"phase 2 let through {p2}/{EMITTED} events; throttle not applied after reload\n{captured}"
);
assert!(
info_kept < EMITTED / 2,
"rate limiter let through {info_kept}/{EMITTED} INFO events; expected ≪ {EMITTED}\n{captured}"

// Phase 3: throttle removed — back to passing everything.
assert_eq!(
p3, EMITTED,
"phase 3 (throttle removed) dropped events: {p3}/{EMITTED}"
);
}
}