diff --git a/go.mod b/go.mod index 1d386c4dac..0f2eb7ec09 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.100 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 196784c7d6..7df06f3da0 100644 --- a/go.sum +++ b/go.sum @@ -258,8 +258,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.100 h1:wpiSpmI/eFjY+wx/nPr5VuNF4hki0prIBMKEaQWn3g4= github.com/smartcontractkit/chain-selectors v1.0.100/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146 h1:PlkA7NGpBm5sc2P//crDFgMIQ0qsQhKcpjWV7Qzwqz8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8 h1:5O2qAtvBLzTHBeSIPcxt9i9BCqqOyeroVxN0xbXwoS0= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= diff --git a/pkg/durableemitter/auth.go b/pkg/durableemitter/auth.go new file mode 100644 index 0000000000..646324e90c --- /dev/null +++ b/pkg/durableemitter/auth.go @@ -0,0 +1,94 @@ +package durableemitter + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" +) + +// Signer signs auth header payloads using the node's CSA key. It is an alias of +// chipingress.Signer so DurableEmitter callers don't need to import chipingress +// directly. +type Signer = chipingress.Signer + +// AuthConfig configures chip ingress auth headers for DurableEmitter clients. +type AuthConfig struct { + AuthHeaders map[string]string + AuthHeadersTTL time.Duration + AuthPublicKeyHex string + // AuthKeySigner may be nil at init time for LOOP plugins; call SetGlobalSigner + // after the CSA keystore is available. + AuthKeySigner Signer +} + +// lazySigner is a thread-safe wrapper that lets the CSA keystore be injected +// after the header provider is built. LOOP plugins start with rotating auth +// configured but receive the keystore later via SetGlobalSigner. The zero value +// is usable. +type lazySigner struct { + signer atomic.Pointer[Signer] +} + +func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) { + s := l.signer.Load() + if s == nil { + return nil, errors.New("keystore not yet available for signing") + } + return (*s).Sign(ctx, keyID, data) +} + +func (l *lazySigner) Set(signer Signer) { l.signer.Store(&signer) } + +func (l *lazySigner) IsSet() bool { return l.signer.Load() != nil } + +// globalSigner holds the lazy wrapper backing the process-wide rotating auth +// provider, set by NewAuthHeaderProvider when rotating auth is configured. +var globalSigner atomic.Pointer[lazySigner] + +// NewAuthHeaderProvider builds a chip ingress HeaderProvider for DurableEmitter +// clients, delegating the static/rotating provider logic to +// chipingress.NewHeaderProvider. +// +// For rotating auth (AuthHeadersTTL > 0) the signer is wrapped in a lazy holder +// so the CSA keystore can be injected after startup via SetGlobalSigner. +func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error) { + lazy := &lazySigner{} + if cfg.AuthKeySigner != nil { + lazy.Set(cfg.AuthKeySigner) + } + + provider, err := chipingress.NewHeaderProvider(chipingress.HeaderProviderConfig{ + AuthHeaders: cfg.AuthHeaders, + AuthHeadersTTL: cfg.AuthHeadersTTL, + AuthPublicKeyHex: cfg.AuthPublicKeyHex, + AuthKeySigner: lazy, + }) + if err != nil { + return nil, err + } + + // Only rotating providers consult the signer; publish the lazy wrapper so + // SetGlobalSigner can inject the keystore once it becomes available. + if cfg.AuthHeadersTTL > 0 { + globalSigner.Store(lazy) + } + + return provider, nil +} + +// SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress +// auth headers. No-op when rotating auth is not configured. +func SetGlobalSigner(signer Signer) { + if lazy := globalSigner.Load(); lazy != nil { + lazy.Set(signer) + } +} + +// IsGlobalSignerSet reports whether rotating DurableEmitter auth has a signer configured. +func IsGlobalSignerSet() bool { + lazy := globalSigner.Load() + return lazy != nil && lazy.IsSet() +} diff --git a/pkg/durableemitter/auth_test.go b/pkg/durableemitter/auth_test.go new file mode 100644 index 0000000000..2d93c136db --- /dev/null +++ b/pkg/durableemitter/auth_test.go @@ -0,0 +1,91 @@ +package durableemitter_test + +import ( + "context" + "crypto/ed25519" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" +) + +type mockSigner struct{} + +func (mockSigner) Sign(_ context.Context, _ string, _ []byte) ([]byte, error) { + return []byte("signature"), nil +} + +func TestNewAuthHeaderProvider_Static(t *testing.T) { + t.Parallel() + + headers := map[string]string{"X-Node-Auth-Token": "static"} + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: headers, + }) + require.NoError(t, err) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.Equal(t, headers, got) +} + +func TestNewAuthHeaderProvider_RotatingDeferredSigner(t *testing.T) { + t.Parallel() + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + + initial := map[string]string{"X-Node-Auth-Token": "initial"} + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: initial, + AuthHeadersTTL: 10 * time.Minute, + AuthPublicKeyHex: hex.EncodeToString(pubKey), + }) + require.NoError(t, err) + require.False(t, durableemitter.IsGlobalSignerSet()) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.Equal(t, initial, got) + + durableemitter.SetGlobalSigner(mockSigner{}) + require.True(t, durableemitter.IsGlobalSignerSet()) +} + +func TestNewAuthHeaderProvider_RotatingWithSigner(t *testing.T) { + t.Parallel() + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + + provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeaders: map[string]string{"X-Node-Auth-Token": "initial"}, + AuthHeadersTTL: 10 * time.Minute, + AuthPublicKeyHex: hex.EncodeToString(pubKey), + AuthKeySigner: mockSigner{}, + }) + require.NoError(t, err) + require.True(t, durableemitter.IsGlobalSignerSet()) + + got, err := provider.Headers(t.Context()) + require.NoError(t, err) + require.NotEmpty(t, got["X-Node-Auth-Token"]) +} + +func TestNewAuthHeaderProvider_Validation(t *testing.T) { + t.Parallel() + + _, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeadersTTL: 10 * time.Minute, + }) + require.ErrorContains(t, err, "public key hex required") + + _, err = durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{ + AuthHeadersTTL: time.Minute, + AuthPublicKeyHex: "abcd", + }) + require.ErrorContains(t, err, "at least 10 minutes") +} diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index 71ca2701fe..913e2f4823 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -11,58 +11,66 @@ import ( "time" "go.opentelemetry.io/otel/metric" - "google.golang.org/grpc" - grpcEncoding "google.golang.org/grpc/encoding" - "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" ) -// DurableEmitterConfig configures the DurableEmitter behaviour. -type DurableEmitterConfig struct { +// BatchEmitter is the transport interface DurableEmitter delegates to for +// batched delivery of CloudEvents to Chip Ingress. +// +// *batch.Client from pkg/chipingress/batch satisfies this interface and +// handles seqnum stamping, gRPC size splitting, concurrency limiting, and +// graceful shutdown with a configurable timeout. +// +// The callback passed to QueueMessage is invoked once after the batch +// containing the event is sent. A nil error means the RPC succeeded; a +// non-nil error means the batch was dropped — the event remains in the DB +// and the retransmit loop will retry it. +type BatchEmitter interface { + // QueueMessage enqueues a single CloudEvent for batched delivery. + // Returns an error only if the internal buffer is full or the client + // has been stopped. Callers must treat a non-nil return as a + // drop (the event is still persisted; retransmit will retry). + QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error + // Start begins background processing. Must be called before QueueMessage. + Start(ctx context.Context) + // Stop flushes any queued events, waits for all in-flight network calls + // and callbacks to complete, then closes the underlying transport. + Stop() +} + +// Config configures the DurableEmitter behaviour. +type Config struct { // RetransmitInterval controls how often the retransmit loop ticks. RetransmitInterval time.Duration // RetransmitAfter is the minimum age of an event before the retransmit // loop considers it. This gives the batch publish path time to succeed. RetransmitAfter time.Duration - // RetransmitBatchSize caps how many pending rows are listed per retransmit tick - // (each row is enqueued for the batch publish workers). + // RetransmitBatchSize caps how many pending rows are listed per retransmit tick. RetransmitBatchSize int // ExpiryInterval controls how often the expiry loop ticks. ExpiryInterval time.Duration // EventTTL is the maximum age of an event before it is expired. EventTTL time.Duration - // PublishTimeout is the per-RPC deadline for each Publish call. + // PublishTimeout is the deadline for DB operations in delivery callbacks + // (MarkDeliveredBatch). The actual gRPC publish timeout is configured on + // the BatchEmitter (batch.Client) directly. PublishTimeout time.Duration // PurgeInterval is how often the purge loop runs to batch-delete rows that // were marked delivered (Postgres). Zero defaults to 250ms. PurgeInterval time.Duration // PurgeBatchSize is the maximum rows removed per PurgeDelivered call. Zero defaults to 500. PurgeBatchSize int - // PublishBatchSize is the target number of events per PublishBatch RPC. Values below 1 are - // clamped to 1 in NewDurableEmitter. - PublishBatchSize int - // PublishBatchWorkers is the number of concurrent goroutines that read from - // the batch channel and call PublishBatch. More workers means higher - // throughput (each worker handles one in-flight batch at a time). Zero defaults to 1. - PublishBatchWorkers int - // PublishBatchFlushInterval is the maximum time to wait for a full batch - // before flushing a partial one. Zero defaults to 50ms. - PublishBatchFlushInterval time.Duration - // PublishBatchChannelSize overrides the publishCh buffer capacity. Zero - // defaults to max(PublishBatchSize*2000, 200_000). - PublishBatchChannelSize int // DisablePruning disables the background purge (PurgeDelivered) and expiry // (DeleteExpired) loops. Events remain in the DB after delivery. Useful for // post-test analysis of created_at / delivered_at timestamps. DisablePruning bool // Hooks is optional instrumentation (load tests, profiling). Nil fields are skipped. // Callbacks may run from many goroutines; implementations must be thread-safe. - Hooks *DurableEmitterHooks + Hooks *Hooks // Metrics enables OpenTelemetry instruments (queue, publish, store, optional process stats). // When non-nil, a meter must be supplied to NewDurableEmitter; nil disables instrumentation. Metrics *DurableEmitterMetricsConfig @@ -79,20 +87,22 @@ type DurableEmitterConfig struct { InsertBatchWorkers int } -// DurableEmitterHooks records Publish vs Delete latency to locate pipeline bottlenecks. -type DurableEmitterHooks struct { +// Hooks records delivery latency to locate pipeline bottlenecks. +type Hooks struct { // OnEmitInsert is called after each store.Insert in Emit (the DB write that // blocks the caller). elapsed covers only the INSERT; err is nil on success. OnEmitInsert func(elapsed time.Duration, err error) - // OnBatchPublish is called after each PublishBatch RPC in the batch publish loop. - // batchSize is the number of events in the batch; err is nil on success. + // OnBatchPublish is called from the delivery callback after each event's + // batch is sent. elapsed is measured from QueueMessage call to callback + // invocation; batchSize is always 1 (one callback per event); err is nil + // on success. OnBatchPublish func(elapsed time.Duration, batchSize int, err error) - // OnBatchMarkDelivered is called after MarkDeliveredBatch following a successful batch publish. + // OnBatchMarkDelivered is called after MarkDeliveredBatch following a successful delivery. OnBatchMarkDelivered func(elapsed time.Duration, count int) } -func DefaultDurableEmitterConfig() DurableEmitterConfig { - return DurableEmitterConfig{ +func DefaultConfig() Config { + return Config{ RetransmitInterval: 5 * time.Second, RetransmitAfter: 10 * time.Second, RetransmitBatchSize: 100, @@ -101,7 +111,6 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { PublishTimeout: 5 * time.Second, PurgeInterval: 250 * time.Millisecond, PurgeBatchSize: 500, - PublishBatchSize: 1, // Metrics is opt-in: callers who want instrumentation must set this // and pass a metric.Meter to NewDurableEmitter. Metrics: nil, @@ -110,21 +119,15 @@ func DefaultDurableEmitterConfig() DurableEmitterConfig { // DurableEmitter implements Emitter with persistence-backed delivery guarantees. // -// Emit writes to a DurableEventStore, returns nil after insert, and enqueues the -// row for async PublishBatch delivery. Successful publishes are followed by -// MarkDeliveredBatch; the purge loop removes delivered rows from Postgres. When -// Chip is down or the publish channel is full, a retransmit loop lists stale -// pending rows and re-enqueues them to the same batch workers (up to +// Emit writes to a DurableEventStore then hands the event to the BatchEmitter +// for async delivery. The delivery callback from BatchEmitter marks the row +// delivered; the purge loop removes delivered rows from Postgres. When the +// batch emitter buffer is full or the network is down, a retransmit loop lists +// stale pending rows and re-enqueues them through the same BatchEmitter (up to // RetransmitBatchSize per tick). // // A separate expiry loop garbage-collects events older than EventTTL to bound // table growth. -// publishWork is a unit of work for the batch publish channel. -type publishWork struct { - id int64 - payload []byte // serialized CloudEvent proto (always set) - event *chipingress.CloudEventPb // original struct; set from Emit(), nil from retransmit -} // insertRequest is a single Emit() caller waiting for a coalesced batch INSERT. type insertRequest struct { @@ -141,19 +144,27 @@ type DurableEmitter struct { services.Service eng *services.Engine - store DurableEventStore - client chipingress.Client - // isHostProcess determines if the emitter runs retransmit and cleanup loops. - // Should be set to false when initialized inside LOOP plugins. - isHostProcess bool - cfg DurableEmitterConfig + store DurableEventStore + batchEmitter BatchEmitter + // fallbackClient, when non-nil, is used for single-event per-RPC retry + // whenever the batch emitter reports a delivery failure. Each failed event + // is retried individually via Publish (not PublishBatch) in a goroutine. + // If the single-event retry also fails the event stays in the DB and the + // retransmit loop will eventually deliver it. DurableEmitter owns this + // client and closes it during shutdown. + fallbackClient chipingress.Client + // fallbackWg tracks in-flight single-event fallback goroutines. It is + // waited on after batchEmitter.Stop() so that all fallback attempts that + // were spawned during the final flush can complete before we close the + // fallback client connection. + fallbackWg sync.WaitGroup + // retransmitEnabled controls whether this instance runs the retransmit and + // cleanup loops. Should be set to false when initialized inside LOOP plugins. + retransmitEnabled bool + cfg Config metrics *durableEmitterMetrics - // rawConn is the underlying gRPC connection when the client exposes it. - // Non-nil enables zero-copy batch publishing (protowire + ForceCodec). - rawConn *grpc.ClientConn - // batchInserter is non-nil when the store supports multi-row INSERTs // and InsertBatchSize > 0. batchInserter BatchInserter @@ -171,65 +182,11 @@ type DurableEmitter struct { pendingCount atomic.Int64 pendingMax atomic.Int64 - // publishCh buffers events for the batch publish loop. - publishCh chan publishWork - - // stopCh signals background loops to exit. It is owned by DurableEmitter - // (not the service engine) because shutdown must drain coalesced inserts - // and close publishCh in a specific order after all workers stop. + // stopCh signals background loops to exit. stopCh services.StopChan wg sync.WaitGroup } -// grpcConnProvider is an optional interface for clients that expose the -// underlying gRPC connection. When satisfied, DurableEmitter uses zero-copy -// batch publishing to avoid protobuf marshal/unmarshal overhead. -type grpcConnProvider interface { - Conn() *grpc.ClientConn -} - -// rawBytesCodec is a gRPC codec that passes []byte through without -// marshal/unmarshal. Name returns "proto" so the wire content-type is -// application/grpc+proto, matching what Chip Ingress expects. -type rawBytesCodec struct{} - -func (rawBytesCodec) Marshal(v any) ([]byte, error) { - b, ok := v.([]byte) - if !ok { - return nil, fmt.Errorf("rawBytesCodec.Marshal: expected []byte, got %T", v) - } - return b, nil -} - -func (rawBytesCodec) Unmarshal(data []byte, v any) error { - bp, ok := v.(*[]byte) - if !ok { - return fmt.Errorf("rawBytesCodec.Unmarshal: expected *[]byte, got %T", v) - } - *bp = data - return nil -} - -func (rawBytesCodec) Name() string { return "proto" } - -var _ grpcEncoding.Codec = rawBytesCodec{} - -// buildBatchBytes constructs the protobuf wire format for a CloudEventBatch -// directly from already-serialized CloudEvent payloads. Each payload is -// wrapped with the field-1 tag and length prefix — no unmarshal/re-marshal. -func buildBatchBytes(payloads [][]byte) []byte { - size := 0 - for _, p := range payloads { - size += protowire.SizeTag(1) + protowire.SizeBytes(len(p)) - } - buf := make([]byte, 0, size) - for _, p := range payloads { - buf = protowire.AppendTag(buf, 1, protowire.BytesType) - buf = protowire.AppendBytes(buf, p) - } - return buf -} - // Compile-time assertion that *DurableEmitter exposes the canonical emit and // close methods expected of an emitter. var _ interface { @@ -237,32 +194,35 @@ var _ interface { io.Closer } = (*DurableEmitter)(nil) -// NewDurableEmitter constructs a DurableEmitter as a services.Service. +// NewDurableEmitter constructs a DurableEmitter as a service. // -// meter is the OpenTelemetry Meter used for instrument registration. It is -// required when cfg.Metrics is non-nil and must be supplied by the caller -// (e.g. otel.Meter("durableemitter") or a meter from the host's -// MeterProvider). Pass nil when metrics are disabled. +// batchEmitter is the transport layer (typically *batch.Client from +// pkg/chipingress/batch) responsible for batched gRPC delivery, seqnum +// stamping, size splitting, and concurrency limiting. +// +// fallbackClient, when non-nil, is used to retry individual events via a +// direct unary Publish RPC whenever the batch emitter reports a delivery +// failure. This gives a fast second-chance path before the DB-backed +// retransmit loop kicks in. Pass nil to disable single-event fallback +// (events are left in the DB and delivered by the retransmit loop). func NewDurableEmitter( store DurableEventStore, - client chipingress.Client, - isHostProcess bool, - cfg DurableEmitterConfig, + batchEmitter BatchEmitter, + fallbackClient chipingress.Client, + retransmitEnabled bool, + cfg Config, lggr logger.Logger, meter metric.Meter, ) (*DurableEmitter, error) { if store == nil { return nil, errors.New("durable event store is nil") } - if client == nil { - return nil, errors.New("chipingress client is nil") + if batchEmitter == nil { + return nil, errors.New("batch emitter is nil") } if lggr == nil { return nil, errors.New("logger is nil") } - if cfg.PublishBatchSize < 1 { - cfg.PublishBatchSize = 1 - } var m *durableEmitterMetrics if cfg.Metrics != nil { if meter == nil { @@ -276,12 +236,13 @@ func NewDurableEmitter( store = newMetricsInstrumentedStore(store, m) } d := &DurableEmitter{ - store: store, - client: client, - isHostProcess: isHostProcess, - cfg: cfg, - metrics: m, - stopCh: make(chan struct{}), + store: store, + batchEmitter: batchEmitter, + fallbackClient: fallbackClient, + retransmitEnabled: retransmitEnabled, + cfg: cfg, + metrics: m, + stopCh: make(chan struct{}), } d.Service, d.eng = services.Config{ Name: "DurableEmitter", @@ -289,10 +250,6 @@ func NewDurableEmitter( Close: d.stop, }.NewServiceEngine(lggr) - if cp, ok := client.(grpcConnProvider); ok { - d.rawConn = cp.Conn() - d.eng.Infow("DurableEmitter: raw-codec batch publishing enabled (zero-copy protowire)") - } if cfg.InsertBatchSize > 0 { if bi, ok := store.(BatchInserter); ok { d.batchInserter = bi @@ -307,58 +264,42 @@ func NewDurableEmitter( "insertBatchFlushInterval", cfg.InsertBatchFlushInterval) } } - bufSize := cfg.PublishBatchChannelSize - if bufSize <= 0 { - bufSize = cfg.PublishBatchSize * 2000 - if bufSize < 200_000 { - bufSize = 200_000 - } - } - d.publishCh = make(chan publishWork, bufSize) return d, nil } -// start launches the retransmit, expiry, purge, and (optionally) batch publish -// background loops. It is invoked by the services.Engine when the embedded -// Service is started; callers should not call this directly. The supplied -// context must not be retained beyond start (per services.Service contract); -// loops use d.stopCh.NewCtx() for their own lifecycle. -func (d *DurableEmitter) start(_ context.Context) error { - batchWorkers := d.cfg.PublishBatchWorkers - if batchWorkers <= 0 { - d.eng.Warnw("configured batchWorkers <=0; defaulting to 1") - batchWorkers = 1 - } +// start launches the retransmit, expiry, purge, and insert-coalescing background +// loops, then starts the batch emitter transport. It is invoked by the +// services.Engine when the embedded Service is started. +func (d *DurableEmitter) start(ctx context.Context) error { + d.batchEmitter.Start(ctx) + insertWorkers := d.cfg.InsertBatchWorkers if insertWorkers <= 0 { - d.eng.Warnw("configured insertWorkers <=0; defaulting to 4") insertWorkers = 4 } + if d.insertCh != nil { + for i := 0; i < insertWorkers; i++ { + d.wg.Go(d.insertBatchLoop) + } + } - if d.isHostProcess { + if d.retransmitEnabled { d.wg.Go(d.retransmitLoop) if !d.cfg.DisablePruning { d.wg.Go(d.expiryLoop) d.wg.Go(d.purgeLoop) } } - if d.insertCh != nil { - for i := 0; i < insertWorkers; i++ { - d.wg.Go(d.insertBatchLoop) - } - } - for i := 0; i < batchWorkers; i++ { - d.wg.Go(d.batchPublishLoop) - } if d.metrics != nil && d.cfg.Metrics != nil { d.wg.Go(d.metricsLoop) } return nil } -// Emit persists the event then enqueues it for async PublishBatch delivery. Returns nil once -// the insert is accepted (or the coalesced insert path completes successfully). Returns an -// error when the service is not in the Started state (e.g. before Start or after Close). +// Emit persists the event then hands it to the BatchEmitter for async delivery. +// Returns nil once the insert is accepted (or the coalesced insert path +// completes successfully). Returns an error when the service is not in the +// Started state (e.g. before Start or after Close). func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { return d.eng.IfStarted(func() error { tEmitTotal := time.Now() @@ -468,32 +409,113 @@ func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) d.incPending(1) - // Batch path: enqueue for batch publish loop (PublishBatchSize is always >= 1). - work := publishWork{id: id, payload: payload} - if d.rawConn == nil { - work.event = eventPb - } - select { - case d.publishCh <- work: - return nil - default: - // Channel full — event is safely in the DB; retransmit loop will deliver it. - d.eng.Warnw("DurableEmitter: batch publish channel full, relying on retransmit", - "id", id, "ch_len", len(d.publishCh), "ch_cap", cap(d.publishCh)) + // Hand off to the batch emitter. The callback fires once the batch + // containing this event is sent (success or failure). eventPb is + // captured in the closure so the fallback path can retry without a + // DB round-trip. + t0Publish := time.Now() + if qErr := d.batchEmitter.QueueMessage(eventPb, d.deliveryCallback(id, eventPb, t0Publish)); qErr != nil { + d.eng.Warnw("DurableEmitter: batch emitter buffer full, relying on retransmit", "id", id) } return nil }) } -// stop signals background loops to stop and waits for them to finish. It is -// invoked by the services.Engine when the embedded Service is closed; callers -// should not call this directly. -// -// When coalesced inserts are enabled, insertShutdown and insertInFlight drain -// before close(stopCh) so Emit can finish enqueueing to publishCh after a -// successful insert (receive on a closed stopCh in select would race with -// default). Then stopCh is closed, workers exit, and publishCh is closed -// after wg.Wait. +// deliveryCallback returns the function passed to BatchEmitter.QueueMessage. +// On success, it marks the event delivered. On failure, it attempts a +// single-event fallback via fallbackClient (when configured) in a goroutine +// before leaving the event in the DB for the retransmit loop. +func (d *DurableEmitter) deliveryCallback(id int64, eventPb *chipingress.CloudEventPb, t0Publish time.Time) func(error) { + return func(sendErr error) { + publishElapsed := time.Since(t0Publish) + + if h := d.cfg.Hooks; h != nil && h.OnBatchPublish != nil { + h.OnBatchPublish(publishElapsed, 1, sendErr) + } + + cbCtx, cbCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer cbCancel() + + d.metrics.recordPublish(cbCtx, publishElapsed, "batch", sendErr) + + if sendErr != nil { + if d.metrics != nil { + d.metrics.publishBatchEvErr.Add(cbCtx, 1) + } + // Batch path failed. If a fallback client is configured, retry the + // single event directly; otherwise leave in DB for retransmit. + d.tryFallback(id, eventPb) + return + } + + if d.metrics != nil { + d.metrics.publishBatchEvOK.Add(cbCtx, 1) + } + + tMark := time.Now() + marked, markErr := d.store.MarkDeliveredBatch(cbCtx, []int64{id}) + markElapsed := time.Since(tMark) + + if h := d.cfg.Hooks; h != nil && h.OnBatchMarkDelivered != nil { + h.OnBatchMarkDelivered(markElapsed, int(marked)) + } + if markErr != nil { + d.eng.Errorw("failed to mark event delivered", "id", id, "error", markErr) + return + } + d.decPending(marked) + if d.metrics != nil { + d.metrics.deliverComplete.Add(cbCtx, marked) + } + } +} + +// tryFallback spawns a goroutine that retries a single event via the direct +// chipingress.Client.Publish RPC. If fallbackClient is nil this is a no-op +// and the event is left in the DB for the retransmit loop. +func (d *DurableEmitter) tryFallback(id int64, eventPb *chipingress.CloudEventPb) { + if d.fallbackClient == nil { + return + } + d.fallbackWg.Add(1) + go func() { + defer d.fallbackWg.Done() + d.singleEventFallback(id, eventPb) + }() +} + +// singleEventFallback sends a single event directly via the fallback +// chipingress.Client. On success, it marks the event delivered and decrements +// the pending counter. On failure, it logs and returns — the event remains in +// the DB and the retransmit loop will eventually deliver it. +func (d *DurableEmitter) singleEventFallback(id int64, eventPb *chipingress.CloudEventPb) { + pubCtx, pubCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer pubCancel() + + if _, err := d.fallbackClient.Publish(pubCtx, eventPb); err != nil { + d.eng.Warnw("DurableEmitter: single-event fallback publish failed, relying on retransmit", + "id", id, "error", err) + return + } + + markCtx, markCancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) + defer markCancel() + + marked, markErr := d.store.MarkDeliveredBatch(markCtx, []int64{id}) + if markErr != nil { + d.eng.Errorw("DurableEmitter: failed to mark fallback event delivered", "id", id, "error", markErr) + return + } + d.decPending(marked) + if d.metrics != nil { + d.metrics.deliverComplete.Add(markCtx, marked) + } +} + +// stop signals background loops to stop and waits for them to finish, then +// stops the batch emitter (which flushes any queued events and waits for all +// in-flight callbacks). It is invoked by the services.Engine when the embedded +// Service is closed. func (d *DurableEmitter) stop() error { if d.insertCh != nil { d.insertShutdown.Store(true) @@ -504,14 +526,22 @@ func (d *DurableEmitter) stop() error { } close(d.stopCh) d.wg.Wait() - close(d.publishCh) + // Stop the batch emitter: flushes remaining queued events, waits for all + // in-flight PublishBatch RPCs, and waits for all delivery callbacks. + // Delivery callbacks may spawn single-event fallback goroutines tracked by + // fallbackWg, so we wait on those next. + d.batchEmitter.Stop() + d.fallbackWg.Wait() + if d.fallbackClient != nil { + if err := d.fallbackClient.Close(); err != nil { + d.eng.Warnw("DurableEmitter: error closing fallback chip client", "error", err) + } + } return nil } // insertBatchLoop collects insertRequest items from insertCh and flushes them -// as multi-row INSERTs via BatchInserter.InsertBatch. Uses a linger pattern: -// blocks for the first request, then collects more until the batch is full or -// the flush interval elapses. +// as multi-row INSERTs via BatchInserter.InsertBatch. func (d *DurableEmitter) insertBatchLoop() { batchSize := d.cfg.InsertBatchSize linger := d.cfg.InsertBatchFlushInterval @@ -523,15 +553,12 @@ func (d *DurableEmitter) insertBatchLoop() { for { batch = batch[:0] - // Exit only when insertCh is closed and drained; do not exit on stopCh - // or Emit callers blocked on req.result would hang. req, ok := <-d.insertCh if !ok { return } batch = append(batch, req) - // Linger to collect more. timer := time.NewTimer(linger) collecting: for len(batch) < batchSize { @@ -552,8 +579,6 @@ func (d *DurableEmitter) insertBatchLoop() { for i, r := range batch { payloads[i] = r.payload } - // Detached from stopCh so closing stopCh does not cancel inserts while - // draining insertCh during shutdown. ctx, cancel := context.WithTimeout(context.Background(), d.cfg.PublishTimeout) ids, batchErr := d.batchInserter.InsertBatch(ctx, payloads) cancel() @@ -606,213 +631,6 @@ func (d *DurableEmitter) decPending(n int64) { } } -// batchPublishLoop reads events from publishCh, collects them into batches of -// PublishBatchSize, and sends each batch via PublishBatch RPC. It blocks until -// the batch is full or PublishBatchFlushInterval elapses after the first event -// arrives (linger pattern), guaranteeing full batches at high throughput. -func (d *DurableEmitter) batchPublishLoop() { - batchSize := d.cfg.PublishBatchSize - flushInterval := d.cfg.PublishBatchFlushInterval - if flushInterval <= 0 { - flushInterval = 50 * time.Millisecond - } - - batch := make([]publishWork, 0, batchSize) - - for { - // Stage 1: block until at least one event arrives (or shutdown). - select { - case w, ok := <-d.publishCh: - if !ok { - return - } - batch = append(batch, w) - case <-d.stopCh: - d.drainPublishChOnShutdown(batch) - return - } - - // Stage 2: linger — keep reading until batch is full or deadline. - linger := time.NewTimer(flushInterval) - fill: - for len(batch) < batchSize { - select { - case w, ok := <-d.publishCh: - if !ok { - linger.Stop() - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - batch = append(batch, w) - case <-linger.C: - break fill - case <-d.stopCh: - linger.Stop() - d.drainPublishChOnShutdown(batch) - return - } - } - linger.Stop() - - d.flushBatch(batch) - batch = batch[:0] - } -} - -// drainPublishChOnShutdown empties publishCh after stopCh has fired but before -// Close closes publishCh; finishes any in-flight batches. -func (d *DurableEmitter) drainPublishChOnShutdown(batch []publishWork) { - bs := d.cfg.PublishBatchSize - if bs < 1 { - bs = 1 - } - for { - select { - case w, ok := <-d.publishCh: - if !ok { - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - batch = append(batch, w) - if len(batch) >= bs { - d.flushBatch(batch) - batch = batch[:0] - } - default: - if len(batch) > 0 { - d.flushBatch(batch) - } - return - } - } -} - -// drainPublishCh flushes the given partial batch plus any remaining items on -// publishCh in batchSize chunks. Called during shutdown (ctx cancelled or -// channel closed). -func (d *DurableEmitter) drainPublishCh(batch []publishWork) { - for w := range d.publishCh { - batch = append(batch, w) - if len(batch) >= d.cfg.PublishBatchSize { - d.flushBatch(batch) - batch = batch[:0] - } - } - if len(batch) > 0 { - d.flushBatch(batch) - } -} - -// flushBatch publishes a collected batch via PublishBatch and marks all events -// as delivered in a single MarkDeliveredBatch call. -// -// When rawConn is available, batch wire bytes are built directly from the -// already-serialized payloads using protowire — zero unmarshal/re-marshal. -// Otherwise, falls back to the typed PublishBatch RPC. -func (d *DurableEmitter) flushBatch(batch []publishWork) { - ids := make([]int64, len(batch)) - for i, w := range batch { - ids[i] = w.id - } - - ctx, cancel := d.stopCh.NewCtx() - defer cancel() - pubCtx, pubCancel := context.WithTimeout(ctx, d.cfg.PublishTimeout) - defer pubCancel() - - t0 := time.Now() - var err error - if d.rawConn != nil { - err = d.flushBatchRaw(pubCtx, batch) - } else { - err = d.flushBatchTyped(pubCtx, batch) - } - elapsed := time.Since(t0) - - if h := d.cfg.Hooks; h != nil && h.OnBatchPublish != nil { - h.OnBatchPublish(elapsed, len(batch), err) - } - d.metrics.recordPublish(ctx, elapsed, "batch", err) - - if err != nil { - if d.metrics != nil { - d.metrics.publishBatchEvErr.Add(ctx, int64(len(batch))) - } - d.eng.Warnw("DurableEmitter: PublishBatch failed, events will be retransmitted", - "batch_size", len(batch), "error", err, - "elapsed_ms", elapsed.Milliseconds()) - return - } - - if d.metrics != nil { - d.metrics.publishBatchEvOK.Add(pubCtx, int64(len(batch))) - } - - // Async MarkDelivered: the DB UPDATE runs in a background goroutine so - // the batch worker can immediately start collecting the next batch. - // If MarkDelivered fails, events stay pending and the retransmit loop - // delivers them (at-least-once semantics are unchanged). - d.wg.Go(func() { - tMark := time.Now() - marked, markErr := d.store.MarkDeliveredBatch(ctx, ids) - markElapsed := time.Since(tMark) - if h := d.cfg.Hooks; h != nil && h.OnBatchMarkDelivered != nil { - h.OnBatchMarkDelivered(markElapsed, int(marked)) - } - if markErr != nil { - d.eng.Errorw("failed to batch-mark events delivered", "batch_size", len(ids), "error", markErr) - return - } - d.decPending(marked) - if d.metrics != nil { - d.metrics.deliverComplete.Add(ctx, marked) - } - }) -} - -// flushBatchRaw builds the CloudEventBatch wire format directly from -// already-serialized payloads and sends it via raw gRPC Invoke — zero -// protobuf unmarshal/re-marshal overhead. -func (d *DurableEmitter) flushBatchRaw(ctx context.Context, batch []publishWork) error { - payloads := make([][]byte, len(batch)) - for i, w := range batch { - payloads[i] = w.payload - } - batchBytes := buildBatchBytes(payloads) - var respBytes []byte - return d.rawConn.Invoke( - ctx, - pb.ChipIngress_PublishBatch_FullMethodName, - batchBytes, - &respBytes, - grpc.ForceCodec(rawBytesCodec{}), - ) -} - -// flushBatchTyped builds a typed CloudEventBatch and sends it via the -// standard gRPC client. Used as fallback when rawConn is not available. -func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWork) error { - events := make([]*chipingress.CloudEventPb, len(batch)) - for i, w := range batch { - if w.event != nil { - events[i] = w.event - } else { - ev := new(chipingress.CloudEventPb) - if err := proto.Unmarshal(w.payload, ev); err != nil { - return fmt.Errorf("unmarshal event %d for typed publish: %w", i, err) - } - events[i] = ev - } - } - batchPb := &chipingress.CloudEventBatch{Events: events} - _, err := d.client.PublishBatch(ctx, batchPb) - return err -} - func (d *DurableEmitter) retransmitLoop() { ticker := time.NewTicker(d.cfg.RetransmitInterval) defer ticker.Stop() @@ -862,11 +680,10 @@ func (d *DurableEmitter) retransmitPending() { d.retransmit(pending) } -// retransmit enqueues pending DB rows to publishCh so the batch workers handle -// publishing. When rawConn is set, payloads are passed through without -// proto.Unmarshal — the batch workers use buildBatchBytes for the wire format. +// retransmit re-enqueues pending DB rows through the batch emitter. Each row +// gets its own delivery callback that marks it delivered on success. func (d *DurableEmitter) retransmit(pending []DurableEvent) { - var enqueued int + var enqueued, skipped int for _, pe := range pending { select { @@ -874,21 +691,25 @@ func (d *DurableEmitter) retransmit(pending []DurableEvent) { return default: } - work := publishWork{id: pe.ID, payload: pe.Payload} - select { - case d.publishCh <- work: + eventPb := new(chipingress.CloudEventPb) + if err := proto.Unmarshal(pe.Payload, eventPb); err != nil { + d.eng.Errorw("DurableEmitter: failed to unmarshal event for retransmit", "id", pe.ID, "error", err) + continue + } + + id := pe.ID + if err := d.batchEmitter.QueueMessage(eventPb, d.deliveryCallback(id, eventPb, time.Now())); err != nil { + skipped++ + } else { enqueued++ - default: } } - d.eng.Infow("DurableEmitter: retransmit enqueued to batch workers", + d.eng.Infow("DurableEmitter: retransmit queued to batch emitter", "enqueued", enqueued, - "skipped_ch_full", len(pending)-enqueued, + "skipped_buffer_full", skipped, "total_pending", len(pending), - "ch_len", len(d.publishCh), - "ch_cap", cap(d.publishCh), ) } diff --git a/pkg/durableemitter/durable_emitter_metrics.go b/pkg/durableemitter/durable_emitter_metrics.go index a3ef68969c..ce7cf92bb9 100644 --- a/pkg/durableemitter/durable_emitter_metrics.go +++ b/pkg/durableemitter/durable_emitter_metrics.go @@ -10,7 +10,7 @@ import ( ) // DurableEmitterMetricsConfig enables OpenTelemetry metrics for DurableEmitter. -// Set on DurableEmitterConfig.Metrics; nil disables instrumentation. +// Set on Config.Metrics; nil disables instrumentation. // // When non-nil, an otel Meter must be supplied to NewDurableEmitter so that // instruments can be registered. DurableEmitter does not look up a global diff --git a/pkg/durableemitter/durable_emitter_test.go b/pkg/durableemitter/durable_emitter_test.go index 7bff67963a..1482f3b6c5 100644 --- a/pkg/durableemitter/durable_emitter_test.go +++ b/pkg/durableemitter/durable_emitter_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -39,76 +40,70 @@ func newTestMeter(t *testing.T) (metric.Meter, *sdkmetric.ManualReader) { return mp.Meter("durableemitter"), reader } -// testChipClient is a minimal chipingress.Client for tests. -type testChipClient struct { - chipingress.NoopClient - - mu sync.Mutex - publishErr error - publishCount atomic.Int64 - batchCount atomic.Int64 - publishedIDs []string +// testBatchEmitter is a minimal BatchEmitter for unit tests. +// QueueMessage invokes the callback asynchronously (like batch.Client), using +// publishErr as the result. callCount tracks how many events were enqueued. +type testBatchEmitter struct { + mu sync.Mutex + publishErr error + callCount atomic.Int64 + + stopOnce sync.Once + stopCh chan struct{} + wg sync.WaitGroup } -func (c *testChipClient) Publish(_ context.Context, ev *chipingress.CloudEventPb, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { - c.publishCount.Add(1) - c.mu.Lock() - if ev != nil { - c.publishedIDs = append(c.publishedIDs, ev.Id) - } - err := c.publishErr - c.mu.Unlock() - return &chipingress.PublishResponse{}, err +func newTestBatchEmitter() *testBatchEmitter { + return &testBatchEmitter{stopCh: make(chan struct{})} } -// PublishBatch mirrors production semantics: respect publishErr and count as a -// separate RPC (batch path / tests that assert Publish only would miss it). -func (c *testChipClient) PublishBatch(_ context.Context, b *chipingress.CloudEventBatch, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { - c.batchCount.Add(1) - c.mu.Lock() - if b != nil { - for _, ev := range b.Events { - if ev != nil { - c.publishedIDs = append(c.publishedIDs, ev.Id) - } - } +func (b *testBatchEmitter) QueueMessage(event *chipingress.CloudEventPb, cb func(error)) error { + select { + case <-b.stopCh: + return errors.New("batch emitter stopped") + default: } - err := c.publishErr - c.mu.Unlock() - return &chipingress.PublishResponse{}, err + b.mu.Lock() + err := b.publishErr + b.mu.Unlock() + + b.callCount.Add(1) + if cb != nil { + b.wg.Add(1) + go func() { + defer b.wg.Done() + cb(err) + }() + } + return nil } -// totalChipRPCs is unary Publish + PublishBatch for assertions that do not care which path ran. -func (c *testChipClient) totalChipRPCs() int64 { - return c.publishCount.Load() + c.batchCount.Load() -} +func (b *testBatchEmitter) Start(_ context.Context) {} -func (c *testChipClient) setPublishErr(err error) { - c.mu.Lock() - defer c.mu.Unlock() - c.publishErr = err +func (b *testBatchEmitter) Stop() { + b.stopOnce.Do(func() { + close(b.stopCh) + b.wg.Wait() + }) } -func (c *testChipClient) getPublishedIDs() []string { - c.mu.Lock() - defer c.mu.Unlock() - out := make([]string, len(c.publishedIDs)) - copy(out, c.publishedIDs) - return out +func (b *testBatchEmitter) setPublishErr(err error) { + b.mu.Lock() + defer b.mu.Unlock() + b.publishErr = err } func testEmitAttrs() []any { return []any{"source", "test-source", "type", "test-type"} } -func newTestDurableEmitter(t *testing.T, store DurableEventStore, client chipingress.Client, cfgOverride *DurableEmitterConfig) *DurableEmitter { +func newTestDurableEmitter(t *testing.T, store DurableEventStore, be BatchEmitter, cfgOverride *Config) *DurableEmitter { t.Helper() - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() if cfgOverride != nil { cfg = *cfgOverride } - // Tests that need metrics build the emitter directly with an injected meter. - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) return em } @@ -132,13 +127,13 @@ func TestDurableEmitter_CloseCoalescedInsertShutdown(t *testing.T) { MemDurableEventStore: NewMemDurableEventStore(), stall: stall, } - client := &testChipClient{} - cfg := DefaultDurableEmitterConfig() + be := newTestBatchEmitter() + cfg := DefaultConfig() cfg.InsertBatchSize = 1 cfg.InsertBatchWorkers = 1 cfg.DisablePruning = true - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) ctx := t.Context() require.NoError(t, em.Start(ctx)) @@ -177,14 +172,14 @@ func TestDurableEmitter_CloseCoalescedInsertShutdown(t *testing.T) { func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() var pubCalls, markCalls atomic.Int32 - cfg := DefaultDurableEmitterConfig() - cfg.Hooks = &DurableEmitterHooks{ + cfg := DefaultConfig() + cfg.Hooks = &Hooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -197,15 +192,15 @@ func TestDurableEmitter_HooksBatchPublishPath(t *testing.T) { func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("down")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("down")) var pubCalls, markCalls atomic.Int32 - cfg := DefaultDurableEmitterConfig() - cfg.Hooks = &DurableEmitterHooks{ + cfg := DefaultConfig() + cfg.Hooks = &Hooks{ OnBatchPublish: func(time.Duration, int, error) { pubCalls.Add(1) }, OnBatchMarkDelivered: func(time.Duration, int) { markCalls.Add(1) }, } - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -217,16 +212,16 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) { func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("chip unavailable")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("chip unavailable")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 40 * time.Millisecond cfg.RetransmitAfter = 15 * time.Millisecond cfg.ExpiryInterval = 40 * time.Millisecond cfg.EventTTL = 25 * time.Millisecond - em, err := NewDurableEmitter(store, client, false, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, false, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -234,21 +229,21 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("plugin-row"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return client.batchCount.Load() >= 1 && store.Len() == 1 - }, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row") + return be.callCount.Load() >= 1 && store.Len() == 1 + }, 2*time.Second, 5*time.Millisecond, "initial QueueMessage should fail and leave the row") // Several host-only ticks would have cleared or retried by now. time.Sleep(250 * time.Millisecond) assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops") - assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit") + assert.Equal(t, int64(1), be.callCount.Load(), "non-host must not schedule extra QueueMessage via retransmit") } func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() - em, err := NewDurableEmitter(store, client, false, DefaultDurableEmitterConfig(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, false, DefaultConfig(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -256,23 +251,22 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) require.NoError(t, em.Emit(ctx, []byte("loop-plugin"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false") + return store.Len() == 0 && be.callCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "batch emitter must deliver even when retransmitEnabled is false") } func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) require.NoError(t, err) - // Batch publish should fire (PublishBatch with batch size 1) and delete the record. require.Eventually(t, func() bool { - return client.batchCount.Load() == 1 + return be.callCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { @@ -282,19 +276,18 @@ func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) { func TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("connection refused")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("connection refused")) - em := newTestDurableEmitter(t, store, client, nil) + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("hello"), testEmitAttrs()...) require.NoError(t, err, "Emit must succeed once the DB insert succeeds") - // Wait for the async PublishBatch attempt to complete. require.Eventually(t, func() bool { - return client.batchCount.Load() == 1 + return be.callCount.Load() == 1 }, 2*time.Second, 10*time.Millisecond) // Event must remain in the store for retransmit. @@ -303,46 +296,43 @@ func TestDurableEmitter_EmitReturnSuccessEvenWhenPublishFails(t *testing.T) { func TestDurableEmitter_RetransmitLoopDeliversFailedEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("connection refused")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("connection refused")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() err := em.Emit(ctx, []byte("retry-me"), testEmitAttrs()...) require.NoError(t, err) - // Wait until the async immediate path has run with the error and the row - // is still pending (not a success race after we clear the error). require.Eventually(t, func() bool { - return client.batchCount.Load() >= 1 && store.Len() == 1 - }, 2*time.Second, 5*time.Millisecond, "failed PublishBatch should leave the row") + return be.callCount.Load() >= 1 && store.Len() == 1 + }, 2*time.Second, 5*time.Millisecond, "failed delivery should leave the row") - client.setPublishErr(nil) + be.setPublishErr(nil) require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond, "retransmit loop should eventually deliver and delete the event") - // At least: one failed PublishBatch + one successful delivery (retransmit uses PublishBatch too). - assert.GreaterOrEqual(t, client.batchCount.Load(), int64(2)) + assert.GreaterOrEqual(t, be.callCount.Load(), int64(2)) } func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("immediate fail")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("immediate fail")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 100 * time.Millisecond cfg.RetransmitAfter = 50 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() @@ -350,33 +340,26 @@ func TestDurableEmitter_RetransmitSerialDistinctCloudEvents(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("second"), testEmitAttrs()...)) require.Eventually(t, func() bool { - return client.batchCount.Load() >= 2 && store.Len() == 2 - }, 2*time.Second, 5*time.Millisecond, "both failed PublishBatch calls should leave two rows") + return be.callCount.Load() >= 2 && store.Len() == 2 + }, 2*time.Second, 5*time.Millisecond, "both failed deliveries should leave two rows") - client.setPublishErr(nil) + be.setPublishErr(nil) require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond) - - ids := client.getPublishedIDs() - require.GreaterOrEqual(t, len(ids), 4, "two failed attempts then two successful deliveries (IDs recorded)") - require.GreaterOrEqual(t, client.batchCount.Load(), int64(4)) - a, b := ids[len(ids)-2], ids[len(ids)-1] - assert.NotEmpty(t, a) - assert.NotEmpty(t, b) - assert.NotEqualf(t, a, b, "retransmit must publish two distinct CloudEvents, not one pointer reused for every row") + assert.GreaterOrEqual(t, be.callCount.Load(), int64(4)) } func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - client.setPublishErr(errors.New("always fail")) + be := newTestBatchEmitter() + be.setPublishErr(errors.New("always fail")) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.ExpiryInterval = 100 * time.Millisecond cfg.EventTTL = 50 * time.Millisecond cfg.RetransmitInterval = 10 * time.Minute // effectively disable retransmit - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) ctx := t.Context() @@ -391,7 +374,7 @@ func TestDurableEmitter_ExpiryLoopDeletesOldEvents(t *testing.T) { func TestDurableEmitter_RetransmitDeliversManuallyInsertedRow(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} + be := newTestBatchEmitter() ev, err := chipingress.NewEvent("unknown-domain", "t", []byte("b"), nil) require.NoError(t, err) @@ -403,22 +386,22 @@ func TestDurableEmitter_RetransmitDeliversManuallyInsertedRow(t *testing.T) { _, err = store.Insert(context.Background(), payload) require.NoError(t, err) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() cfg.RetransmitInterval = 50 * time.Millisecond cfg.RetransmitAfter = 30 * time.Millisecond - em := newTestDurableEmitter(t, store, client, &cfg) + em := newTestDurableEmitter(t, store, be, &cfg) servicetest.Run(t, em) require.Eventually(t, func() bool { - return store.Len() == 0 && client.batchCount.Load() >= 1 - }, 3*time.Second, 20*time.Millisecond, "pending row should be delivered via PublishBatch") + return store.Len() == 0 && be.callCount.Load() >= 1 + }, 3*time.Second, 20*time.Millisecond, "pending row should be delivered via batch emitter") } func TestDurableEmitter_EmitRejectsInvalidAttributes(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) err := em.Emit(context.Background(), []byte("no-attrs")) require.Error(t, err) @@ -427,8 +410,8 @@ func TestDurableEmitter_EmitRejectsInvalidAttributes(t *testing.T) { func TestDurableEmitter_MultipleEvents(t *testing.T) { store := NewMemDurableEventStore() - client := &testChipClient{} - em := newTestDurableEmitter(t, store, client, nil) + be := newTestBatchEmitter() + em := newTestDurableEmitter(t, store, be, nil) servicetest.Run(t, em) ctx := t.Context() @@ -439,7 +422,7 @@ func TestDurableEmitter_MultipleEvents(t *testing.T) { } require.Eventually(t, func() bool { - return client.batchCount.Load() == int64(n) + return be.callCount.Load() == int64(n) }, 5*time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { @@ -449,25 +432,26 @@ func TestDurableEmitter_MultipleEvents(t *testing.T) { func TestNewDurableEmitter_ValidationErrors(t *testing.T) { log := logger.Test(t) - cfg := DefaultDurableEmitterConfig() + cfg := DefaultConfig() + be := newTestBatchEmitter() - _, err := NewDurableEmitter(nil, &testChipClient{}, true, cfg, log, nil) + _, err := NewDurableEmitter(nil, be, nil, true, cfg, log, nil) assert.ErrorContains(t, err, "store") - _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, true, cfg, log, nil) - assert.ErrorContains(t, err, "client") + _, err = NewDurableEmitter(NewMemDurableEventStore(), nil, nil, true, cfg, log, nil) + assert.ErrorContains(t, err, "batch emitter") - _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfg, nil, nil) + _, err = NewDurableEmitter(NewMemDurableEventStore(), be, nil, true, cfg, nil, nil) assert.ErrorContains(t, err, "logger") cfgWithMetrics := cfg cfgWithMetrics.Metrics = &DurableEmitterMetricsConfig{} - _, err = NewDurableEmitter(NewMemDurableEventStore(), &testChipClient{}, true, cfgWithMetrics, log, nil) + _, err = NewDurableEmitter(NewMemDurableEventStore(), be, nil, true, cfgWithMetrics, log, nil) assert.ErrorContains(t, err, "meter") } func TestDurableEmitter_HealthReport(t *testing.T) { - em := newTestDurableEmitter(t, NewMemDurableEventStore(), &testChipClient{}, nil) + em := newTestDurableEmitter(t, NewMemDurableEventStore(), newTestBatchEmitter(), nil) servicetest.Run(t, em) report := em.HealthReport() @@ -480,12 +464,12 @@ func TestDurableEmitter_MetricsRegistersEmitSuccess(t *testing.T) { meter, reader := newTestMeter(t) store := NewMemDurableEventStore() - client := &testChipClient{} - cfg := DefaultDurableEmitterConfig() + be := newTestBatchEmitter() + cfg := DefaultConfig() cfg.RetransmitInterval = time.Hour cfg.Metrics = &DurableEmitterMetricsConfig{PollInterval: 25 * time.Millisecond} - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), meter) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), meter) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -600,36 +584,148 @@ func newChipClient(t *testing.T, addr string) chipingress.Client { t.Helper() c, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) require.NoError(t, err) - t.Cleanup(func() { _ = c.Close() }) return c } +// newIntegrationBatchEmitter creates a batch.Client suitable for integration tests. +// Batch size 1 and a short interval ensure one event per RPC, matching the +// assertion style of the integration tests. +func newIntegrationBatchEmitter(t *testing.T, addr string) *batch.Client { + t.Helper() + chipClient := newChipClient(t, addr) + bc, err := batch.NewBatchClient(chipClient, + batch.WithBatchSize(1), + batch.WithBatchInterval(10*time.Millisecond), + batch.WithMaxPublishTimeout(2*time.Second), + batch.WithShutdownTimeout(5*time.Second), + ) + require.NoError(t, err) + return bc +} + func emitAttrs() []any { return []any{"source", "test-domain", "type", "test-entity"} } -func fastCfg() DurableEmitterConfig { - return DurableEmitterConfig{ - PublishBatchSize: 1, - PublishBatchFlushInterval: 10 * time.Millisecond, - RetransmitInterval: 100 * time.Millisecond, - RetransmitAfter: 50 * time.Millisecond, - RetransmitBatchSize: 50, - ExpiryInterval: 200 * time.Millisecond, - EventTTL: 500 * time.Millisecond, - PublishTimeout: 2 * time.Second, +func fastCfg() Config { + return Config{ + RetransmitInterval: 100 * time.Millisecond, + RetransmitAfter: 50 * time.Millisecond, + RetransmitBatchSize: 50, + ExpiryInterval: 200 * time.Millisecond, + EventTTL: 500 * time.Millisecond, + PublishTimeout: 2 * time.Second, } } +// testFallbackClient is a minimal chipingress.Client for fallback testing. +type testFallbackClient struct { + chipingress.NoopClient + + mu sync.Mutex + publishErr error + publishCount atomic.Int64 + closed atomic.Bool +} + +func (c *testFallbackClient) Publish(_ context.Context, _ *chipingress.CloudEventPb, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { + c.mu.Lock() + err := c.publishErr + c.mu.Unlock() + c.publishCount.Add(1) + return &chipingress.PublishResponse{}, err +} + +func (c *testFallbackClient) Close() error { + c.closed.Store(true) + return nil +} + +func (c *testFallbackClient) setPublishErr(err error) { + c.mu.Lock() + defer c.mu.Unlock() + c.publishErr = err +} + +// TestDurableEmitter_FallbackDeliversOnBatchFailure verifies that when the +// batch emitter fails, the fallback client delivers the event directly and +// marks it delivered (removing it from the DB). +func TestDurableEmitter_FallbackDeliversOnBatchFailure(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + be.setPublishErr(errors.New("batch down")) + fallback := &testFallbackClient{} + + em, err := NewDurableEmitter(store, be, fallback, true, DefaultConfig(), logger.Test(t), nil) + require.NoError(t, err) + servicetest.Run(t, em) + ctx := t.Context() + + require.NoError(t, em.Emit(ctx, []byte("needs-fallback"), testEmitAttrs()...)) + + // Batch emitter fires callback with error → fallback client should deliver. + require.Eventually(t, func() bool { + return fallback.publishCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "fallback client should receive one Publish call") + + // Event should be marked delivered and removed from the store. + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond, "event should be marked delivered after fallback") +} + +// TestDurableEmitter_FallbackFailureEventRemainsForRetransmit verifies that +// when both the batch emitter and the fallback client fail, the event stays +// in the DB for the retransmit loop to pick up. +func TestDurableEmitter_FallbackFailureEventRemainsForRetransmit(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + be.setPublishErr(errors.New("batch down")) + fallback := &testFallbackClient{} + fallback.setPublishErr(errors.New("fallback down too")) + + cfg := DefaultConfig() + cfg.RetransmitInterval = 10 * time.Minute // disable retransmit for this test + em, err := NewDurableEmitter(store, be, fallback, true, cfg, logger.Test(t), nil) + require.NoError(t, err) + servicetest.Run(t, em) + ctx := t.Context() + + require.NoError(t, em.Emit(ctx, []byte("both-fail"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { + return fallback.publishCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "fallback should have been attempted") + + // Event must still be in the store for the retransmit loop. + time.Sleep(50 * time.Millisecond) + assert.Equal(t, 1, store.Len(), "event must remain in DB when fallback also fails") +} + +// TestDurableEmitter_FallbackClientClosedOnStop verifies that the fallback +// client's Close() method is called when DurableEmitter shuts down. +func TestDurableEmitter_FallbackClientClosedOnStop(t *testing.T) { + store := NewMemDurableEventStore() + be := newTestBatchEmitter() + fallback := &testFallbackClient{} + + em, err := NewDurableEmitter(store, be, fallback, true, DefaultConfig(), logger.Test(t), nil) + require.NoError(t, err) + require.NoError(t, em.Start(t.Context())) + + require.NoError(t, em.Close()) + assert.True(t, fallback.closed.Load(), "fallback client should be closed after Stop") +} + // ---------- Test cases ---------- func TestIntegration_HappyPath(t *testing.T) { srv := &mockChipServer{} _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -647,14 +743,13 @@ func TestIntegration_HappyPath(t *testing.T) { } func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { - // Start with server returning UNAVAILABLE on PublishBatch. srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Unavailable, "chip down")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -665,7 +760,6 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { return srv.batchCount.Load() >= 1 && store.Len() == 1 }, 2*time.Second, 10*time.Millisecond, "failed PublishBatch should leave the row pending") - // "Recover" the server. srv.setBatchErr(nil) require.Eventually(t, func() bool { @@ -678,15 +772,14 @@ func TestIntegration_ServerUnavailable_RetransmitRecovers(t *testing.T) { } func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { - // Start server, then stop it to simulate total outage. srv := &mockChipServer{} gs, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.PublishTimeout = 500 * time.Millisecond - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -713,13 +806,9 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { go func() { _ = gs2.Serve(lis) }() t.Cleanup(func() { gs2.GracefulStop() }) - // Create a new client and DurableEmitter re-using the same store - // (simulating node restart with Postgres). - client2, err := chipingress.NewClient(addr, chipingress.WithInsecureConnection()) - require.NoError(t, err) - t.Cleanup(func() { _ = client2.Close() }) - - em2, err := NewDurableEmitter(store, client2, true, cfg, logger.Test(t), nil) + // Create a new batch emitter re-using the same store (simulating node restart with Postgres). + be2 := newIntegrationBatchEmitter(t, addr) + em2, err := NewDurableEmitter(store, be2, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em2) @@ -735,12 +824,12 @@ func TestIntegration_ServerDown_EventsSurvive(t *testing.T) { func TestIntegration_HighThroughput(t *testing.T) { srv := &mockChipServer{} _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.RetransmitBatchSize = 200 - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -764,13 +853,13 @@ func TestIntegration_EventExpiry(t *testing.T) { srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Internal, "permanent failure")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() cfg := fastCfg() cfg.EventTTL = 100 * time.Millisecond cfg.ExpiryInterval = 100 * time.Millisecond - em, err := NewDurableEmitter(store, client, true, cfg, logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, cfg, logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -785,14 +874,13 @@ func TestIntegration_EventExpiry(t *testing.T) { } func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { - // PublishBatch fails for each emit; retransmit recovers via PublishBatch. srv := &mockChipServer{} srv.setBatchErr(status.Error(codes.Unavailable, "reject batch")) _, addr := startMockServer(t, srv) - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() @@ -801,8 +889,6 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { require.NoError(t, em.Emit(ctx, []byte("retry-me"), emitAttrs()...)) } - // All five async PublishBatch attempts must observe the error before we clear - // it, or they succeed immediately and the retransmit loop has nothing to do. require.Eventually(t, func() bool { return srv.batchCount.Load() >= 5 && store.Len() == 5 }, 3*time.Second, 10*time.Millisecond, "all five PublishBatch RPCs should have failed and left five rows") @@ -812,7 +898,7 @@ func TestIntegration_RetransmitEnqueuesBatchWorkers(t *testing.T) { require.Eventually(t, func() bool { return store.Len() == 0 }, 5*time.Second, 50*time.Millisecond, - "retransmit should deliver via PublishBatch") + "retransmit should deliver via batch emitter") assert.GreaterOrEqual(t, srv.batchCount.Load(), int64(10), "five failed PublishBatch plus five successful PublishBatch (retransmit)") @@ -836,11 +922,10 @@ func TestIntegration_GRPCConnection(t *testing.T) { require.NoError(t, err) assert.Equal(t, "pong", pong.Message) - // Now use the chipingress.Client wrapper with DurableEmitter. - client := newChipClient(t, addr) + be := newIntegrationBatchEmitter(t, addr) store := NewMemDurableEventStore() - em, err := NewDurableEmitter(store, client, true, fastCfg(), logger.Test(t), nil) + em, err := NewDurableEmitter(store, be, nil, true, fastCfg(), logger.Test(t), nil) require.NoError(t, err) servicetest.Run(t, em) ctx := t.Context() diff --git a/pkg/durableemitter/durable_event_store.go b/pkg/durableemitter/observable_store.go similarity index 98% rename from pkg/durableemitter/durable_event_store.go rename to pkg/durableemitter/observable_store.go index 6e4c0cabfc..8700427b20 100644 --- a/pkg/durableemitter/durable_event_store.go +++ b/pkg/durableemitter/observable_store.go @@ -28,7 +28,7 @@ type DurableQueueStats struct { // so DurableEmitter can export queue depth and age gauges when metrics are enabled. type DurableQueueObserver interface { // ObserveDurableQueue returns live queue statistics. eventTTL and nearExpiryLead - // match DurableEmitterConfig (nearExpiryLead should be << eventTTL). + // match Config (nearExpiryLead should be << eventTTL). ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) } diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go new file mode 100644 index 0000000000..5ff5364a40 --- /dev/null +++ b/pkg/durableemitter/setup.go @@ -0,0 +1,165 @@ +package durableemitter + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + chipingressbatch "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// globalEmitter holds the process-wide DurableEmitter instance, set by Setup. +var globalEmitter atomic.Pointer[DurableEmitter] + +// SetGlobalEmitter sets the global DurableEmitter. +func SetGlobalEmitter(d *DurableEmitter) { + globalEmitter.Store(d) +} + +// GetGlobalEmitter returns the global DurableEmitter, or nil if Setup has not been called. +func GetGlobalEmitter() *DurableEmitter { + return globalEmitter.Load() +} + +// GlobalEmit emits an event via the global DurableEmitter. +// Returns a non-nil error when the global emitter has not been initialized. +func GlobalEmit(ctx context.Context, body []byte, attrKVs ...any) error { + d := globalEmitter.Load() + if d == nil { + return errors.New("global DurableEmitter not initialized; call durableemitter.Setup first") + } + return d.Emit(ctx, body, attrKVs...) +} + +// SetupConfig holds all configuration required to create and start a +// DurableEmitter including its chip ingress transport clients. +type SetupConfig struct { + // Endpoint is the gRPC address for the Chip Ingress service. + Endpoint string + // InsecureConnection disables TLS when true. + InsecureConnection bool + // Auth configures chip ingress credentials. AuthKeySigner may be nil at init + // for LOOP plugins; call SetGlobalSigner after the CSA keystore is available. + Auth AuthConfig + // RetransmitEnabled controls whether the retransmit and cleanup loops run. + // Set to true for the host (chainlink node) process. + // Set to false for LOOP plugin processes — the host's retransmit loop picks + // up any rows inserted by plugin-side DurableEmitters from the shared DB. + RetransmitEnabled bool + + // Batch client tuning — zero values use package defaults. + BatchSize int // default: 50 + BatchInterval time.Duration // default: 50ms + MaxConcurrentSends int // default: 4 + MaxPublishTimeout time.Duration // default: 5s + ShutdownTimeout time.Duration // default: 30s + + // EmitterConfig overrides DefaultConfig when non-nil. + EmitterConfig *Config + // Meter is the OpenTelemetry meter for instrumentation. Nil disables metrics. + Meter metric.Meter +} + +// Setup creates a DurableEmitter with dedicated batch and fallback chip ingress +// clients, registers it as the global emitter, and returns it unconfigured. +func Setup( + store DurableEventStore, + cfg SetupConfig, + lggr logger.Logger, +) (*DurableEmitter, error) { + if cfg.Endpoint == "" { + return nil, errors.New("chip ingress endpoint is required for DurableEmitter") + } + if store == nil { + return nil, errors.New("durable event store is required for DurableEmitter") + } + if lggr == nil { + return nil, errors.New("logger is required for DurableEmitter") + } + + auth, err := NewAuthHeaderProvider(cfg.Auth) + if err != nil { + return nil, fmt.Errorf("failed to build chip ingress auth: %w", err) + } + + chipOpts := buildChipOpts(cfg, auth) + + // Primary client — owned by batch.Client, closed on batch.Client.Stop(). + batchChipClient, err := chipingress.NewClient(cfg.Endpoint, chipOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create batch chip ingress client: %w", err) + } + + batchClient, err := chipingressbatch.NewBatchClient(batchChipClient, + chipingressbatch.WithBatchSize(defaultInt(cfg.BatchSize, 50)), + chipingressbatch.WithBatchInterval(defaultDuration(cfg.BatchInterval, 50*time.Millisecond)), + chipingressbatch.WithMaxConcurrentSends(defaultInt(cfg.MaxConcurrentSends, 4)), + chipingressbatch.WithMaxPublishTimeout(defaultDuration(cfg.MaxPublishTimeout, 5*time.Second)), + chipingressbatch.WithShutdownTimeout(defaultDuration(cfg.ShutdownTimeout, 30*time.Second)), + ) + if err != nil { + _ = batchChipClient.Close() + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + // Fallback client — owned by DurableEmitter, closed on DurableEmitter.Stop(). + // Used for single-event direct Publish retry when a batch delivery fails. + fallbackClient, err := chipingress.NewClient(cfg.Endpoint, chipOpts...) + if err != nil { + batchClient.Stop() + return nil, fmt.Errorf("failed to create fallback chip ingress client: %w", err) + } + + emitterCfg := DefaultConfig() + if cfg.EmitterConfig != nil { + emitterCfg = *cfg.EmitterConfig + } + + emitter, err := NewDurableEmitter(store, batchClient, fallbackClient, cfg.RetransmitEnabled, emitterCfg, lggr, cfg.Meter) + if err != nil { + batchClient.Stop() + _ = fallbackClient.Close() + return nil, fmt.Errorf("failed to create durable emitter: %w", err) + } + + SetGlobalEmitter(emitter) + + lggr.Infow("DurableEmitter created — call Start() or register with service lifecycle", + "endpoint", cfg.Endpoint, + "retransmitEnabled", cfg.RetransmitEnabled, + ) + return emitter, nil +} + +func buildChipOpts(cfg SetupConfig, auth chipingress.HeaderProvider) []chipingress.Opt { + var opts []chipingress.Opt + if cfg.InsecureConnection { + opts = append(opts, chipingress.WithInsecureConnection()) + } else { + opts = append(opts, chipingress.WithTLS()) + } + if auth != nil { + opts = append(opts, chipingress.WithTokenAuth(auth)) + } + return opts +} + +func defaultInt(v, def int) int { + if v <= 0 { + return def + } + return v +} + +func defaultDuration(v, def time.Duration) time.Duration { + if v <= 0 { + return def + } + return v +} diff --git a/pkg/durableemitter/store.go b/pkg/durableemitter/store.go new file mode 100644 index 0000000000..c124d7b58e --- /dev/null +++ b/pkg/durableemitter/store.go @@ -0,0 +1,204 @@ +package durableemitter + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" +) + +const chipDurableEventsTable = "cre.chip_durable_events" + +// PgDurableEventStore is a Postgres-backed implementation of DurableEventStore. +// Tests live in chainlink/core/services/durableemitter as they require DB migrations. +type PgDurableEventStore struct { + ds sqlutil.DataSource +} + +var ( + _ DurableEventStore = (*PgDurableEventStore)(nil) + _ DurableQueueObserver = (*PgDurableEventStore)(nil) + _ BatchInserter = (*PgDurableEventStore)(nil) +) + +func NewPgDurableEventStore(ds sqlutil.DataSource) *PgDurableEventStore { + return &PgDurableEventStore{ds: ds} +} + +func (s *PgDurableEventStore) Insert(ctx context.Context, payload []byte) (int64, error) { + const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id` + var id int64 + if err := s.ds.GetContext(ctx, &id, q, payload); err != nil { + return 0, fmt.Errorf("failed to insert chip durable event: %w", err) + } + return id, nil +} + +func (s *PgDurableEventStore) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) { + if len(payloads) == 0 { + return nil, nil + } + placeholders := make([]string, len(payloads)) + args := make([]interface{}, len(payloads)) + for i, p := range payloads { + placeholders[i] = fmt.Sprintf("($%d)", i+1) + args[i] = p + } + q := fmt.Sprintf( + "INSERT INTO %s (payload) VALUES %s RETURNING id", + chipDurableEventsTable, + strings.Join(placeholders, ","), + ) + + var ids []int64 + if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil { + return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err) + } + return ids, nil +} + +func (s *PgDurableEventStore) Delete(ctx context.Context, id int64) error { + const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDelivered(ctx context.Context, id int64) error { + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL` + if _, err := s.ds.ExecContext(ctx, q, id); err != nil { + return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err) + } + return nil +} + +func (s *PgDurableEventStore) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL` + res, err := s.ds.ExecContext(ctx, q, pq.Array(ids)) + if err != nil { + return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +func (s *PgDurableEventStore) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) { + if batchLimit <= 0 { + return 0, nil + } + const q = ` +WITH picked AS ( + SELECT id FROM ` + chipDurableEventsTable + ` + WHERE delivered_at IS NOT NULL + ORDER BY delivered_at ASC + LIMIT $1 +) +DELETE FROM ` + chipDurableEventsTable + ` AS t +USING picked WHERE t.id = picked.id` + res, err := s.ds.ExecContext(ctx, q, batchLimit) + if err != nil { + return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("purge delivered rows affected: %w", err) + } + return n, nil +} + +func (s *PgDurableEventStore) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) { + const q = ` +SELECT id, payload, created_at +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at < $1 +ORDER BY created_at ASC +LIMIT $2` + + type row struct { + ID int64 `db:"id"` + Payload []byte `db:"payload"` + CreatedAt time.Time `db:"created_at"` + } + + var rows []row + if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil { + return nil, fmt.Errorf("failed to list pending chip durable events: %w", err) + } + + out := make([]DurableEvent, 0, len(rows)) + for _, r := range rows { + out = append(out, DurableEvent{ + ID: r.ID, + Payload: r.Payload, + CreatedAt: r.CreatedAt, + }) + } + return out, nil +} + +func (s *PgDurableEventStore) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) { + const q = ` +WITH deleted AS ( + DELETE FROM ` + chipDurableEventsTable + ` + WHERE created_at <= now() - $1::interval + RETURNING id +) +SELECT count(*) FROM deleted` + + var count int64 + if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil { + return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err) + } + return count, nil +} + +type chipDurableQueueAgg struct { + Cnt int64 `db:"cnt"` + PayloadSum int64 `db:"payload_sum"` + MinCreated *time.Time `db:"min_created"` +} + +// ObserveDurableQueue implements DurableQueueObserver for queue depth / age gauges. +func (s *PgDurableEventStore) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (DurableQueueStats, error) { + const qAgg = ` +SELECT + count(*)::bigint AS cnt, + coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum, + min(created_at) AS min_created +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL` + + var row chipDurableQueueAgg + if err := s.ds.GetContext(ctx, &row, qAgg); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err) + } + var st DurableQueueStats + st.Depth = row.Cnt + st.PayloadBytes = row.PayloadSum + if row.MinCreated != nil { + st.OldestPendingAge = time.Since(*row.MinCreated) + } + if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL { + ttlSec := int64(eventTTL.Round(time.Second) / time.Second) + leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second) + const qNear = ` +SELECT count(*)::bigint +FROM ` + chipDurableEventsTable + ` +WHERE delivered_at IS NULL + AND created_at >= now() - ($1::bigint * interval '1 second') + AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')` + if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil { + return DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err) + } + } + return st, nil +} diff --git a/pkg/loop/config.go b/pkg/loop/config.go index c91442117e..396c7d979d 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -86,9 +86,10 @@ const ( envTelemetryPrometheusBridgePrefixes = "CL_TELEMETRY_PROMETHEUS_BRIDGE_PREFIXES" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" - envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" + envChipIngressDurableEmitterEnabled = "CL_CHIP_INGRESS_DURABLE_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -99,9 +100,10 @@ const ( type EnvConfig struct { AppID string - ChipIngressEndpoint string - ChipIngressInsecureConnection bool - ChipIngressBatchEmitterEnabled bool + ChipIngressEndpoint string + ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool + ChipIngressDurableEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -266,6 +268,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) + add(envChipIngressDurableEmitterEnabled, strconv.FormatBool(e.ChipIngressDurableEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -506,6 +509,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) } + e.ChipIngressDurableEmitterEnabled, err = getBool(envChipIngressDurableEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressDurableEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 43ed1d4867..d592543fd4 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -200,9 +200,10 @@ var envCfgFull = EnvConfig{ TelemetryPrometheusBridgeEnabled: true, TelemetryPrometheusBridgePrefixes: []string{"foo", "bar"}, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, - ChipIngressBatchEmitterEnabled: false, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, + ChipIngressDurableEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, diff --git a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go index 51f4b54a81..472d672b10 100644 --- a/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go +++ b/pkg/loop/internal/core/services/capability/standard/standard_capabilities.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -323,6 +324,7 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca // Sets the auth header signing mechanism beholder.GetClient().SetSigner(keyStore) + durableemitter.SetGlobalSigner(keyStore) capabilitiesRegistryConn, err := s.Dial(request.CapRegistryId) if err != nil { diff --git a/pkg/loop/internal/relayer/relayer.go b/pkg/loop/internal/relayer/relayer.go index 3d1bc6a10d..31591479c4 100644 --- a/pkg/loop/internal/relayer/relayer.go +++ b/pkg/loop/internal/relayer/relayer.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" aptospb "github.com/smartcontractkit/chainlink-common/pkg/chains/aptos" evmpb "github.com/smartcontractkit/chainlink-common/pkg/chains/evm" solpb "github.com/smartcontractkit/chainlink-common/pkg/chains/solana" @@ -142,6 +143,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel // Sets the auth header signing mechanism beholder.GetClient().SetSigner(csaKeystore) + durableemitter.SetGlobalSigner(csaKeystore) r, err := p.impl.NewRelayer(ctx, request.Config, ks.NewClient(ksConn), csaKeystore, capRegistry) if err != nil { diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 71193dad66..cbf2690abd 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -2,6 +2,7 @@ package loop import ( "context" + "errors" "fmt" "os" "os/signal" @@ -22,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/config/build" + "github.com/smartcontractkit/chainlink-common/pkg/durableemitter" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/promutil" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -105,6 +107,7 @@ type Server struct { LimitsFactory limits.Factory profiler *pyroscope.Profiler beholderClient *beholder.Client + durableEmitter *durableemitter.DurableEmitter } func newServer(loggerName string) (*Server, error) { @@ -340,6 +343,34 @@ func (s *Server) start(opts ...ServerOpt) error { s.LimitsFactory.Settings = s.cfg.settingsGetter } + if s.EnvConfig.ChipIngressDurableEmitterEnabled && s.EnvConfig.ChipIngressEndpoint != "" { + if s.DataSource == nil { + return errors.New("data source required when durable emitter is enabled") + } + + // Rotating auth: signer is injected later via durableemitter.SetGlobalSigner when the host + // provides the CSA keystore (see relayer and standard capabilities startup). + durableCfg := durableemitter.SetupConfig{ + Endpoint: s.EnvConfig.ChipIngressEndpoint, + InsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + Auth: durableemitter.AuthConfig{ + AuthHeaders: s.EnvConfig.TelemetryAuthHeaders, + AuthHeadersTTL: s.EnvConfig.TelemetryAuthHeadersTTL, + AuthPublicKeyHex: s.EnvConfig.TelemetryAuthPubKeyHex, + }, + RetransmitEnabled: false, // LOOP plugins do not run the retransmit loop; the host process handles it. + } + store := durableemitter.NewPgDurableEventStore(s.DataSource) + var err error + s.durableEmitter, err = durableemitter.Setup(store, durableCfg, s.Logger) + if err != nil { + return fmt.Errorf("failed to set up durable emitter: %w", err) + } + if err = s.durableEmitter.Start(ctx); err != nil { + return fmt.Errorf("failed to start durable emitter: %w", err) + } + } + return nil } @@ -380,6 +411,9 @@ func (s *Server) Stop() { if s.beholderClient != nil { s.Logger.ErrorIfFn(s.beholderClient.Close, "Failed to close beholder client") } + if s.durableEmitter != nil { + s.Logger.ErrorIfFn(s.durableEmitter.Close, "Failed to close durable emitter") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() }