Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions cmd/blitz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/observiq/blitz/generator/count"
"github.com/observiq/blitz/generator/hostmetrics"
gennop "github.com/observiq/blitz/generator/nop"
tracesgen "github.com/observiq/blitz/generator/traces"
"github.com/observiq/blitz/generator/winevt"
Expand Down Expand Up @@ -400,38 +399,31 @@ shutdown:
func createGenerator(logger *zap.Logger, genCfg config.Generator, out output.Output) (any, error) {
// Standalone-CLI-only generator types that dispatch.ForEmbed does not
// construct (winevt is deprecated for embed; nop yields no records;
// hostmetrics + traces are not yet migrated — those land in
// PIPE-1023 / PIPE-1024). All other generators delegate to
// dispatch.ForEmbed so the construction logic lives in exactly one
// place.
// traces is not yet migrated — lands in PIPE-1024). All other
// generators delegate to dispatch.ForEmbed so the construction logic
// lives in exactly one place.
switch genCfg.Type {
case config.GeneratorTypeNop:
return gennop.New(logger)
case config.GeneratorTypeWinevt:
return winevt.New(logger, genCfg.Winevt.Workers, genCfg.Winevt.Rate)
case config.GeneratorTypeHostMetrics:
mw, ok := out.(output.MetricWriter)
if !ok {
return nil, fmt.Errorf("hostmetrics requires an output that supports MetricWriter; configured output does not")
}
return hostmetrics.New(hostmetrics.Config{
Logger: logger,
Workers: genCfg.HostMetrics.Workers,
Rate: genCfg.HostMetrics.Rate,
OS: genCfg.HostMetrics.OS,
Hostname: genCfg.HostMetrics.Hostname,
ScraperNames: genCfg.HostMetrics.Scrapers,
Consumer: output.WriterAsMetricConsumer(mw),
Seed: genCfg.HostMetrics.Seed,
})
case config.GeneratorTypeTraces:
return tracesgen.New(logger, genCfg.Traces.Workers, genCfg.Traces.Rate)
}

// All remaining (embed-eligible) types go through the canonical
// dispatch.ForEmbed path, with the standalone output wrapped as a
// LogConsumer.
mod, err := dispatch.ForEmbed(logger, genCfg, output.WriterAsLogConsumer(out), nil)
// dispatch.ForEmbed path. Outputs that implement MetricWriter get
// wrapped as a MetricConsumer so metric-yielding generators
// (hostmetrics) work standalone; ForEmbed rejects with a clear
// message when an output doesn't support a signal the configured
// generator needs.
consumers := dispatch.EmbedConsumers{
LogConsumer: output.WriterAsLogConsumer(out),
}
if mw, ok := out.(output.MetricWriter); ok {
consumers.MetricConsumer = output.WriterAsMetricConsumer(mw)
}
mod, err := dispatch.ForEmbed(logger, genCfg, consumers, nil)
if err != nil {
return nil, err
}
Expand Down
23 changes: 18 additions & 5 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,20 @@ type EmbedOpts struct {
Logger *zap.Logger

// LogConsumer is the destination for every record any constructed
// log generator produces. Required.
// log generator produces. Required when a log-yielding generator is
// configured.
LogConsumer embed.LogConsumer

// MetricConsumer is the destination for every metric point any
// constructed metric generator (e.g. hostmetrics) produces. Required
// when a metric-yielding generator is configured.
MetricConsumer embed.MetricConsumer

// TraceConsumer is the destination for every span batch any
// constructed trace generator (e.g. traces) produces. Required when
// a trace-yielding generator is configured.
TraceConsumer embed.TraceConsumer

// FileGenLibrary is the optional filesystem the filegen generator
// consults for `package:`-prefixed source references and bare-name
// library fallbacks. Pass embeddedlibrary.FS() (with the
Expand Down Expand Up @@ -129,9 +140,6 @@ type EmbedOpts struct {
// embed-eligible; partial results are NOT returned, so callers don't
// silently lose generators they thought would be wired up.
func LoadModules(yamlBytes []byte, opts EmbedOpts) ([]embed.ProducerModule, error) {
if opts.LogConsumer == nil {
return nil, fmt.Errorf("EmbedOpts.LogConsumer is required")
}
logger := opts.Logger
if logger == nil {
logger = zap.NewNop()
Expand All @@ -141,9 +149,14 @@ func LoadModules(yamlBytes []byte, opts EmbedOpts) ([]embed.ProducerModule, erro
return nil, err
}
gens := cfg.EffectiveGenerators()
consumers := dispatch.EmbedConsumers{
LogConsumer: opts.LogConsumer,
MetricConsumer: opts.MetricConsumer,
TraceConsumer: opts.TraceConsumer,
}
modules := make([]embed.ProducerModule, 0, len(gens))
for i, gen := range gens {
mod, err := dispatch.ForEmbed(logger, gen, opts.LogConsumer, opts.FileGenLibrary)
mod, err := dispatch.ForEmbed(logger, gen, consumers, opts.FileGenLibrary)
if err != nil {
return nil, fmt.Errorf("generator[%d] type=%q: %w", i, gen.Type, err)
}
Expand Down
25 changes: 19 additions & 6 deletions config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,22 @@ metrics:
}

func TestLoadModules_NilConsumer(t *testing.T) {
_, err := config.LoadModules([]byte(`generator: {type: apache-common}`), config.EmbedOpts{})
yaml := []byte(`
generator:
type: apache-common
apache-common:
workers: 1
rate: 1s
output:
type: nop
logging:
type: stdout
metrics:
port: 19000
`)
_, err := config.LoadModules(yaml, config.EmbedOpts{})
require.Error(t, err)
assert.Contains(t, err.Error(), "LogConsumer is required")
assert.Contains(t, err.Error(), "LogConsumer")
}

func TestLoadModules_SingleProducer(t *testing.T) {
Expand Down Expand Up @@ -148,9 +161,9 @@ metrics:
assert.ElementsMatch(t, []string{"apache", "nginx", "postgres"}, names)
}

func TestLoadModules_RejectsNonProducer(t *testing.T) {
// hostmetrics is a metric Producer, not a log Producer; LoadModules
// must reject it explicitly rather than silently dropping it.
func TestLoadModules_HostMetricsRequiresMetricConsumer(t *testing.T) {
// hostmetrics is a metric Producer; LoadModules must require
// EmbedOpts.MetricConsumer rather than accepting a log-only opts.
yaml := []byte(`
generator:
type: hostmetrics
Expand All @@ -167,7 +180,7 @@ metrics:
`)
_, err := config.LoadModules(yaml, config.EmbedOpts{LogConsumer: &mockConsumer{}})
require.Error(t, err)
assert.Contains(t, err.Error(), "hostmetrics")
assert.Contains(t, err.Error(), "MetricConsumer")
}

func TestLoadModules_RejectsWinevt(t *testing.T) {
Expand Down
156 changes: 128 additions & 28 deletions internal/dispatch/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/observiq/blitz/generator/filegen"
fixgen "github.com/observiq/blitz/generator/fix"
"github.com/observiq/blitz/generator/fix/catalog"
"github.com/observiq/blitz/generator/hostmetrics"
jsongen "github.com/observiq/blitz/generator/json"
"github.com/observiq/blitz/generator/kubernetes"
"github.com/observiq/blitz/generator/nginx"
Expand All @@ -28,45 +29,110 @@ import (
"go.uber.org/zap"
)

// EmbedConsumers bundles the per-signal consumers an embedding host can
// supply. LogConsumer is required for any log-yielding generator;
// MetricConsumer is required for metric-yielding generators
// (hostmetrics); TraceConsumer is required for trace-yielding
// generators (traces). ForEmbed only consults the field that matches
// the requested generator type — pass nil for the others when their
// signal isn't needed.
type EmbedConsumers struct {
LogConsumer embed.LogConsumer
MetricConsumer embed.MetricConsumer
TraceConsumer embed.TraceConsumer
}

func (c EmbedConsumers) requireLog(typ config.GeneratorType) error {
if c.LogConsumer == nil {
return fmt.Errorf("generator type %q requires EmbedConsumers.LogConsumer", typ)
}
return nil
}

func (c EmbedConsumers) requireMetric(typ config.GeneratorType) error {
if c.MetricConsumer == nil {
return fmt.Errorf("generator type %q requires EmbedConsumers.MetricConsumer", typ)
}
return nil
}

func (c EmbedConsumers) requireTrace(typ config.GeneratorType) error {
if c.TraceConsumer == nil {
return fmt.Errorf("generator type %q requires EmbedConsumers.TraceConsumer", typ)
}
return nil
}

// ForEmbed constructs an embed.ProducerModule for the given generator
// config wired to the supplied log consumer. fileGenLibrary is optional
// and only consulted by the filegen generator: pass embeddedlibrary.FS()
// (with the `embed_library` build tag set) to use the snapshot shipped
// in the blitz module, or nil to fall back to reading ./data_library/
// from the process cwd.
// config wired to the relevant consumer in `consumers`. fileGenLibrary
// is optional and only consulted by the filegen generator: pass
// embeddedlibrary.FS() (with the `embed_library` build tag set) to use
// the snapshot shipped in the blitz module, or nil to fall back to
// reading ./data_library/ from the process cwd.
//
// Returns an error for non-Producer generator types (nop, winevt,
// hostmetrics, traces) — these either don't yield logs at all or
// are not yet migrated to the embed.LogConsumer contract.
func ForEmbed(logger *zap.Logger, genCfg config.Generator, consumer embed.LogConsumer, fileGenLibrary fs.FS) (embed.ProducerModule, error) {
// Returns an error when the configured generator type requires a
// consumer that is nil in `consumers` (e.g. hostmetrics without a
// MetricConsumer), and for generator types that are not embed-eligible
// at all (nop, winevt — see PIPE-1032).
func ForEmbed(logger *zap.Logger, genCfg config.Generator, consumers EmbedConsumers, fileGenLibrary fs.FS) (embed.ProducerModule, error) {
if logger == nil {
return nil, fmt.Errorf("logger cannot be nil")
}
if consumer == nil {
return nil, fmt.Errorf("consumer cannot be nil")
}
switch genCfg.Type {
case config.GeneratorTypeJSON:
return jsongen.New(logger, genCfg.JSON.Workers, genCfg.JSON.Rate, genCfg.JSON.Type, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return jsongen.New(logger, genCfg.JSON.Workers, genCfg.JSON.Rate, genCfg.JSON.Type, consumers.LogConsumer)
case config.GeneratorTypePaloAlto:
return paloalto.New(logger, genCfg.PaloAlto.Workers, genCfg.PaloAlto.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return paloalto.New(logger, genCfg.PaloAlto.Workers, genCfg.PaloAlto.Rate, consumers.LogConsumer)
case config.GeneratorTypeApache:
return apachegen.New(logger, genCfg.Apache.Workers, genCfg.Apache.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return apachegen.New(logger, genCfg.Apache.Workers, genCfg.Apache.Rate, consumers.LogConsumer)
case config.GeneratorTypeApacheCombined:
return apachecombinedgen.New(logger, genCfg.ApacheCombined.Workers, genCfg.ApacheCombined.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return apachecombinedgen.New(logger, genCfg.ApacheCombined.Workers, genCfg.ApacheCombined.Rate, consumers.LogConsumer)
case config.GeneratorTypeApacheError:
return apacheerrorgen.New(logger, genCfg.ApacheError.Workers, genCfg.ApacheError.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return apacheerrorgen.New(logger, genCfg.ApacheError.Workers, genCfg.ApacheError.Rate, consumers.LogConsumer)
case config.GeneratorTypeNginx:
return nginx.New(logger, genCfg.Nginx.Workers, genCfg.Nginx.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return nginx.New(logger, genCfg.Nginx.Workers, genCfg.Nginx.Rate, consumers.LogConsumer)
case config.GeneratorTypePostgres:
return postgres.New(logger, genCfg.Postgres.Workers, genCfg.Postgres.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return postgres.New(logger, genCfg.Postgres.Workers, genCfg.Postgres.Rate, consumers.LogConsumer)
case config.GeneratorTypeKubernetes:
return kubernetes.New(logger, genCfg.Kubernetes.Workers, genCfg.Kubernetes.Rate, genCfg.Kubernetes.Format, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return kubernetes.New(logger, genCfg.Kubernetes.Workers, genCfg.Kubernetes.Rate, genCfg.Kubernetes.Format, consumers.LogConsumer)
case config.GeneratorTypeFile:
return filegen.New(logger, genCfg.Filegen.Workers, genCfg.Filegen.Rate, genCfg.Filegen.Source, genCfg.Filegen.CacheEnabled, genCfg.Filegen.CacheTTL, consumer, fileGenLibrary)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return filegen.New(logger, genCfg.Filegen.Workers, genCfg.Filegen.Rate, genCfg.Filegen.Source, genCfg.Filegen.CacheEnabled, genCfg.Filegen.CacheTTL, consumers.LogConsumer, fileGenLibrary)
case config.GeneratorTypeOkta:
return okta.New(logger, genCfg.Okta.Workers, genCfg.Okta.Rate, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return okta.New(logger, genCfg.Okta.Workers, genCfg.Okta.Rate, consumers.LogConsumer)
case config.GeneratorTypeWel:
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
role := welcatalog.MachineRole(genCfg.Wel.Role)
if role == "" {
role = welcatalog.RoleMember
Expand All @@ -79,23 +145,57 @@ func ForEmbed(logger *zap.Logger, genCfg config.Generator, consumer embed.LogCon
Domain: genCfg.Wel.Domain,
Role: role,
Channels: genCfg.Wel.Channels,
Consumer: consumer,
Consumer: consumers.LogConsumer,
})
case config.GeneratorTypeFIX:
return newFIX(logger, genCfg.FIX, consumer)
if err := consumers.requireLog(genCfg.Type); err != nil {
return nil, err
}
return newFIX(logger, genCfg.FIX, consumers.LogConsumer)
case config.GeneratorTypeHostMetrics:
if err := consumers.requireMetric(genCfg.Type); err != nil {
return nil, err
}
return hostmetrics.New(hostmetrics.Config{
Logger: logger,
Workers: genCfg.HostMetrics.Workers,
Rate: genCfg.HostMetrics.Rate,
OS: genCfg.HostMetrics.OS,
Hostname: genCfg.HostMetrics.Hostname,
ScraperNames: genCfg.HostMetrics.Scrapers,
Consumer: consumers.MetricConsumer,
Seed: yamlSeedDefault(genCfg.HostMetrics.Seed),
})
case config.GeneratorTypeNop:
return nil, fmt.Errorf("generator type %q does not produce log records; not embed-eligible", genCfg.Type)
return nil, fmt.Errorf("generator type %q does not produce records; not embed-eligible", genCfg.Type)
case config.GeneratorTypeWinevt:
return nil, fmt.Errorf("generator type %q is DEPRECATED and is not available via embed; the legacy single-template Windows Event XML generator has been superseded by the multi-channel `wel` generator (see docs/generator/wel.md). The standalone blitz CLI still accepts `winevt` with a deprecation warning", genCfg.Type)
case config.GeneratorTypeHostMetrics:
return nil, fmt.Errorf("generator type %q produces metrics, not logs; the embed contract supports a separate MetricConsumer path that is not yet wired for this generator", genCfg.Type)
case config.GeneratorTypeTraces:
return nil, fmt.Errorf("generator type %q produces traces, not logs; the embed contract supports a separate TraceConsumer path that is not yet wired for this generator", genCfg.Type)
return nil, fmt.Errorf("generator type %q produces traces; the embed contract supports a separate TraceConsumer path that is not yet wired for this generator", genCfg.Type)
default:
return nil, fmt.Errorf("unknown generator type %q", genCfg.Type)
}
}

// yamlSeedDefault translates a YAML-loaded Seed value into the
// generator-Config Seed value, applying the "stochastic by default"
// architectural intent for YAML users. YAML zero-value (omitted `seed:`
// key) and an explicit `seed: 0` both map to -1 (randomize per worker).
// Any other value passes through unchanged.
//
// Programmatic Go callers bypass this translation and observe whatever
// literal value they pass — useful for tests that want to pin seed=0.
//
// PIPE-1036 will route per-machine identity determinism through the
// top-level `environment.seed_config` instead; this knob will then
// govern only the generator's record-content RNG, not host identity.
func yamlSeedDefault(yamlSeed int64) int64 {
if yamlSeed == 0 {
return -1
}
return yamlSeed
}

// newFIX translates the YAML-shaped FIXGeneratorConfig into the
// catalog-typed fix.Config and constructs a FIX generator. Version and
// EnabledCategories strings are validated; an empty version defaults to
Expand Down
Loading
Loading