diff --git a/docs/embed.md b/docs/embed.md index 89b11e7..b1894ad 100644 --- a/docs/embed.md +++ b/docs/embed.md @@ -53,6 +53,16 @@ type TraceConsumer interface { ConsumeTraces(ctx, []Span) error } Consumers receive batches. Producer modules push size-1 batches today; the framework allows larger batches if a module wants to coalesce. +### Generators by signal type + +| Signal | Generators wired today | Consumer field on `embed.Host` | +|---------|-------------------------------------------------------------------------------------------|--------------------------------| +| Logs | `apache`, `apache_combined`, `apache_error`, `filegen`, `fix`, `json`, `kubernetes`, `nginx`, `okta`, `paloalto`, `postgres`, `wel` | `Host.Logs` | +| Metrics | `hostmetrics` | `Host.Metrics` | +| Traces | `traces` *(not yet wired through embed — see PIPE-1024)* | `Host.Traces` | + +Embedded hosts that load configuration via `config.LoadModules` must populate the relevant `LogConsumer` / `MetricConsumer` / `TraceConsumer` field on `EmbedOpts` for whichever signal types their generators yield; missing consumers surface as a clear construction-time error rather than a runtime no-op. + ## Constructing an embedded runner ```go @@ -69,16 +79,24 @@ func main() { // 1. Host owns the consumers and ambient resources. host := embed.Host{ Logs: myLogConsumer, + Metrics: myMetricConsumer, // optional — required when a metric generator is wired Logger: logger, - // Metrics / Traces / Resource also available. + // Traces / Resource also available. } // 2. Construct modules, passing the appropriate consumer from host. apacheGen, _ := apache.New(logger, /*workers*/ 1, /*rate*/ time.Second, host.Logs) + hmGen, _ := hostmetrics.New(hostmetrics.Config{ + Logger: logger, + Workers: 1, + Rate: 10 * time.Second, + OS: "linux", + Consumer: host.Metrics, + }) // 3. Build the runner. runner, err := embed.New(embed.Config{ - Modules: []embed.ProducerModule{apacheGen}, + Modules: []embed.ProducerModule{apacheGen, hmGen}, }) if err != nil { /* ... */ } diff --git a/embed/integration_test.go b/embed/integration_test.go index 97a7da4..18c31f1 100644 --- a/embed/integration_test.go +++ b/embed/integration_test.go @@ -8,6 +8,7 @@ import ( "github.com/observiq/blitz/embed" "github.com/observiq/blitz/generator/apache" + "github.com/observiq/blitz/generator/hostmetrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -82,6 +83,76 @@ func TestEmbed_NewRejectsEmptyModules(t *testing.T) { assert.Contains(t, err.Error(), "Modules cannot be empty") } +// memoryMetricConsumer captures every metric-point batch pushed through +// ConsumeMetrics. +type memoryMetricConsumer struct { + mu sync.Mutex + points []embed.MetricPoint +} + +func (c *memoryMetricConsumer) ConsumeMetrics(_ context.Context, batch []embed.MetricPoint) error { + c.mu.Lock() + defer c.mu.Unlock() + c.points = append(c.points, batch...) + return nil +} + +func (c *memoryMetricConsumer) snapshot() []embed.MetricPoint { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]embed.MetricPoint, len(c.points)) + copy(out, c.points) + return out +} + +// TestEmbed_HostMetricsPointsFlowToMemoryConsumer is the parallel +// metrics-path smoke test for PIPE-1023: a hostmetrics generator +// constructed against an embed.MetricConsumer, wrapped in embed.New, +// started via the Runner, produces points the host observes in-process. +func TestEmbed_HostMetricsPointsFlowToMemoryConsumer(t *testing.T) { + logger := zaptest.NewLogger(t) + consumer := &memoryMetricConsumer{} + + gen, err := hostmetrics.New(hostmetrics.Config{ + Logger: logger, + Workers: 1, + Rate: 50 * time.Millisecond, + OS: "linux", + Hostname: "test-host", + ScraperNames: []string{"cpu", "memory"}, + Consumer: consumer, + }) + require.NoError(t, err) + + runner, err := embed.New(embed.Config{ + Modules: []embed.ProducerModule{gen}, + }) + require.NoError(t, err) + + host := embed.Host{ + Metrics: consumer, + Logger: logger, + } + require.NoError(t, runner.Start(context.Background(), host)) + + require.Eventually(t, + func() bool { return len(consumer.snapshot()) >= 3 }, + 2*time.Second, 20*time.Millisecond, + "expected at least 3 metric points to flow through the embed seam", + ) + + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.NoError(t, runner.Stop(stopCtx)) + + points := consumer.snapshot() + assert.GreaterOrEqual(t, len(points), 3) + for _, p := range points { + assert.NotEmpty(t, p.Name, "expected non-empty Name on metric point") + assert.NotZero(t, p.Metadata.Timestamp, "expected non-zero Timestamp") + } +} + func TestEmbed_RunnerRejectsDoubleStart(t *testing.T) { logger := zaptest.NewLogger(t) consumer := &memoryLogConsumer{}