Skip to content
Open
20 changes: 20 additions & 0 deletions changelog.d/tag_cardinality_limit_ttl.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
The `tag_cardinality_limit` transform gained an optional sliding-window TTL
for tracked tag values, controlled by two new settings on the global block,
each `per_metric_limits` entry, and the `Inner`/`OverrideInner` config schemas:

- **`ttl_secs`**: expire tracked tag values after this many seconds since they
were last seen. Useful when the downstream system (e.g. Datadog custom
metrics) bills on a rolling unique-series window — without TTL, the
cardinality cache saturates `value_limit` and starts rejecting fresh values
long after the old ones have aged out of the billing window. When unset
(default), behavior is unchanged from previous releases.
- **`ttl_generations`**: tune how the TTL window is sliced for the
`probabilistic` backend. Defaults to `4` (eviction granularity =
`ttl_secs / 4`). Memory cost is `ttl_generations * cache_size_per_key` per
(metric, tag-key) pair; lower `cache_size_per_key` to keep total memory flat.
In `exact` mode this knob only controls the sweep cadence.

A new internal counter `tag_cardinality_ttl_expirations_total` reports how
many distinct values are evicted by TTL.

authors: kaarolch
31 changes: 31 additions & 0 deletions changelog.d/tag_cardinality_limit_ttl_fixes.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Several fixes to the `tag_cardinality_limit` transform's sliding-window TTL:

- The probabilistic backend's `value_limit` cap is now enforced against an
upper-bound estimate of the union cardinality across all generational
shards. Previously the cap was checked against the maximum count of any
single shard, which under-counts when distinct values are spread across
shards (high-churn / low-repeat traffic) and silently admitted values past
the configured `value_limit`.
- `ttl_generations` is now silently capped to `ttl_secs` when `ttl_secs` is
smaller, so the configured TTL window is honored exactly. Previously the
per-slice duration was floored to 1 second, which stretched the effective
retention window to `ttl_generations` seconds when `ttl_secs <
ttl_generations` (for example `ttl_secs: 1, ttl_generations: 4` kept values
for about 4 seconds).
- Rolling-bloom refresh-on-sighting now sets the bits in the newest shard
unconditionally instead of skipping the write when the bloom already reports
the value as present. The skip path could leave a recently-observed value
riding on another value's false-positive bits, so its lifetime depended on
when those unrelated bits aged out rather than on its own activity.
- Pathological `ttl_secs` configs near `u64::MAX` no longer panic the
transform on construction or rotation. `Instant + Duration` overflows are
now caught and saturated so the transform stays alive instead of crashing
on misconfiguration.
- When TTL eviction empties a `(metric, tag-key)` bucket, the slot is now
reclaimed under `max_tracked_keys`. Previously the bucket lingered with an
empty inner set and permanently consumed a slot, so high-churn workloads
could hit the cap forever even after older pairs had fully aged out.
Reclamation runs lazily on the cap-hit path, so steady-state allocations
are unaffected.

authors: kaarolch
2 changes: 2 additions & 0 deletions lib/vector-common/src/internal_event/metric_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum CounterName {
StaleEventsFlushedTotal,
StartedTotal,
StoppedTotal,
TagCardinalityTtlExpirationsTotal,
TagCardinalityUntrackedEventsTotal,
TagValueLimitExceededTotal,
ValueLimitReachedTotal,
Expand Down Expand Up @@ -335,6 +336,7 @@ impl CounterName {
Self::StaleEventsFlushedTotal => "stale_events_flushed_total",
Self::StartedTotal => "started_total",
Self::StoppedTotal => "stopped_total",
Self::TagCardinalityTtlExpirationsTotal => "tag_cardinality_ttl_expirations_total",
Self::TagCardinalityUntrackedEventsTotal => "tag_cardinality_untracked_events_total",
Self::TagValueLimitExceededTotal => "tag_value_limit_exceeded_total",
Self::ValueLimitReachedTotal => "value_limit_reached_total",
Expand Down
24 changes: 24 additions & 0 deletions src/internal_events/tag_cardinality_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,27 @@ impl InternalEvent for TagCardinalityTrackedKeys {
gauge!(GaugeName::TagCardinalityTrackedKeys).set(self.count as f64);
}
}

/// Emitted when a TTL sweep removes tag values from a tracking bucket.
///
/// The `count` is the number of *distinct* values evicted in that pass.
/// For the probabilistic backend this is the count drained from the oldest
/// rolling-bloom shard; for the exact backend it is the number of entries
/// whose last sighting was older than `ttl_secs`.
#[derive(NamedInternalEvent)]
pub struct TagCardinalityTtlExpired {
pub count: u64,
}

impl InternalEvent for TagCardinalityTtlExpired {
fn emit(self) {
if self.count == 0 {
return;
}
debug!(
message = "Expired tag values from cardinality cache.",
count = self.count,
);
counter!(CounterName::TagCardinalityTtlExpirationsTotal).increment(self.count);
}
}
55 changes: 55 additions & 0 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ pub struct Inner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Expire tracked tag values after this many seconds since they were last seen.
///
/// When unset (default) or set to `0`, values persist for the lifetime of the
/// process — the historical behavior. When set to a positive value, the
/// transform behaves like a sliding window: any tag value not observed within
/// the TTL is dropped, freeing room under `value_limit` for fresh values.
/// Useful for bounding cost on backends (e.g. Datadog custom metrics) that
/// bill on a rolling unique-series window.
///
/// In `exact` mode every value carries a precise last-seen timestamp; in
/// `probabilistic` mode the underlying bloom filter is split into
/// `ttl_generations` rolling shards, so eviction is approximate to within
/// `ttl_secs / ttl_generations`.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Number of time-slices the TTL window is split into for the
/// `probabilistic` backend.
///
/// Higher values smooth eviction (closer to a true sliding window) at the
/// cost of `ttl_generations * cache_size_per_key` memory per (metric,
/// tag-key) pair. `1` produces a tumbling window: all tracked values are
/// dropped at once every `ttl_secs`. Ignored when `ttl_secs` is unset, or
/// when mode is `exact` (which uses precise per-value timestamps).
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the global level.
Expand Down Expand Up @@ -169,6 +198,23 @@ pub struct OverrideInner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Per-metric TTL for tracked tag values. See [`Inner::ttl_secs`] for the
/// full description.
///
/// Per-metric TTL is a **full override** of the global TTL — it does not
/// inherit. Leaving this unset means "no TTL for this metric", *not*
/// "fall back to the global `ttl_secs`". This mirrors how a per-metric
/// `value_limit` fully shadows the global one. If you want a metric to
/// share the global TTL, copy the value explicitly.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Per-metric override for `ttl_generations`. See [`Inner::ttl_generations`].
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the per-metric level.
Expand Down Expand Up @@ -308,6 +354,13 @@ pub(crate) const fn default_cache_size() -> usize {
5 * 1024 // 5KB
}

/// Default number of rolling-bloom shards. Four gives a reasonable middle ground:
/// eviction granularity of `ttl/4`, and a 4x memory multiplier on
/// `cache_size_per_key` for users who opt into TTL.
pub(crate) const fn default_ttl_generations() -> u8 {
4
}

// =============================================================================
// Transform plumbing
// =============================================================================
Expand All @@ -320,6 +373,8 @@ impl GenerateConfig for Config {
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
internal_metrics: InternalMetricsConfig::default(),
ttl_secs: None,
ttl_generations: default_ttl_generations(),
},
tracking_scope: TrackingScope::default(),
max_tracked_keys: None,
Expand Down
111 changes: 91 additions & 20 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,42 @@ impl TagCardinalityLimit {
});
}

/// Drop (metric, tag-key) buckets whose `AcceptedTagValueSet` has been
/// fully expired by TTL, decrementing `tracked_keys_count` so freed slots
/// can be reused under `max_tracked_keys`.
///
/// Without this, TTL sweeps in `TtlExactStorage` / `RollingBloomStorage`
/// only shrink the inner storage; the empty bucket itself lingers and
/// permanently consumes a slot. High-churn workloads under
/// `max_tracked_keys` would then hit the cap forever — every new
/// `(metric, tag-key)` pair would fall through as untracked even after
/// the cache for older pairs has fully aged out.
///
/// Called lazily on cap-hit paths so the steady-state overhead is zero;
/// the scan only fires when a new allocation would otherwise be rejected.
/// `len()` is intentionally called on every set because it also drives
/// the lazy sweep/rotation that may be what empties the bucket.
fn reclaim_empty_buckets(&mut self) {
let mut reclaimed = 0usize;
self.accepted_tags.retain(|_, inner| {
inner.retain(|_, set| {
if set.len() == 0 {
reclaimed += 1;
false
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep active zero-limit buckets out of TTL reclaim

When value_limit: 0 is used with max_tracked_keys, the transform creates an intentionally empty bucket for a tracked key so future values for that key keep being enforced, but this reclaim path removes any zero-length bucket, including non-TTL ones. Once the cap is hit, each new key can evict the previous zero-limit bucket and get enforced/dropped instead of following the documented cap behavior of passing additional keys through unchecked after max_tracked_keys is exhausted.

Useful? React with 👍 / 👎.

} else {
true
}
});
!inner.is_empty()
});
if reclaimed > 0 {
self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed);
emit!(TagCardinalityTrackedKeys {
count: self.tracked_keys_count,
});
}
}

/// Resolve the configuration that applies to a specific (metric, tag) pair.
///
/// Per-tag entries support two modes:
Expand Down Expand Up @@ -120,6 +156,11 @@ impl TagCardinalityLimit {
let limit_exceeded_action = per_metric.config.limit_exceeded_action;
let metric_value_limit = per_metric.config.value_limit;
let internal_metrics = per_metric.config.internal_metrics;
// Per-metric TTL is a *full override* of the global TTL for this metric:
// unset (`None`) means "no TTL for this metric" rather than "inherit from
// global", mirroring how per-metric `value_limit` shadows the global value.
let ttl_secs = per_metric.config.ttl_secs;
let ttl_generations = per_metric.config.ttl_generations;

// Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts
// the tag out. All other settings are always inherited from per-metric.
Expand All @@ -134,6 +175,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
});
}
}
Expand All @@ -143,6 +186,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
})
}

Expand Down Expand Up @@ -205,18 +250,24 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
// Reclaim before giving up: a TTL backend may have emptied
// buckets we're still accounting against `max_tracked_keys`.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode));
let tag_value_set = metric_accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
});

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
// Already accepted; `contains` also refreshes the TTL lease on
// TTL backends. See `AcceptedTagValueSet::contains`.
return AcceptResult::Tracked;
}

Expand All @@ -238,8 +289,12 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
///
/// Note: takes `&mut self` because TTL-enabled backends (`TtlSet`,
/// `RollingBloom`) perform lazy sweep/rotation inside `contains`/`len`.
/// The non-TTL backends are still effectively read-only here.
fn tag_limit_exceeded(
&self,
&mut self,
metric_key: Option<&MetricId>,
key: &str,
value: &TagValueSet,
Expand All @@ -248,21 +303,30 @@ impl TagCardinalityLimit {
TagSettings::Excluded => return false,
TagSettings::Tracked(inner) => inner,
};
let can_allocate = self.can_allocate_new_key();
match self
.accepted_tags
.get(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get(key))
.get_mut(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key))
{
// Already accepted — never exceeds.
Some(value_set) if value_set.contains(value) => false,
// Adding this value would push us at or past the configured cap. Treat a
// missing bucket as an empty set so `value_limit: 0` correctly rejects
// the first occurrence too — but only when the (metric, tag) pair would
// actually be tracked. If `max_tracked_keys` is exhausted, `record_tag_value`
// will pass the tag through unchecked and emit `TagCardinalityLimitUntracked`,
// so we must not pre-empt that path by reporting the limit as exceeded here.
Some(value_set) => value_set.len() >= resolved.value_limit,
None => resolved.value_limit == 0 && self.can_allocate_new_key(),
// Pattern guards bind variables immutably, so the mutable call
// can't live in a guard; hence the if/else inside the arm.
Some(value_set) => {
// Must be the non-refreshing variant: `DropEvent` may still
// reject this event on a later tag, and we must not extend
// any TTL lease for an event that gets dropped. Refresh
// happens on the accept path via `record_tag_value::insert`.
if value_set.contains_no_refresh(value) {
false
} else {
value_set.len() >= resolved.value_limit
}
}
// Missing bucket: treat as empty so `value_limit: 0` rejects the
// first occurrence too — but only when the pair can actually be
// allocated. Otherwise `record_tag_value` will forward untracked
// (and emit `TagCardinalityLimitUntracked`).
None => resolved.value_limit == 0 && can_allocate,
Comment thread
kaarolch marked this conversation as resolved.
Outdated
}
}

Expand Down Expand Up @@ -292,15 +356,22 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return true;
// See `try_accept_tag` — try reclaiming empty TTL buckets
// before treating the new pair as untracked.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return true;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode))
.or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
})
.insert(value.clone());
false
}
Expand Down
Loading
Loading