Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions cmd/blitz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/observiq/blitz/generator/count"
gennop "github.com/observiq/blitz/generator/nop"
tracesgen "github.com/observiq/blitz/generator/traces"
"github.com/observiq/blitz/generator/winevt"
"github.com/observiq/blitz/internal/build"
"github.com/observiq/blitz/internal/config"
Expand Down Expand Up @@ -398,42 +397,32 @@ shutdown:

func createGenerator(logger *zap.Logger, genCfg config.Generator, out output.Output) (any, error) {
// Standalone-CLI-only generator types that dispatch.ForEmbed does not
// construct (winevt is deprecated for embed; nop yields no records;
// traces is not yet migrated — lands in PIPE-1024). All other
// generators delegate to dispatch.ForEmbed so the construction logic
// lives in exactly one place.
// construct (winevt is deprecated for embed; nop yields no records).
// All other generators delegate to dispatch.ForEmbed so the
// construction logic lives in exactly one place.
switch genCfg.Type {
case config.GeneratorTypeNop:
return gennop.New(logger)
case config.GeneratorTypeWinevt:
return winevt.New(logger, genCfg.Winevt.Workers, genCfg.Winevt.Rate)
case config.GeneratorTypeTraces:
tw, ok := out.(output.TraceWriter)
if !ok {
return nil, fmt.Errorf("traces requires an output that supports TraceWriter; configured output does not")
}
return tracesgen.New(tracesgen.Config{
Logger: logger,
Workers: genCfg.Traces.Workers,
Rate: genCfg.Traces.Rate,
Hostname: genCfg.Traces.Hostname,
Consumer: output.WriterAsTraceConsumer(tw),
Seed: genCfg.Traces.Seed,
})
}

// All remaining (embed-eligible) types go through the canonical
// dispatch.ForEmbed path. Outputs that implement MetricWriter get
// wrapped as a MetricConsumer so metric-yielding generators
// (hostmetrics) work standalone; ForEmbed rejects with a clear
// message when an output doesn't support a signal the configured
// generator needs.
// dispatch.ForEmbed path. Outputs that implement MetricWriter or
// TraceWriter get wrapped as the corresponding consumer so
// metric-yielding (hostmetrics) and trace-yielding (traces)
// generators work standalone; ForEmbed rejects with a clear message
// when an output doesn't support a signal the configured generator
// needs.
consumers := dispatch.EmbedConsumers{
LogConsumer: output.WriterAsLogConsumer(out),
}
if mw, ok := out.(output.MetricWriter); ok {
consumers.MetricConsumer = output.WriterAsMetricConsumer(mw)
}
if tw, ok := out.(output.TraceWriter); ok {
consumers.TraceConsumer = output.WriterAsTraceConsumer(tw)
}
mod, err := dispatch.ForEmbed(logger, genCfg, consumers, nil)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type EmbedOpts struct {
// when a metric-yielding generator is configured.
MetricConsumer embed.MetricConsumer

// TraceConsumer is the destination for every span batch any
// constructed trace generator (e.g. traces) produces. Required when
// a trace-yielding generator is configured.
// TraceConsumer is the destination for every span any constructed
// trace generator (e.g. traces) produces. Required when a
// trace-yielding generator is configured.
TraceConsumer embed.TraceConsumer

// FileGenLibrary is the optional filesystem the filegen generator
Expand Down
21 changes: 21 additions & 0 deletions config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,27 @@ metrics:
assert.Contains(t, err.Error(), "MetricConsumer")
}

func TestLoadModules_TracesRequiresTraceConsumer(t *testing.T) {
// traces is a trace Producer; LoadModules must require
// EmbedOpts.TraceConsumer rather than accepting a log-only opts.
yaml := []byte(`
generator:
type: traces
traces:
workers: 1
rate: 1s
output:
type: nop
logging:
type: stdout
metrics:
port: 19000
`)
_, err := config.LoadModules(yaml, config.EmbedOpts{LogConsumer: &mockConsumer{}})
require.Error(t, err)
assert.Contains(t, err.Error(), "TraceConsumer")
}

func TestLoadModules_RejectsWinevt(t *testing.T) {
yaml := []byte(`
generator:
Expand Down
19 changes: 15 additions & 4 deletions internal/dispatch/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/observiq/blitz/generator/okta"
"github.com/observiq/blitz/generator/paloalto"
"github.com/observiq/blitz/generator/postgres"
tracesgen "github.com/observiq/blitz/generator/traces"
"github.com/observiq/blitz/generator/wel"
welcatalog "github.com/observiq/blitz/generator/wel/catalog"
"github.com/observiq/blitz/internal/config"
Expand Down Expand Up @@ -72,8 +73,8 @@ func (c EmbedConsumers) requireTrace(typ config.GeneratorType) error {
//
// Returns an error when the configured generator type requires a
// consumer that is nil in `consumers` (e.g. hostmetrics without a
// MetricConsumer), and for generator types that are not embed-eligible
// at all (nop, winevt — see PIPE-1032).
// MetricConsumer, traces without a TraceConsumer), and for generator
// types that are not embed-eligible at all (nop, winevt — see PIPE-1032).
func ForEmbed(logger *zap.Logger, genCfg config.Generator, consumers EmbedConsumers, fileGenLibrary fs.FS) (embed.ProducerModule, error) {
if logger == nil {
return nil, fmt.Errorf("logger cannot be nil")
Expand Down Expand Up @@ -166,12 +167,22 @@ func ForEmbed(logger *zap.Logger, genCfg config.Generator, consumers EmbedConsum
Consumer: consumers.MetricConsumer,
Seed: yamlSeedDefault(genCfg.HostMetrics.Seed),
})
case config.GeneratorTypeTraces:
if err := consumers.requireTrace(genCfg.Type); err != nil {
return nil, err
}
return tracesgen.New(tracesgen.Config{
Logger: logger,
Workers: genCfg.Traces.Workers,
Rate: genCfg.Traces.Rate,
Hostname: genCfg.Traces.Hostname,
Consumer: consumers.TraceConsumer,
Seed: yamlSeedDefault(genCfg.Traces.Seed),
})
case config.GeneratorTypeNop:
return nil, fmt.Errorf("generator type %q does not produce records; not embed-eligible", genCfg.Type)
case config.GeneratorTypeWinevt:
return nil, fmt.Errorf("generator type %q is DEPRECATED and is not available via embed; the legacy single-template Windows Event XML generator has been superseded by the multi-channel `wel` generator (see docs/generator/wel.md). The standalone blitz CLI still accepts `winevt` with a deprecation warning", genCfg.Type)
case config.GeneratorTypeTraces:
return nil, fmt.Errorf("generator type %q produces traces; the embed contract supports a separate TraceConsumer path that is not yet wired for this generator", genCfg.Type)
default:
return nil, fmt.Errorf("unknown generator type %q", genCfg.Type)
}
Expand Down
32 changes: 32 additions & 0 deletions internal/dispatch/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (noopMetricConsumer) ConsumeMetrics(_ context.Context, _ []embed.MetricPoin
return nil
}

type noopTraceConsumer struct{}

func (noopTraceConsumer) ConsumeTraces(_ context.Context, _ []embed.Span) error { return nil }

func logsOnly() EmbedConsumers {
return EmbedConsumers{LogConsumer: noopConsumer{}}
}
Expand Down Expand Up @@ -148,3 +152,31 @@ func TestForEmbedHostMetricsRejectsMissingMetricConsumer(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "MetricConsumer")
}

// PIPE-1024 additions — traces dispatch.
func TestForEmbedTracesReturnsProducerModule(t *testing.T) {
cfg := config.Generator{
Type: config.GeneratorTypeTraces,
Traces: config.TracesGeneratorConfig{
Workers: 1,
Rate: 50 * time.Millisecond,
},
}
mod, err := ForEmbed(zap.NewNop(), cfg, EmbedConsumers{TraceConsumer: noopTraceConsumer{}}, nil)
require.NoError(t, err)
require.NotNil(t, mod)
assert.Equal(t, "traces", mod.Name())
}

func TestForEmbedTracesRejectsMissingTraceConsumer(t *testing.T) {
cfg := config.Generator{
Type: config.GeneratorTypeTraces,
Traces: config.TracesGeneratorConfig{
Workers: 1,
Rate: time.Second,
},
}
_, err := ForEmbed(zap.NewNop(), cfg, EmbedConsumers{}, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "TraceConsumer")
}
Loading