From d5020802bd5f5ea7e7c01e8bf987c2ed0e42f2d1 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Wed, 10 Jun 2026 16:38:12 +0400 Subject: [PATCH 1/7] chore(tracing): Add atomic tracing config handler Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Sergey Matov --- tracectl/src/control.rs | 190 ++++++++++++++++++++++++++++++---------- 1 file changed, 143 insertions(+), 47 deletions(-) diff --git a/tracectl/src/control.rs b/tracectl/src/control.rs index b6879f5d1..d94410ddd 100644 --- a/tracectl/src/control.rs +++ b/tracectl/src/control.rs @@ -377,6 +377,71 @@ where } } +/// `AtomicThrottle` wraps an *optional* [`TracingRateLimitLayer`] in an +/// `ArcSwap` so the rate-limit policy can be swapped at runtime — the same +/// pattern [`AtomicEnvFilter`] uses for the level filter. One clone is moved +/// into the subscriber as a per-layer `Filter`; another is kept on +/// [`TracingControl`] to push reloads. Both share the same +/// `Arc>>`. +/// +/// `None` means "no throttling": every event passes. [`reload`](Self::reload) +/// builds a brand-new layer (with fresh token buckets) from the new config and +/// swaps it in atomically, or stores `None` to disable throttling entirely. +/// A `Some(config)` that fails to build falls back to `None` (unthrottled), +/// mirroring the init-time behavior of [`TracingControl::build_rate_limit_layer`]. +#[derive(Clone)] +struct AtomicThrottle { + inner: Arc>>, +} + +impl AtomicThrottle { + fn new(config: Option) -> Self { + Self { + inner: Arc::new(ArcSwap::from_pointee( + TracingControl::build_rate_limit_layer(config), + )), + } + } + + /// Swap the active throttle. `Some(config)` installs a freshly-built + /// token-bucket layer; `None` removes throttling. Infallible: a config + /// that fails to build degrades to unthrottled. + fn reload(&self, config: Option) { + self.inner.store(std::sync::Arc::new( + TracingControl::build_rate_limit_layer(config), + )); // nosemgrep: rust-no-direct-std-sync-import + callsite::rebuild_interest_cache(); + } +} + +// Used as a per-layer `Filter` (via `level_filter.and(throttle)`), exactly +// like the baked-in `TracingRateLimitLayer` it replaces. The throttle verdict +// is made per event in `event_enabled`; `callsite_enabled` deliberately +// reports `sometimes` so a later `reload` that installs a throttle is never +// short-circuited by a cached always/never interest. +impl Filter for AtomicThrottle +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { + match &**self.inner.load() { + Some(layer) => Filter::::enabled(layer, meta, cx), + None => true, + } + } + + fn callsite_enabled(&self, _meta: &'static Metadata<'static>) -> Interest { + Interest::sometimes() + } + + fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool { + match &**self.inner.load() { + Some(layer) => Filter::::event_enabled(layer, event, cx), + None => true, + } + } +} + // --------------------------------------------------------------------------- // TracingControl // --------------------------------------------------------------------------- @@ -386,6 +451,7 @@ type BoxedTracingLayer = Box + Send + Sync>; pub struct TracingControl { db: Arc>, reload_filter: AtomicEnvFilter, + reload_throttle: AtomicThrottle, } impl TracingControl { fn fmt_layer() -> impl Layer + Send + Sync @@ -433,21 +499,17 @@ impl TracingControl { .boxed() } - fn throttled_fmt_layer( - rate_limit_config: Option, - ) -> BoxedTracingLayer + // The throttle is always installed as a swappable filter (even when it + // currently holds `None`) so the policy can be turned on, off, or + // reconfigured at runtime via `AtomicThrottle::reload` without rebuilding + // the subscriber. + fn throttled_fmt_layer(throttle: AtomicThrottle) -> BoxedTracingLayer where S: Subscriber + for<'span> LookupSpan<'span> + Send + Sync + 'static, { - if let Some(rate_limit) = Self::build_rate_limit_layer(rate_limit_config) { - Self::fmt_layer() - .with_filter(Self::throttled_level_filter().and(rate_limit)) - .boxed() - } else { - Self::fmt_layer() - .with_filter(Self::throttled_level_filter()) - .boxed() - } + Self::fmt_layer() + .with_filter(Self::throttled_level_filter().and(throttle)) + .boxed() } fn build_rate_limit_layer( @@ -483,11 +545,11 @@ impl TracingControl { } } - fn init_subscriber(filter: AtomicEnvFilter, rate_limit_config: Option) { + fn init_subscriber(filter: AtomicEnvFilter, throttle: AtomicThrottle) { if let Err(e) = tracing_subscriber::registry() .with(filter) .with(Self::unthrottled_fmt_layer()) - .with(Self::throttled_fmt_layer(rate_limit_config)) + .with(Self::throttled_fmt_layer(throttle)) .with(tracing_error::ErrorLayer::default()) .try_init() { @@ -499,8 +561,10 @@ impl TracingControl { let db = TargetCfgDb::new(); let filter = AtomicEnvFilter::new(db.env_filter()); let reload_filter = filter.clone(); + let throttle = AtomicThrottle::new(rate_limit_config); + let reload_throttle = throttle.clone(); - Self::init_subscriber(filter, rate_limit_config); + Self::init_subscriber(filter, throttle); if let Err(e) = color_eyre::install() { eprintln!("Failed to initialize color_eyre:\n{e}"); @@ -508,6 +572,7 @@ impl TracingControl { Self { db: Arc::new(Mutex::new(db)), reload_filter, + reload_throttle, } } fn lock(&self) -> MutexGuard<'_, TargetCfgDb> { @@ -587,6 +652,18 @@ impl TracingControl { Ok(db.default) } + /// Reload (or disable) the tracing rate limiter at runtime. + /// + /// `Some(config)` swaps in a freshly-built token-bucket layer (with fresh + /// buckets — a reload grants a new burst allowance); `None` removes + /// throttling so every event passes. Infallible: like init, a config that + /// fails to build degrades to unthrottled. This is the hook a config/CRD + /// handler calls when the rate-limit parameters change, analogous to + /// [`reconfigure`](Self::reconfigure) for log levels. + pub fn reload_rate_limit(&self, config: Option) { + self.reload_throttle.reload(config); + } + /// Parse a string made of comma-separated tag=level; level = [off,error,warn,info,debug,trace] fn parse_tracing_config(input: &str) -> Result, TraceCtlError> { let mut result = OrderMap::new(); @@ -1059,70 +1136,89 @@ mod tests { assert_eq!(check_level!(Y4), LevelFilter::DEBUG); } - // Verify the rate limiter actually drops events past the burst. + // Exercise the runtime reload path: no throttle -> install one -> remove it, + // counting events at each phase to prove the swap takes effect live. + // + // The throttle is held in an `AtomicThrottle` installed as a per-layer + // filter, the same handle `TracingControl` keeps on `reload_throttle`. We + // build a *local* subscriber (rather than the global one) only so the fmt + // output lands in a capturable `MockWriter` buffer instead of stdout — the + // mechanism under test is identical. #[test] #[serial] #[allow(clippy::disallowed_types)] - fn test_rate_limit_drops_burst_overflow() { - use crate::control::{TracingControl, TracingRateLimitConfig}; + fn test_rate_limit_reload_phases() { + use crate::control::{AtomicThrottle, TracingControl, TracingRateLimitConfig}; use std::sync::Mutex; // nosemgrep: rust-no-direct-std-sync-import use tracing_subscriber::{EnvFilter, filter::FilterExt, layer::Layer, prelude::*}; use tracing_test::internal::MockWriter; - const INFO_MARKER: &str = "HH_RATE_TEST_INFO"; - const DEBUG_MARKER: &str = "HH_RATE_TEST_DEBUG"; + // Distinct markers so one accumulating buffer can be counted per phase. + const P1_OFF: &str = "HH_RL_PHASE1_OFF"; + const P2_ON: &str = "HH_RL_PHASE2_ON"; + const P3_OFF: &str = "HH_RL_PHASE3_OFF"; const BURST: u32 = 5; const EMITTED: usize = 100; // We leak a `Mutex>` to get a `'static` writer buffer for the subscriber layer. let buf: &'static Mutex> = Box::leak(Box::new(Mutex::new(Vec::new()))); - let rate_limit = TracingControl::build_rate_limit_layer(Some(TracingRateLimitConfig { - burst: BURST, - replenish_per_second: 1, - })) - .expect("build rate-limit layer"); - - // Same for dataplane: `DEBUG` events are unthrottled, `INFO` events are - let fmt_unthrottled = tracing_subscriber::fmt::layer() - .with_ansi(false) - .with_writer(MockWriter::new(buf)) - .with_filter(TracingControl::unthrottled_level_filter()); + // Start with throttling disabled; keep a handle to flip it mid-test. + let throttle = AtomicThrottle::new(None); let fmt_throttled = tracing_subscriber::fmt::layer() .with_ansi(false) .with_writer(MockWriter::new(buf)) - .with_filter(TracingControl::throttled_level_filter().and(rate_limit)); + .with_filter(TracingControl::throttled_level_filter().and(throttle.clone())); let subscriber = tracing_subscriber::registry() - .with(EnvFilter::new("debug")) - .with(fmt_unthrottled) + .with(EnvFilter::new("info")) .with(fmt_throttled); tracing::subscriber::with_default(subscriber, || { + // Phase 1 — no throttle: every INFO event must pass. + for _ in 0..EMITTED { + tracing::info!("{P1_OFF}"); + } + + // Phase 2 — install a small-burst throttle and reload the layer. + throttle.reload(Some(TracingRateLimitConfig { + burst: BURST, + replenish_per_second: 1, + })); for _ in 0..EMITTED { - tracing::debug!("{DEBUG_MARKER}"); - tracing::info!("{INFO_MARKER}"); + tracing::info!("{P2_ON}"); + } + + // Phase 3 — remove the throttle again: every INFO event must pass. + throttle.reload(None); + for _ in 0..EMITTED { + tracing::info!("{P3_OFF}"); } }); let captured = String::from_utf8(buf.lock().unwrap().clone()).unwrap(); - let info_kept = captured.matches(INFO_MARKER).count(); - let debug_kept = captured.matches(DEBUG_MARKER).count(); + let count = |marker: &str| captured.matches(marker).count(); + let (p1, p2, p3) = (count(P1_OFF), count(P2_ON), count(P3_OFF)); - // DEBUG is unthrottled — every event must make it through, and the - // throttle layer never sees it so it can't burn tokens budgeted for INFO. + // Phase 1: throttle disabled, nothing dropped. assert_eq!( - debug_kept, EMITTED, - "DEBUG events must pass the unthrottled layer; got {debug_kept}/{EMITTED}" + p1, EMITTED, + "phase 1 (no throttle) dropped events: {p1}/{EMITTED}" ); + + // Phase 2: throttle active — at least the burst survives, but the bulk + // is dropped. If this saw EMITTED, the reload didn't take effect. + assert!(p2 >= 1, "phase 2 throttle swallowed the entire burst"); assert!( - info_kept >= 1, - "rate limiter swallowed the entire INFO burst" + p2 < EMITTED / 2, + "phase 2 let through {p2}/{EMITTED} events; throttle not applied after reload\n{captured}" ); - assert!( - info_kept < EMITTED / 2, - "rate limiter let through {info_kept}/{EMITTED} INFO events; expected ≪ {EMITTED}\n{captured}" + + // Phase 3: throttle removed — back to passing everything. + assert_eq!( + p3, EMITTED, + "phase 3 (throttle removed) dropped events: {p3}/{EMITTED}" ); } } From 0eb54e77d12d6dd60c02f8ece6b73fc6262bca69 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 11 Jun 2026 18:57:00 +0400 Subject: [PATCH 2/7] chore(tracing): Get TracingRateLimiter config from CRDs Signed-off-by: Sergey Matov --- config/src/converters/k8s/config/tracecfg.rs | 30 ++++++- config/src/internal/device/tracecfg.rs | 30 +++++++ dataplane/src/runtime.rs | 9 +- k8s-intf/src/bolero/logs.rs | 11 ++- mgmt/src/processor/proc.rs | 6 +- tracectl/src/control.rs | 93 ++++++++++++++++---- 6 files changed, 153 insertions(+), 26 deletions(-) diff --git a/config/src/converters/k8s/config/tracecfg.rs b/config/src/converters/k8s/config/tracecfg.rs index 6f000ec00..44dcca18d 100644 --- a/config/src/converters/k8s/config/tracecfg.rs +++ b/config/src/converters/k8s/config/tracecfg.rs @@ -8,7 +8,10 @@ use tracing::metadata::LevelFilter; use k8s_intf::gateway_agent_crd::GatewayAgentGatewayLogs; -use crate::{converters::k8s::FromK8sConversionError, internal::device::tracecfg::TracingConfig}; +use crate::{ + converters::k8s::FromK8sConversionError, + internal::device::tracecfg::{TracingConfig, TracingRateLimit}, +}; fn levelstring_to_levelfilter(value: Option<&str>) -> Result { match value { @@ -36,6 +39,21 @@ impl TryFrom<&GatewayAgentGatewayLogs> for TracingConfig { config.add_tag(tag, level); } } + if let Some(rl) = logs.rate_limit.as_ref() { + let burst = rl.burst.filter(|&b| b > 0).ok_or_else(|| { + FromK8sConversionError::InvalidData("rate-limit burst must be > 0".to_string()) + })?; + let replenish_per_second = + rl.replenish_per_second.filter(|&r| r > 0).ok_or_else(|| { + FromK8sConversionError::InvalidData( + "rate-limit replenishPerSecond must be > 0".to_string(), + ) + })?; + config.set_rate_limit(TracingRateLimit { + burst, + replenish_per_second, + }); + } Ok(config) } } @@ -76,6 +94,16 @@ mod test { assert_eq!(&level, tc_level); } } + match logs.rate_limit.as_ref() { + Some(rl) => { + assert_eq!(Some(tc.rate_limit.burst), rl.burst); + assert_eq!( + Some(tc.rate_limit.replenish_per_second), + rl.replenish_per_second + ); + } + None => assert_eq!(tc.rate_limit, TracingRateLimit::default()), + } }); } } diff --git a/config/src/internal/device/tracecfg.rs b/config/src/internal/device/tracecfg.rs index f38806bd3..7bf815402 100644 --- a/config/src/internal/device/tracecfg.rs +++ b/config/src/internal/device/tracecfg.rs @@ -19,16 +19,42 @@ use tracing::metadata::LevelFilter; #[cfg(not(unix))] const DEFAULT_DEFAULT_LOGLEVEL: LevelFilter = LevelFilter::INFO; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct TracingRateLimit { + pub burst: u32, + pub replenish_per_second: u32, +} +impl Default for TracingRateLimit { + fn default() -> Self { + #[cfg(unix)] + let (burst, replenish_per_second) = { + let d = tracectl::TracingRateLimitConfig::default(); + (d.burst, d.replenish_per_second) + }; + #[cfg(not(unix))] + // Default params + let (burst, replenish_per_second) = (50, 5); + Self { + burst, + replenish_per_second, + } + } +} + #[derive(Clone, Debug)] pub struct TracingConfig { pub default: LevelFilter, pub tags: OrderMap, + /// Log rate limiter — always present; defaults to + /// [`TracingRateLimit::default`] when a config omits it. + pub rate_limit: TracingRateLimit, } impl Default for TracingConfig { fn default() -> Self { Self { default: DEFAULT_DEFAULT_LOGLEVEL, tags: OrderMap::new(), + rate_limit: TracingRateLimit::default(), } } } @@ -38,11 +64,15 @@ impl TracingConfig { Self { default, tags: OrderMap::new(), + rate_limit: TracingRateLimit::default(), } } pub fn add_tag(&mut self, tag: &str, level: LevelFilter) { let _ = self.tags.insert(tag.to_string(), level); } + pub fn set_rate_limit(&mut self, rate_limit: TracingRateLimit) { + self.rate_limit = rate_limit; + } /// Validate the tracing configuration. /// /// # Errors diff --git a/dataplane/src/runtime.rs b/dataplane/src/runtime.rs index 94b481416..dc1b2f86e 100644 --- a/dataplane/src/runtime.rs +++ b/dataplane/src/runtime.rs @@ -48,12 +48,13 @@ fn init_name(args: &CmdArgs) -> Result { } } fn init_logging(args: &CmdArgs, gwname: &str) { - TracingControl::init_with_rate_limit(args.tracing_rate_limit().map(|rate_limit| { - TracingRateLimitConfig { + TracingControl::init_with_rate_limit(Some(args.tracing_rate_limit().map_or_else( + TracingRateLimitConfig::default, + |rate_limit| TracingRateLimitConfig { burst: rate_limit.burst, replenish_per_second: rate_limit.replenish_per_second, - } - })); + }, + ))); let tctl = get_trace_ctl(); info!( diff --git a/k8s-intf/src/bolero/logs.rs b/k8s-intf/src/bolero/logs.rs index d757d6a34..03f34ca84 100644 --- a/k8s-intf/src/bolero/logs.rs +++ b/k8s-intf/src/bolero/logs.rs @@ -7,7 +7,7 @@ use std::ops::Bound; use bolero::{Driver, TypeGenerator}; use crate::bolero::LegalValue; -use crate::gateway_agent_crd::GatewayAgentGatewayLogs; +use crate::gateway_agent_crd::{GatewayAgentGatewayLogs, GatewayAgentGatewayLogsRateLimit}; const LEVELS: &[&str] = &["off", "error", "warning", "info", "debug", "trace"]; @@ -49,10 +49,19 @@ impl TypeGenerator for LegalValue { } tags = Some(tag_levels); } + let rate_limit = if d.produce::()? { + Some(GatewayAgentGatewayLogsRateLimit { + burst: Some(d.produce::()?.max(1)), + replenish_per_second: Some(d.produce::()?.max(1)), + }) + } else { + None + }; Some(LegalValue(GatewayAgentGatewayLogs { default: d .produce::>()? .map(|log_level| log_level.0), + rate_limit, tags, })) } diff --git a/mgmt/src/processor/proc.rs b/mgmt/src/processor/proc.rs index a2e898bdf..de89975d5 100644 --- a/mgmt/src/processor/proc.rs +++ b/mgmt/src/processor/proc.rs @@ -37,7 +37,7 @@ use crate::processor::mgmt_client::{ use crate::vpc_manager::{RequiredInformationBase, VpcManager}; use rekon::{Observe, Reconcile}; -use tracectl::get_trace_ctl; +use tracectl::{TracingRateLimitConfig, get_trace_ctl}; use tracing::{debug, error, info, warn}; use net::interface::display::MultiIndexInterfaceMapView; @@ -552,6 +552,10 @@ fn apply_tracing_config(tracing: &Option) -> ConfigResult { .iter() .map(|(tag, level)| (tag.as_str(), *level)), )?; + get_trace_ctl().reload_rate_limit(Some(TracingRateLimitConfig { + burst: tracing.rate_limit.burst, + replenish_per_second: tracing.rate_limit.replenish_per_second, + })); debug!("Successfully reconfigured tracing"); Ok(()) } diff --git a/tracectl/src/control.rs b/tracectl/src/control.rs index d94410ddd..af0f9bf1e 100644 --- a/tracectl/src/control.rs +++ b/tracectl/src/control.rs @@ -26,12 +26,21 @@ use crate::targets::{TRACING_TAG_ALL, TRACING_TARGETS}; use crate::trace_target; trace_target!("tracectl", LevelFilter::INFO, &[]); -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct TracingRateLimitConfig { pub burst: u32, pub replenish_per_second: u32, } +impl Default for TracingRateLimitConfig { + fn default() -> Self { + Self { + burst: 50, + replenish_per_second: 5, + } + } +} + #[derive(Clone, Debug, Error, PartialEq)] pub enum TraceCtlError { #[error("Unknown tag {0}")] @@ -309,7 +318,10 @@ impl AtomicEnvFilter { } fn reload(&self, new: EnvFilter) { - self.inner.store(std::sync::Arc::new(new)); // nosemgrep: rust-no-direct-std-sync-import + // The concurrency facade `Arc` is std-backed under tracectl's + // (non-model-checker) backend, which is exactly what `ArcSwap::store` + // accepts — so we go through the facade rather than importing directly. + self.inner.store(Arc::new(new)); callsite::rebuild_interest_cache(); } } @@ -385,13 +397,20 @@ where /// `Arc>>`. /// /// `None` means "no throttling": every event passes. [`reload`](Self::reload) -/// builds a brand-new layer (with fresh token buckets) from the new config and -/// swaps it in atomically, or stores `None` to disable throttling entirely. -/// A `Some(config)` that fails to build falls back to `None` (unthrottled), -/// mirroring the init-time behavior of [`TracingControl::build_rate_limit_layer`]. +/// swaps in a brand-new layer (with fresh token buckets) *when the config +/// changes*, or stores `None` to disable throttling entirely; an unchanged +/// config is a no-op, so live bucket state survives repeated applies of the +/// same config. A `Some(config)` that fails to build falls back to `None` +/// (unthrottled), mirroring the init-time behavior of +/// [`TracingControl::build_rate_limit_layer`]. #[derive(Clone)] struct AtomicThrottle { inner: Arc>>, + /// The config the current layer was built from. `reload` compares against + /// it so an unchanged config is a no-op: rebuilding would reset the token + /// buckets and hand out a fresh burst on every management re-apply of the + /// same config, which would weaken throttling. + current: Arc>>, } impl AtomicThrottle { @@ -400,16 +419,24 @@ impl AtomicThrottle { inner: Arc::new(ArcSwap::from_pointee( TracingControl::build_rate_limit_layer(config), )), + current: Arc::new(Mutex::new(config)), } } - /// Swap the active throttle. `Some(config)` installs a freshly-built - /// token-bucket layer; `None` removes throttling. Infallible: a config - /// that fails to build degrades to unthrottled. + /// Swap the active throttle when the config changes. `Some(config)` + /// installs a freshly-built token-bucket layer; `None` removes throttling. + /// Idempotent: an unchanged config is a no-op, preserving live bucket + /// state. Infallible: a config that fails to build degrades to unthrottled. fn reload(&self, config: Option) { - self.inner.store(std::sync::Arc::new( - TracingControl::build_rate_limit_layer(config), - )); // nosemgrep: rust-no-direct-std-sync-import + let mut current = self.current.lock(); + if *current == config { + return; + } + // See `AtomicEnvFilter::reload`: the facade `Arc` is what + // `ArcSwap::store` accepts here, so no direct import is needed. + self.inner + .store(Arc::new(TracingControl::build_rate_limit_layer(config))); + *current = config; callsite::rebuild_interest_cache(); } } @@ -654,8 +681,10 @@ impl TracingControl { /// Reload (or disable) the tracing rate limiter at runtime. /// - /// `Some(config)` swaps in a freshly-built token-bucket layer (with fresh - /// buckets — a reload grants a new burst allowance); `None` removes + /// `Some(config)` swaps in a freshly-built token-bucket layer when the + /// config differs from the active one (a changed config resets buckets to a + /// fresh burst); an unchanged config is a no-op, so repeatedly re-applying + /// the same config does not keep resetting buckets. `None` removes /// throttling so every event passes. Infallible: like init, a config that /// fails to build degrades to unthrottled. This is the hook a config/CRD /// handler calls when the rate-limit parameters change, analogous to @@ -1182,13 +1211,39 @@ mod tests { } // Phase 2 — install a small-burst throttle and reload the layer. - throttle.reload(Some(TracingRateLimitConfig { + let cfg = TracingRateLimitConfig { burst: BURST, replenish_per_second: 1, - })); - for _ in 0..EMITTED { - tracing::info!("{P2_ON}"); - } + }; + throttle.reload(Some(cfg)); + // Emit through a single callsite so the two bursts below share one + // token bucket (the throttle keys buckets by callsite + fields); a + // fresh callsite would get its own bucket and hide the bug. + let emit_p2 = || { + for _ in 0..EMITTED { + tracing::info!("{P2_ON}"); + } + }; + let p2_passed = || { + String::from_utf8(buf.lock().unwrap().clone()) + .unwrap() + .matches(P2_ON) + .count() + }; + emit_p2(); + let p2_first = p2_passed(); + + // Phase 2b — reload with the *identical* config. This must be a + // no-op (idempotent): the bucket is not refilled, so a second burst + // through the same callsite stays throttled rather than receiving a + // fresh BURST allowance. + throttle.reload(Some(cfg)); + emit_p2(); + let p2_second = p2_passed() - p2_first; + assert!( + p2_second < BURST as usize, + "idempotent reload granted a fresh burst: 2nd burst passed {p2_second}, expected < BURST ({BURST})" + ); // Phase 3 — remove the throttle again: every INFO event must pass. throttle.reload(None); From a5421d03c72ee82a7ee326a40127602f2933d1cb Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 18 Jun 2026 12:23:37 +0000 Subject: [PATCH 3/7] chore: Bump Fabric to v0.123.1 Signed-off-by: Sergey Matov --- npins/sources.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/npins/sources.json b/npins/sources.json index f1f9d5e8d..67aa1db48 100644 --- a/npins/sources.json +++ b/npins/sources.json @@ -82,10 +82,10 @@ "version_upper_bound": null, "release_prefix": null, "submodules": false, - "version": "v0.117.0", - "revision": "c44d5756e066adccec25f57b89efc8583ea8f5ad", - "url": "https://api.github.com/repos/githedgehog/fabric/tarball/refs/tags/v0.117.0", - "hash": "sha256-ZQ/zLiVH1khkCRrX7riPoCfadD6vJgGjHixmKxhYnQw=", + "version": "v0.123.1", + "revision": "78be009135a1bebd58d5cf20c1778e859aec0736", + "url": "https://api.github.com/repos/githedgehog/fabric/tarball/refs/tags/v0.123.1", + "hash": "sha256-IZ9BCmrRM4lmIvK4p5/8QNBcKyzBKQbhHngFNQtqgR8=", "frozen": true }, "frr": { From 6904461943a67f63ad204ae26cdf322c9ce73360 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 18 Jun 2026 16:25:40 +0400 Subject: [PATCH 4/7] chore(tracig): Use concurrency::slot for atomic swap Signed-off-by: Sergey Matov --- Cargo.lock | 1 - tracectl/Cargo.toml | 1 - tracectl/src/control.rs | 33 ++++++++++++++------------------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index abbd55106..e0c2f5f83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1724,7 +1724,6 @@ dependencies = [ name = "dataplane-tracectl" version = "0.21.0" dependencies = [ - "arc-swap", "color-eyre", "dataplane-common", "dataplane-concurrency", diff --git a/tracectl/Cargo.toml b/tracectl/Cargo.toml index 7fcbaad14..0417e9f89 100644 --- a/tracectl/Cargo.toml +++ b/tracectl/Cargo.toml @@ -6,7 +6,6 @@ publish.workspace = true version.workspace = true [dependencies] -arc-swap = { workspace = true } color-eyre = { workspace = true , features = [ "capture-spantrace", "color-spantrace", "tracing-error", "track-caller" ] } common = { workspace = true } concurrency = { workspace = true } diff --git a/tracectl/src/control.rs b/tracectl/src/control.rs index af0f9bf1e..4fd64b39d 100644 --- a/tracectl/src/control.rs +++ b/tracectl/src/control.rs @@ -3,7 +3,7 @@ //! Tracing runtime control. -use arc_swap::ArcSwap; +use concurrency::slot::Slot; use concurrency::sync::{Arc, Mutex, MutexGuard, OnceLock}; use ordermap::OrderMap; use std::str::FromStr; @@ -300,27 +300,24 @@ impl TargetCfgDb { } } -/// `AtomicEnvFilter` wraps an `EnvFilter` in an `ArcSwap` to allow atomic +/// `AtomicEnvFilter` wraps an `EnvFilter` in a [`Slot`] to allow atomic /// swapping of the filter. The type is `Clone`-able: the original instance /// is moved into the subscriber as a `Layer`, while a clone is kept on /// `TracingControl` to push reloads from the outside. Both clones share the -/// same `Arc>` so an update from one is visible to the other. +/// same `Arc>` so an update from one is visible to the other. #[derive(Clone)] struct AtomicEnvFilter { - inner: Arc>, + inner: Arc>, } impl AtomicEnvFilter { fn new(initial: EnvFilter) -> Self { Self { - inner: Arc::new(ArcSwap::from_pointee(initial)), + inner: Arc::new(Slot::from_pointee(initial)), } } fn reload(&self, new: EnvFilter) { - // The concurrency facade `Arc` is std-backed under tracectl's - // (non-model-checker) backend, which is exactly what `ArcSwap::store` - // accepts — so we go through the facade rather than importing directly. self.inner.store(Arc::new(new)); callsite::rebuild_interest_cache(); } @@ -378,7 +375,7 @@ where // per-layer filter, so reporting "no match" for anything but our own // type id is correct. We deliberately do *not* expose the inner // `EnvFilter`'s address — its identity can change at any time via - // `ArcSwap::store`. + // `Slot::store`. #[doc(hidden)] unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> { if id == TypeId::of::() { @@ -389,12 +386,12 @@ where } } -/// `AtomicThrottle` wraps an *optional* [`TracingRateLimitLayer`] in an -/// `ArcSwap` so the rate-limit policy can be swapped at runtime — the same +/// `AtomicThrottle` wraps an *optional* [`TracingRateLimitLayer`] in a +/// [`Slot`] so the rate-limit policy can be swapped at runtime — the same /// pattern [`AtomicEnvFilter`] uses for the level filter. One clone is moved /// into the subscriber as a per-layer `Filter`; another is kept on /// [`TracingControl`] to push reloads. Both share the same -/// `Arc>>`. +/// `Arc>>`. /// /// `None` means "no throttling": every event passes. [`reload`](Self::reload) /// swaps in a brand-new layer (with fresh token buckets) *when the config @@ -405,7 +402,7 @@ where /// [`TracingControl::build_rate_limit_layer`]. #[derive(Clone)] struct AtomicThrottle { - inner: Arc>>, + inner: Arc>>, /// The config the current layer was built from. `reload` compares against /// it so an unchanged config is a no-op: rebuilding would reset the token /// buckets and hand out a fresh burst on every management re-apply of the @@ -416,9 +413,9 @@ struct AtomicThrottle { impl AtomicThrottle { fn new(config: Option) -> Self { Self { - inner: Arc::new(ArcSwap::from_pointee( - TracingControl::build_rate_limit_layer(config), - )), + inner: Arc::new(Slot::from_pointee(TracingControl::build_rate_limit_layer( + config, + ))), current: Arc::new(Mutex::new(config)), } } @@ -432,8 +429,6 @@ impl AtomicThrottle { if *current == config { return; } - // See `AtomicEnvFilter::reload`: the facade `Arc` is what - // `ArcSwap::store` accepts here, so no direct import is needed. self.inner .store(Arc::new(TracingControl::build_rate_limit_layer(config))); *current = config; @@ -605,7 +600,7 @@ impl TracingControl { fn lock(&self) -> MutexGuard<'_, TargetCfgDb> { self.db.lock() } - /// Reload the active `EnvFilter`. Infallible: the `ArcSwap`-based handle + /// Reload the active `EnvFilter`. Infallible: the `Slot`-based handle /// has no lock to poison and no subscriber-gone case. fn reload(&self, filter: EnvFilter) { self.reload_filter.reload(filter); From 87e17d71e7ef79f765896bcb68c5ec5007ef48c7 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 18 Jun 2026 16:53:18 +0400 Subject: [PATCH 5/7] test(tracing): Fix burst test Signed-off-by: Sergey Matov --- tracectl/src/control.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tracectl/src/control.rs b/tracectl/src/control.rs index 4fd64b39d..e6646b1cf 100644 --- a/tracectl/src/control.rs +++ b/tracectl/src/control.rs @@ -1259,7 +1259,10 @@ mod tests { // Phase 2: throttle active — at least the burst survives, but the bulk // is dropped. If this saw EMITTED, the reload didn't take effect. - assert!(p2 >= 1, "phase 2 throttle swallowed the entire burst"); + assert!( + p2 >= BURST as usize, + "phase 2 passed only {p2}/{BURST} burst events" + ); assert!( p2 < EMITTED / 2, "phase 2 let through {p2}/{EMITTED} events; throttle not applied after reload\n{captured}" From d45017ee88c038bba695908939c93e477bb57b31 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 18 Jun 2026 17:03:29 +0400 Subject: [PATCH 6/7] docs(cli): Adjust CLI helper info to fit default throttler config Signed-off-by: Sergey Matov --- args/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/args/src/lib.rs b/args/src/lib.rs index 1d9c3da79..f11ae2207 100644 --- a/args/src/lib.rs +++ b/args/src/lib.rs @@ -507,7 +507,8 @@ pub struct TracingConfigSection { pub show: TracingShowSection, /// Tracing configuration string (e.g., "default=info,nat=debug") pub config: Option, // TODO: stronger typing on this config? - /// Optional rate limit for lower-severity tracing output + /// Rate limit for lower-severity tracing output; throttling is always on, + /// so an unset CLI flag falls back to the default (50:5) pub rate_limit: Option, } @@ -1317,9 +1318,9 @@ E.g. default=error,all=info,nat=debug will set the default target to error, and long, value_name = "BURST:REPLENISH_PER_SECOND", value_parser=TracingRateLimit::from_str, - help = "Optional rate limit for lower-severity tracing output. Syntax: BURST:REPLENISH_PER_SECOND. + help = "Rate limit for lower-severity tracing output. Syntax: BURST:REPLENISH_PER_SECOND. Example: --tracing-rate-limit 50:5 allows bursts up to 50 repeated messages and replenishes 5 messages per second. -If omitted, tracing output is not rate-limited." +Throttling is always on: if omitted, the default rate limit of 50:5 is applied. (DEBUG is exempt from throttling.)" )] tracing_rate_limit: Option, From 711ce70e1a4f752666bc328cee86e10fc95eafa3 Mon Sep 17 00:00:00 2001 From: Sergey Matov Date: Thu, 18 Jun 2026 17:14:30 +0400 Subject: [PATCH 7/7] chore(tracing): Use load_full for Slot backend Signed-off-by: Sergey Matov --- tracectl/src/control.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tracectl/src/control.rs b/tracectl/src/control.rs index e6646b1cf..b499090d7 100644 --- a/tracectl/src/control.rs +++ b/tracectl/src/control.rs @@ -334,39 +334,39 @@ where S: Subscriber + for<'a> LookupSpan<'a>, { fn register_callsite(&self, meta: &'static Metadata<'static>) -> Interest { - Layer::::register_callsite(&**self.inner.load(), meta) + Layer::::register_callsite(&*self.inner.load_full(), meta) } fn enabled(&self, meta: &Metadata<'_>, ctx: Context<'_, S>) -> bool { - Layer::::enabled(&**self.inner.load(), meta, ctx) + Layer::::enabled(&*self.inner.load_full(), meta, ctx) } fn max_level_hint(&self) -> Option { - Layer::::max_level_hint(&**self.inner.load()) + Layer::::max_level_hint(&*self.inner.load_full()) } fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { - Layer::::on_new_span(&**self.inner.load(), attrs, id, ctx); + Layer::::on_new_span(&*self.inner.load_full(), attrs, id, ctx); } fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { - Layer::::on_record(&**self.inner.load(), id, values, ctx); + Layer::::on_record(&*self.inner.load_full(), id, values, ctx); } fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) { - Layer::::on_enter(&**self.inner.load(), id, ctx); + Layer::::on_enter(&*self.inner.load_full(), id, ctx); } fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) { - Layer::::on_exit(&**self.inner.load(), id, ctx); + Layer::::on_exit(&*self.inner.load_full(), id, ctx); } fn on_close(&self, id: span::Id, ctx: Context<'_, S>) { - Layer::::on_close(&**self.inner.load(), id, ctx); + Layer::::on_close(&*self.inner.load_full(), id, ctx); } fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - Layer::::on_event(&**self.inner.load(), event, ctx); + Layer::::on_event(&*self.inner.load_full(), event, ctx); } // `downcast_raw` is how the layered subscriber probes a layer for @@ -446,7 +446,7 @@ where S: Subscriber + for<'a> LookupSpan<'a>, { fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { - match &**self.inner.load() { + match &*self.inner.load_full() { Some(layer) => Filter::::enabled(layer, meta, cx), None => true, } @@ -457,7 +457,7 @@ where } fn event_enabled(&self, event: &Event<'_>, cx: &Context<'_, S>) -> bool { - match &**self.inner.load() { + match &*self.inner.load_full() { Some(layer) => Filter::::event_enabled(layer, event, cx), None => true, }