Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/reduce_trace_events.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The `reduce` transform now accepts a `data_type` option (`log` or
`trace`, default `log`) that selects whether the instance collapses log
events or trace events. The existing merge strategies and conditions
apply unchanged to trace events.

authors: p120ph37
42 changes: 39 additions & 3 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,43 @@ use crate::{
},
};

/// The event data type a `reduce` transform instance accepts and emits.
///
/// A single `reduce` instance handles exactly one data type. To reduce both
/// logs and traces, instantiate two `reduce` transforms with different
/// `data_type` values.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ReduceDataType {
/// Accept and emit `log` events.
#[default]
Log,

/// Accept and emit `trace` events.
Trace,
}

/// Configuration for the `reduce` transform.
#[serde_as]
#[configurable_component(transform(
"reduce",
"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
"Collapse multiple log or trace events into a single event based on a set of conditions and merge strategies.",
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct ReduceConfig {
/// The event data type this transform instance operates on.
///
/// `reduce` accepts and emits a single data type per instance. Defaults
/// to `log` to preserve historical behavior; set to `trace` to reduce
/// trace events instead. The selected value drives both the topology-
/// level input type filter and the type of the emitted reduced events.
#[serde(default)]
#[configurable(metadata(docs::human_name = "Data Type"))]
pub data_type: ReduceDataType,

/// The maximum period of time to wait after the last event is received, in milliseconds, before
/// a combined event should be considered complete.
#[serde(default = "default_expire_after_ms")]
Expand Down Expand Up @@ -124,7 +151,10 @@ impl TransformConfig for ReduceConfig {
}

fn input(&self) -> Input {
Input::log()
match self.data_type {
ReduceDataType::Log => Input::log(),
ReduceDataType::Trace => Input::trace(),
}
}

fn outputs(
Expand Down Expand Up @@ -229,7 +259,13 @@ impl TransformConfig for ReduceConfig {
output_definitions.insert(output.clone(), schema_definition.clone());
}

vec![TransformOutput::new(DataType::Log, output_definitions)]
vec![TransformOutput::new(
match self.data_type {
ReduceDataType::Log => DataType::Log,
ReduceDataType::Trace => DataType::Trace,
},
output_definitions,
)]
}
}

Expand Down
242 changes: 225 additions & 17 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use vrl::{

use crate::{
conditions::Condition,
event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
event::{Event, EventMetadata, LogEvent, TraceEvent, discriminant::Discriminant},
internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
transforms::{
TaskTransform,
reduce::{
config::ReduceConfig,
config::{ReduceConfig, ReduceDataType},
merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
},
},
Expand Down Expand Up @@ -150,6 +150,7 @@ pub struct Reduce {
ends_when: Option<Condition>,
starts_when: Option<Condition>,
max_events: Option<usize>,
data_type: ReduceDataType,
}

fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
Expand Down Expand Up @@ -219,6 +220,7 @@ impl Reduce {
ends_when,
starts_when,
max_events,
data_type: config.data_type,
})
}

Expand All @@ -239,15 +241,16 @@ impl Reduce {
for k in &flush_discriminants {
if let Some(t) = self.reduce_merge_states.remove(k) {
emit!(ReduceStaleEventFlushed);
emitter.emit(Event::from(t.flush()));
emitter.emit(wrap_flushed(t.flush(), self.data_type));
}
}
}

fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
let data_type = self.data_type;
self.reduce_merge_states
.drain()
.for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
.for_each(|(_, s)| emitter.emit(wrap_flushed(s.flush(), data_type)));
}

fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
Expand All @@ -264,6 +267,22 @@ impl Reduce {
}

pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
// `input()` restricts the variants we can see here to `Log` or
// `Trace` based on `data_type`. `TraceEvent` is a newtype around
// `LogEvent` (`From<TraceEvent> for LogEvent` is lossless), so we
// unwrap traces to `Event::Log` before evaluating conditions. This
// lets log-only matchers (e.g. `datadog_search`, whose runner only
// matches `EventRef::Log`) fire for trace inputs, and avoids a
// second unwrap after each condition. We re-wrap on flush via
// `wrap_flushed`.
let event = match event {
Event::Log(_) => event,
Event::Trace(trace) => Event::Log(LogEvent::from(trace)),
Comment thread
p120ph37 marked this conversation as resolved.
Outdated
Event::Metric(_) => {
unreachable!("reduce input() rejects metric events")
}
};

let (starts_here, event) = match &self.starts_when {
Some(condition) => condition.check(event),
None => (false, event),
Expand All @@ -290,28 +309,42 @@ impl Reduce {

if starts_here {
if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
emitter.emit(state.flush().into());
emitter.emit(wrap_flushed(state.flush(), self.data_type));
}

self.push_or_new_reduce_state(event, discriminant)
} else if ends_here {
emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
Some(mut state) => {
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
None => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
});
emitter.emit(wrap_flushed(
match self.reduce_merge_states.remove(&discriminant) {
Some(mut state) => {
state.add_event(event, &self.merge_strategies);
state.flush()
}
None => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies);
state.flush()
}
},
self.data_type,
));
} else {
self.push_or_new_reduce_state(event, discriminant)
}
}
}

/// Wrap a reduced `LogEvent` back into the `Event` variant declared by the
/// transform's `data_type`. Pairs with the input-variant extraction in
/// `transform_one`: a `Log` input round-trips as `Event::Log`, a `Trace`
/// input round-trips as `Event::Trace(TraceEvent::from(log))`.
fn wrap_flushed(log: LogEvent, data_type: ReduceDataType) -> Event {
match data_type {
ReduceDataType::Log => Event::Log(log),
ReduceDataType::Trace => Event::Trace(TraceEvent::from(log)),
}
}

impl TaskTransform<Event> for Reduce {
fn transform(
self: Box<Self>,
Expand Down Expand Up @@ -367,7 +400,7 @@ mod test {
use super::*;
use crate::{
config::{OutputId, TransformConfig, schema, schema::Definition},
event::{LogEvent, Value},
event::{LogEvent, TraceEvent, Value},
test_util::components::assert_transform_compliance,
transforms::test::create_topology,
};
Expand Down Expand Up @@ -1047,4 +1080,179 @@ merge_strategies.bar = "concat"
})
.await
}

#[tokio::test]
async fn reduce_trace_events() {
// Mirror of `reduce_from_condition`, but configured with
// `data_type = "trace"` so the reduce instance accepts and emits
// `Event::Trace` rather than `Event::Log`. Verifies that the
// existing field-merge machinery works unchanged across event
// variants and that the output round-trips through `TraceEvent`.
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
data_type = "trace"
group_by = [ "request_id" ]

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("trace message 1");
e_1.insert("counter", 1);
e_1.insert("request_id", "1");

let mut e_2 = LogEvent::from("trace message 2");
e_2.insert("counter", 2);
e_2.insert("request_id", "1");

let mut e_3 = LogEvent::from("trace message 3");
e_3.insert("counter", 3);
e_3.insert("request_id", "1");
e_3.insert("test_end", "yep");

for log in [e_1, e_2, e_3] {
tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap();
}

let output = out.recv().await.unwrap();
// Output variant must match the configured data_type.
let trace = match output {
Event::Trace(t) => t,
other => panic!("expected Event::Trace, got {other:?}"),
};
assert_eq!(trace.get("message"), Some(&"trace message 1".into()));
assert_eq!(trace.get("counter"), Some(&Value::from(6)));

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn reduce_trace_merge_strategies() {
// Exercise per-field merge strategies on trace events to confirm
// the strategy machinery is fully event-type-agnostic.
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
data_type = "trace"
group_by = [ "request_id" ]

merge_strategies.foo = "concat"
merge_strategies.bar = "array"
merge_strategies.baz = "max"

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("trace message 1");
e_1.insert("foo", "first foo");
e_1.insert("bar", "first bar");
e_1.insert("baz", 2);
e_1.insert("request_id", "1");
tx.send(Event::Trace(TraceEvent::from(e_1))).await.unwrap();

let mut e_2 = LogEvent::from("trace message 2");
e_2.insert("foo", "second foo");
e_2.insert("bar", 2);
e_2.insert("baz", "not number");
e_2.insert("request_id", "1");
tx.send(Event::Trace(TraceEvent::from(e_2))).await.unwrap();

let mut e_3 = LogEvent::from("trace message 3");
e_3.insert("foo", 10);
e_3.insert("bar", "third bar");
e_3.insert("baz", 3);
e_3.insert("request_id", "1");
e_3.insert("test_end", "yep");
tx.send(Event::Trace(TraceEvent::from(e_3))).await.unwrap();

let trace = match out.recv().await.unwrap() {
Event::Trace(t) => t,
other => panic!("expected Event::Trace, got {other:?}"),
};
assert_eq!(trace.get("message"), Some(&"trace message 1".into()));
assert_eq!(trace.get("foo"), Some(&"first foo second foo".into()));
assert_eq!(
trace.get("bar"),
Some(&Value::Array(vec![
"first bar".into(),
2.into(),
"third bar".into(),
])),
);
assert_eq!(trace.get("baz"), Some(&3.into()));

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn reduce_trace_with_datadog_search_condition() {
// `DatadogSearchRunner::matches` only matches `EventRef::Log`, so
// boundary conditions would silently never fire on trace inputs if
// the trace were not unwrapped to `Event::Log` before condition
// evaluation. This test pins that contract.
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
data_type = "trace"
group_by = [ "request_id" ]

[ends_when]
type = "datadog_search"
source = "@test_end:yep"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("trace message 1");
e_1.insert("counter", 1);
e_1.insert("request_id", "1");

let mut e_2 = LogEvent::from("trace message 2");
e_2.insert("counter", 2);
e_2.insert("request_id", "1");
e_2.insert("test_end", "yep");

for log in [e_1, e_2] {
tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap();
}

let trace = match out.recv().await.unwrap() {
Event::Trace(t) => t,
other => panic!("expected Event::Trace, got {other:?}"),
};
assert_eq!(trace.get("message"), Some(&"trace message 1".into()));
assert_eq!(trace.get("counter"), Some(&Value::from(3)));
assert_eq!(trace.get("test_end"), Some(&"yep".into()));

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
}
Loading
Loading