Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chain-selectors v1.0.100
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

195 changes: 195 additions & 0 deletions pkg/durableemitter/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package durableemitter

import (
"context"
"crypto/ed25519"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"maps"
"sync"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

const (
authHeaderKey = "X-Beholder-Node-Auth-Token"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This file is just example code to show what we would need to do to achieve similar behaviour to beholder for injecting signer post initialization

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkcll for some context - the rotating signer works today, but we can do better by upgrading to per message auth for ChIP as there is no passthrough.

If, however, we believe that a streaming API is the best long term path for the durable emitter, than the rotating auth will be useful as generating a hash poses concurrency challenges.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upgrading to per message auth for ChIP
Signing every message adds CPU/latency on the publish hot path. Rotating signer is sufficient and gives us enough security IMHO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New commit seems simple enough for me.

authHeaderVersion = "2"
)

// Signer signs auth header payloads using the node's CSA key.
type Signer interface {
Sign(ctx context.Context, keyID string, data []byte) ([]byte, error)
}

// AuthConfig configures chip ingress auth headers for DurableEmitter clients.
type AuthConfig struct {
AuthHeaders map[string]string
AuthHeadersTTL time.Duration
AuthPublicKeyHex string
// AuthKeySigner may be nil at init time for LOOP plugins; call SetGlobalSigner
// after the CSA keystore is available.
AuthKeySigner Signer
}

type lazySigner struct {
signer atomic.Pointer[Signer]
}

func (l *lazySigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) {
s := l.signer.Load()
if s == nil {
return nil, errors.New("keystore not yet available for signing")
}
return (*s).Sign(ctx, keyID, data)
}

func (l *lazySigner) Set(signer Signer) {
l.signer.Store(&signer)
}

func (l *lazySigner) IsSet() bool {
return l.signer.Load() != nil
}

type staticHeaderProvider struct {
headers map[string]string
}

func (p *staticHeaderProvider) Headers(_ context.Context) (map[string]string, error) {
return p.headers, nil
}

type rotatingHeaderProvider struct {
csaPubKey ed25519.PublicKey
signer Signer
signerTimeout time.Duration
headers atomic.Value // map[string]string
ttl time.Duration
lastUpdatedNanos atomic.Int64
lazy *lazySigner
mu sync.Mutex
}

func (p *rotatingHeaderProvider) SetSigner(signer Signer) {
if p.lazy != nil {
p.lazy.Set(signer)
}
}

func (p *rotatingHeaderProvider) IsSignerSet() bool {
return p.lazy != nil && p.lazy.IsSet()
}

func (p *rotatingHeaderProvider) Headers(ctx context.Context) (map[string]string, error) {
returnHeader := make(map[string]string)
lastUpdated := time.Unix(0, p.lastUpdatedNanos.Load())

if time.Since(lastUpdated) > p.ttl {
p.mu.Lock()
defer p.mu.Unlock()

lastUpdated = time.Unix(0, p.lastUpdatedNanos.Load())
if time.Since(lastUpdated) < p.ttl {
maps.Copy(returnHeader, p.headers.Load().(map[string]string))
return returnHeader, nil
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, p.signerTimeout)
defer cancel()

ts := time.Now()
newHeaders, err := newAuthHeaderV2(ctxWithTimeout, p.csaPubKey, p.signer, ts)
if err != nil {
return nil, fmt.Errorf("durableemitter: failed to create auth header: %w", err)
}

p.headers.Store(newHeaders)
p.lastUpdatedNanos.Store(ts.UnixNano())
}

maps.Copy(returnHeader, p.headers.Load().(map[string]string))
return returnHeader, nil
}

var globalRotatingAuth atomic.Pointer[rotatingHeaderProvider]

// NewAuthHeaderProvider builds a HeaderProvider for DurableEmitter chip ingress clients.
//
// Static mode (AuthHeadersTTL == 0): returns fixed AuthHeaders.
// Rotating mode (AuthHeadersTTL > 0): uses initial AuthHeaders until TTL expires, then signs fresh headers.
func NewAuthHeaderProvider(cfg AuthConfig) (chipingress.HeaderProvider, error) {
if cfg.AuthHeadersTTL > 0 {
if cfg.AuthPublicKeyHex == "" {
return nil, errors.New("auth: public key hex required for rotating auth (TTL > 0)")
}
if cfg.AuthHeadersTTL < 10*time.Minute {
return nil, errors.New("auth: headers TTL must be at least 10 minutes")
}

pubKey, err := hex.DecodeString(cfg.AuthPublicKeyHex)
if err != nil {
return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err)
}

lazy := &lazySigner{}
if cfg.AuthKeySigner != nil {
lazy.Set(cfg.AuthKeySigner)
}

provider := &rotatingHeaderProvider{
csaPubKey: ed25519.PublicKey(pubKey),
signer: lazy,
signerTimeout: 5 * time.Second,
ttl: cfg.AuthHeadersTTL,
lazy: lazy,
}

headers := make(map[string]string)
if len(cfg.AuthHeaders) > 0 {
headers = cfg.AuthHeaders
provider.lastUpdatedNanos.Store(time.Now().UnixNano())
}
provider.headers.Store(headers)

globalRotatingAuth.Store(provider)
return provider, nil
}

if len(cfg.AuthHeaders) == 0 {
return nil, nil
}
return &staticHeaderProvider{headers: cfg.AuthHeaders}, nil
}

// SetGlobalSigner injects the CSA keystore used to refresh rotating chip ingress auth headers.
// No-op when rotating auth is not configured.
func SetGlobalSigner(signer Signer) {
if provider := globalRotatingAuth.Load(); provider != nil {
provider.SetSigner(signer)
}
}

// IsGlobalSignerSet reports whether rotating DurableEmitter auth has a signer configured.
func IsGlobalSignerSet() bool {
provider := globalRotatingAuth.Load()
return provider != nil && provider.IsSignerSet()
}

func newAuthHeaderV2(ctx context.Context, pubKey ed25519.PublicKey, signer Signer, ts time.Time) (map[string]string, error) {
tsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(tsBytes, uint64(ts.UnixNano()))
msgBytes := append(pubKey, tsBytes...)

signature, err := signer.Sign(ctx, fmt.Sprintf("%x", pubKey), msgBytes)
if err != nil {
return nil, fmt.Errorf("durableemitter: failed to sign auth header: %w", err)
}

return map[string]string{
authHeaderKey: fmt.Sprintf("%s:%x:%d:%x", authHeaderVersion, pubKey, ts.UnixNano(), signature),
}, nil
}
91 changes: 91 additions & 0 deletions pkg/durableemitter/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package durableemitter_test

import (
"context"
"crypto/ed25519"
"encoding/hex"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/durableemitter"
)

type mockSigner struct{}

func (mockSigner) Sign(_ context.Context, _ string, _ []byte) ([]byte, error) {
return []byte("signature"), nil
}

func TestNewAuthHeaderProvider_Static(t *testing.T) {
t.Parallel()

headers := map[string]string{"X-Beholder-Node-Auth-Token": "static"}
provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{
AuthHeaders: headers,
})
require.NoError(t, err)

got, err := provider.Headers(t.Context())
require.NoError(t, err)
require.Equal(t, headers, got)
}

func TestNewAuthHeaderProvider_RotatingDeferredSigner(t *testing.T) {
t.Parallel()

pubKey, _, err := ed25519.GenerateKey(nil)
require.NoError(t, err)

initial := map[string]string{"X-Beholder-Node-Auth-Token": "initial"}
provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{
AuthHeaders: initial,
AuthHeadersTTL: 10 * time.Minute,
AuthPublicKeyHex: hex.EncodeToString(pubKey),
})
require.NoError(t, err)
require.False(t, durableemitter.IsGlobalSignerSet())

got, err := provider.Headers(t.Context())
require.NoError(t, err)
require.Equal(t, initial, got)

durableemitter.SetGlobalSigner(mockSigner{})
require.True(t, durableemitter.IsGlobalSignerSet())
}

func TestNewAuthHeaderProvider_RotatingWithSigner(t *testing.T) {
t.Parallel()

pubKey, _, err := ed25519.GenerateKey(nil)
require.NoError(t, err)

provider, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{
AuthHeaders: map[string]string{"X-Beholder-Node-Auth-Token": "initial"},
AuthHeadersTTL: 10 * time.Minute,
AuthPublicKeyHex: hex.EncodeToString(pubKey),
AuthKeySigner: mockSigner{},
})
require.NoError(t, err)
require.True(t, durableemitter.IsGlobalSignerSet())

got, err := provider.Headers(t.Context())
require.NoError(t, err)
require.NotEmpty(t, got["X-Beholder-Node-Auth-Token"])
}

func TestNewAuthHeaderProvider_Validation(t *testing.T) {
t.Parallel()

_, err := durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{
AuthHeadersTTL: 10 * time.Minute,
})
require.ErrorContains(t, err, "public key hex required")

_, err = durableemitter.NewAuthHeaderProvider(durableemitter.AuthConfig{
AuthHeadersTTL: time.Minute,
AuthPublicKeyHex: "abcd",
})
require.ErrorContains(t, err, "at least 10 minutes")
}
Loading
Loading