Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 162 additions & 45 deletions pkg/chipingress/batch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,65 @@ type seqnumKey struct {
eventType string
}

// PublishError is an error returned per-event when partial delivery is enabled
// and an individual event fails validation/production.
type PublishError struct {
Code chipingress.PublishErrorCode
Reason string
}

func (e *PublishError) Error() string {
return fmt.Sprintf("%s: %s", e.Code.String(), e.Reason)
}

// Error codes returned by the server in PublishError.Code.
// Re-exported from the proto package for convenience.
const (
ErrCodeUnknown = chipingress.PublishErrorCode(0) // PUBLISH_ERROR_CODE_UNKNOWN
ErrCodeValidationFailed = chipingress.PublishErrorCode(1) // PUBLISH_ERROR_CODE_VALIDATION_FAILED
ErrCodeSchemaMissing = chipingress.PublishErrorCode(2) // PUBLISH_ERROR_CODE_SCHEMA_MISSING
ErrCodeEncodeError = chipingress.PublishErrorCode(3) // PUBLISH_ERROR_CODE_ENCODE_ERROR
ErrCodeDomainMisconfiguration = chipingress.PublishErrorCode(4) // PUBLISH_ERROR_CODE_DOMAIN_MISCONFIGURATION
Comment on lines +46 to +50
ErrCodeResultsMismatch = chipingress.PublishErrorCode(-1) // client-side synthetic code
)

// Client is a batching client that accumulates messages and sends them in batches.
type Client struct {
Comment on lines 54 to 55
client chipingress.Client
batchSize int
maxGRPCRequestSize int // configured max, used for metrics/error reporting
effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting
cloneEvent bool
maxConcurrentSends chan struct{}
batchInterval time.Duration
maxPublishTimeout time.Duration
messageBuffer chan *messageWithCallback
stopCh stopCh
log *zap.SugaredLogger
callbackWg sync.WaitGroup
shutdownTimeout time.Duration
shutdownOnce sync.Once
batcherDone chan struct{}
started bool
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()
maxConcurrentSends chan struct{}
batchInterval time.Duration
maxPublishTimeout time.Duration
messageBuffer chan *messageWithCallback
stopCh stopCh
log *zap.SugaredLogger
callbackWg sync.WaitGroup
shutdownTimeout time.Duration
Comment on lines +66 to +68
shutdownOnce sync.Once
batcherDone chan struct{}
started bool
counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop()

metrics batchClientMetrics

transactionEnabled bool
}

type batchClientMetrics struct {
sendRequestsTotal otelmetric.Int64Counter
requestSizeMessages otelmetric.Int64Histogram
requestSizeBytes otelmetric.Int64Histogram
requestLatencyMS otelmetric.Float64Histogram
configInfo otelmetric.Int64Gauge
batchSplitsTotal otelmetric.Int64Counter
batchSizeAttr otelmetric.MeasurementOption
maxGRPCReqSizeAttr otelmetric.MeasurementOption
successStatusAttr otelmetric.MeasurementOption
failureStatusAttr otelmetric.MeasurementOption
sendRequestsTotal otelmetric.Int64Counter
requestSizeMessages otelmetric.Int64Histogram
requestSizeBytes otelmetric.Int64Histogram
requestLatencyMS otelmetric.Float64Histogram
configInfo otelmetric.Int64Gauge
batchSplitsTotal otelmetric.Int64Counter
resultsMismatchTotal otelmetric.Int64Counter
batchSizeAttr otelmetric.MeasurementOption
maxGRPCReqSizeAttr otelmetric.MeasurementOption
successStatusAttr otelmetric.MeasurementOption
failureStatusAttr otelmetric.MeasurementOption
}

// Opt is a functional option for configuring the batch Client.
Expand All @@ -77,14 +102,15 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
maxGRPCRequestSize: 10 * 1024 * 1024,
effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead,
cloneEvent: true,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
batchInterval: 100 * time.Millisecond,
maxPublishTimeout: 5 * time.Second,
stopCh: make(chan struct{}),
callbackWg: sync.WaitGroup{},
shutdownTimeout: 5 * time.Second,
batcherDone: make(chan struct{}),
transactionEnabled: false,
maxConcurrentSends: make(chan struct{}, 1),
messageBuffer: make(chan *messageWithCallback, 200),
batchInterval: 100 * time.Millisecond,
maxPublishTimeout: 5 * time.Second,
stopCh: make(chan struct{}),
callbackWg: sync.WaitGroup{},
shutdownTimeout: 5 * time.Second,
Comment on lines +108 to +112
batcherDone: make(chan struct{}),
}

for _, opt := range opts {
Expand Down Expand Up @@ -263,12 +289,12 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
go func() {
defer func() { <-b.maxConcurrentSends }()

splitBatches := splitMessagesByRequestSize(messages, b.effectiveMaxRequestSize)
splitBatches := splitMessagesByRequestSize(messages, b.effectiveMaxRequestSize, b.transactionEnabled)
if len(splitBatches) > 1 {
b.metrics.batchSplitsTotal.Add(ctx, 1)
}
for _, batchMessages := range splitBatches {
batchReq, batchBytes := newBatchRequest(batchMessages)
batchReq, batchBytes := newBatchRequest(batchMessages, b.transactionEnabled)
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
b.metrics.recordSend(ctx, len(batchMessages), batchBytes, 0, false)
Expand All @@ -280,14 +306,18 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
// this is specifically to prevent long running network calls
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
startedAt := time.Now()
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
resp, err := b.client.PublishBatch(ctxTimeout, batchReq)
cancel()

b.metrics.recordSend(ctx, len(batchMessages), batchBytes, time.Since(startedAt), err == nil)
if err != nil {
b.log.Errorw("failed to publish batch", "error", err)
b.completeBatchCallbacks(batchMessages, err)
} else if !b.transactionEnabled && resp != nil && len(resp.Results) > 0 {
b.completeBatchCallbacksFromResults(batchMessages, resp.Results)
} else {
Comment on lines 314 to +318
b.completeBatchCallbacks(batchMessages, nil)
}
b.completeBatchCallbacks(batchMessages, err)
}
}()
}
Expand All @@ -305,6 +335,62 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err
})
}

// completeBatchCallbacksFromResults dispatches per-event callbacks using the server's
// PublishResult slice. Results are matched to messages by index (server contract guarantees
// positional correspondence). If a result has a non-nil Error, the callback receives a
// PublishError; otherwise it receives nil.
//
// Defensive behaviour:
// - If len(results) < len(messages): remaining callbacks get a synthetic RESULTS_MISMATCH error.
// - If len(results) > len(messages): extras are ignored.
// - If results[i].EventId != messages[i].event.Id: a warning is logged.
func (b *Client) completeBatchCallbacksFromResults(messages []*messageWithCallback, results []*chipingress.PublishResult) {
if len(results) != len(messages) {
b.log.Warnw("publish results length mismatch",
"results", len(results),
"messages", len(messages),
)
b.metrics.resultsMismatchTotal.Add(context.Background(), 1)
}

b.callbackWg.Go(func() {
for i, msg := range messages {
if msg.callback == nil {
continue
}
if i >= len(results) {
msg.callback(&PublishError{
Code: ErrCodeResultsMismatch,
Reason: fmt.Sprintf("server returned %d results for %d events", len(results), len(messages)),
})
continue
}
result := results[i]
if result == nil {
msg.callback(nil)
continue
}
// Defensive: warn on ID mismatch but still dispatch by index.
if result.EventId != "" && msg.event.Id != "" && result.EventId != msg.event.Id {
b.log.Warnw("publish result event_id mismatch at index",
"index", i,
"expected", msg.event.Id,
"got", result.EventId,
)
b.metrics.resultsMismatchTotal.Add(context.Background(), 1)
}
if result.Error != nil {
msg.callback(&PublishError{
Code: result.Error.ErrorCode,
Reason: result.Error.Reason,
})
} else {
msg.callback(nil)
}
}
})
}

// grpcFramingOverhead accounts for gRPC framing, HTTP/2 headers, auth tokens,
// tracing metadata, and other per-request overhead not captured by proto.Size.
const grpcFramingOverhead = 10 * 1024 // 10 KiB
Expand All @@ -314,7 +400,7 @@ const grpcFramingOverhead = 10 * 1024 // 10 KiB
// reservation remains meaningful.
const minMaxGRPCRequestSize = 1024 * 1024 // 1 MiB

func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int, transactionEnabled bool) [][]*messageWithCallback {
if len(messages) == 0 {
return nil
}
Expand All @@ -326,7 +412,7 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize
current := make([]*messageWithCallback, 0, len(messages))
for _, msg := range messages {
candidate := append(current, msg)
_, candidateBytes := newBatchRequest(candidate)
_, candidateBytes := newBatchRequest(candidate, transactionEnabled)
if len(current) > 0 && candidateBytes > maxRequestSize {
batches = append(batches, current)
current = []*messageWithCallback{msg}
Expand All @@ -340,12 +426,20 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize
return batches
}

func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) {
func newBatchRequest(messages []*messageWithCallback, transactionEnabled bool) (*chipingress.CloudEventBatch, int) {
events := make([]*chipingress.CloudEventPb, len(messages))
for i, msg := range messages {
events[i] = msg.event
}
batchReq := &chipingress.CloudEventBatch{Events: events}
// Always emit PublishOptions so the wire form unambiguously reflects
// client intent. The server treats unset and explicit false identically,
// but explicit-false is defensive against any future server-default drift
// and makes traces/logs self-describing.
te := transactionEnabled
batchReq := &chipingress.CloudEventBatch{
Events: events,
Options: &chipingress.PublishOptions{TransactionEnabled: &te},
}
return batchReq, proto.Size(batchReq)
}

Expand Down Expand Up @@ -417,6 +511,20 @@ func WithLogger(log *zap.SugaredLogger) Opt {
}
}

// WithTransactionEnabled sets PublishOptions.transaction_enabled on every
// batch request. The option is always emitted on the wire so client intent
// is explicit in traces/logs; the server treats unset and explicit false
// identically (partial delivery).
// - false (the default for NewBatchClient): partial delivery. Valid events
// are produced and per-event errors are returned for invalid ones rather
// than failing the entire batch.
// - true: all-or-nothing. Any per-event failure fails the entire batch.
func WithTransactionEnabled(transactionEnabled bool) Opt {
return func(c *Client) {
c.transactionEnabled = transactionEnabled
}
}

func newBatchClientMetrics() (batchClientMetrics, error) {
meter := otel.Meter("chipingress/batch_client")
sendRequestsTotal, err := meter.Int64Counter(
Expand Down Expand Up @@ -467,15 +575,23 @@ func newBatchClientMetrics() (batchClientMetrics, error) {
if err != nil {
return batchClientMetrics{}, err
}
resultsMismatchTotal, err := meter.Int64Counter(
"chip_ingress.batch.results_mismatch_total",
otelmetric.WithDescription("Total publish responses where result count or event IDs did not match the request"),
otelmetric.WithUnit("{mismatch}"),
)
if err != nil {
return batchClientMetrics{}, err
}

return batchClientMetrics{
sendRequestsTotal: sendRequestsTotal,

requestSizeMessages: requestSizeMessages,
requestSizeBytes: requestSizeBytes,
requestLatencyMS: requestLatencyMS,
configInfo: configInfo,
batchSplitsTotal: batchSplitsTotal,
sendRequestsTotal: sendRequestsTotal,
requestSizeMessages: requestSizeMessages,
requestSizeBytes: requestSizeBytes,
requestLatencyMS: requestLatencyMS,
configInfo: configInfo,
batchSplitsTotal: batchSplitsTotal,
resultsMismatchTotal: resultsMismatchTotal,
successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet(
attribute.String("status", "success"),
)),
Expand All @@ -500,6 +616,7 @@ func (m *batchClientMetrics) recordConfig(ctx context.Context, c *Client) {
attribute.Int64("max_publish_timeout_ms", c.maxPublishTimeout.Milliseconds()),
attribute.Int64("shutdown_timeout_ms", c.shutdownTimeout.Milliseconds()),
attribute.Bool("clone_event", c.cloneEvent),
attribute.Bool("transaction_enabled", c.transactionEnabled),
attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize),
))
}
Expand Down
Loading
Loading