From 1010779d072e1c5a9f7ad4653fa4b01c1d0bf5f3 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 09:58:32 -0400 Subject: [PATCH 01/12] Refactor using batch emitter --- pkg/durableemitter/durable_emitter.go | 639 ++++++++------------- pkg/durableemitter/durable_emitter_test.go | 399 ++++++++----- 2 files changed, 488 insertions(+), 550 deletions(-) diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index 71ca2701fe..e16590690c 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -11,17 +11,37 @@ import ( "time" "go.opentelemetry.io/otel/metric" - "google.golang.org/grpc" - grpcEncoding "google.golang.org/grpc/encoding" - "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" ) +// BatchEmitter is the transport interface DurableEmitter delegates to for +// batched, at-most-once delivery of CloudEvents to Chip Ingress. +// +// *batch.Client from pkg/chipingress/batch satisfies this interface and +// handles seqnum stamping, gRPC size splitting, concurrency limiting, and +// graceful shutdown with a configurable timeout. +// +// The callback passed to QueueMessage is invoked once after the batch +// containing the event is sent. A nil error means the RPC succeeded; a +// non-nil error means the batch was dropped — the event remains in the DB +// and the retransmit loop will retry it. +type BatchEmitter interface { + // QueueMessage enqueues a single CloudEvent for batched delivery. + // Returns an error only if the internal buffer is full or the client + // has been stopped. Callers must treat a non-nil return as a + // drop (the event is still persisted; retransmit will retry). + QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error + // Start begins background processing. Must be called before QueueMessage. + Start(ctx context.Context) + // Stop flushes any queued events, waits for all in-flight network calls + // and callbacks to complete, then closes the underlying transport. + Stop() +} + // DurableEmitterConfig configures the DurableEmitter behaviour. type DurableEmitterConfig struct { // RetransmitInterval controls how often the retransmit loop ticks. @@ -29,33 +49,21 @@ type DurableEmitterConfig struct { // RetransmitAfter is the minimum age of an event before the retransmit // loop considers it. This gives the batch publish path time to succeed. RetransmitAfter time.Duration - // RetransmitBatchSize caps how many pending rows are listed per retransmit tick - // (each row is enqueued for the batch publish workers). + // RetransmitBatchSize caps how many pending rows are listed per retransmit tick. RetransmitBatchSize int // ExpiryInterval controls how often the expiry loop ticks. ExpiryInterval time.Duration // EventTTL is the maximum age of an event before it is expired. EventTTL time.Duration - // PublishTimeout is the per-RPC deadline for each Publish call. + // PublishTimeout is the deadline for DB operations in delivery callbacks + // (MarkDeliveredBatch). The actual gRPC publish timeout is configured on + // the BatchEmitter (batch.Client) directly. PublishTimeout time.Duration // PurgeInterval is how often the purge loop runs to batch-delete rows that // were marked delivered (Postgres). Zero defaults to 250ms. PurgeInterval time.Duration // PurgeBatchSize is the maximum rows removed per PurgeDelivered call. Zero defaults to 500. PurgeBatchSize int - // PublishBatchSize is the target number of events per PublishBatch RPC. Values below 1 are - // clamped to 1 in NewDurableEmitter. - PublishBatchSize int - // PublishBatchWorkers is the number of concurrent goroutines that read from - // the batch channel and call PublishBatch. More workers means higher - // throughput (each worker handles one in-flight batch at a time). Zero defaults to 1. - PublishBatchWorkers int - // PublishBatchFlushInterval is the maximum time to wait for a full batch - // before flushing a partial one. Zero defaults to 50ms. - PublishBatchFlushInterval time.Duration - // PublishBatchChannelSize overrides the publishCh buffer capacity. Zero - // defaults to max(PublishBatchSize*2000, 200_000). - PublishBatchChannelSize int // DisablePruning disables the background purge (PurgeDelivered) and expiry // (DeleteExpired) loops. Events remain in the DB after delivery. Useful for // post-test analysis of created_at / delivered_at timestamps. @@ -79,15 +87,17 @@ type DurableEmitterConfig struct { InsertBatchWorkers int } -// DurableEmitterHooks records Publish vs Delete latency to locate pipeline bottlenecks. +// DurableEmitterHooks records delivery latency to locate pipeline bottlenecks. type DurableEmitterHooks struct { // OnEmitInsert is called after each store.Insert in Emit (the DB write that // blocks the caller). elapsed covers only the INSERT; err is nil on success. OnEmitInsert func(elapsed time.Duration, err error) - // OnBatchPublish is called after each PublishBatch RPC in the batch publish loop. - // batchSize is the number of events in the batch; err is nil on success. + // OnBatchPublish is called from the delivery callback after each event's + // batch is sent. elapsed is measured from QueueMessage call to callback + // invocation; batchSize is always 1 (one callback per event); err is nil + // on success. OnBatchPublish func(elapsed time.Duration, batchSize int, err error) - // OnBatchMarkDelivered is called after MarkDeliveredBatch following a successful batch publish. + // OnBatchMarkDelivered is called after MarkDeliveredBatch following a successful delivery. OnBatchMarkDelivered func(elapsed time.Duration, count int) } @@ -101,7 +111,6 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { PublishTimeout: 5 * time.Second, PurgeInterval: 250 * time.Millisecond, PurgeBatchSize: 500, - PublishBatchSize: 1, // Metrics is opt-in: callers who want instrumentation must set this // and pass a metric.Meter to NewDurableEmitter. Metrics: nil, @@ -110,21 +119,15 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // DurableEmitter implements Emitter with persistence-backed delivery guarantees. // -// Emit writes to a DurableEventStore, returns nil after insert, and enqueues the -// row for async PublishBatch delivery. Successful publishes are followed by -// MarkDeliveredBatch; the purge loop removes delivered rows from Postgres. When -// Chip is down or the publish channel is full, a retransmit loop lists stale -// pending rows and re-enqueues them to the same batch workers (up to +// Emit writes to a DurableEventStore then hands the event to the BatchEmitter +// for async delivery. The delivery callback from BatchEmitter marks the row +// delivered; the purge loop removes delivered rows from Postgres. When the +// batch emitter buffer is full or the network is down, a retransmit loop lists +// stale pending rows and re-enqueues them through the same BatchEmitter (up to // RetransmitBatchSize per tick). // // A separate expiry loop garbage-collects events older than EventTTL to bound // table growth. -// publishWork is a unit of work for the batch publish channel. -type publishWork struct { - id int64 - payload []byte // serialized CloudEvent proto (always set) - event *chipingress.CloudEventPb // original struct; set from Emit(), nil from retransmit -} // insertRequest is a single Emit() caller waiting for a coalesced batch INSERT. type insertRequest struct { @@ -141,8 +144,20 @@ type DurableEmitter struct { services.Service eng *services.Engine - store DurableEventStore - client chipingress.Client + store DurableEventStore + batchEmitter BatchEmitter + // fallbackClient, when non-nil, is used for single-event per-RPC retry + // whenever the batch emitter reports a delivery failure. Each failed event + // is retried individually via Publish (not PublishBatch) in a goroutine. + // If the single-event retry also fails the event stays in the DB and the + // retransmit loop will eventually deliver it. DurableEmitter owns this + // client and closes it during shutdown. + fallbackClient chipingress.Client + // fallbackWg tracks in-flight single-event fallback goroutines. It is + // waited on after batchEmitter.Stop() so that all fallback attempts that + // were spawned during the final flush can complete before we close the + // fallback client connection. + fallbackWg sync.WaitGroup // isHostProcess determines if the emitter runs retransmit and cleanup loops. // Should be set to false when initialized inside LOOP plugins. isHostProcess bool @@ -150,10 +165,6 @@ type DurableEmitter struct { metrics *durableEmitterMetrics - // rawConn is the underlying gRPC connection when the client exposes it. - // Non-nil enables zero-copy batch publishing (protowire + ForceCodec). - rawConn *grpc.ClientConn - // batchInserter is non-nil when the store supports multi-row INSERTs // and InsertBatchSize > 0. batchInserter BatchInserter @@ -171,65 +182,11 @@ type DurableEmitter struct { pendingCount atomic.Int64 pendingMax atomic.Int64 - // publishCh buffers events for the batch publish loop. - publishCh chan publishWork - - // stopCh signals background loops to exit. It is owned by DurableEmitter - // (not the service engine) because shutdown must drain coalesced inserts - // and close publishCh in a specific order after all workers stop. + // stopCh signals background loops to exit. stopCh services.StopChan wg sync.WaitGroup } -// grpcConnProvider is an optional interface for clients that expose the -// underlying gRPC connection. When satisfied, DurableEmitter uses zero-copy -// batch publishing to avoid protobuf marshal/unmarshal overhead. -type grpcConnProvider interface { - Conn() *grpc.ClientConn -} - -// rawBytesCodec is a gRPC codec that passes []byte through without -// marshal/unmarshal. Name returns "proto" so the wire content-type is -// application/grpc+proto, matching what Chip Ingress expects. -type rawBytesCodec struct{} - -func (rawBytesCodec) Marshal(v any) ([]byte, error) { - b, ok := v.([]byte) - if !ok { - return nil, fmt.Errorf("rawBytesCodec.Marshal: expected []byte, got %T", v) - } - return b, nil -} - -func (rawBytesCodec) Unmarshal(data []byte, v any) error { - bp, ok := v.(*[]byte) - if !ok { - return fmt.Errorf("rawBytesCodec.Unmarshal: expected *[]byte, got %T", v) - } - *bp = data - return nil -} - -func (rawBytesCodec) Name() string { return "proto" } - -var _ grpcEncoding.Codec = rawBytesCodec{} - -// buildBatchBytes constructs the protobuf wire format for a CloudEventBatch -// directly from already-serialized CloudEvent payloads. Each payload is -// wrapped with the field-1 tag and length prefix — no unmarshal/re-marshal. -func buildBatchBytes(payloads [][]byte) []byte { - size := 0 - for _, p := range payloads { - size += protowire.SizeTag(1) + protowire.SizeBytes(len(p)) - } - buf := make([]byte, 0, size) - for _, p := range payloads { - buf = protowire.AppendTag(buf, 1, protowire.BytesType) - buf = protowire.AppendBytes(buf, p) - } - return buf -} - // Compile-time assertion that *DurableEmitter exposes the canonical emit and // close methods expected of an emitter. var _ interface { @@ -239,13 +196,25 @@ var _ interface { // NewDurableEmitter constructs a DurableEmitter as a services.Service. // +// batchEmitter is the transport layer (typically *batch.Client from +// pkg/chipingress/batch) responsible for batched gRPC delivery, seqnum +// stamping, size splitting, and concurrency limiting. +// +// fallbackClient, when non-nil, is used to retry individual events via a +// direct unary Publish RPC whenever the batch emitter reports a delivery +// failure. This gives a fast second-chance path before the DB-backed +// retransmit loop kicks in. Pass nil to disable single-event fallback +// (events are left in the DB and delivered by the retransmit loop). +// DurableEmitter takes ownership of fallbackClient and closes it on Stop. +// // meter is the OpenTelemetry Meter used for instrument registration. It is // required when cfg.Metrics is non-nil and must be supplied by the caller // (e.g. otel.Meter("durableemitter") or a meter from the host's // MeterProvider). Pass nil when metrics are disabled. func NewDurableEmitter( store DurableEventStore, - client chipingress.Client, + batchEmitter BatchEmitter, + fallbackClient chipingress.Client, isHostProcess bool, cfg DurableEmitterConfig, lggr logger.Logger, @@ -254,15 +223,12 @@ func NewDurableEmitter( if store == nil { return nil, errors.New("durable event store is nil") } - if client == nil { - return nil, errors.New("chipingress client is nil") + if batchEmitter == nil { + return nil, errors.New("batch emitter is nil") } if lggr == nil { return nil, errors.New("logger is nil") } - if cfg.PublishBatchSize < 1 { - cfg.PublishBatchSize = 1 - } var m *durableEmitterMetrics if cfg.Metrics != nil { if meter == nil { @@ -276,12 +242,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - client: client, - isHostProcess: isHostProcess, - cfg: cfg, - metrics: m, - stopCh: make(chan struct{}), + store: store, + batchEmitter: batchEmitter, + fallbackClient: fallbackClient, + isHostProcess: isHostProcess, + cfg: cfg, + metrics: m, + stopCh: make(chan struct{}), } d.Service, d.eng = services.Config{ Name: "DurableEmitter", @@ -289,10 +256,6 @@ func NewDurableEmitter( Close: d.stop, }.NewServiceEngine(lggr) - if cp, ok := client.(grpcConnProvider); ok { - d.rawConn = cp.Conn() - d.eng.Infow("DurableEmitter: raw-codec batch publishing enabled (zero-copy protowire)") - } if cfg.InsertBatchSize > 0 { if bi, ok := store.(BatchInserter); ok { d.batchInserter = bi @@ -307,33 +270,24 @@ func NewDurableEmitter( "insertBatchFlushInterval", cfg.InsertBatchFlushInterval) } } - bufSize := cfg.PublishBatchChannelSize - if bufSize <= 0 { - bufSize = cfg.PublishBatchSize * 2000 - if bufSize < 200_000 { - bufSize = 200_000 - } - } - d.publishCh = make(chan publishWork, bufSize) return d, nil } -// start launches the retransmit, expiry, purge, and (optionally) batch publish -// background loops. It is invoked by the services.Engine when the embedded -// Service is started; callers should not call this directly. The supplied -// context must not be retained beyond start (per services.Service contract); -// loops use d.stopCh.NewCtx() for their own lifecycle. -func (d *DurableEmitter) start(_ context.Context) error { - batchWorkers := d.cfg.PublishBatchWorkers - if batchWorkers <= 0 { - d.eng.Warnw("configured batchWorkers <=0; defaulting to 1") - batchWorkers = 1 - } +// start launches the retransmit, expiry, purge, and insert-coalescing background +// loops, then starts the batch emitter transport. It is invoked by the +// services.Engine when the embedded Service is started. +func (d *DurableEmitter) start(ctx context.Context) error { + d.batchEmitter.Start(ctx) + insertWorkers := d.cfg.InsertBatchWorkers if insertWorkers <= 0 { - d.eng.Warnw("configured insertWorkers <=0; defaulting to 4") insertWorkers = 4 } + if d.insertCh != nil { + for i := 0; i < insertWorkers; i++ { + d.wg.Go(d.insertBatchLoop) + } + } if d.isHostProcess { d.wg.Go(d.retransmitLoop) @@ -342,23 +296,16 @@ func (d *DurableEmitter) start(_ context.Context) error { d.wg.Go(d.purgeLoop) } } - if d.insertCh != nil { - for i := 0; i < insertWorkers; i++ { - d.wg.Go(d.insertBatchLoop) - } - } - for i := 0; i < batchWorkers; i++ { - d.wg.Go(d.batchPublishLoop) - } if d.metrics != nil && d.cfg.Metrics != nil { d.wg.Go(d.metricsLoop) } return nil } -// Emit persists the event then enqueues it for async PublishBatch delivery. Returns nil once -// the insert is accepted (or the coalesced insert path completes successfully). Returns an -// error when the service is not in the Started state (e.g. before Start or after Close). +// Emit persists the event then hands it to the BatchEmitter for async delivery. +// Returns nil once the insert is accepted (or the coalesced insert path +// completes successfully). Returns an error when the service is not in the +// Started state (e.g. before Start or after Close). func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { return d.eng.IfStarted(func() error { tEmitTotal := time.Now() @@ -468,32 +415,114 @@ func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) d.incPending(1) - // Batch path: enqueue for batch publish loop (PublishBatchSize is always >= 1). - work := publishWork{id: id, payload: payload} - if d.rawConn == nil { - work.event = eventPb - } - select { - case d.publishCh <- work: - return nil - default: - // Channel full — event is safely in the DB; retransmit loop will deliver it. - d.eng.Warnw("DurableEmitter: batch publish channel full, relying on retransmit", - "id", id, "ch_len", len(d.publishCh), "ch_cap", cap(d.publishCh)) + // Hand off to the batch emitter. The callback fires once the batch + // containing this event is sent (success or failure). eventPb is + // captured in the closure so the fallback path can retry without a + // DB round-trip. + t0Publish := time.Now() + if qErr := d.batchEmitter.QueueMessage(eventPb, d.deliveryCallback(id, eventPb, t0Publish)); qErr != nil { + // Buffer full — event is safely in the DB; retransmit loop will deliver it. + d.eng.Warnw("DurableEmitter: batch emitter buffer full, relying on retransmit", "id", id) } return nil }) } -// stop signals background loops to stop and waits for them to finish. It is -// invoked by the services.Engine when the embedded Service is closed; callers -// should not call this directly. -// -// When coalesced inserts are enabled, insertShutdown and insertInFlight drain -// before close(stopCh) so Emit can finish enqueueing to publishCh after a -// successful insert (receive on a closed stopCh in select would race with -// default). Then stopCh is closed, workers exit, and publishCh is closed -// after wg.Wait. +// deliveryCallback returns the function passed to BatchEmitter.QueueMessage. +// On success, it marks the event delivered. On failure, it attempts a +// single-event fallback via fallbackClient (when configured) in a goroutine +// before leaving the event in the DB for the retransmit loop. +func (d *DurableEmitter) deliveryCallback(id int64, eventPb *chipingress.CloudEventPb, t0Publish time.Time) func(error) { + return func(sendErr error) { + publishElapsed := time.Since(t0Publish) + + if h := d.cfg.Hooks; h != nil && h.OnBatchPublish != nil { + h.OnBatchPublish(publishElapsed, 1, sendErr) + } + + cbCtx, cbCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer cbCancel() + + d.metrics.recordPublish(cbCtx, publishElapsed, "batch", sendErr) + + if sendErr != nil { + if d.metrics != nil { + d.metrics.publishBatchEvErr.Add(cbCtx, 1) + } + // Batch path failed. If a fallback client is configured, retry the + // single event directly; otherwise leave in DB for retransmit. + d.tryFallback(id, eventPb) + return + } + + if d.metrics != nil { + d.metrics.publishBatchEvOK.Add(cbCtx, 1) + } + + tMark := time.Now() + marked, markErr := d.store.MarkDeliveredBatch(cbCtx, []int64{id}) + markElapsed := time.Since(tMark) + + if h := d.cfg.Hooks; h != nil && h.OnBatchMarkDelivered != nil { + h.OnBatchMarkDelivered(markElapsed, int(marked)) + } + if markErr != nil { + d.eng.Errorw("failed to mark event delivered", "id", id, "error", markErr) + return + } + d.decPending(marked) + if d.metrics != nil { + d.metrics.deliverComplete.Add(cbCtx, marked) + } + } +} + +// tryFallback spawns a goroutine that retries a single event via the direct +// chipingress.Client.Publish RPC. If fallbackClient is nil this is a no-op +// and the event is left in the DB for the retransmit loop. +func (d *DurableEmitter) tryFallback(id int64, eventPb *chipingress.CloudEventPb) { + if d.fallbackClient == nil { + return + } + d.fallbackWg.Add(1) + go func() { + defer d.fallbackWg.Done() + d.singleEventFallback(id, eventPb) + }() +} + +// singleEventFallback sends a single event directly via the fallback +// chipingress.Client. On success it marks the event delivered and decrements +// the pending counter. On failure it logs and returns — the event remains in +// the DB and the retransmit loop will eventually deliver it. +func (d *DurableEmitter) singleEventFallback(id int64, eventPb *chipingress.CloudEventPb) { + pubCtx, pubCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer pubCancel() + + if _, err := d.fallbackClient.Publish(pubCtx, eventPb); err != nil { + d.eng.Warnw("DurableEmitter: single-event fallback publish failed, relying on retransmit", + "id", id, "error", err) + return + } + + markCtx, markCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer markCancel() + + marked, markErr := d.store.MarkDeliveredBatch(markCtx, []int64{id}) + if markErr != nil { + d.eng.Errorw("DurableEmitter: failed to mark fallback event delivered", "id", id, "error", markErr) + return + } + d.decPending(marked) + if d.metrics != nil { + d.metrics.deliverComplete.Add(markCtx, marked) + } +} + +// stop signals background loops to stop and waits for them to finish, then +// stops the batch emitter (which flushes any queued events and waits for all +// in-flight callbacks). It is invoked by the services.Engine when the embedded +// Service is closed. func (d *DurableEmitter) stop() error { if d.insertCh != nil { d.insertShutdown.Store(true) @@ -504,14 +533,22 @@ func (d *DurableEmitter) stop() error { } close(d.stopCh) d.wg.Wait() - close(d.publishCh) + // Stop the batch emitter: flushes remaining queued events, waits for all + // in-flight PublishBatch RPCs, and waits for all delivery callbacks. + // Delivery callbacks may spawn single-event fallback goroutines tracked by + // fallbackWg, so we wait on those next. + d.batchEmitter.Stop() + d.fallbackWg.Wait() + if d.fallbackClient != nil { + if err := d.fallbackClient.Close(); err != nil { + d.eng.Warnw("DurableEmitter: error closing fallback chip client", "error", err) + } + } return nil } // insertBatchLoop collects insertRequest items from insertCh and flushes them -// as multi-row INSERTs via BatchInserter.InsertBatch. Uses a linger pattern: -// blocks for the first request, then collects more until the batch is full or -// the flush interval elapses. +// as multi-row INSERTs via BatchInserter.InsertBatch. func (d *DurableEmitter) insertBatchLoop() { batchSize := d.cfg.InsertBatchSize linger := d.cfg.InsertBatchFlushInterval @@ -523,15 +560,12 @@ func (d *DurableEmitter) insertBatchLoop() { for { batch = batch[:0] - // Exit only when insertCh is closed and drained; do not exit on stopCh - // or Emit callers blocked on req.result would hang. req, ok := <-d.insertCh if !ok { return } batch = append(batch, req) - // Linger to collect more. timer := time.NewTimer(linger) collecting: for len(batch) < batchSize { @@ -552,8 +586,6 @@ func (d *DurableEmitter) insertBatchLoop() { for i, r := range batch { payloads[i] = r.payload } - // Detached from stopCh so closing stopCh does not cancel inserts while - // draining insertCh during shutdown. ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) ids, batchErr := d.batchInserter.InsertBatch(ctx, payloads) cancel() @@ -606,213 +638,6 @@ func (d *DurableEmitter) decPending(n int64) { } } -// batchPublishLoop reads events from publishCh, collects them into batches of -// PublishBatchSize, and sends each batch via PublishBatch RPC. It blocks until -// the batch is full or PublishBatchFlushInterval elapses after the first event -// arrives (linger pattern), guaranteeing full batches at high throughput. -func (d *DurableEmitter) batchPublishLoop() { - batchSize := d.cfg.PublishBatchSize - flushInterval := d.cfg.PublishBatchFlushInterval - if flushInterval <= 0 { - flushInterval = 50 * time.Millisecond - } - - batch := make([]publishWork, 0, batchSize) - - for { - // Stage 1: block until at least one event arrives (or shutdown). - select { - case w, ok := <-d.publishCh: - if !ok { - return - } - batch = append(batch, w) - case <-d.stopCh: - d.drainPublishChOnShutdown(batch) - return - } - - // Stage 2: linger — keep reading until batch is full or deadline. - linger := time.NewTimer(flushInterval) - fill: - for len(batch) < batchSize { - select { - case w, ok := <-d.publishCh: - if !ok { - linger.Stop() - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - batch = append(batch, w) - case <-linger.C: - break fill - case <-d.stopCh: - linger.Stop() - d.drainPublishChOnShutdown(batch) - return - } - } - linger.Stop() - - d.flushBatch(batch) - batch = batch[:0] - } -} - -// drainPublishChOnShutdown empties publishCh after stopCh has fired but before -// Close closes publishCh; finishes any in-flight batches. -func (d *DurableEmitter) drainPublishChOnShutdown(batch []publishWork) { - bs := d.cfg.PublishBatchSize - if bs < 1 { - bs = 1 - } - for { - select { - case w, ok := <-d.publishCh: - if !ok { - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - batch = append(batch, w) - if len(batch) >= bs { - d.flushBatch(batch) - batch = batch[:0] - } - default: - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - } -} - -// drainPublishCh flushes the given partial batch plus any remaining items on -// publishCh in batchSize chunks. Called during shutdown (ctx cancelled or -// channel closed). -func (d *DurableEmitter) drainPublishCh(batch []publishWork) { - for w := range d.publishCh { - batch = append(batch, w) - if len(batch) >= d.cfg.PublishBatchSize { - d.flushBatch(batch) - batch = batch[:0] - } - } - if len(batch) > 0 { - d.flushBatch(batch) - } -} - -// flushBatch publishes a collected batch via PublishBatch and marks all events -// as delivered in a single MarkDeliveredBatch call. -// -// When rawConn is available, batch wire bytes are built directly from the -// already-serialized payloads using protowire — zero unmarshal/re-marshal. -// Otherwise, falls back to the typed PublishBatch RPC. -func (d *DurableEmitter) flushBatch(batch []publishWork) { - ids := make([]int64, len(batch)) - for i, w := range batch { - ids[i] = w.id - } - - ctx, cancel := d.stopCh.NewCtx() - defer cancel() - pubCtx, pubCancel := context.WithTimeout(ctx, d.cfg.PublishTimeout) - defer pubCancel() - - t0 := time.Now() - var err error - if d.rawConn != nil { - err = d.flushBatchRaw(pubCtx, batch) - } else { - err = d.flushBatchTyped(pubCtx, batch) - } - elapsed := time.Since(t0) - - if h := d.cfg.Hooks; h != nil && h.OnBatchPublish != nil { - h.OnBatchPublish(elapsed, len(batch), err) - } - d.metrics.recordPublish(ctx, elapsed, "batch", err) - - if err != nil { - if d.metrics != nil { - d.metrics.publishBatchEvErr.Add(ctx, int64(len(batch))) - } - d.eng.Warnw("DurableEmitter: PublishBatch failed, events will be retransmitted", - "batch_size", len(batch), "error", err, - "elapsed_ms", elapsed.Milliseconds()) - return - } - - if d.metrics != nil { - d.metrics.publishBatchEvOK.Add(pubCtx, int64(len(batch))) - } - - // Async MarkDelivered: the DB UPDATE runs in a background goroutine so - // the batch worker can immediately start collecting the next batch. - // If MarkDelivered fails, events stay pending and the retransmit loop - // delivers them (at-least-once semantics are unchanged). - d.wg.Go(func() { - tMark := time.Now() - marked, markErr := d.store.MarkDeliveredBatch(ctx, ids) - markElapsed := time.Since(tMark) - if h := d.cfg.Hooks; h != nil && h.OnBatchMarkDelivered != nil { - h.OnBatchMarkDelivered(markElapsed, int(marked)) - } - if markErr != nil { - d.eng.Errorw("failed to batch-mark events delivered", "batch_size", len(ids), "error", markErr) - return - } - d.decPending(marked) - if d.metrics != nil { - d.metrics.deliverComplete.Add(ctx, marked) - } - }) -} - -// flushBatchRaw builds the CloudEventBatch wire format directly from -// already-serialized payloads and sends it via raw gRPC Invoke — zero -// protobuf unmarshal/re-marshal overhead. -func (d *DurableEmitter) flushBatchRaw(ctx context.Context, batch []publishWork) error { - payloads := make([][]byte, len(batch)) - for i, w := range batch { - payloads[i] = w.payload - } - batchBytes := buildBatchBytes(payloads) - var respBytes []byte - return d.rawConn.Invoke( - ctx, - pb.ChipIngress_PublishBatch_FullMethodName, - batchBytes, - &respBytes, - grpc.ForceCodec(rawBytesCodec{}), - ) -} - -// flushBatchTyped builds a typed CloudEventBatch and sends it via the -// standard gRPC client. Used as fallback when rawConn is not available. -func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWork) error { - events := make([]*chipingress.CloudEventPb, len(batch)) - for i, w := range batch { - if w.event != nil { - events[i] = w.event - } else { - ev := new(chipingress.CloudEventPb) - if err := proto.Unmarshal(w.payload, ev); err != nil { - return fmt.Errorf("unmarshal event %d for typed publish: %w", i, err) - } - events[i] = ev - } - } - batchPb := &chipingress.CloudEventBatch{Events: events} - _, err := d.client.PublishBatch(ctx, batchPb) - return err -} - func (d *DurableEmitter) retransmitLoop() { ticker := time.NewTicker(d.cfg.RetransmitInterval) defer ticker.Stop() @@ -862,11 +687,10 @@ func (d *DurableEmitter) retransmitPending() { d.retransmit(pending) } -// retransmit enqueues pending DB rows to publishCh so the batch workers handle -// publishing. When rawConn is set, payloads are passed through without -// proto.Unmarshal — the batch workers use buildBatchBytes for the wire format. +// retransmit re-enqueues pending DB rows through the batch emitter. Each row +// gets its own delivery callback that marks it delivered on success. func (d *DurableEmitter) retransmit(pending []DurableEvent) { - var enqueued int + var enqueued, skipped int for _, pe := range pending { select { @@ -874,24 +698,53 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { return default: } - work := publishWork{id: pe.ID, payload: pe.Payload} - select { - case d.publishCh <- work: + eventPb := new(chipingress.CloudEventPb) + if err := proto.Unmarshal(pe.Payload, eventPb); err != nil { + d.eng.Errorw("DurableEmitter: failed to unmarshal event for retransmit", "id", pe.ID, "error", err) + continue + } + + id := pe.ID + if err := d.batchEmitter.QueueMessage(eventPb, d.retransmitCallback(id, eventPb)); err != nil { + skipped++ + } else { enqueued++ - default: } } - d.eng.Infow("DurableEmitter: retransmit enqueued to batch workers", + d.eng.Infow("DurableEmitter: retransmit queued to batch emitter", "enqueued", enqueued, - "skipped_ch_full", len(pending)-enqueued, + "skipped_buffer_full", skipped, "total_pending", len(pending), - "ch_len", len(d.publishCh), - "ch_cap", cap(d.publishCh), ) } +// retransmitCallback returns the delivery callback used for retransmit rows. +// When the batch emitter reports failure, it falls back to a single-event +// direct Publish (same as the primary delivery path). +func (d *DurableEmitter) retransmitCallback(id int64, eventPb *chipingress.CloudEventPb) func(error) { + return func(sendErr error) { + if sendErr != nil { + // Batch path failed; try single-event fallback before leaving to next tick. + d.tryFallback(id, eventPb) + return + } + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer cancel() + + marked, markErr := d.store.MarkDeliveredBatch(ctx, []int64{id}) + if markErr != nil { + d.eng.Errorw("failed to mark retransmit event delivered", "id", id, "error", markErr) + return + } + d.decPending(marked) + if d.metrics != nil { + d.metrics.deliverComplete.Add(ctx, marked) + } + } +} + func (d *DurableEmitter) purgeLoop() { interval := d.cfg.PurgeInterval if interval <= 0 { diff --git a/pkg/durableemitter/durable_emitter_test.go b/pkg/durableemitter/durable_emitter_test.go index 7bff67963a..3846750966 100644 --- a/pkg/durableemitter/durable_emitter_test.go +++ b/pkg/durableemitter/durable_emitter_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -39,76 +40,70 @@ func newTestMeter(t *testing.T) (metric.Meter, *sdkmetric.ManualReader) { return mp.Meter("durableemitter"), reader } -// testChipClient is a minimal chipingress.Client for tests. -type testChipClient struct { - chipingress.NoopClient - - mu sync.Mutex - publishErr error - publishCount atomic.Int64 - batchCount atomic.Int64 - publishedIDs []string +// testBatchEmitter is a minimal BatchEmitter for unit tests. +// QueueMessage invokes the callback asynchronously (like batch.Client), using +// publishErr as the result. callCount tracks how many events were enqueued. +type testBatchEmitter struct { + mu sync.Mutex + publishErr error + callCount atomic.Int64 + + stopOnce sync.Once + stopCh chan struct{} + wg sync.WaitGroup } -func (c *testChipClient) Publish(_ context.Context, ev *chipingress.CloudEventPb, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { - c.publishCount.Add(1) - c.mu.Lock() - if ev != nil { - c.publishedIDs = append(c.publishedIDs, ev.Id) - } - err := c.publishErr - c.mu.Unlock() - return &chipingress.PublishResponse{}, err +func newTestBatchEmitter() *testBatchEmitter { + return &testBatchEmitter{stopCh: make(chan struct{})} } -// PublishBatch mirrors production semantics: respect publishErr and count as a -// separate RPC (batch path / tests that assert Publish only would miss it). -func (c *testChipClient) PublishBatch(_ context.Context, b *chipingress.CloudEventBatch, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { - c.batchCount.Add(1) - c.mu.Lock() - if b != nil { - for _, ev := range b.Events { - if ev != nil { - c.publishedIDs = append(c.publishedIDs, ev.Id) - } - } +func (b *testBatchEmitter) QueueMessage(event *chipingress.CloudEventPb, cb func(error)) error { + select { + case <-b.stopCh: + return errors.New("batch emitter stopped") + default: } - err := c.publishErr - c.mu.Unlock() - return &chipingress.PublishResponse{}, err + b.mu.Lock() + err := b.publishErr + b.mu.Unlock() + + b.callCount.Add(1) + if cb != nil { + b.wg.Add(1) + go func() { + defer b.wg.Done() + cb(err) + }() + } + return nil } -// totalChipRPCs is unary Publish + PublishBatch for assertions that do not care which path ran. -func (c *testChipClient) totalChipRPCs() int64 { - return c.publishCount.Load() + c.batchCount.Load() -} +func (b *testBatchEmitter) Start(_ context.Context) {} -func (c *testChipClient) setPublishErr(err error) { - c.mu.Lock() - defer c.mu.Unlock() - c.publishErr = err +func (b *testBatchEmitter) Stop() { + b.stopOnce.Do(func() { + close(b.stopCh) + b.wg.Wait() + }) } -func (c *testChipClient) getPublishedIDs() []string { - c.mu.Lock() - defer c.mu.Unlock() - out := make([]string, len(c.publishedIDs)) - copy(out, c.publishedIDs) - return out +func (b *testBatchEmitter) setPublishErr(err error) { + b.mu.Lock() + defer b.mu.Unlock() + b.publishErr = err } func testEmitAttrs() []any { return []any{"source", "test-source", "type", "test-type"} } -func newTestDurableEmitter(t *testing.T, store DurableEventStore, client chipingress.Client, cfgOverride *DurableEmitterConfig) *DurableEmitter { +func newTestDurableEmitter(t *testing.T, store DurableEventStore, be BatchEmitter, cfgOverride *DurableEmitterConfig) *DurableEmitter { t.Helper() cfg := DefaultDurableEmitterConfig() if cfgOverride != nil { cfg = *cfgOverride } - // Tests that need metrics build the emitter directly with an injected meter. - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) return em } @@ -132,13 +127,13 @@ func TestDurableEmitter_CloseCoalescedInsertShutdown(t *testing.T) { MemDurableEventStore: NewMemDurableEventStore(), stall: stall, } - client := &testChipClient{} + be := newTestBatchEmitter() cfg := DefaultDurableEmitterConfig() cfg.InsertBatchSize = 1 cfg.InsertBatchWorkers = 1 cfg.DisablePruning = true - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) ctx := t.Context() require.NoError(t, em.Start(ctx)) @@ -177,14 +172,14 @@ func TestDurableEmitter_CloseCoalescedInsertShutdown(t *testing.T) { func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() var pubCalls, markCalls atomic.Int32 cfg := DefaultDurableEmitterConfig() cfg.Hooks = &DurableEmitterHooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -197,15 +192,15 @@ func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("down")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("down")) var pubCalls, markCalls atomic.Int32 cfg := DefaultDurableEmitterConfig() cfg.Hooks = &DurableEmitterHooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -217,8 +212,8 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("chip unavailable")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("chip unavailable")) cfg := DefaultDurableEmitterConfig() cfg.RetransmitInterval = 40 * time.Millisecond @@ -226,7 +221,7 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { cfg.ExpiryInterval = 40 * time.Millisecond cfg.EventTTL = 25 * time.Millisecond - em, err := NewDurableEmitter(store, client, false, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, false, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -234,21 +229,21 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("plugin-row"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return client.batchCount.Load() >= 1 && store.Len() == 1 - }, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row") + return be.callCount.Load() >= 1 && store.Len() == 1 + }, 2*time.Second, 5*time.Millisecond, "initial QueueMessage should fail and leave the row") // Several host-only ticks would have cleared or retried by now. time.Sleep(250 * time.Millisecond) assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops") - assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit") + assert.Equal(t, int64(1), be.callCount.Load(), "non-host must not schedule extra QueueMessage via retransmit") } func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() - em, err := NewDurableEmitter(store, client, false, DefaultDurableEmitterConfig(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, false, DefaultDurableEmitterConfig(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -256,23 +251,22 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) require.NoError(t, em.Emit(ctx, []byte("loop-plugin"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false") + return store.Len() == 0 && be.callCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "batch emitter must deliver even when isHostProcess is false") } func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) require.NoError(t, err) - // Batch publish should fire (PublishBatch with batch size 1) and delete the record. require.Eventually(t, func() bool { - return client.batchCount.Load() == 1 + return be.callCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { @@ -282,19 +276,18 @@ func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { func TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("connection refused")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("connection refused")) - em := newTestDurableEmitter(t, store, client, nil) + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) require.NoError(t, err, "Emit must succeed once the DB insert succeeds") - // Wait for the async PublishBatch attempt to complete. require.Eventually(t, func() bool { - return client.batchCount.Load() == 1 + return be.callCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) // Event must remain in the store for retransmit. @@ -303,46 +296,43 @@ func TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails(t *testing.T) { func TestDurableEmitter_RetransmitLoopDeliversFailedEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("connection refused")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("connection refused")) cfg := DefaultDurableEmitterConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("retry-me"), testEmitAttrs()...) require.NoError(t, err) - // Wait until the async immediate path has run with the error and the row - // is still pending (not a success race after we clear the error). require.Eventually(t, func() bool { - return client.batchCount.Load() >= 1 && store.Len() == 1 - }, 2*time.Second, 5*time.Millisecond, "failed PublishBatch should leave the row") + return be.callCount.Load() >= 1 && store.Len() == 1 + }, 2*time.Second, 5*time.Millisecond, "failed delivery should leave the row") - client.setPublishErr(nil) + be.setPublishErr(nil) require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond, "retransmit loop should eventually deliver and delete the event") - // At least: one failed PublishBatch + one successful delivery (retransmit uses PublishBatch too). - assert.GreaterOrEqual(t, client.batchCount.Load(), int64(2)) + assert.GreaterOrEqual(t, be.callCount.Load(), int64(2)) } func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("immediate fail")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("immediate fail")) cfg := DefaultDurableEmitterConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() @@ -350,33 +340,26 @@ func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("second"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return client.batchCount.Load() >= 2 && store.Len() == 2 - }, 2*time.Second, 5*time.Millisecond, "both failed PublishBatch calls should leave two rows") + return be.callCount.Load() >= 2 && store.Len() == 2 + }, 2*time.Second, 5*time.Millisecond, "both failed deliveries should leave two rows") - client.setPublishErr(nil) + be.setPublishErr(nil) require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond) - - ids := client.getPublishedIDs() - require.GreaterOrEqual(t, len(ids), 4, "two failed attempts then two successful deliveries (IDs recorded)") - require.GreaterOrEqual(t, client.batchCount.Load(), int64(4)) - a, b := ids[len(ids)-2], ids[len(ids)-1] - assert.NotEmpty(t, a) - assert.NotEmpty(t, b) - assert.NotEqualf(t, a, b, "retransmit must publish two distinct CloudEvents, not one pointer reused for every row") + assert.GreaterOrEqual(t, be.callCount.Load(), int64(4)) } func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("always fail")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("always fail")) cfg := DefaultDurableEmitterConfig() cfg.ExpiryInterval = 100 * time.Millisecond cfg.EventTTL = 50 * time.Millisecond cfg.RetransmitInterval = 10 * time.Minute // effectively disable retransmit - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() @@ -391,7 +374,7 @@ func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { func TestDurableEmitter_RetransmitDeliversManuallyInsertedRow(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() ev, err := chipingress.NewEvent("unknown-domain", "t", []byte("b"), nil) require.NoError(t, err) @@ -407,18 +390,18 @@ func TestDurableEmitter_RetransmitDeliversManuallyInsertedRow(t *testing.T) { cfg.RetransmitInterval = 50 * time.Millisecond cfg.RetransmitAfter = 30 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) require.Eventually(t, func() bool { - return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 3*time.Second, 20*time.Millisecond, "pending row should be delivered via PublishBatch") + return store.Len() == 0 && be.callCount.Load() >= 1 + }, 3*time.Second, 20*time.Millisecond, "pending row should be delivered via batch emitter") } func TestDurableEmitter_EmitRejectsInvalidAttributes(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) err := em.Emit(context.Background(), []byte("no-attrs")) require.Error(t, err) @@ -427,8 +410,8 @@ func TestDurableEmitter_EmitRejectsInvalidAttributes(t *testing.T) { func TestDurableEmitter_MultipleEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() @@ -439,7 +422,7 @@ func TestDurableEmitter_MultipleEvents(t *testing.T) { } require.Eventually(t, func() bool { - return client.batchCount.Load() == int64(n) + return be.callCount.Load() == int64(n) }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { @@ -450,24 +433,25 @@ func TestDurableEmitter_MultipleEvents(t *testing.T) { func TestNewDurableEmitter_ValidationErrors(t *testing.T) { log := logger.Test(t) cfg := DefaultDurableEmitterConfig() + be := newTestBatchEmitter() - _, err := NewDurableEmitter(nil, &testChipClient{}, true, cfg, log, nil) + _, err := NewDurableEmitter(nil, be, nil, true, cfg, log, nil) assert.ErrorContains(t, err, "store") - _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, true, cfg, log, nil) - assert.ErrorContains(t, err, "client") + _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, nil, true, cfg, log, nil) + assert.ErrorContains(t, err, "batch emitter") - _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfg, nil, nil) + _, err = NewDurableEmitter(NewMemDurableEventStore(), be, nil, true, cfg, nil, nil) assert.ErrorContains(t, err, "logger") cfgWithMetrics := cfg cfgWithMetrics.Metrics = &DurableEmitterMetricsConfig{} - _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfgWithMetrics, log, nil) + _, err = NewDurableEmitter(NewMemDurableEventStore(), be, nil, true, cfgWithMetrics, log, nil) assert.ErrorContains(t, err, "meter") } func TestDurableEmitter_HealthReport(t *testing.T) { - em := newTestDurableEmitter(t, NewMemDurableEventStore(), &testChipClient{}, nil) + em := newTestDurableEmitter(t, NewMemDurableEventStore(), newTestBatchEmitter(), nil) servicetest.Run(t, em) report := em.HealthReport() @@ -480,12 +464,12 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) { meter, reader := newTestMeter(t) store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() cfg := DefaultDurableEmitterConfig() cfg.RetransmitInterval = time.Hour cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond} - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), meter) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), meter) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -600,36 +584,148 @@ func newChipClient(t *testing.T, addr string) chipingress.Client { t.Helper() c, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) require.NoError(t, err) - t.Cleanup(func() { _ = c.Close() }) return c } +// newIntegrationBatchEmitter creates a batch.Client suitable for integration tests. +// Batch size 1 and a short interval ensure one event per RPC, matching the +// assertion style of the integration tests. +func newIntegrationBatchEmitter(t *testing.T, addr string) *batch.Client { + t.Helper() + chipClient := newChipClient(t, addr) + bc, err := batch.NewBatchClient(chipClient, + batch.WithBatchSize(1), + batch.WithBatchInterval(10*time.Millisecond), + batch.WithMaxPublishTimeout(2*time.Second), + batch.WithShutdownTimeout(5*time.Second), + ) + require.NoError(t, err) + return bc +} + func emitAttrs() []any { return []any{"source", "test-domain", "type", "test-entity"} } func fastCfg() DurableEmitterConfig { return DurableEmitterConfig{ - PublishBatchSize: 1, - PublishBatchFlushInterval: 10 * time.Millisecond, - RetransmitInterval: 100 * time.Millisecond, - RetransmitAfter: 50 * time.Millisecond, - RetransmitBatchSize: 50, - ExpiryInterval: 200 * time.Millisecond, - EventTTL: 500 * time.Millisecond, - PublishTimeout: 2 * time.Second, + RetransmitInterval: 100 * time.Millisecond, + RetransmitAfter: 50 * time.Millisecond, + RetransmitBatchSize: 50, + ExpiryInterval: 200 * time.Millisecond, + EventTTL: 500 * time.Millisecond, + PublishTimeout: 2 * time.Second, } } +// testFallbackClient is a minimal chipingress.Client for fallback testing. +type testFallbackClient struct { + chipingress.NoopClient + + mu sync.Mutex + publishErr error + publishCount atomic.Int64 + closed atomic.Bool +} + +func (c *testFallbackClient) Publish(_ context.Context, _ *chipingress.CloudEventPb, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { + c.mu.Lock() + err := c.publishErr + c.mu.Unlock() + c.publishCount.Add(1) + return &chipingress.PublishResponse{}, err +} + +func (c *testFallbackClient) Close() error { + c.closed.Store(true) + return nil +} + +func (c *testFallbackClient) setPublishErr(err error) { + c.mu.Lock() + defer c.mu.Unlock() + c.publishErr = err +} + +// TestDurableEmitter_FallbackDeliversOnBatchFailure verifies that when the +// batch emitter fails, the fallback client delivers the event directly and +// marks it delivered (removing it from the DB). +func TestDurableEmitter_FallbackDeliversOnBatchFailure(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + be.setPublishErr(errors.New("batch down")) + fallback := &testFallbackClient{} + + em, err := NewDurableEmitter(store, be, fallback, true, DefaultDurableEmitterConfig(), logger.Test(t), nil) + require.NoError(t, err) + servicetest.Run(t, em) + ctx := t.Context() + + require.NoError(t, em.Emit(ctx, []byte("needs-fallback"), testEmitAttrs()...)) + + // Batch emitter fires callback with error → fallback client should deliver. + require.Eventually(t, func() bool { + return fallback.publishCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "fallback client should receive one Publish call") + + // Event should be marked delivered and removed from the store. + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond, "event should be marked delivered after fallback") +} + +// TestDurableEmitter_FallbackFailureEventRemainsForRetransmit verifies that +// when both the batch emitter and the fallback client fail, the event stays +// in the DB for the retransmit loop to pick up. +func TestDurableEmitter_FallbackFailureEventRemainsForRetransmit(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + be.setPublishErr(errors.New("batch down")) + fallback := &testFallbackClient{} + fallback.setPublishErr(errors.New("fallback down too")) + + cfg := DefaultDurableEmitterConfig() + cfg.RetransmitInterval = 10 * time.Minute // disable retransmit for this test + em, err := NewDurableEmitter(store, be, fallback, true, cfg, logger.Test(t), nil) + require.NoError(t, err) + servicetest.Run(t, em) + ctx := t.Context() + + require.NoError(t, em.Emit(ctx, []byte("both-fail"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { + return fallback.publishCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "fallback should have been attempted") + + // Event must still be in the store for the retransmit loop. + time.Sleep(50 * time.Millisecond) + assert.Equal(t, 1, store.Len(), "event must remain in DB when fallback also fails") +} + +// TestDurableEmitter_FallbackClientClosedOnStop verifies that the fallback +// client's Close() method is called when DurableEmitter shuts down. +func TestDurableEmitter_FallbackClientClosedOnStop(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + fallback := &testFallbackClient{} + + em, err := NewDurableEmitter(store, be, fallback, true, DefaultDurableEmitterConfig(), logger.Test(t), nil) + require.NoError(t, err) + require.NoError(t, em.Start(t.Context())) + + require.NoError(t, em.Close()) + assert.True(t, fallback.closed.Load(), "fallback client should be closed after Stop") +} + // ---------- Test cases ---------- func TestIntegration_HappyPath(t *testing.T) { srv := &mockChipServer{} _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -647,14 +743,13 @@ func TestIntegration_HappyPath(t *testing.T) { } func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { - // Start with server returning UNAVAILABLE on PublishBatch. srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Unavailable, "chip down")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -665,7 +760,6 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { return srv.batchCount.Load() >= 1 && store.Len() == 1 }, 2*time.Second, 10*time.Millisecond, "failed PublishBatch should leave the row pending") - // "Recover" the server. srv.setBatchErr(nil) require.Eventually(t, func() bool { @@ -678,15 +772,14 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { } func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { - // Start server, then stop it to simulate total outage. srv := &mockChipServer{} gs, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.PublishTimeout = 500 * time.Millisecond - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -713,13 +806,9 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { go func() { _ = gs2.Serve(lis) }() t.Cleanup(func() { gs2.GracefulStop() }) - // Create a new client and DurableEmitter re-using the same store - // (simulating node restart with Postgres). - client2, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) - require.NoError(t, err) - t.Cleanup(func() { _ = client2.Close() }) - - em2, err := NewDurableEmitter(store, client2, true, cfg, logger.Test(t), nil) + // Create a new batch emitter re-using the same store (simulating node restart with Postgres). + be2 := newIntegrationBatchEmitter(t, addr) + em2, err := NewDurableEmitter(store, be2, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em2) @@ -735,12 +824,12 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { func TestIntegration_HighThroughput(t *testing.T) { srv := &mockChipServer{} _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.RetransmitBatchSize = 200 - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -764,13 +853,13 @@ func TestIntegration_EventExpiry(t *testing.T) { srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Internal, "permanent failure")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.EventTTL = 100 * time.Millisecond cfg.ExpiryInterval = 100 * time.Millisecond - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -785,14 +874,13 @@ func TestIntegration_EventExpiry(t *testing.T) { } func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { - // PublishBatch fails for each emit; retransmit recovers via PublishBatch. srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Unavailable, "reject batch")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -801,8 +889,6 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("retry-me"), emitAttrs()...)) } - // All five async PublishBatch attempts must observe the error before we clear - // it, or they succeed immediately and the retransmit loop has nothing to do. require.Eventually(t, func() bool { return srv.batchCount.Load() >= 5 && store.Len() == 5 }, 3*time.Second, 10*time.Millisecond, "all five PublishBatch RPCs should have failed and left five rows") @@ -812,7 +898,7 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond, - "retransmit should deliver via PublishBatch") + "retransmit should deliver via batch emitter") assert.GreaterOrEqual(t, srv.batchCount.Load(), int64(10), "five failed PublishBatch plus five successful PublishBatch (retransmit)") @@ -836,11 +922,10 @@ func TestIntegration_GRPCConnection(t *testing.T) { require.NoError(t, err) assert.Equal(t, "pong", pong.Message) - // Now use the chipingress.Client wrapper with DurableEmitter. - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() From 867b2be2146dc37de0dcd211830cd1d8ed21b3b1 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 12:11:32 -0400 Subject: [PATCH 02/12] Loop support --- ...ble_event_store.go => observable_store.go} | 0 pkg/durableemitter/store.go | 203 ++++++++++++++++++ pkg/loop/config.go | 19 +- pkg/loop/server.go | 2 + 4 files changed, 218 insertions(+), 6 deletions(-) rename pkg/durableemitter/{durable_event_store.go => observable_store.go} (100%) create mode 100644 pkg/durableemitter/store.go diff --git a/pkg/durableemitter/durable_event_store.go b/pkg/durableemitter/observable_store.go similarity index 100% rename from pkg/durableemitter/durable_event_store.go rename to pkg/durableemitter/observable_store.go diff --git a/pkg/durableemitter/store.go b/pkg/durableemitter/store.go new file mode 100644 index 0000000000..8b1b6d1198 --- /dev/null +++ b/pkg/durableemitter/store.go @@ -0,0 +1,203 @@ +package durableemitter + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" +) + +const chipDurableEventsTable = "cre.chip_durable_events" + +// PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. +type PgDurableEventStore struct { + ds sqlutil.DataSource +} + +var ( + _ DurableEventStore = (*PgDurableEventStore)(nil) + _ DurableQueueObserver = (*PgDurableEventStore)(nil) + _ BatchInserter = (*PgDurableEventStore)(nil) +) + +func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore { + return &PgDurableEventStore{ds: ds} +} + +func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { + const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` + var id int64 + if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { + return 0, fmt.Errorf("failed to insert chip durable event: %w", err) + } + return id, nil +} + +func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { + if len(payloads) == 0 { + return nil, nil + } + placeholders := make([]string, len(payloads)) + args := make([]interface{}, len(payloads)) + for i, p := range payloads { + placeholders[i] = fmt.Sprintf("($%d)", i+1) + args[i] = p + } + q := fmt.Sprintf( + "INSERT INTO %s (payload) VALUES %s RETURNING id", + chipDurableEventsTable, + strings.Join(placeholders, ","), + ) + + var ids []int64 + if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { + return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) + } + return ids, nil +} + +func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error { + const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` + res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) + if err != nil { + return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + if batchLimit <= 0 { + return 0, nil + } + const q = ` +WITH picked AS ( + SELECT id FROM ` + chipDurableEventsTable + ` + WHERE delivered_at IS NOT NULL + ORDER BY delivered_at ASC + LIMIT $1 +) +DELETE FROM ` + chipDurableEventsTable + ` AS t +USING picked WHERE t.id = picked.id` + res, err := s.ds.ExecContext(ctx, q, batchLimit) + if err != nil { + return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("purge delivered rows affected: %w", err) + } + return n, nil +} + +func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { + const q = ` +SELECT id, payload, created_at +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at < $1 +ORDER BY created_at ASC +LIMIT $2` + + type row struct { + ID int64 `db:"id"` + Payload []byte `db:"payload"` + CreatedAt time.Time `db:"created_at"` + } + + var rows []row + if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { + return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) + } + + out := make([]DurableEvent, 0, len(rows)) + for _, r := range rows { + out = append(out, DurableEvent{ + ID: r.ID, + Payload: r.Payload, + CreatedAt: r.CreatedAt, + }) + } + return out, nil +} + +func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + const q = ` +WITH deleted AS ( + DELETE FROM ` + chipDurableEventsTable + ` + WHERE created_at <= now() - $1::interval + RETURNING id +) +SELECT count(*) FROM deleted` + + var count int64 + if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { + return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) + } + return count, nil +} + +type chipDurableQueueAgg struct { + Cnt int64 `db:"cnt"` + PayloadSum int64 `db:"payload_sum"` + MinCreated *time.Time `db:"min_created"` +} + +// ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. +func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { + const qAgg = ` +SELECT + count(*)::bigint AS cnt, + coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, + min(created_at) AS min_created +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL` + + var row chipDurableQueueAgg + if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + } + var st DurableQueueStats + st.Depth = row.Cnt + st.PayloadBytes = row.PayloadSum + if row.MinCreated != nil { + st.OldestPendingAge = time.Since(*row.MinCreated) + } + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + ttlSec := int64(eventTTL.Round(time.Second) / time.Second) + leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) + const qNear = ` +SELECT count(*)::bigint +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at >= now() - ($1::bigint * interval '1 second') + AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` + if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + } + } + return st, nil +} diff --git a/pkg/loop/config.go b/pkg/loop/config.go index c91442117e..396c7d979d 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -86,9 +86,10 @@ const ( envTelemetryPrometheusBridgePrefixes = "CL_TELEMETRY_PROMETHEUS_BRIDGE_PREFIXES" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" - envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -99,9 +100,10 @@ const ( type EnvConfig struct { AppID string - ChipIngressEndpoint string - ChipIngressInsecureConnection bool - ChipIngressBatchEmitterEnabled bool + ChipIngressEndpoint string + ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool + ChipIngressDurableEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -266,6 +268,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) + add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -506,6 +509,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) } + e.ChipIngressDurableEmitterEnabled, err = getBool(envChipIngressDurableEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 71193dad66..0735ffe503 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -229,6 +229,8 @@ func (s *Server) start(opts ...ServerOpt) error { if err := s.startBeholderClient(ctx, beholderCfg); err != nil { return err } + + // TODO: Initialize DurableEmitter; Move setup method } if addr := s.EnvConfig.PyroscopeServerAddress; addr != "" { From 23061e9720cc396320c27e26c63a942ed73a130e Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 12:23:26 -0400 Subject: [PATCH 03/12] Rename isHostProcess to retransmitEnabled --- pkg/durableemitter/durable_emitter.go | 26 +++++++++++----------- pkg/durableemitter/durable_emitter_test.go | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index e16590690c..51743bdeed 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -158,10 +158,10 @@ type DurableEmitter struct { // were spawned during the final flush can complete before we close the // fallback client connection. fallbackWg sync.WaitGroup - // isHostProcess determines if the emitter runs retransmit and cleanup loops. - // Should be set to false when initialized inside LOOP plugins. - isHostProcess bool - cfg DurableEmitterConfig + // retransmitEnabled controls whether this instance runs the retransmit and + // cleanup loops. Should be set to false when initialized inside LOOP plugins. + retransmitEnabled bool + cfg DurableEmitterConfig metrics *durableEmitterMetrics @@ -215,7 +215,7 @@ func NewDurableEmitter( store DurableEventStore, batchEmitter BatchEmitter, fallbackClient chipingress.Client, - isHostProcess bool, + retransmitEnabled bool, cfg DurableEmitterConfig, lggr logger.Logger, meter metric.Meter, @@ -242,13 +242,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - batchEmitter: batchEmitter, - fallbackClient: fallbackClient, - isHostProcess: isHostProcess, - cfg: cfg, - metrics: m, - stopCh: make(chan struct{}), + store: store, + batchEmitter: batchEmitter, + fallbackClient: fallbackClient, + retransmitEnabled: retransmitEnabled, + cfg: cfg, + metrics: m, + stopCh: make(chan struct{}), } d.Service, d.eng = services.Config{ Name: "DurableEmitter", @@ -289,7 +289,7 @@ func (d *DurableEmitter) start(ctx context.Context) error { } } - if d.isHostProcess { + if d.retransmitEnabled { d.wg.Go(d.retransmitLoop) if !d.cfg.DisablePruning { d.wg.Go(d.expiryLoop) diff --git a/pkg/durableemitter/durable_emitter_test.go b/pkg/durableemitter/durable_emitter_test.go index 3846750966..e47aa9aeba 100644 --- a/pkg/durableemitter/durable_emitter_test.go +++ b/pkg/durableemitter/durable_emitter_test.go @@ -252,7 +252,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) require.Eventually(t, func() bool { return store.Len() == 0 && be.callCount.Load() >= 1 - }, 2*time.Second, 10*time.Millisecond, "batch emitter must deliver even when isHostProcess is false") + }, 2*time.Second, 10*time.Millisecond, "batch emitter must deliver even when retransmitEnabled is false") } func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { From e213bb80d077bc4505ed65c60fc7a170baf9343b Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 12:48:05 -0400 Subject: [PATCH 04/12] Add setup --- pkg/durableemitter/setup.go | 181 ++++++++++++++++++++++++++++++++++++ pkg/durableemitter/store.go | 1 + pkg/loop/server.go | 32 ++++++- 3 files changed, 212 insertions(+), 2 deletions(-) create mode 100644 pkg/durableemitter/setup.go diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go new file mode 100644 index 0000000000..ebfab68512 --- /dev/null +++ b/pkg/durableemitter/setup.go @@ -0,0 +1,181 @@ +package durableemitter + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + chipingressbatch "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// globalEmitter holds the process-wide DurableEmitter instance, set by Setup. +var globalEmitter atomic.Pointer[DurableEmitter] + +// SetGlobalEmitter sets the global DurableEmitter. +func SetGlobalEmitter(d *DurableEmitter) { + globalEmitter.Store(d) +} + +// GetGlobalEmitter returns the global DurableEmitter, or nil if Setup has not +// been called or DurableEmitterEnabled was false. +func GetGlobalEmitter() *DurableEmitter { + return globalEmitter.Load() +} + +// GlobalEmit emits an event via the global DurableEmitter. +// Returns a non-nil error when the global emitter has not been initialized. +func GlobalEmit(ctx context.Context, body []byte, attrKVs ...any) error { + d := globalEmitter.Load() + if d == nil { + return errors.New("global DurableEmitter not initialized; call durableemitter.Setup first") + } + return d.Emit(ctx, body, attrKVs...) +} + +// SetupConfig holds all configuration required to create and start a +// DurableEmitter including its chip ingress transport clients. +type SetupConfig struct { + // DurableEmitterEnabled gates the entire setup. Setup returns (nil, nil) + // when false — callers do not need an outer guard. + DurableEmitterEnabled bool + // BatchEmitterEnabled must be true when DurableEmitterEnabled is true. + // Setup returns an error if DurableEmitterEnabled is true but this is false. + BatchEmitterEnabled bool + // Endpoint is the gRPC address for the Chip Ingress service. + Endpoint string + // InsecureConnection disables TLS when true. + InsecureConnection bool + // Auth is the per-RPC credential provider. Nil means no auth. + Auth chipingress.HeaderProvider + // RetransmitEnabled controls whether the retransmit and cleanup loops run. + // Set to true for the host (chainlink node) process. + // Set to false for LOOP plugin processes — the host's retransmit loop picks + // up any rows inserted by plugin-side DurableEmitters from the shared DB. + RetransmitEnabled bool + + // Batch client tuning — zero values use package defaults. + BatchSize int // default: 50 + BatchInterval time.Duration // default: 50ms + MaxConcurrentSends int // default: 4 + MaxPublishTimeout time.Duration // default: 5s + ShutdownTimeout time.Duration // default: 30s + + // EmitterConfig overrides DefaultDurableEmitterConfig when non-nil. + EmitterConfig *DurableEmitterConfig + // Meter is the OpenTelemetry meter for instrumentation. Nil disables metrics. + Meter metric.Meter +} + +// Setup creates a DurableEmitter with dedicated batch and fallback chip ingress +// clients, registers it as the global emitter, and returns it unconfigured. +// +// The caller is responsible for starting and stopping the emitter: +// - In chainlink application: append the returned emitter to srvcs so the +// service runner manages Start/Close. +// - In LOOP server: call emitter.Start(ctx) then emitter.Close() on shutdown. +// +// When cfg.DurableEmitterEnabled is false, Setup is a no-op and returns +// (nil, nil) — callers do not need to guard the call. +func Setup( + store DurableEventStore, + cfg SetupConfig, + lggr logger.Logger, +) (*DurableEmitter, error) { + if !cfg.DurableEmitterEnabled { + return nil, nil + } + if !cfg.BatchEmitterEnabled { + return nil, errors.New("ChipIngressBatchEmitterEnabled must be true for DurableEmitter; " + + "set ChipIngressBatchEmitterEnabled = true in the telemetry config") + } + if cfg.Endpoint == "" { + return nil, errors.New("chip ingress endpoint is required for DurableEmitter") + } + if store == nil { + return nil, errors.New("durable event store is required for DurableEmitter") + } + if lggr == nil { + return nil, errors.New("logger is required for DurableEmitter") + } + + chipOpts := buildChipOpts(cfg) + + // Primary client — owned by batch.Client, closed on batch.Client.Stop(). + batchChipClient, err := chipingress.NewClient(cfg.Endpoint, chipOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create batch chip ingress client: %w", err) + } + + batchClient, err := chipingressbatch.NewBatchClient(batchChipClient, + chipingressbatch.WithBatchSize(defaultInt(cfg.BatchSize, 50)), + chipingressbatch.WithBatchInterval(defaultDuration(cfg.BatchInterval, 50*time.Millisecond)), + chipingressbatch.WithMaxConcurrentSends(defaultInt(cfg.MaxConcurrentSends, 4)), + chipingressbatch.WithMaxPublishTimeout(defaultDuration(cfg.MaxPublishTimeout, 5*time.Second)), + chipingressbatch.WithShutdownTimeout(defaultDuration(cfg.ShutdownTimeout, 30*time.Second)), + ) + if err != nil { + _ = batchChipClient.Close() + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + // Fallback client — owned by DurableEmitter, closed on DurableEmitter.Stop(). + // Used for single-event direct Publish retry when a batch delivery fails. + fallbackClient, err := chipingress.NewClient(cfg.Endpoint, chipOpts...) + if err != nil { + batchClient.Stop() + return nil, fmt.Errorf("failed to create fallback chip ingress client: %w", err) + } + + emitterCfg := DefaultDurableEmitterConfig() + if cfg.EmitterConfig != nil { + emitterCfg = *cfg.EmitterConfig + } + + emitter, err := NewDurableEmitter(store, batchClient, fallbackClient, cfg.RetransmitEnabled, emitterCfg, lggr, cfg.Meter) + if err != nil { + batchClient.Stop() + _ = fallbackClient.Close() + return nil, fmt.Errorf("failed to create durable emitter: %w", err) + } + + SetGlobalEmitter(emitter) + + lggr.Infow("DurableEmitter created — call Start() or register with service lifecycle", + "endpoint", cfg.Endpoint, + "retransmitEnabled", cfg.RetransmitEnabled, + ) + return emitter, nil +} + +func buildChipOpts(cfg SetupConfig) []chipingress.Opt { + var opts []chipingress.Opt + if cfg.InsecureConnection { + opts = append(opts, chipingress.WithInsecureConnection()) + } else { + opts = append(opts, chipingress.WithTLS()) + } + if cfg.Auth != nil { + opts = append(opts, chipingress.WithTokenAuth(cfg.Auth)) + } + return opts +} + +func defaultInt(v, def int) int { + if v <= 0 { + return def + } + return v +} + +func defaultDuration(v, def time.Duration) time.Duration { + if v <= 0 { + return def + } + return v +} diff --git a/pkg/durableemitter/store.go b/pkg/durableemitter/store.go index 8b1b6d1198..c124d7b58e 100644 --- a/pkg/durableemitter/store.go +++ b/pkg/durableemitter/store.go @@ -14,6 +14,7 @@ import ( const chipDurableEventsTable = "cre.chip_durable_events" // PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. +// Tests live in chainlink/core/services/durableemitter as they require DB migrations. type PgDurableEventStore struct { ds sqlutil.DataSource } diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 0735ffe503..fad85da0fb 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/config/build" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/promutil" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -105,6 +106,7 @@ type Server struct { LimitsFactory limits.Factory profiler *pyroscope.Profiler beholderClient *beholder.Client + durableEmitter *durableemitter.DurableEmitter } func newServer(loggerName string) (*Server, error) { @@ -229,8 +231,6 @@ func (s *Server) start(opts ...ServerOpt) error { if err := s.startBeholderClient(ctx, beholderCfg); err != nil { return err } - - // TODO: Initialize DurableEmitter; Move setup method } if addr := s.EnvConfig.PyroscopeServerAddress; addr != "" { @@ -342,6 +342,31 @@ func (s *Server) start(opts ...ServerOpt) error { s.LimitsFactory.Settings = s.cfg.settingsGetter } + if s.DataSource != nil { + var auth beholder.Auth + if len(s.EnvConfig.TelemetryAuthHeaders) > 0 { + auth = beholder.NewStaticAuth(s.EnvConfig.TelemetryAuthHeaders, !s.EnvConfig.ChipIngressInsecureConnection) + } + + durableCfg := durableemitter.SetupConfig{ + DurableEmitterEnabled: s.EnvConfig.ChipIngressDurableEmitterEnabled, + BatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, + Endpoint: s.EnvConfig.ChipIngressEndpoint, + InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + Auth: auth, + RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop, the host process handles it. + } + store := durableemitter.NewPgDurableEventStore(s.DataSource) + var err error + s.durableEmitter, err = durableemitter.Setup(store, durableCfg, s.Logger) + if err != nil { + return fmt.Errorf("failed to set up durable emitter: %w", err) + } + if err = s.durableEmitter.Start(ctx); err != nil { + return fmt.Errorf("failed to start durable emitter: %w", err) + } + } + return nil } @@ -382,6 +407,9 @@ func (s *Server) Stop() { if s.beholderClient != nil { s.Logger.ErrorIfFn(s.beholderClient.Close, "Failed to close beholder client") } + if s.durableEmitter != nil { + s.Logger.ErrorIfFn(s.durableEmitter.Close, "Failed to close durable emitter") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() } From b78e71cb21af7b4bdc474bf87bc29ff2dd19c476 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 12:55:22 -0400 Subject: [PATCH 05/12] Fix setup --- pkg/durableemitter/setup.go | 7 ------- pkg/loop/server.go | 3 +-- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index ebfab68512..c0486b923b 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -44,9 +44,6 @@ type SetupConfig struct { // DurableEmitterEnabled gates the entire setup. Setup returns (nil, nil) // when false — callers do not need an outer guard. DurableEmitterEnabled bool - // BatchEmitterEnabled must be true when DurableEmitterEnabled is true. - // Setup returns an error if DurableEmitterEnabled is true but this is false. - BatchEmitterEnabled bool // Endpoint is the gRPC address for the Chip Ingress service. Endpoint string // InsecureConnection disables TLS when true. @@ -90,10 +87,6 @@ func Setup( if !cfg.DurableEmitterEnabled { return nil, nil } - if !cfg.BatchEmitterEnabled { - return nil, errors.New("ChipIngressBatchEmitterEnabled must be true for DurableEmitter; " + - "set ChipIngressBatchEmitterEnabled = true in the telemetry config") - } if cfg.Endpoint == "" { return nil, errors.New("chip ingress endpoint is required for DurableEmitter") } diff --git a/pkg/loop/server.go b/pkg/loop/server.go index fad85da0fb..72a7559071 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -350,11 +350,10 @@ func (s *Server) start(opts ...ServerOpt) error { durableCfg := durableemitter.SetupConfig{ DurableEmitterEnabled: s.EnvConfig.ChipIngressDurableEmitterEnabled, - BatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, Endpoint: s.EnvConfig.ChipIngressEndpoint, InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, Auth: auth, - RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop, the host process handles it. + RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. } store := durableemitter.NewPgDurableEventStore(s.DataSource) var err error From c8055cbf0428a4713861bfe5e3e717a63f6f3cfb Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 13:01:48 -0400 Subject: [PATCH 06/12] Update comments and callback --- pkg/durableemitter/durable_emitter.go | 31 +++------------------------ pkg/durableemitter/setup.go | 8 ------- 2 files changed, 3 insertions(+), 36 deletions(-) diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index 51743bdeed..d05735ff6c 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -421,7 +421,6 @@ func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) // DB round-trip. t0Publish := time.Now() if qErr := d.batchEmitter.QueueMessage(eventPb, d.deliveryCallback(id, eventPb, t0Publish)); qErr != nil { - // Buffer full — event is safely in the DB; retransmit loop will deliver it. d.eng.Warnw("DurableEmitter: batch emitter buffer full, relying on retransmit", "id", id) } return nil @@ -492,8 +491,8 @@ func (d *DurableEmitter) tryFallback(id int64, eventPb *chipingress.CloudEventPb } // singleEventFallback sends a single event directly via the fallback -// chipingress.Client. On success it marks the event delivered and decrements -// the pending counter. On failure it logs and returns — the event remains in +// chipingress.Client. On success, it marks the event delivered and decrements +// the pending counter. On failure, it logs and returns — the event remains in // the DB and the retransmit loop will eventually deliver it. func (d *DurableEmitter) singleEventFallback(id int64, eventPb *chipingress.CloudEventPb) { pubCtx, pubCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) @@ -706,7 +705,7 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { } id := pe.ID - if err := d.batchEmitter.QueueMessage(eventPb, d.retransmitCallback(id, eventPb)); err != nil { + if err := d.batchEmitter.QueueMessage(eventPb, d.deliveryCallback(id, eventPb, time.Now())); err != nil { skipped++ } else { enqueued++ @@ -720,30 +719,6 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { ) } -// retransmitCallback returns the delivery callback used for retransmit rows. -// When the batch emitter reports failure, it falls back to a single-event -// direct Publish (same as the primary delivery path). -func (d *DurableEmitter) retransmitCallback(id int64, eventPb *chipingress.CloudEventPb) func(error) { - return func(sendErr error) { - if sendErr != nil { - // Batch path failed; try single-event fallback before leaving to next tick. - d.tryFallback(id, eventPb) - return - } - ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) - defer cancel() - - marked, markErr := d.store.MarkDeliveredBatch(ctx, []int64{id}) - if markErr != nil { - d.eng.Errorw("failed to mark retransmit event delivered", "id", id, "error", markErr) - return - } - d.decPending(marked) - if d.metrics != nil { - d.metrics.deliverComplete.Add(ctx, marked) - } - } -} func (d *DurableEmitter) purgeLoop() { interval := d.cfg.PurgeInterval diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index c0486b923b..a90c18081c 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -71,14 +71,6 @@ type SetupConfig struct { // Setup creates a DurableEmitter with dedicated batch and fallback chip ingress // clients, registers it as the global emitter, and returns it unconfigured. -// -// The caller is responsible for starting and stopping the emitter: -// - In chainlink application: append the returned emitter to srvcs so the -// service runner manages Start/Close. -// - In LOOP server: call emitter.Start(ctx) then emitter.Close() on shutdown. -// -// When cfg.DurableEmitterEnabled is false, Setup is a no-op and returns -// (nil, nil) — callers do not need to guard the call. func Setup( store DurableEventStore, cfg SetupConfig, From 9594dc26c49b79c1654dc2c6e68996ada4b60b78 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 13:09:39 -0400 Subject: [PATCH 07/12] Update naming --- pkg/durableemitter/durable_emitter.go | 29 +++++--------- pkg/durableemitter/durable_emitter_metrics.go | 2 +- pkg/durableemitter/durable_emitter_test.go | 40 +++++++++---------- pkg/durableemitter/observable_store.go | 2 +- pkg/durableemitter/setup.go | 6 +-- 5 files changed, 36 insertions(+), 43 deletions(-) diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index d05735ff6c..913e2f4823 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -19,7 +19,7 @@ import ( ) // BatchEmitter is the transport interface DurableEmitter delegates to for -// batched, at-most-once delivery of CloudEvents to Chip Ingress. +// batched delivery of CloudEvents to Chip Ingress. // // *batch.Client from pkg/chipingress/batch satisfies this interface and // handles seqnum stamping, gRPC size splitting, concurrency limiting, and @@ -42,8 +42,8 @@ type BatchEmitter interface { Stop() } -// DurableEmitterConfig configures the DurableEmitter behaviour. -type DurableEmitterConfig struct { +// Config configures the DurableEmitter behaviour. +type Config struct { // RetransmitInterval controls how often the retransmit loop ticks. RetransmitInterval time.Duration // RetransmitAfter is the minimum age of an event before the retransmit @@ -70,7 +70,7 @@ type DurableEmitterConfig struct { DisablePruning bool // Hooks is optional instrumentation (load tests, profiling). Nil fields are skipped. // Callbacks may run from many goroutines; implementations must be thread-safe. - Hooks *DurableEmitterHooks + Hooks *Hooks // Metrics enables OpenTelemetry instruments (queue, publish, store, optional process stats). // When non-nil, a meter must be supplied to NewDurableEmitter; nil disables instrumentation. Metrics *DurableEmitterMetricsConfig @@ -87,8 +87,8 @@ type DurableEmitterConfig struct { InsertBatchWorkers int } -// DurableEmitterHooks records delivery latency to locate pipeline bottlenecks. -type DurableEmitterHooks struct { +// Hooks records delivery latency to locate pipeline bottlenecks. +type Hooks struct { // OnEmitInsert is called after each store.Insert in Emit (the DB write that // blocks the caller). elapsed covers only the INSERT; err is nil on success. OnEmitInsert func(elapsed time.Duration, err error) @@ -101,8 +101,8 @@ type DurableEmitterHooks struct { OnBatchMarkDelivered func(elapsed time.Duration, count int) } -func DefaultDurableEmitterConfig() DurableEmitterConfig { - return DurableEmitterConfig{ +func DefaultConfig() Config { + return Config{ RetransmitInterval: 5 * time.Second, RetransmitAfter: 10 * time.Second, RetransmitBatchSize: 100, @@ -161,7 +161,7 @@ type DurableEmitter struct { // retransmitEnabled controls whether this instance runs the retransmit and // cleanup loops. Should be set to false when initialized inside LOOP plugins. retransmitEnabled bool - cfg DurableEmitterConfig + cfg Config metrics *durableEmitterMetrics @@ -194,7 +194,7 @@ var _ interface { io.Closer } = (*DurableEmitter)(nil) -// NewDurableEmitter constructs a DurableEmitter as a services.Service. +// NewDurableEmitter constructs a DurableEmitter as a service. // // batchEmitter is the transport layer (typically *batch.Client from // pkg/chipingress/batch) responsible for batched gRPC delivery, seqnum @@ -205,18 +205,12 @@ var _ interface { // failure. This gives a fast second-chance path before the DB-backed // retransmit loop kicks in. Pass nil to disable single-event fallback // (events are left in the DB and delivered by the retransmit loop). -// DurableEmitter takes ownership of fallbackClient and closes it on Stop. -// -// meter is the OpenTelemetry Meter used for instrument registration. It is -// required when cfg.Metrics is non-nil and must be supplied by the caller -// (e.g. otel.Meter("durableemitter") or a meter from the host's -// MeterProvider). Pass nil when metrics are disabled. func NewDurableEmitter( store DurableEventStore, batchEmitter BatchEmitter, fallbackClient chipingress.Client, retransmitEnabled bool, - cfg DurableEmitterConfig, + cfg Config, lggr logger.Logger, meter metric.Meter, ) (*DurableEmitter, error) { @@ -719,7 +713,6 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { ) } - func (d *DurableEmitter) purgeLoop() { interval := d.cfg.PurgeInterval if interval <= 0 { diff --git a/pkg/durableemitter/durable_emitter_metrics.go b/pkg/durableemitter/durable_emitter_metrics.go index a3ef68969c..ce7cf92bb9 100644 --- a/pkg/durableemitter/durable_emitter_metrics.go +++ b/pkg/durableemitter/durable_emitter_metrics.go @@ -10,7 +10,7 @@ import ( ) // DurableEmitterMetricsConfig enables OpenTelemetry metrics for DurableEmitter. -// Set on DurableEmitterConfig.Metrics; nil disables instrumentation. +// Set on Config.Metrics; nil disables instrumentation. // // When non-nil, an otel Meter must be supplied to NewDurableEmitter so that // instruments can be registered. DurableEmitter does not look up a global diff --git a/pkg/durableemitter/durable_emitter_test.go b/pkg/durableemitter/durable_emitter_test.go index e47aa9aeba..1482f3b6c5 100644 --- a/pkg/durableemitter/durable_emitter_test.go +++ b/pkg/durableemitter/durable_emitter_test.go @@ -97,9 +97,9 @@ func testEmitAttrs() []any { return []any{"source", "test-source", "type", "test-type"} } -func newTestDurableEmitter(t *testing.T, store DurableEventStore, be BatchEmitter, cfgOverride *DurableEmitterConfig) *DurableEmitter { +func newTestDurableEmitter(t *testing.T, store DurableEventStore, be BatchEmitter, cfgOverride *Config) *DurableEmitter { t.Helper() - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() if cfgOverride != nil { cfg = *cfgOverride } @@ -128,7 +128,7 @@ func TestDurableEmitter_CloseCoalescedInsertShutdown(t *testing.T) { stall: stall, } be := newTestBatchEmitter() - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.InsertBatchSize = 1 cfg.InsertBatchWorkers = 1 cfg.DisablePruning = true @@ -174,8 +174,8 @@ func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { store := NewMemDurableEventStore() be := newTestBatchEmitter() var pubCalls, markCalls atomic.Int32 - cfg := DefaultDurableEmitterConfig() - cfg.Hooks = &DurableEmitterHooks{ + cfg := DefaultConfig() + cfg.Hooks = &Hooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } @@ -195,8 +195,8 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { be := newTestBatchEmitter() be.setPublishErr(errors.New("down")) var pubCalls, markCalls atomic.Int32 - cfg := DefaultDurableEmitterConfig() - cfg.Hooks = &DurableEmitterHooks{ + cfg := DefaultConfig() + cfg.Hooks = &Hooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } @@ -215,7 +215,7 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { be := newTestBatchEmitter() be.setPublishErr(errors.New("chip unavailable")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 40 * time.Millisecond cfg.RetransmitAfter = 15 * time.Millisecond cfg.ExpiryInterval = 40 * time.Millisecond @@ -243,7 +243,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) store := NewMemDurableEventStore() be := newTestBatchEmitter() - em, err := NewDurableEmitter(store, be, nil, false, DefaultDurableEmitterConfig(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, false, DefaultConfig(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -299,7 +299,7 @@ func TestDurableEmitter_RetransmitLoopDeliversFailedEvents(t *testing.T) { be := newTestBatchEmitter() be.setPublishErr(errors.New("connection refused")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond @@ -328,7 +328,7 @@ func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { be := newTestBatchEmitter() be.setPublishErr(errors.New("immediate fail")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond @@ -354,7 +354,7 @@ func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { be := newTestBatchEmitter() be.setPublishErr(errors.New("always fail")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.ExpiryInterval = 100 * time.Millisecond cfg.EventTTL = 50 * time.Millisecond cfg.RetransmitInterval = 10 * time.Minute // effectively disable retransmit @@ -386,7 +386,7 @@ func TestDurableEmitter_RetransmitDeliversManuallyInsertedRow(t *testing.T) { _, err = store.Insert(context.Background(), payload) require.NoError(t, err) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 50 * time.Millisecond cfg.RetransmitAfter = 30 * time.Millisecond @@ -432,7 +432,7 @@ func TestDurableEmitter_MultipleEvents(t *testing.T) { func TestNewDurableEmitter_ValidationErrors(t *testing.T) { log := logger.Test(t) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() be := newTestBatchEmitter() _, err := NewDurableEmitter(nil, be, nil, true, cfg, log, nil) @@ -465,7 +465,7 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) { store := NewMemDurableEventStore() be := newTestBatchEmitter() - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = time.Hour cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond} @@ -607,8 +607,8 @@ func emitAttrs() []any { return []any{"source", "test-domain", "type", "test-entity"} } -func fastCfg() DurableEmitterConfig { - return DurableEmitterConfig{ +func fastCfg() Config { + return Config{ RetransmitInterval: 100 * time.Millisecond, RetransmitAfter: 50 * time.Millisecond, RetransmitBatchSize: 50, @@ -656,7 +656,7 @@ func TestDurableEmitter_FallbackDeliversOnBatchFailure(t *testing.T) { be.setPublishErr(errors.New("batch down")) fallback := &testFallbackClient{} - em, err := NewDurableEmitter(store, be, fallback, true, DefaultDurableEmitterConfig(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, fallback, true, DefaultConfig(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -684,7 +684,7 @@ func TestDurableEmitter_FallbackFailureEventRemainsForRetransmit(t *testing.T) { fallback := &testFallbackClient{} fallback.setPublishErr(errors.New("fallback down too")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 10 * time.Minute // disable retransmit for this test em, err := NewDurableEmitter(store, be, fallback, true, cfg, logger.Test(t), nil) require.NoError(t, err) @@ -709,7 +709,7 @@ func TestDurableEmitter_FallbackClientClosedOnStop(t *testing.T) { be := newTestBatchEmitter() fallback := &testFallbackClient{} - em, err := NewDurableEmitter(store, be, fallback, true, DefaultDurableEmitterConfig(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, fallback, true, DefaultConfig(), logger.Test(t), nil) require.NoError(t, err) require.NoError(t, em.Start(t.Context())) diff --git a/pkg/durableemitter/observable_store.go b/pkg/durableemitter/observable_store.go index 6e4c0cabfc..8700427b20 100644 --- a/pkg/durableemitter/observable_store.go +++ b/pkg/durableemitter/observable_store.go @@ -28,7 +28,7 @@ type DurableQueueStats struct { // so DurableEmitter can export queue depth and age gauges when metrics are enabled. type DurableQueueObserver interface { // ObserveDurableQueue returns live queue statistics. eventTTL and nearExpiryLead - // match DurableEmitterConfig (nearExpiryLead should be << eventTTL). + // match Config (nearExpiryLead should be << eventTTL). ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) } diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index a90c18081c..bc820f436d 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -63,8 +63,8 @@ type SetupConfig struct { MaxPublishTimeout time.Duration // default: 5s ShutdownTimeout time.Duration // default: 30s - // EmitterConfig overrides DefaultDurableEmitterConfig when non-nil. - EmitterConfig *DurableEmitterConfig + // EmitterConfig overrides DefaultConfig when non-nil. + EmitterConfig *Config // Meter is the OpenTelemetry meter for instrumentation. Nil disables metrics. Meter metric.Meter } @@ -117,7 +117,7 @@ func Setup( return nil, fmt.Errorf("failed to create fallback chip ingress client: %w", err) } - emitterCfg := DefaultDurableEmitterConfig() + emitterCfg := DefaultConfig() if cfg.EmitterConfig != nil { emitterCfg = *cfg.EmitterConfig } From fad53978047b27db24fd689526972354648b9b8a Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 26 May 2026 13:40:37 -0400 Subject: [PATCH 08/12] Update setup --- pkg/durableemitter/setup.go | 9 +-------- pkg/loop/config_test.go | 7 ++++--- pkg/loop/server.go | 16 +++++++++------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index bc820f436d..4b5236da7d 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -22,8 +22,7 @@ func SetGlobalEmitter(d *DurableEmitter) { globalEmitter.Store(d) } -// GetGlobalEmitter returns the global DurableEmitter, or nil if Setup has not -// been called or DurableEmitterEnabled was false. +// GetGlobalEmitter returns the global DurableEmitter, or nil if Setup has not been called. func GetGlobalEmitter() *DurableEmitter { return globalEmitter.Load() } @@ -41,9 +40,6 @@ func GlobalEmit(ctx context.Context, body []byte, attrKVs ...any) error { // SetupConfig holds all configuration required to create and start a // DurableEmitter including its chip ingress transport clients. type SetupConfig struct { - // DurableEmitterEnabled gates the entire setup. Setup returns (nil, nil) - // when false — callers do not need an outer guard. - DurableEmitterEnabled bool // Endpoint is the gRPC address for the Chip Ingress service. Endpoint string // InsecureConnection disables TLS when true. @@ -76,9 +72,6 @@ func Setup( cfg SetupConfig, lggr logger.Logger, ) (*DurableEmitter, error) { - if !cfg.DurableEmitterEnabled { - return nil, nil - } if cfg.Endpoint == "" { return nil, errors.New("chip ingress endpoint is required for DurableEmitter") } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 43ed1d4867..d592543fd4 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -200,9 +200,10 @@ var envCfgFull = EnvConfig{ TelemetryPrometheusBridgeEnabled: true, TelemetryPrometheusBridgePrefixes: []string{"foo", "bar"}, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, - ChipIngressBatchEmitterEnabled: false, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, + ChipIngressDurableEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 72a7559071..601f2eb2bd 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -2,6 +2,7 @@ package loop import ( "context" + "errors" "fmt" "os" "os/signal" @@ -342,18 +343,19 @@ func (s *Server) start(opts ...ServerOpt) error { s.LimitsFactory.Settings = s.cfg.settingsGetter } - if s.DataSource != nil { + if s.EnvConfig.ChipIngressDurableEmitterEnabled && s.EnvConfig.ChipIngressEndpoint != "" { + if s.DataSource == nil { + return errors.New("data source required when durable emitter is enabled") + } var auth beholder.Auth if len(s.EnvConfig.TelemetryAuthHeaders) > 0 { auth = beholder.NewStaticAuth(s.EnvConfig.TelemetryAuthHeaders, !s.EnvConfig.ChipIngressInsecureConnection) } - durableCfg := durableemitter.SetupConfig{ - DurableEmitterEnabled: s.EnvConfig.ChipIngressDurableEmitterEnabled, - Endpoint: s.EnvConfig.ChipIngressEndpoint, - InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, - Auth: auth, - RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. + Endpoint: s.EnvConfig.ChipIngressEndpoint, + InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + Auth: auth, + RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. } store := durableemitter.NewPgDurableEventStore(s.DataSource) var err error From 6a80325db36172e61e9de75e768ba950e498ea43 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 29 May 2026 11:16:35 -0400 Subject: [PATCH 09/12] Create durableemitter/auth --- go.mod | 2 +- go.sum | 4 +- pkg/durableemitter/auth.go | 195 ++++++++++++++++++ pkg/durableemitter/auth_test.go | 91 ++++++++ pkg/durableemitter/setup.go | 18 +- .../standard/standard_capabilities.go | 2 + pkg/loop/internal/relayer/relayer.go | 2 + pkg/loop/server.go | 15 +- 8 files changed, 314 insertions(+), 15 deletions(-) create mode 100644 pkg/durableemitter/auth.go create mode 100644 pkg/durableemitter/auth_test.go diff --git a/go.mod b/go.mod index f6cbe43af5..1c31c2ec7b 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.89 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528221217-0181abca3d53 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260520181035-b5bb732eb9d7 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index e961bbd66a..6508524367 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 h1:PlkA7NGpBm5sc2P//crDFgMIQ0qsQhKcpjWV7Qzwqz8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528221217-0181abca3d53 h1:0rbr4dRkMvR5FpZtOYpOUyrYliWvOZDbYRGQrKIy1OA= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528221217-0181abca3d53/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260520181035-b5bb732eb9d7 h1:5Qq2EQcR6kTNyk9S2FjzNUhkQ5BiL0RYd1ODhu3P1/8= diff --git a/pkg/durableemitter/auth.go b/pkg/durableemitter/auth.go new file mode 100644 index 0000000000..7025f96060 --- /dev/null +++ b/pkg/durableemitter/auth.go @@ -0,0 +1,195 @@ +package durableemitter + +import ( + "context" + "crypto/ed25519" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "maps" + "sync" + "sync/atomic" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +const ( + authHeaderKey = "X-Beholder-Node-Auth-Token" + authHeaderVersion = "2" +) + +// Signer signs auth header payloads using the node's CSA key. +type Signer interface { + Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) +} + +// AuthConfig configures chip ingress auth headers for DurableEmitter clients. +type AuthConfig struct { + AuthHeaders map[string]string + AuthHeadersTTL time.Duration + AuthPublicKeyHex string + // AuthKeySigner may be nil at init time for LOOP plugins; call SetGlobalSigner + // after the CSA keystore is available. + AuthKeySigner Signer +} + +type lazySigner struct { + signer atomic.Pointer[Signer] +} + +func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) { + s := l.signer.Load() + if s == nil { + return nil, errors.New("keystore not yet available for signing") + } + return (*s).Sign(ctx, keyID, data) +} + +func (l *lazySigner) Set(signer Signer) { + l.signer.Store(&signer) +} + +func (l *lazySigner) IsSet() bool { + return l.signer.Load() != nil +} + +type staticHeaderProvider struct { + headers map[string]string +} + +func (p *staticHeaderProvider) Headers(_ context.Context) (map[string]string, error) { + return p.headers, nil +} + +type rotatingHeaderProvider struct { + csaPubKey ed25519.PublicKey + signer Signer + signerTimeout time.Duration + headers atomic.Value // map[string]string + ttl time.Duration + lastUpdatedNanos atomic.Int64 + lazy *lazySigner + mu sync.Mutex +} + +func (p *rotatingHeaderProvider) SetSigner(signer Signer) { + if p.lazy != nil { + p.lazy.Set(signer) + } +} + +func (p *rotatingHeaderProvider) IsSignerSet() bool { + return p.lazy != nil && p.lazy.IsSet() +} + +func (p *rotatingHeaderProvider) Headers(ctx context.Context) (map[string]string, error) { + returnHeader := make(map[string]string) + lastUpdated := time.Unix(0, p.lastUpdatedNanos.Load()) + + if time.Since(lastUpdated) > p.ttl { + p.mu.Lock() + defer p.mu.Unlock() + + lastUpdated = time.Unix(0, p.lastUpdatedNanos.Load()) + if time.Since(lastUpdated) < p.ttl { + maps.Copy(returnHeader, p.headers.Load().(map[string]string)) + return returnHeader, nil + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, p.signerTimeout) + defer cancel() + + ts := time.Now() + newHeaders, err := newAuthHeaderV2(ctxWithTimeout, p.csaPubKey, p.signer, ts) + if err != nil { + return nil, fmt.Errorf("durableemitter: failed to create auth header: %w", err) + } + + p.headers.Store(newHeaders) + p.lastUpdatedNanos.Store(ts.UnixNano()) + } + + maps.Copy(returnHeader, p.headers.Load().(map[string]string)) + return returnHeader, nil +} + +var globalRotatingAuth atomic.Pointer[rotatingHeaderProvider] + +// NewAuthHeaderProvider builds a HeaderProvider for DurableEmitter chip ingress clients. +// +// Static mode (AuthHeadersTTL == 0): returns fixed AuthHeaders. +// Rotating mode (AuthHeadersTTL > 0): uses initial AuthHeaders until TTL expires, then signs fresh headers. +func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error) { + if cfg.AuthHeadersTTL > 0 { + if cfg.AuthPublicKeyHex == "" { + return nil, errors.New("auth: public key hex required for rotating auth (TTL > 0)") + } + if cfg.AuthHeadersTTL < 10*time.Minute { + return nil, errors.New("auth: headers TTL must be at least 10 minutes") + } + + pubKey, err := hex.DecodeString(cfg.AuthPublicKeyHex) + if err != nil { + return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err) + } + + lazy := &lazySigner{} + if cfg.AuthKeySigner != nil { + lazy.Set(cfg.AuthKeySigner) + } + + provider := &rotatingHeaderProvider{ + csaPubKey: ed25519.PublicKey(pubKey), + signer: lazy, + signerTimeout: 5 * time.Second, + ttl: cfg.AuthHeadersTTL, + lazy: lazy, + } + + headers := make(map[string]string) + if len(cfg.AuthHeaders) > 0 { + headers = cfg.AuthHeaders + provider.lastUpdatedNanos.Store(time.Now().UnixNano()) + } + provider.headers.Store(headers) + + globalRotatingAuth.Store(provider) + return provider, nil + } + + if len(cfg.AuthHeaders) == 0 { + return nil, nil + } + return &staticHeaderProvider{headers: cfg.AuthHeaders}, nil +} + +// SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress auth headers. +// No-op when rotating auth is not configured. +func SetGlobalSigner(signer Signer) { + if provider := globalRotatingAuth.Load(); provider != nil { + provider.SetSigner(signer) + } +} + +// IsGlobalSignerSet reports whether rotating DurableEmitter auth has a signer configured. +func IsGlobalSignerSet() bool { + provider := globalRotatingAuth.Load() + return provider != nil && provider.IsSignerSet() +} + +func newAuthHeaderV2(ctx context.Context, pubKey ed25519.PublicKey, signer Signer, ts time.Time) (map[string]string, error) { + tsBytes := make([]byte, 8) + binary.BigEndian.PutUint64(tsBytes, uint64(ts.UnixNano())) + msgBytes := append(pubKey, tsBytes...) + + signature, err := signer.Sign(ctx, fmt.Sprintf("%x", pubKey), msgBytes) + if err != nil { + return nil, fmt.Errorf("durableemitter: failed to sign auth header: %w", err) + } + + return map[string]string{ + authHeaderKey: fmt.Sprintf("%s:%x:%d:%x", authHeaderVersion, pubKey, ts.UnixNano(), signature), + }, nil +} diff --git a/pkg/durableemitter/auth_test.go b/pkg/durableemitter/auth_test.go new file mode 100644 index 0000000000..4a2beb4d02 --- /dev/null +++ b/pkg/durableemitter/auth_test.go @@ -0,0 +1,91 @@ +package durableemitter_test + +import ( + "context" + "crypto/ed25519" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" +) + +type mockSigner struct{} + +func (mockSigner) Sign(_ context.Context, _ string, _ []byte) ([]byte, error) { + return []byte("signature"), nil +} + +func TestNewAuthHeaderProvider_Static(t *testing.T) { + t.Parallel() + + headers := map[string]string{"X-Beholder-Node-Auth-Token": "static"} + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: headers, + }) + require.NoError(t, err) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.Equal(t, headers, got) +} + +func TestNewAuthHeaderProvider_RotatingDeferredSigner(t *testing.T) { + t.Parallel() + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + + initial := map[string]string{"X-Beholder-Node-Auth-Token": "initial"} + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: initial, + AuthHeadersTTL: 10 * time.Minute, + AuthPublicKeyHex: hex.EncodeToString(pubKey), + }) + require.NoError(t, err) + require.False(t, durableemitter.IsGlobalSignerSet()) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.Equal(t, initial, got) + + durableemitter.SetGlobalSigner(mockSigner{}) + require.True(t, durableemitter.IsGlobalSignerSet()) +} + +func TestNewAuthHeaderProvider_RotatingWithSigner(t *testing.T) { + t.Parallel() + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: map[string]string{"X-Beholder-Node-Auth-Token": "initial"}, + AuthHeadersTTL: 10 * time.Minute, + AuthPublicKeyHex: hex.EncodeToString(pubKey), + AuthKeySigner: mockSigner{}, + }) + require.NoError(t, err) + require.True(t, durableemitter.IsGlobalSignerSet()) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.NotEmpty(t, got["X-Beholder-Node-Auth-Token"]) +} + +func TestNewAuthHeaderProvider_Validation(t *testing.T) { + t.Parallel() + + _, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeadersTTL: 10 * time.Minute, + }) + require.ErrorContains(t, err, "public key hex required") + + _, err = durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeadersTTL: time.Minute, + AuthPublicKeyHex: "abcd", + }) + require.ErrorContains(t, err, "at least 10 minutes") +} diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index 4b5236da7d..5ff5364a40 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -44,8 +44,9 @@ type SetupConfig struct { Endpoint string // InsecureConnection disables TLS when true. InsecureConnection bool - // Auth is the per-RPC credential provider. Nil means no auth. - Auth chipingress.HeaderProvider + // Auth configures chip ingress credentials. AuthKeySigner may be nil at init + // for LOOP plugins; call SetGlobalSigner after the CSA keystore is available. + Auth AuthConfig // RetransmitEnabled controls whether the retransmit and cleanup loops run. // Set to true for the host (chainlink node) process. // Set to false for LOOP plugin processes — the host's retransmit loop picks @@ -82,7 +83,12 @@ func Setup( return nil, errors.New("logger is required for DurableEmitter") } - chipOpts := buildChipOpts(cfg) + auth, err := NewAuthHeaderProvider(cfg.Auth) + if err != nil { + return nil, fmt.Errorf("failed to build chip ingress auth: %w", err) + } + + chipOpts := buildChipOpts(cfg, auth) // Primary client — owned by batch.Client, closed on batch.Client.Stop(). batchChipClient, err := chipingress.NewClient(cfg.Endpoint, chipOpts...) @@ -131,15 +137,15 @@ func Setup( return emitter, nil } -func buildChipOpts(cfg SetupConfig) []chipingress.Opt { +func buildChipOpts(cfg SetupConfig, auth chipingress.HeaderProvider) []chipingress.Opt { var opts []chipingress.Opt if cfg.InsecureConnection { opts = append(opts, chipingress.WithInsecureConnection()) } else { opts = append(opts, chipingress.WithTLS()) } - if cfg.Auth != nil { - opts = append(opts, chipingress.WithTokenAuth(cfg.Auth)) + if auth != nil { + opts = append(opts, chipingress.WithTokenAuth(auth)) } return opts } diff --git a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go index 51f4b54a81..472d672b10 100644 --- a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go +++ b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -323,6 +324,7 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca // Sets the auth header signing mechanism beholder.GetClient().SetSigner(keyStore) + durableemitter.SetGlobalSigner(keyStore) capabilitiesRegistryConn, err := s.Dial(request.CapRegistryId) if err != nil { diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 3d1bc6a10d..31591479c4 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" aptospb "github.com/smartcontractkit/chainlink-common/pkg/chains/aptos" evmpb "github.com/smartcontractkit/chainlink-common/pkg/chains/evm" solpb "github.com/smartcontractkit/chainlink-common/pkg/chains/solana" @@ -142,6 +143,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel // Sets the auth header signing mechanism beholder.GetClient().SetSigner(csaKeystore) + durableemitter.SetGlobalSigner(csaKeystore) r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), csaKeystore, capRegistry) if err != nil { diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 601f2eb2bd..cbf2690abd 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -347,15 +347,18 @@ func (s *Server) start(opts ...ServerOpt) error { if s.DataSource == nil { return errors.New("data source required when durable emitter is enabled") } - var auth beholder.Auth - if len(s.EnvConfig.TelemetryAuthHeaders) > 0 { - auth = beholder.NewStaticAuth(s.EnvConfig.TelemetryAuthHeaders, !s.EnvConfig.ChipIngressInsecureConnection) - } + + // Rotating auth: signer is injected later via durableemitter.SetGlobalSigner when the host + // provides the CSA keystore (see relayer and standard capabilities startup). durableCfg := durableemitter.SetupConfig{ Endpoint: s.EnvConfig.ChipIngressEndpoint, InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, - Auth: auth, - RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. + Auth: durableemitter.AuthConfig{ + AuthHeaders: s.EnvConfig.TelemetryAuthHeaders, + AuthHeadersTTL: s.EnvConfig.TelemetryAuthHeadersTTL, + AuthPublicKeyHex: s.EnvConfig.TelemetryAuthPubKeyHex, + }, + RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. } store := durableemitter.NewPgDurableEventStore(s.DataSource) var err error From f6f0bc187867e7cf1fef4f0ddea10745019712c6 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 29 May 2026 11:18:27 -0400 Subject: [PATCH 10/12] Bump chipingress --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1d386c4dac..0f2eb7ec09 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.100 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 196784c7d6..7df06f3da0 100644 --- a/go.sum +++ b/go.sum @@ -258,8 +258,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.100 h1:wpiSpmI/eFjY+wx/nPr5VuNF4hki0prIBMKEaQWn3g4= github.com/smartcontractkit/chain-selectors v1.0.100/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 h1:PlkA7NGpBm5sc2P//crDFgMIQ0qsQhKcpjWV7Qzwqz8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8 h1:5O2qAtvBLzTHBeSIPcxt9i9BCqqOyeroVxN0xbXwoS0= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= From 48205bc10812eae14f737b5b5c0e55ccdce1279d Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 29 May 2026 14:11:40 -0400 Subject: [PATCH 11/12] refactor(durableemitter): delegate auth header logic to chipingress Collapse pkg/durableemitter/auth.go onto chipingress.NewHeaderProvider, removing the duplicated static/rotating header provider, newAuthHeaderV2, and auth header constants. Keep a thin lazySigner wrapper plus the package-level SetGlobalSigner/IsGlobalSignerSet so LOOP plugins can still inject the CSA keystore after startup. NewAuthHeaderProvider now wraps the (possibly nil) signer in the lazy holder, delegates provider construction to chipingress, and publishes the wrapper globally only for rotating auth. setup.go and the existing call sites are unchanged. --- pkg/durableemitter/auth.go | 183 +++++++++---------------------------- 1 file changed, 41 insertions(+), 142 deletions(-) diff --git a/pkg/durableemitter/auth.go b/pkg/durableemitter/auth.go index 7025f96060..646324e90c 100644 --- a/pkg/durableemitter/auth.go +++ b/pkg/durableemitter/auth.go @@ -2,28 +2,17 @@ package durableemitter import ( "context" - "crypto/ed25519" - "encoding/binary" - "encoding/hex" "errors" - "fmt" - "maps" - "sync" "sync/atomic" "time" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) -const ( - authHeaderKey = "X-Beholder-Node-Auth-Token" - authHeaderVersion = "2" -) - -// Signer signs auth header payloads using the node's CSA key. -type Signer interface { - Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) -} +// Signer signs auth header payloads using the node's CSA key. It is an alias of +// chipingress.Signer so DurableEmitter callers don't need to import chipingress +// directly. +type Signer = chipingress.Signer // AuthConfig configures chip ingress auth headers for DurableEmitter clients. type AuthConfig struct { @@ -35,6 +24,10 @@ type AuthConfig struct { AuthKeySigner Signer } +// lazySigner is a thread-safe wrapper that lets the CSA keystore be injected +// after the header provider is built. LOOP plugins start with rotating auth +// configured but receive the keystore later via SetGlobalSigner. The zero value +// is usable. type lazySigner struct { signer atomic.Pointer[Signer] } @@ -47,149 +40,55 @@ func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byt return (*s).Sign(ctx, keyID, data) } -func (l *lazySigner) Set(signer Signer) { - l.signer.Store(&signer) -} - -func (l *lazySigner) IsSet() bool { - return l.signer.Load() != nil -} +func (l *lazySigner) Set(signer Signer) { l.signer.Store(&signer) } -type staticHeaderProvider struct { - headers map[string]string -} +func (l *lazySigner) IsSet() bool { return l.signer.Load() != nil } -func (p *staticHeaderProvider) Headers(_ context.Context) (map[string]string, error) { - return p.headers, nil -} - -type rotatingHeaderProvider struct { - csaPubKey ed25519.PublicKey - signer Signer - signerTimeout time.Duration - headers atomic.Value // map[string]string - ttl time.Duration - lastUpdatedNanos atomic.Int64 - lazy *lazySigner - mu sync.Mutex -} +// globalSigner holds the lazy wrapper backing the process-wide rotating auth +// provider, set by NewAuthHeaderProvider when rotating auth is configured. +var globalSigner atomic.Pointer[lazySigner] -func (p *rotatingHeaderProvider) SetSigner(signer Signer) { - if p.lazy != nil { - p.lazy.Set(signer) +// NewAuthHeaderProvider builds a chip ingress HeaderProvider for DurableEmitter +// clients, delegating the static/rotating provider logic to +// chipingress.NewHeaderProvider. +// +// For rotating auth (AuthHeadersTTL > 0) the signer is wrapped in a lazy holder +// so the CSA keystore can be injected after startup via SetGlobalSigner. +func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error) { + lazy := &lazySigner{} + if cfg.AuthKeySigner != nil { + lazy.Set(cfg.AuthKeySigner) } -} - -func (p *rotatingHeaderProvider) IsSignerSet() bool { - return p.lazy != nil && p.lazy.IsSet() -} - -func (p *rotatingHeaderProvider) Headers(ctx context.Context) (map[string]string, error) { - returnHeader := make(map[string]string) - lastUpdated := time.Unix(0, p.lastUpdatedNanos.Load()) - - if time.Since(lastUpdated) > p.ttl { - p.mu.Lock() - defer p.mu.Unlock() - - lastUpdated = time.Unix(0, p.lastUpdatedNanos.Load()) - if time.Since(lastUpdated) < p.ttl { - maps.Copy(returnHeader, p.headers.Load().(map[string]string)) - return returnHeader, nil - } - - ctxWithTimeout, cancel := context.WithTimeout(ctx, p.signerTimeout) - defer cancel() - ts := time.Now() - newHeaders, err := newAuthHeaderV2(ctxWithTimeout, p.csaPubKey, p.signer, ts) - if err != nil { - return nil, fmt.Errorf("durableemitter: failed to create auth header: %w", err) - } - - p.headers.Store(newHeaders) - p.lastUpdatedNanos.Store(ts.UnixNano()) + provider, err := chipingress.NewHeaderProvider(chipingress.HeaderProviderConfig{ + AuthHeaders: cfg.AuthHeaders, + AuthHeadersTTL: cfg.AuthHeadersTTL, + AuthPublicKeyHex: cfg.AuthPublicKeyHex, + AuthKeySigner: lazy, + }) + if err != nil { + return nil, err } - maps.Copy(returnHeader, p.headers.Load().(map[string]string)) - return returnHeader, nil -} - -var globalRotatingAuth atomic.Pointer[rotatingHeaderProvider] - -// NewAuthHeaderProvider builds a HeaderProvider for DurableEmitter chip ingress clients. -// -// Static mode (AuthHeadersTTL == 0): returns fixed AuthHeaders. -// Rotating mode (AuthHeadersTTL > 0): uses initial AuthHeaders until TTL expires, then signs fresh headers. -func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error) { + // Only rotating providers consult the signer; publish the lazy wrapper so + // SetGlobalSigner can inject the keystore once it becomes available. if cfg.AuthHeadersTTL > 0 { - if cfg.AuthPublicKeyHex == "" { - return nil, errors.New("auth: public key hex required for rotating auth (TTL > 0)") - } - if cfg.AuthHeadersTTL < 10*time.Minute { - return nil, errors.New("auth: headers TTL must be at least 10 minutes") - } - - pubKey, err := hex.DecodeString(cfg.AuthPublicKeyHex) - if err != nil { - return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err) - } - - lazy := &lazySigner{} - if cfg.AuthKeySigner != nil { - lazy.Set(cfg.AuthKeySigner) - } - - provider := &rotatingHeaderProvider{ - csaPubKey: ed25519.PublicKey(pubKey), - signer: lazy, - signerTimeout: 5 * time.Second, - ttl: cfg.AuthHeadersTTL, - lazy: lazy, - } - - headers := make(map[string]string) - if len(cfg.AuthHeaders) > 0 { - headers = cfg.AuthHeaders - provider.lastUpdatedNanos.Store(time.Now().UnixNano()) - } - provider.headers.Store(headers) - - globalRotatingAuth.Store(provider) - return provider, nil + globalSigner.Store(lazy) } - if len(cfg.AuthHeaders) == 0 { - return nil, nil - } - return &staticHeaderProvider{headers: cfg.AuthHeaders}, nil + return provider, nil } -// SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress auth headers. -// No-op when rotating auth is not configured. +// SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress +// auth headers. No-op when rotating auth is not configured. func SetGlobalSigner(signer Signer) { - if provider := globalRotatingAuth.Load(); provider != nil { - provider.SetSigner(signer) + if lazy := globalSigner.Load(); lazy != nil { + lazy.Set(signer) } } // IsGlobalSignerSet reports whether rotating DurableEmitter auth has a signer configured. func IsGlobalSignerSet() bool { - provider := globalRotatingAuth.Load() - return provider != nil && provider.IsSignerSet() -} - -func newAuthHeaderV2(ctx context.Context, pubKey ed25519.PublicKey, signer Signer, ts time.Time) (map[string]string, error) { - tsBytes := make([]byte, 8) - binary.BigEndian.PutUint64(tsBytes, uint64(ts.UnixNano())) - msgBytes := append(pubKey, tsBytes...) - - signature, err := signer.Sign(ctx, fmt.Sprintf("%x", pubKey), msgBytes) - if err != nil { - return nil, fmt.Errorf("durableemitter: failed to sign auth header: %w", err) - } - - return map[string]string{ - authHeaderKey: fmt.Sprintf("%s:%x:%d:%x", authHeaderVersion, pubKey, ts.UnixNano(), signature), - }, nil + lazy := globalSigner.Load() + return lazy != nil && lazy.IsSet() } From e1eb0b54a47cd134b17aac715708dde35b61a2e4 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 29 May 2026 14:31:56 -0400 Subject: [PATCH 12/12] Update auth_test.go --- pkg/durableemitter/auth_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/durableemitter/auth_test.go b/pkg/durableemitter/auth_test.go index 4a2beb4d02..2d93c136db 100644 --- a/pkg/durableemitter/auth_test.go +++ b/pkg/durableemitter/auth_test.go @@ -21,7 +21,7 @@ func (mockSigner) Sign(_ context.Context, _ string, _ []byte) ([]byte, error) { func TestNewAuthHeaderProvider_Static(t *testing.T) { t.Parallel() - headers := map[string]string{"X-Beholder-Node-Auth-Token": "static"} + headers := map[string]string{"X-Node-Auth-Token": "static"} provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ AuthHeaders: headers, }) @@ -38,7 +38,7 @@ func TestNewAuthHeaderProvider_RotatingDeferredSigner(t *testing.T) { pubKey, _, err := ed25519.GenerateKey(nil) require.NoError(t, err) - initial := map[string]string{"X-Beholder-Node-Auth-Token": "initial"} + initial := map[string]string{"X-Node-Auth-Token": "initial"} provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ AuthHeaders: initial, AuthHeadersTTL: 10 * time.Minute, @@ -62,7 +62,7 @@ func TestNewAuthHeaderProvider_RotatingWithSigner(t *testing.T) { require.NoError(t, err) provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ - AuthHeaders: map[string]string{"X-Beholder-Node-Auth-Token": "initial"}, + AuthHeaders: map[string]string{"X-Node-Auth-Token": "initial"}, AuthHeadersTTL: 10 * time.Minute, AuthPublicKeyHex: hex.EncodeToString(pubKey), AuthKeySigner: mockSigner{}, @@ -72,7 +72,7 @@ func TestNewAuthHeaderProvider_RotatingWithSigner(t *testing.T) { got, err := provider.Headers(t.Context()) require.NoError(t, err) - require.NotEmpty(t, got["X-Beholder-Node-Auth-Token"]) + require.NotEmpty(t, got["X-Node-Auth-Token"]) } func TestNewAuthHeaderProvider_Validation(t *testing.T) {