Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bba88a2
chip ingress: consolidate initial batch emitter setup
pkcll May 11, 2026
1024a05
Enable ChipIngressBatchEmitterEnabled by default
pkcll May 11, 2026
d9a2229
Update go.mod/go.sum for chainlink-common and chipingress
pkcll May 11, 2026
564f108
Move beholder lifecycle to Shell and set ChipIngressLogger
pkcll May 11, 2026
fb6e575
Add ChipIngress batching smoke test
pkcll May 11, 2026
5540c3f
Add CHiP sink assertion to cron invalid-schedule regression test
pkcll May 11, 2026
5414ff6
Merge remote-tracking branch 'origin/develop' into infoplat-3436-chip…
pkcll May 11, 2026
28f3514
Update chainlink-common/chipingress to infoplat-3436 branch heads
pkcll May 11, 2026
caa4a86
Refactor beholder lifecycle to Shell field and fix test coverage
pkcll May 12, 2026
1197c48
Merge origin/develop into infoplat-3436-chipingress-publishBatch
pkcll May 12, 2026
3dd6bdf
chore: shorten newBeholderClient comment
pkcll May 12, 2026
d22e6ac
chore: bump chainlink-common chipingress batching deps
pkcll May 15, 2026
d34935e
Merge origin/develop into infoplat-3436-chipingress-publishBatch
pkcll May 15, 2026
5ded6fe
gomodtidy
pkcll May 15, 2026
28c9556
fix: resolve lint errors in beholder lifecycle code
pkcll May 15, 2026
29a66b5
refactor: assign no-op beholder client when telemetry disabled
pkcll May 15, 2026
092cd97
Merge branch 'develop' of github.com:smartcontractkit/chainlink into …
pkcll May 15, 2026
42f860c
fix: update tests for no-op beholder client and ChipIngressBatchEmitt…
pkcll May 15, 2026
3aa9650
chore: bump chipingress to main, chainlink-common to infoplat-3436-ch…
pkcll May 15, 2026
cfa96b7
fix: remove duplicate batch forwarding in chip-testsink
pkcll May 15, 2026
915b69d
fix: set telemetry endpoint in beholder lifecycle tests
pkcll May 15, 2026
da809ea
chore: bump chainlink-common for chip ingress batching
pkcll May 15, 2026
b12be08
Merge origin/develop into infoplat-3436-chipingress-publishBatch
pkcll May 18, 2026
6d6d44a
Bump chainlink-common to infoplat-3436-chipingress-batching-part-2, c…
pkcll May 18, 2026
2b40419
gomodtidy
pkcll May 18, 2026
dd48333
Bump chainlink-common to infoplat-3436-chipingress-batching-part-2 (2…
pkcll May 18, 2026
4dd28ac
Bump chainlink-common to infoplat-3436-chipingress-batching-part-2 (2…
pkcll May 18, 2026
c1dcb8a
Merge origin/develop into infoplat-3436-chipingress-publishBatch
pkcll May 18, 2026
fe4d4bb
Bump chainlink-common to infoplat-3436-chipingress-batching-part-2 (2…
pkcll May 18, 2026
44240c0
Bump chainlink-common from main (20260518203858, PR #2059)
pkcll May 18, 2026
22ee17e
Remove nil guard on BeholderClient.Close since it always defaults to …
pkcll May 19, 2026
c0349d4
Fix shell_local_test: ensure noop BeholderClient before AfterNode cle…
pkcll May 19, 2026
2eb857d
Merge branch 'develop' into infoplat-3436-chipingress-publishBatch
pkcll May 19, 2026
23f73b8
gomodtidy
pkcll May 19, 2026
03a38d6
Merge origin/develop with latest chainlink-common and chipingress
pkcll May 20, 2026
a757214
Run go mod tidy
pkcll May 20, 2026
82556f5
Remove empty chip ingress batching test stubs
pkcll May 20, 2026
06d6a77
Bump chainlink-common and chipingress to main
pkcll May 20, 2026
a8337eb
Restore ChipIngressBatchEmitterEnabled config lost during merge
pkcll May 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 86 additions & 59 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func metricViews() []sdkmetric.View {
)
}

func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTelemetry config.Telemetry, lggr logger.Logger, csaPubKeyHex string, beholderAuthHeaders map[string]string) error {
func initGlobals(cfgProm config.Prometheus, cfgTelemetry config.Telemetry, cfgTracing config.Tracing, lggr logger.Logger, beholderClient *beholder.Client) error {
// Avoid double initializations, but does not prevent relay methods from being called multiple times.
var err error
initGlobalsOnce.Do(func() {
Expand All @@ -95,71 +95,97 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
lggr.Errorw("Telemetry error", "err", err)
}))

tracingCfg := loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(error) { lggr.Errorw("Failed to dial", "err", err) },
}
if !cfgTelemetry.Enabled() {
return loop.SetupTracing(tracingCfg)
}

var attributes []attribute.KeyValue
if tracingCfg.Enabled {
attributes = tracingCfg.Attributes()
}
for k, v := range cfgTelemetry.ResourceAttributes() {
attributes = append(attributes, attribute.String(k, v))
return loop.SetupTracing(tracingConfig(cfgTracing, lggr))
}

clientCfg := beholder.Config{
InsecureConnection: cfgTelemetry.InsecureConnection(),
CACertFile: cfgTelemetry.CACertFile(),
OtelExporterGRPCEndpoint: cfgTelemetry.OtelExporterGRPCEndpoint(),
ResourceAttributes: attributes,
TraceSampleRatio: cfgTelemetry.TraceSampleRatio(),
EmitterBatchProcessor: cfgTelemetry.EmitterBatchProcessor(),
EmitterExportTimeout: cfgTelemetry.EmitterExportTimeout(),
AuthPublicKeyHex: csaPubKeyHex,
AuthHeaders: beholderAuthHeaders,
AuthHeadersTTL: cfgTelemetry.AuthHeadersTTL(),
ChipIngressEmitterEnabled: cfgTelemetry.ChipIngressEndpoint() != "",
ChipIngressEmitterGRPCEndpoint: cfgTelemetry.ChipIngressEndpoint(),
ChipIngressInsecureConnection: cfgTelemetry.ChipIngressInsecureConnection(),
LogStreamingEnabled: cfgTelemetry.LogStreamingEnabled(),
LogLevel: cfgTelemetry.LogLevel(),
LogBatchProcessor: cfgTelemetry.LogBatchProcessor(),
LogExportTimeout: cfgTelemetry.LogExportTimeout(),
LogExportMaxBatchSize: cfgTelemetry.LogExportMaxBatchSize(),
LogExportInterval: cfgTelemetry.LogExportInterval(),
LogMaxQueueSize: cfgTelemetry.LogMaxQueueSize(),
// note: due to the OTEL specification, all histogram buckets
// must be defined when the beholder client is created
MetricViews: metricViews(),
if beholderClient != nil {
beholder.SetClient(beholderClient)
beholder.SetGlobalOtelProviders()
}

if tracingCfg.Enabled {
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
return err
}
}
var beholderClient *beholder.Client
beholderClient, err = beholder.NewClient(clientCfg)
if err != nil {
return err
}
beholder.SetClient(beholderClient)
beholder.SetGlobalOtelProviders()
return nil
}()
})
return err
}

func tracingConfig(cfgTracing config.Tracing, lggr logger.Logger) loop.TracingConfig {
return loop.TracingConfig{
Enabled: cfgTracing.Enabled(),
CollectorTarget: cfgTracing.CollectorTarget(),
NodeAttributes: cfgTracing.Attributes(),
SamplingRatio: cfgTracing.SamplingRatio(),
TLSCertPath: cfgTracing.TLSCertPath(),
OnDialError: func(e error) { lggr.Errorw("Failed to dial", "err", e) },
}
}

// newBeholderClient builds a Beholder client from tracing/telemetry config
// and sets the CSA signer used for auth header refresh.
func newBeholderClient(
lggr logger.Logger,
keyStore keystore.Master,
cfgTracing config.Tracing,
cfgTelemetry config.Telemetry,
csaPubKeyHex string,
beholderAuthHeaders map[string]string,
) (*beholder.Client, error) {
attributes := make([]attribute.KeyValue, 0, len(cfgTelemetry.ResourceAttributes()))
for k, v := range cfgTelemetry.ResourceAttributes() {
attributes = append(attributes, attribute.String(k, v))
}

clientCfg := beholder.Config{
InsecureConnection: cfgTelemetry.InsecureConnection(),
CACertFile: cfgTelemetry.CACertFile(),
OtelExporterGRPCEndpoint: cfgTelemetry.OtelExporterGRPCEndpoint(),
ResourceAttributes: attributes,
TraceSampleRatio: cfgTelemetry.TraceSampleRatio(),
EmitterBatchProcessor: cfgTelemetry.EmitterBatchProcessor(),
EmitterExportTimeout: cfgTelemetry.EmitterExportTimeout(),
AuthPublicKeyHex: csaPubKeyHex,
AuthHeaders: beholderAuthHeaders,
AuthHeadersTTL: cfgTelemetry.AuthHeadersTTL(),
ChipIngressEmitterEnabled: cfgTelemetry.ChipIngressEndpoint() != "",
ChipIngressEmitterGRPCEndpoint: cfgTelemetry.ChipIngressEndpoint(),
ChipIngressInsecureConnection: cfgTelemetry.ChipIngressInsecureConnection(),
ChipIngressBatchEmitterEnabled: cfgTelemetry.ChipIngressBatchEmitterEnabled(),
ChipIngressLogger: lggr,
LogStreamingEnabled: cfgTelemetry.LogStreamingEnabled(),
LogLevel: cfgTelemetry.LogLevel(),
LogBatchProcessor: cfgTelemetry.LogBatchProcessor(),
LogExportTimeout: cfgTelemetry.LogExportTimeout(),
LogExportMaxBatchSize: cfgTelemetry.LogExportMaxBatchSize(),
LogExportInterval: cfgTelemetry.LogExportInterval(),
LogMaxQueueSize: cfgTelemetry.LogMaxQueueSize(),
// Due to OpenTelemetry semantics, histogram bucket boundaries must be set
// when the Beholder client is constructed.
MetricViews: metricViews(),
}

if cfgTracing.Enabled() {
tracingCfg := tracingConfig(cfgTracing, lggr)
// add tracing attributes to resource attributes
clientCfg.ResourceAttributes = append(clientCfg.ResourceAttributes, tracingCfg.Attributes()...)

var err error
clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter()
if err != nil {
return nil, err
}
}
beholderClient, err := beholder.NewClient(clientCfg)
if err != nil {
return nil, err
}

// Set the signer used to refresh auth headers when AuthHeadersTTL is non-zero.
// TTL 0 means static headers only; the signer will not run.
beholderClient.SetSigner(&keystore.CSASigner{CSA: keyStore.CSA()})

return beholderClient, nil
}

// ErrNoAPICredentialsAvailable is returned when not run from a terminal
// and no API credentials have been provided
var ErrNoAPICredentialsAvailable = errors.New("API credentials must be supplied")
Expand Down Expand Up @@ -188,9 +214,10 @@ type Shell struct {
secretsFiles []string
secretsFileIsSet bool

LDB pg.LockedDB // initialized in BeforeNode
DS sqlutil.DataSource // initialized in BeforeNode
KeyStore keystore.Master // initialized in BeforeNode
LDB pg.LockedDB // initialized in BeforeNode
DS sqlutil.DataSource // initialized in BeforeNode
KeyStore keystore.Master // initialized in BeforeNode
BeholderClient *beholder.Client // initialized in BeforeNode

CleanupOnce sync.Once // ensures cleanup happens exactly once
}
Expand Down
61 changes: 43 additions & 18 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,33 +1196,55 @@ func (s *Shell) beforeNode(c *cli.Context) error {
return fmt.Errorf("failed to build Beholder auth: %w", err)
}

// Initialize globals with beholder and telemetry
err = initGlobals(s.Config.Prometheus(), s.Config.Tracing(), s.Config.Telemetry(), s.Logger, csaPubKeyHex, beholderAuthHeaders)
if err != nil {
// Build Beholder client: a real one when telemetry is enabled, otherwise a
// no-op so that downstream code never needs to nil-check.
if s.Config.Telemetry().Enabled() {
var beholderErr error
s.BeholderClient, beholderErr = newBeholderClient(s.Logger, keyStore, s.Config.Tracing(), s.Config.Telemetry(), csaPubKeyHex, beholderAuthHeaders)
if beholderErr != nil {
return fmt.Errorf("failed creating beholder client: %w", beholderErr)
}
} else {
s.BeholderClient = beholder.NewNoopClient()
}

// Prometheus, grpc, tracing, and (when telemetry is on) Beholder OTel globals.
if err = initGlobals(s.Config.Prometheus(), s.Config.Telemetry(), s.Config.Tracing(), s.Logger, s.BeholderClient); err != nil {
return fmt.Errorf("failed initializing globals: %w", err)
}

// Set the signing mechanism for beholder auth headers
// if the TTL is 0, we will use the static headers, and this signer will never be called.
beholder.GetClient().SetSigner(&keystore.CSASigner{CSA: keyStore.CSA()})
// Emit node configuration through beholder
s.EmitNodeConfig(ctx)
// If log streaming is enabled swap core to add Otel
if s.Config.Telemetry().LogStreamingEnabled() {
if s.SetOtelCore == nil {
return errors.New("Shell.SetOtelCore is nil")
}
otelLogger := beholder.GetLogger()
logLevel := s.Config.Telemetry().LogLevel()
otelCore := otelzap.NewCore(otelLogger, otelzap.WithLevel(logLevel))
// Wire log streaming before Start so ChipIngressLogger and other internals
// see the OTel-backed logger core.
if err = s.setupLogStreaming(); err != nil {
return err
}

// Start the beholder client after log streaming is wired.
if err = s.BeholderClient.Start(ctx); err != nil {
return fmt.Errorf("failed to start beholder client: %w", err)
}

s.SetOtelCore(otelCore)
lggr.Info("Log streaming enabled")
if s.Config.Telemetry().Enabled() {
s.EmitNodeConfig(ctx)
}

return nil
}

func (s *Shell) setupLogStreaming() error {
if !s.Config.Telemetry().LogStreamingEnabled() {
return nil
}
if s.SetOtelCore == nil {
return errors.New("Shell.SetOtelCore is nil")
}
logLevel := s.Config.Telemetry().LogLevel()
otelCore := otelzap.NewCore(s.BeholderClient.Logger, otelzap.WithLevel(logLevel))
s.SetOtelCore(otelCore)
s.Logger.Info("Log streaming enabled")
return nil
}

// BeforeNode initializes database, keystore, logger, and beholder for node startup.
// This is used as a Before hook in CLI commands that require these components.
func (s *Shell) BeforeNode(c *cli.Context) error {
Expand All @@ -1247,6 +1269,9 @@ func (s *Shell) afterNode(lggr logger.SugaredLogger) {
log.Printf("Failed to close Logger: %v", err)
}
}
if err := s.BeholderClient.Close(); err != nil {
log.Printf("Failed to close Beholder client: %v", err)
}
})
}

Expand Down
Loading
Loading