Skip to content
Open
9 changes: 9 additions & 0 deletions changelog.d/tag_cardinality_limit_ttl.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The `tag_cardinality_limit` transform gained an optional sliding-window TTL
for tracked tag values. Two new settings — `ttl_secs` and `ttl_generations` —
are available on the global block and each `per_metric_limits` entry; values
not observed within `ttl_secs` are evicted, freeing room under `value_limit`.
A new internal counter `tag_cardinality_ttl_expirations_total` reports the
number of values evicted by TTL. See the transform documentation for
configuration details and mode-specific behavior.

authors: kaarolch
2 changes: 2 additions & 0 deletions lib/vector-common/src/internal_event/metric_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum CounterName {
StaleEventsFlushedTotal,
StartedTotal,
StoppedTotal,
TagCardinalityTtlExpirationsTotal,
TagCardinalityUntrackedEventsTotal,
TagValueLimitExceededTotal,
ValueLimitReachedTotal,
Expand Down Expand Up @@ -335,6 +336,7 @@ impl CounterName {
Self::StaleEventsFlushedTotal => "stale_events_flushed_total",
Self::StartedTotal => "started_total",
Self::StoppedTotal => "stopped_total",
Self::TagCardinalityTtlExpirationsTotal => "tag_cardinality_ttl_expirations_total",
Self::TagCardinalityUntrackedEventsTotal => "tag_cardinality_untracked_events_total",
Self::TagValueLimitExceededTotal => "tag_value_limit_exceeded_total",
Self::ValueLimitReachedTotal => "value_limit_reached_total",
Expand Down
24 changes: 24 additions & 0 deletions src/internal_events/tag_cardinality_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,27 @@ impl InternalEvent for TagCardinalityTrackedKeys {
gauge!(GaugeName::TagCardinalityTrackedKeys).set(self.count as f64);
}
}

/// Emitted when a TTL sweep removes tag values from a tracking bucket.
///
/// The `count` is the number of *distinct* values evicted in that pass.
/// For the probabilistic backend this is the count drained from the oldest
/// rolling-bloom shard; for the exact backend it is the number of entries
/// whose last sighting was older than `ttl_secs`.
#[derive(NamedInternalEvent)]
pub struct TagCardinalityTtlExpired {
pub count: u64,
}

impl InternalEvent for TagCardinalityTtlExpired {
fn emit(self) {
if self.count == 0 {
return;
}
debug!(
message = "Expired tag values from cardinality cache.",
count = self.count,
);
counter!(CounterName::TagCardinalityTtlExpirationsTotal).increment(self.count);
}
}
55 changes: 55 additions & 0 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ pub struct Inner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Expire tracked tag values after this many seconds since they were last seen.
///
/// When unset (default) or set to `0`, values persist for the lifetime of the
/// process — the historical behavior. When set to a positive value, the
/// transform behaves like a sliding window: any tag value not observed within
/// the TTL is dropped, freeing room under `value_limit` for fresh values.
/// Useful for bounding cost on backends (e.g. Datadog custom metrics) that
/// bill on a rolling unique-series window.
///
/// In `exact` mode every value carries a precise last-seen timestamp; in
/// `probabilistic` mode the underlying bloom filter is split into
/// `ttl_generations` rolling shards, so eviction is approximate to within
/// `ttl_secs / ttl_generations`.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Number of time-slices the TTL window is split into for the
/// `probabilistic` backend.
///
/// Higher values smooth eviction (closer to a true sliding window) at the
/// cost of `ttl_generations * cache_size_per_key` memory per (metric,
/// tag-key) pair. `1` produces a tumbling window: all tracked values are
/// dropped at once every `ttl_secs`. Ignored when `ttl_secs` is unset, or
/// when mode is `exact` (which uses precise per-value timestamps).
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the global level.
Expand Down Expand Up @@ -169,6 +198,23 @@ pub struct OverrideInner {
#[configurable(derived)]
#[serde(default)]
pub internal_metrics: InternalMetricsConfig,

/// Per-metric TTL for tracked tag values. See [`Inner::ttl_secs`] for the
/// full description.
///
/// Per-metric TTL is a **full override** of the global TTL — it does not
/// inherit. Leaving this unset means "no TTL for this metric", *not*
/// "fall back to the global `ttl_secs`". This mirrors how a per-metric
/// `value_limit` fully shadows the global one. If you want a metric to
/// share the global TTL, copy the value explicitly.
#[serde(default)]
#[configurable(metadata(docs::human_name = "TTL (seconds)"))]
pub ttl_secs: Option<u64>,

/// Per-metric override for `ttl_generations`. See [`Inner::ttl_generations`].
#[serde(default = "default_ttl_generations")]
#[configurable(metadata(docs::human_name = "TTL Generations"))]
pub ttl_generations: u8,
}

/// Controls the approach taken for tracking tag cardinality at the per-metric level.
Expand Down Expand Up @@ -308,6 +354,13 @@ pub(crate) const fn default_cache_size() -> usize {
5 * 1024 // 5KB
}

/// Default number of rolling-bloom shards. Four gives a reasonable middle ground:
/// eviction granularity of `ttl/4`, and a 4x memory multiplier on
/// `cache_size_per_key` for users who opt into TTL.
pub(crate) const fn default_ttl_generations() -> u8 {
4
}

// =============================================================================
// Transform plumbing
// =============================================================================
Expand All @@ -320,6 +373,8 @@ impl GenerateConfig for Config {
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
internal_metrics: InternalMetricsConfig::default(),
ttl_secs: None,
ttl_generations: default_ttl_generations(),
},
tracking_scope: TrackingScope::default(),
max_tracked_keys: None,
Expand Down
102 changes: 81 additions & 21 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ impl TagCardinalityLimit {
});
}

/// Drop empty `AcceptedTagValueSet` buckets left behind by TTL eviction,
/// decrementing `tracked_keys_count` so freed slots can be reused under
/// `max_tracked_keys`. Called lazily on cap-hit paths so steady-state
/// overhead is zero. `len()` is called on every set because, for TTL
/// backends, it also drives the lazy sweep that may empty the bucket.
fn reclaim_empty_buckets(&mut self) {
let mut reclaimed = 0usize;
self.accepted_tags.retain(|_, inner| {
inner.retain(|_, set| {
if set.len() == 0 {
reclaimed += 1;
false
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep active zero-limit buckets out of TTL reclaim

When value_limit: 0 is used with max_tracked_keys, the transform creates an intentionally empty bucket for a tracked key so future values for that key keep being enforced, but this reclaim path removes any zero-length bucket, including non-TTL ones. Once the cap is hit, each new key can evict the previous zero-limit bucket and get enforced/dropped instead of following the documented cap behavior of passing additional keys through unchecked after max_tracked_keys is exhausted.

Useful? React with 👍 / 👎.

} else {
true
}
});
!inner.is_empty()
});
if reclaimed > 0 {
self.tracked_keys_count = self.tracked_keys_count.saturating_sub(reclaimed);
emit!(TagCardinalityTrackedKeys {
count: self.tracked_keys_count,
});
}
}

/// Resolve the configuration that applies to a specific (metric, tag) pair.
///
/// Per-tag entries support two modes:
Expand Down Expand Up @@ -120,6 +146,11 @@ impl TagCardinalityLimit {
let limit_exceeded_action = per_metric.config.limit_exceeded_action;
let metric_value_limit = per_metric.config.value_limit;
let internal_metrics = per_metric.config.internal_metrics;
// Per-metric TTL is a *full override* of the global TTL for this metric:
// unset (`None`) means "no TTL for this metric" rather than "inherit from
// global", mirroring how per-metric `value_limit` shadows the global value.
let ttl_secs = per_metric.config.ttl_secs;
let ttl_generations = per_metric.config.ttl_generations;

// Per-tag entry: LimitOverride uses an explicit value_limit; Excluded opts
// the tag out. All other settings are always inherited from per-metric.
Expand All @@ -134,6 +165,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
});
}
}
Expand All @@ -143,6 +176,8 @@ impl TagCardinalityLimit {
limit_exceeded_action,
mode: metric_mode,
internal_metrics,
ttl_secs,
ttl_generations,
})
}

Expand Down Expand Up @@ -205,18 +240,21 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
// TTL may have emptied buckets that still count against `max_tracked_keys`.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return AcceptResult::Untracked;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode));
let tag_value_set = metric_accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
});

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
return AcceptResult::Tracked;
}

Expand All @@ -238,8 +276,12 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
///
/// Note: takes `&mut self` because TTL-enabled backends (`TtlSet`,
/// `RollingBloom`) perform lazy sweep/rotation inside `contains`/`len`.
/// The non-TTL backends are still effectively read-only here.
fn tag_limit_exceeded(
&self,
&mut self,
metric_key: Option<&MetricId>,
key: &str,
value: &TagValueSet,
Expand All @@ -248,22 +290,34 @@ impl TagCardinalityLimit {
TagSettings::Excluded => return false,
TagSettings::Tracked(inner) => inner,
};
match self

if let Some(value_set) = self
.accepted_tags
.get(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get(key))
.get_mut(&metric_key.cloned())
.and_then(|metric_accepted_tags| metric_accepted_tags.get_mut(key))
{
// Already accepted — never exceeds.
Some(value_set) if value_set.contains(value) => false,
// Adding this value would push us at or past the configured cap. Treat a
// missing bucket as an empty set so `value_limit: 0` correctly rejects
// the first occurrence too — but only when the (metric, tag) pair would
// actually be tracked. If `max_tracked_keys` is exhausted, `record_tag_value`
// will pass the tag through unchecked and emit `TagCardinalityLimitUntracked`,
// so we must not pre-empt that path by reporting the limit as exceeded here.
Some(value_set) => value_set.len() >= resolved.value_limit,
None => resolved.value_limit == 0 && self.can_allocate_new_key(),
// Non-refreshing: `DropEvent` may still reject this event on
// a later tag; refresh happens on the accept path via
// `record_tag_value`.
return if value_set.contains_no_refresh(value) {
false
} else {
value_set.len() >= resolved.value_limit
};
}

// Missing bucket: only `value_limit == 0` can flag the first
// sighting as exceeded; every other limit fits an empty set.
if resolved.value_limit != 0 {
return false;
}
// Mirror `record_tag_value`'s capacity view: reclaim empty TTL
// buckets first so we don't pass-through-untracked here and let
// the record path silently admit a value that should be rejected.
if !self.can_allocate_new_key() {
self.reclaim_empty_buckets();
}
self.can_allocate_new_key()
}

/// Record an accepted tag value (mutation-only, no limit check). Used by the `DropEvent`
Expand Down Expand Up @@ -292,15 +346,21 @@ impl TagCardinalityLimit {

if !pair_exists {
if !self.can_allocate_new_key() {
return true;
// See `try_accept_tag` for rationale.
self.reclaim_empty_buckets();
if !self.can_allocate_new_key() {
return true;
}
}
self.record_new_key_allocation();
}

let metric_accepted_tags = self.accepted_tags.entry(metric_key_owned).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(&config.mode))
.or_insert_with(|| {
AcceptedTagValueSet::new(&config.mode, config.ttl_secs, config.ttl_generations)
})
.insert(value.clone());
false
}
Expand Down
Loading
Loading