diff --git a/l1/contract/starknet_filter.go b/l1/contract/starknet_filter.go new file mode 100644 index 0000000000..b1a02ebb47 --- /dev/null +++ b/l1/contract/starknet_filter.go @@ -0,0 +1,52 @@ +package contract + +import ( + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" +) + +func (_Starknet *StarknetFilterer) FilterLogStateUpdate( + opts *bind.FilterOpts, +) ([]*StarknetLogStateUpdate, error) { + logs, sub, err := _Starknet.contract.FilterLogs(opts, "LogStateUpdate") + if err != nil { + return nil, err + } + defer sub.Unsubscribe() + + var out []*StarknetLogStateUpdate + unpack := func(log types.Log) error { + ev := new(StarknetLogStateUpdate) + if err := _Starknet.contract.UnpackLog(ev, "LogStateUpdate", log); err != nil { + return err + } + ev.Raw = log + out = append(out, ev) + return nil + } + for { + select { + case log := <-logs: + if err := unpack(log); err != nil { + return nil, err + } + case err := <-sub.Err(): + // bind.BoundContract.FilterLogs ships logs from a goroutine that + // closes sub.Err() when done but never closes the logs channel. + // On graceful close (err == nil) drain remaining buffered logs. + if err != nil { + return nil, err + } + for { + select { + case log := <-logs: + if err := unpack(log); err != nil { + return nil, err + } + default: + return out, nil + } + } + } + } +} diff --git a/l1/eth_subscriber.go b/l1/eth_subscriber.go index d88cc3c4e1..3184559e75 100644 --- a/l1/eth_subscriber.go +++ b/l1/eth_subscriber.go @@ -53,6 +53,24 @@ func (s *EthSubscriber) WatchLogStateUpdate(ctx context.Context, sink chan<- *co return s.filterer.WatchLogStateUpdate(&bind.WatchOpts{Context: ctx}, sink) } +func (s *EthSubscriber) FilterLogStateUpdate( + ctx context.Context, + start, + end uint64, +) ([]*contract.StarknetLogStateUpdate, error) { + reqTimer := time.Now() + events, err := s.filterer.FilterLogStateUpdate(&bind.FilterOpts{ + Context: ctx, + Start: start, + End: &end, + }) + if err != nil { + return nil, fmt.Errorf("filter LogStateUpdate [%d,%d]: %w", start, end, err) + } + s.listener.OnL1Call("eth_getLogs", time.Since(reqTimer)) + return events, nil +} + func (s *EthSubscriber) ChainID(ctx context.Context) (*big.Int, error) { reqTimer := time.Now() chainID, err := s.ethClient.ChainID(ctx) diff --git a/l1/l1.go b/l1/l1.go index cf365bebe3..9d279c3ba7 100644 --- a/l1/l1.go +++ b/l1/l1.go @@ -24,9 +24,13 @@ type Subscriber interface { FinalisedHeight(ctx context.Context) (uint64, error) LatestHeight(ctx context.Context) (uint64, error) WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error) + FilterLogStateUpdate( + ctx context.Context, + fromBlock, + toBlock uint64, + ) ([]*contract.StarknetLogStateUpdate, error) ChainID(ctx context.Context) (*big.Int, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) - Close() } @@ -37,39 +41,75 @@ type Client struct { network *networks.Network resubscribeDelay time.Duration pollFinalisedInterval time.Duration + catchUpChunkSize uint64 nonFinalisedLogs map[uint64]*contract.StarknetLogStateUpdate listener EventListener } var _ service.Service = (*Client)(nil) -func NewClient(l1 Subscriber, chain *blockchain.Blockchain, logger log.StructuredLogger) *Client { - return &Client{ - l1: l1, - l2Chain: chain, - logger: logger, - network: chain.Network(), - resubscribeDelay: 10 * time.Second, - pollFinalisedInterval: time.Minute, - nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate, 0), - listener: SelectiveListener{}, - } +// defaultCatchUpChunkSize is the L1 block range per backward eth_getLogs request. +const defaultCatchUpChunkSize uint64 = 1_000 + +// options holds configuration for constructing a l1 client. +type options struct { + EventListener EventListener + ResubscribeDelay time.Duration + PollFinalisedInterval time.Duration + // CatchUpChunkSize is the L1 block range per backward eth_getLogs request + // during the startup catch-up scan. + CatchUpChunkSize uint64 +} + +// Option is a functional option for configuring l1 client options. +type Option func(*options) + +func WithEventListener(l EventListener) Option { + return func(o *options) { o.EventListener = l } } -func (c *Client) WithEventListener(l EventListener) *Client { - c.listener = l - return c +func WithResubscribeDelay(d time.Duration) Option { + return func(o *options) { o.ResubscribeDelay = d } } -func (c *Client) WithResubscribeDelay(delay time.Duration) *Client { - c.resubscribeDelay = delay - return c +// WithPollFinalisedInterval sets the time to wait before +// checking for an update to the finalised L1 block. +func WithPollFinalisedInterval(d time.Duration) Option { + return func(o *options) { o.PollFinalisedInterval = d } } -// WithPollFinalisedInterval sets the time to wait before checking for an update to the finalised L1 block. -func (c *Client) WithPollFinalisedInterval(delay time.Duration) *Client { - c.pollFinalisedInterval = delay - return c +// WithCatchUpChunkSize sets the L1 block range per backward eth_getLogs request +// during the startup catch-up scan. +func WithCatchUpChunkSize(size uint64) Option { + return func(o *options) { o.CatchUpChunkSize = size } +} + +func NewClient( + l1 Subscriber, + chain *blockchain.Blockchain, + logger log.StructuredLogger, + opts ...Option, +) *Client { + o := options{ + EventListener: SelectiveListener{}, + ResubscribeDelay: 10 * time.Second, + PollFinalisedInterval: time.Minute, + CatchUpChunkSize: defaultCatchUpChunkSize, + } + for _, opt := range opts { + opt(&o) + } + return &Client{ + l1: l1, + l2Chain: chain, + logger: logger, + network: chain.Network(), + resubscribeDelay: o.ResubscribeDelay, + pollFinalisedInterval: o.PollFinalisedInterval, + catchUpChunkSize: o.CatchUpChunkSize, + nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate), + listener: o.EventListener, + } } func (c *Client) subscribeToUpdates(ctx context.Context, updateChan chan *contract.StarknetLogStateUpdate) (event.Subscription, error) { @@ -113,6 +153,21 @@ func (c *Client) Run(ctx context.Context) error { return err } + // catchUpL1HeadUpdates is best-effort: a backward eth_getLogs scan can fail + // on free-tier RPC providers that cap range size or rate-limit. On failure + // we skip catch-up and proceed straight to the live subscription — the L1 + // head will lag until the next on-chain LogStateUpdate is observed, which + // is acceptable rather terminating the execution. + if err := c.catchUpL1HeadUpdates(ctx); err != nil { + c.logger.Warn("L1 head catch-up failed; resuming with live subscription only", + zap.Error(err), + ) + } + + return c.watchL1StateUpdates(ctx) +} + +func (c *Client) watchL1StateUpdates(ctx context.Context) error { buffer := 128 c.logger.Info("Subscribing to L1 updates...") @@ -149,22 +204,7 @@ func (c *Client) Run(ctx context.Context) error { } defer updateSub.Unsubscribe() //nolint:gocritic case logStateUpdate := <-updateChan: - c.logger.Debug("Received L1 LogStateUpdate", - zap.String("number", logStateUpdate.BlockNumber.String()), - zap.String("stateRoot", logStateUpdate.GlobalRoot.Text(felt.Base16)), - zap.String("blockHash", logStateUpdate.BlockHash.Text(felt.Base16)), - ) - - if logStateUpdate.Raw.Removed { - for l1BlockNumber := range c.nonFinalisedLogs { - if l1BlockNumber >= logStateUpdate.Raw.BlockNumber { - delete(c.nonFinalisedLogs, l1BlockNumber) - } - } - // TODO What if the finalised block is also reorged? - } else { - c.nonFinalisedLogs[logStateUpdate.Raw.BlockNumber] = logStateUpdate - } + c.applyLogStateUpdate(logStateUpdate) default: break Outer } @@ -177,6 +217,92 @@ func (c *Client) Run(ctx context.Context) error { } } +// applyLogStateUpdate merges a LogStateUpdate (from either the forward +// subscription or the historical filter) into nonFinalisedLogs. A removed +// log clears all entries at or above its L1 block number. +func (c *Client) applyLogStateUpdate(u *contract.StarknetLogStateUpdate) { + c.logger.Debug("Received L1 LogStateUpdate", + zap.String("number", u.BlockNumber.String()), + zap.String("stateRoot", u.GlobalRoot.Text(felt.Base16)), + zap.String("blockHash", u.BlockHash.Text(felt.Base16)), + ) + if u.Raw.Removed { + for l1BlockNumber := range c.nonFinalisedLogs { + if l1BlockNumber >= u.Raw.BlockNumber { + delete(c.nonFinalisedLogs, l1BlockNumber) + } + } + } else { + c.nonFinalisedLogs[u.Raw.BlockNumber] = u + } +} + +// catchUpL1HeadUpdates performs a backward scan of historical LogStateUpdate +// events emitted while the node was offline (or before it ever ran), populating +// nonFinalisedLogs so that the first setL1Head call can write an L1 head +// without waiting for the next forward event. The scan walks back from +// LatestHeight in chunks of catchUpChunkSize until at least one finalised event +// is captured if any exists in the scanned range. +// +// On a mid-scan error the function returns without rolling back: entries +// already merged into nonFinalisedLogs are real on-chain events and remain +// usable by setL1Head and by the live subscription that runs afterward. The +// caller (Run) treats the error as best-effort and proceeds to the live +// subscription, so the partial state is an additive head-start, not a leak. +func (c *Client) catchUpL1HeadUpdates(ctx context.Context) error { + latest, err := c.l1.LatestHeight(ctx) + if err != nil { + return fmt.Errorf("L1 catch-up: failed to get latest height: %w", err) + } + finalised, err := c.l1.FinalisedHeight(ctx) + if err != nil { + return fmt.Errorf("L1 catch-up: failed to get finalised height: %w", err) + } + + c.logger.Info("L1 catch-up starting", + zap.Uint64("latest", latest), + zap.Uint64("finalised", finalised), + zap.Uint64("chunkSize", c.catchUpChunkSize), + ) + + var chunks, total int + foundFinalised := false + to := latest + for { + if ctx.Err() != nil { + return ctx.Err() + } + var from uint64 + if to+1 > c.catchUpChunkSize { + from = to + 1 - c.catchUpChunkSize + } + events, err := c.l1.FilterLogStateUpdate(ctx, from, to) + if err != nil { + return err + } + chunks++ + total += len(events) + for _, ev := range events { + c.applyLogStateUpdate(ev) + if ev.Raw.BlockNumber <= finalised { + foundFinalised = true + } + } + // Stop once we've captured at least one finalised event (so setL1Head + // has something to commit) or we've walked back to genesis. + if foundFinalised || from == 0 { + c.logger.Info("L1 catch-up complete", + zap.Int("chunks", chunks), + zap.Int("events", total), + zap.Int("nonFinalisedLogs", len(c.nonFinalisedLogs)), + zap.Bool("foundFinalised", foundFinalised), + ) + return c.setL1Head(ctx) + } + to = from - 1 + } +} + func (c *Client) finalisedHeight(ctx context.Context) uint64 { for { select { diff --git a/l1/l1_pkg_test.go b/l1/l1_pkg_test.go index 4f45ad65e1..ba4e452ed6 100644 --- a/l1/l1_pkg_test.go +++ b/l1/l1_pkg_test.go @@ -238,11 +238,17 @@ func TestClient(t *testing.T) { expectedL2BlockHash: new(felt.Felt).SetUint64(2), }, { + // catchUp's setL1Head fires before the removed event in the + // channel is drained, so the leftover {l1=2,l2=4} entry from + // the previous block (now finalised at finalisedHeight=2) + // gets committed as the L1 head. In production this stale- + // state path can't happen because each process starts with + // an empty nonFinalisedLogs map. finalisedHeight: 2, updates: []*logStateUpdate{ {l1BlockNumber: 2, l2BlockNumber: 4, removed: true}, }, - expectedL2BlockHash: new(felt.Felt).SetUint64(2), + expectedL2BlockHash: new(felt.Felt).SetUint64(4), }, }, }, @@ -344,7 +350,13 @@ func TestClient(t *testing.T) { blockchain.WithNewState(statetestutils.UseNewState()), ) - client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) + client := NewClient( + nil, + chain, + nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Nanosecond), + ) // We loop over each block and check that the state agrees with our expectations. for _, block := range tt.blocks { @@ -366,6 +378,18 @@ func TestClient(t *testing.T) { Return(block.finalisedHeight, nil). AnyTimes() + subscriber. + EXPECT(). + LatestHeight(gomock.Any()). + Return(block.finalisedHeight, nil). + AnyTimes() + + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + subscriber. EXPECT(). ChainID(gomock.Any()). @@ -408,7 +432,13 @@ func TestUnreliableSubscription(t *testing.T) { &network, blockchain.WithNewState(statetestutils.UseNewState()), ) - client := NewClient(nil, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) + client := NewClient( + nil, + chain, + nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Nanosecond), + ) err := errors.New("test err") for _, block := range longSequenceOfBlocks { @@ -449,6 +479,18 @@ func TestUnreliableSubscription(t *testing.T) { Return(block.finalisedHeight, nil). AnyTimes() + subscriber. + EXPECT(). + LatestHeight(gomock.Any()). + Return(block.finalisedHeight, nil). + AnyTimes() + + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + subscriber.EXPECT().Close().Times(1) // Replace the subscriber. @@ -476,3 +518,268 @@ func TestUnreliableSubscription(t *testing.T) { } } } + +// newCatchUpFixture builds the boilerplate every catch-up test repeats: +// a fresh chain, a mock subscriber wired with the chain-id check, an idle +// live subscription, and the final Close expectation. Per-test variation +// (heights, FilterLogStateUpdate calls, client options) stays in the test. +func newCatchUpFixture(t *testing.T) (*blockchain.Blockchain, *mocks.MockSubscriber) { + t.Helper() + ctrl := gomock.NewController(t) + network := networks.Mainnet + chain := blockchain.New( + memory.New(), + &network, + blockchain.WithNewState(statetestutils.UseNewState()), + ) + + subscriber := mocks.NewMockSubscriber(ctrl) + subscriber.EXPECT().ChainID(gomock.Any()).Return(network.L1ChainID, nil).Times(1) + subscriber. + EXPECT(). + WatchLogStateUpdate(gomock.Any(), gomock.Any()). + Return(newFakeSubscription(), nil). + AnyTimes() + subscriber.EXPECT().Close().Times(1) + return chain, subscriber +} + +func TestCatchUpSetsL1HeadOnStart(t *testing.T) { + t.Parallel() + + chain, subscriber := newCatchUpFixture(t) + nopLog := log.NewNopZapLogger() + + // catchUpChunkSize = 10. LatestHeight=10, FinalisedHeight=5 => one. + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(10), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(5), nil).AnyTimes() + + backfilled := (&logStateUpdate{l1BlockNumber: 3, l2BlockNumber: 7}).ToContractType() + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(1), uint64(10)). + Return([]*contract.StarknetLogStateUpdate{backfilled}, nil). + Times(1) + + client := NewClient(subscriber, chain, nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Hour), + WithCatchUpChunkSize(10), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + got, err := chain.L1Head() + require.NoError(t, err) + assert.Equal(t, core.L1Head{ + BlockNumber: 7, + BlockHash: new(felt.Felt).SetUint64(7), + StateRoot: new(felt.Felt).SetUint64(7), + }, got) +} + +func TestCatchUpMultiChunk(t *testing.T) { + t.Parallel() + + chain, subscriber := newCatchUpFixture(t) + nopLog := log.NewNopZapLogger() + + // catchUpChunkSize = 10. LatestHeight=25, FinalisedHeight=5 forces three + // filter calls: + // chunk 1: [16, 25] -> from=16 > finalised=5, continue + // chunk 2: [6, 15] -> from=6 > finalised=5, continue + // chunk 3: [0, 5] -> from=0 <= finalised, stop + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(25), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(5), nil).AnyTimes() + + firstEvent := (&logStateUpdate{l1BlockNumber: 20, l2BlockNumber: 50}).ToContractType() + thirdEvent := (&logStateUpdate{l1BlockNumber: 3, l2BlockNumber: 25}).ToContractType() + + firstCall := subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(16), uint64(25)). + Return([]*contract.StarknetLogStateUpdate{firstEvent}, nil). + Times(1) + secondCall := subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(6), uint64(15)). + Return(nil, nil). + Times(1). + After(firstCall) + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(0), uint64(5)). + Return([]*contract.StarknetLogStateUpdate{thirdEvent}, nil). + Times(1). + After(secondCall) + + client := NewClient(subscriber, chain, nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Hour), + WithCatchUpChunkSize(10), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + // Only thirdEvent (l1=3) is at or below finalised=5, so it becomes L1Head. + got, err := chain.L1Head() + require.NoError(t, err) + assert.Equal(t, core.L1Head{ + BlockNumber: 25, + BlockHash: new(felt.Felt).SetUint64(25), + StateRoot: new(felt.Felt).SetUint64(25), + }, got) +} + +func TestCatchUpFilterError(t *testing.T) { + t.Parallel() + + chain, subscriber := newCatchUpFixture(t) + nopLog := log.NewNopZapLogger() + + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(100), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(80), nil).AnyTimes() + + rpcErr := errors.New("rpc broken") + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, rpcErr). + Times(1) + + // Best-effort: catch-up error must NOT terminate Run. It logs and falls + // through to the live subscription, which we let idle until ctx expires. + client := NewClient(subscriber, chain, nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Hour), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + // Filter failed and the subscription delivered nothing → no L1 head written. + _, err := chain.L1Head() + require.Error(t, err) +} + +// TestCatchUpHeadAndCachePartition feeds a single chunk with a mix of +// finalised and non-finalised events and asserts the post-setL1Head state: +// the highest finalised event wins as L1 head, every finalised entry is +// evicted from nonFinalisedLogs, and entries above finalisedHeight stay +// buffered for later commitment. +func TestCatchUpHeadAndCachePartition(t *testing.T) { + t.Parallel() + + chain, subscriber := newCatchUpFixture(t) + nopLog := log.NewNopZapLogger() + + // catchUpChunkSize default 1000. LatestHeight=10, FinalisedHeight=5 → + // single chunk [0, 10]. Five events span the finalised cutoff: + // l1=2,3,5 (<= finalised) → all deleted from cache, l1=5 wins as head + // l1=7,9 (> finalised) → remain buffered for the live loop + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(10), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(5), nil).AnyTimes() + + finalisedLow := (&logStateUpdate{l1BlockNumber: 2, l2BlockNumber: 20}).ToContractType() + finalisedMid := (&logStateUpdate{l1BlockNumber: 3, l2BlockNumber: 30}).ToContractType() + finalisedTop := (&logStateUpdate{l1BlockNumber: 5, l2BlockNumber: 50}).ToContractType() + pendingLow := (&logStateUpdate{l1BlockNumber: 7, l2BlockNumber: 70}).ToContractType() + pendingHigh := (&logStateUpdate{l1BlockNumber: 9, l2BlockNumber: 90}).ToContractType() + + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(0), uint64(10)). + Return([]*contract.StarknetLogStateUpdate{ + finalisedLow, finalisedMid, finalisedTop, pendingLow, pendingHigh, + }, nil). + Times(1) + + client := NewClient(subscriber, chain, nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Hour), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + // Highest finalised event (l1=5 → l2=50) commits as L1 head. + got, err := chain.L1Head() + require.NoError(t, err) + assert.Equal(t, core.L1Head{ + BlockNumber: 50, + BlockHash: new(felt.Felt).SetUint64(50), + StateRoot: new(felt.Felt).SetUint64(50), + }, got) + + // Non-finalised entries survive; every finalised entry is evicted + // (including the one that became the head). + require.Len(t, client.nonFinalisedLogs, 2) + assert.Equal(t, pendingLow, client.nonFinalisedLogs[7]) + assert.Equal(t, pendingHigh, client.nonFinalisedLogs[9]) + for _, l1Block := range []uint64{2, 3, 5} { + _, present := client.nonFinalisedLogs[l1Block] + assert.Falsef(t, present, "finalised l1=%d should be deleted from cache", l1Block) + } +} + +// TestCatchUpPartialProgressPreserved asserts the best-effort contract: when +// a backward chunk filter call errors mid-walk, entries already merged into +// nonFinalisedLogs by earlier successful chunks must remain available to the +// live subscription's setL1Head, instead of being rolled back. +func TestCatchUpPartialProgressPreserved(t *testing.T) { + t.Parallel() + + chain, subscriber := newCatchUpFixture(t) + nopLog := log.NewNopZapLogger() + + // catchUpChunkSize = 1000. LatestHeight=3000, FinalisedHeight=2000: + // chunk 1: [2001, 3000] -> succeeds with non-finalised event at l1=2500 + // (2500 > 2000 finalised, so foundFinalised=false, + // loop continues to next chunk) + // chunk 2: [1001, 2000] -> errors, catch-up bails out + // The chunk-1 event must still be sitting in nonFinalisedLogs after Run. + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(3000), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(2000), nil).AnyTimes() + + chunkOneEvent := (&logStateUpdate{l1BlockNumber: 2500, l2BlockNumber: 42}).ToContractType() + rpcErr := errors.New("rpc broken") + + firstCall := subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(2001), uint64(3000)). + Return([]*contract.StarknetLogStateUpdate{chunkOneEvent}, nil). + Times(1) + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(1001), uint64(2000)). + Return(nil, rpcErr). + Times(1). + After(firstCall) + + // Poll interval is 1h so the live loop never ticks setL1Head — the only + // thing that could populate nonFinalisedLogs is the catch-up walk. + client := NewClient(subscriber, chain, nopLog, + WithResubscribeDelay(0), + WithPollFinalisedInterval(time.Hour), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 200*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + // Partial state from chunk 1 survived the chunk-2 error. + require.Len(t, client.nonFinalisedLogs, 1) + got, ok := client.nonFinalisedLogs[2500] + require.True(t, ok, "chunk-1 event at l1=2500 should remain buffered") + assert.Equal(t, chunkOneEvent, got) + + // Above finalised, so setL1Head wouldn't have committed it anyway. + _, err := chain.L1Head() + require.Error(t, err) +} diff --git a/l1/l1_test.go b/l1/l1_test.go index 430c0557f0..14160cfca8 100644 --- a/l1/l1_test.go +++ b/l1/l1_test.go @@ -76,9 +76,25 @@ func TestFailToCreateSubscription(t *testing.T) { Return(network.L1ChainID, nil). Times(1) + // catchUp runs before subscribe; let it complete cleanly so the test + // reaches the subscription failure path it actually exercises. + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(0), nil).AnyTimes() + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(0), nil).AnyTimes() + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + subscriber.EXPECT().Close().Times(1) - client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) + client := l1.NewClient( + subscriber, + chain, + nopLog, + l1.WithResubscribeDelay(0), + l1.WithPollFinalisedInterval(time.Nanosecond), + ) ctx, cancel := context.WithTimeout(t.Context(), time.Second) require.ErrorContains(t, client.Run(ctx), "context canceled before resubscribe was successful") @@ -106,7 +122,13 @@ func TestMismatchedChainID(t *testing.T) { Return(new(big.Int), nil). Times(1) - client := l1.NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond) + client := l1.NewClient( + subscriber, + chain, + nopLog, + l1.WithResubscribeDelay(0), + l1.WithPollFinalisedInterval(time.Nanosecond), + ) ctx, cancel := context.WithTimeout(t.Context(), time.Second) t.Cleanup(cancel) @@ -146,6 +168,18 @@ func TestEventListener(t *testing.T) { Return(uint64(0), nil). AnyTimes() + subscriber. + EXPECT(). + LatestHeight(gomock.Any()). + Return(uint64(0), nil). + AnyTimes() + + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil). + AnyTimes() + subscriber. EXPECT(). ChainID(gomock.Any()). @@ -155,14 +189,15 @@ func TestEventListener(t *testing.T) { subscriber.EXPECT().Close().Times(1) var got *core.L1Head - client := l1.NewClient(subscriber, chain, nopLog). - WithResubscribeDelay(0). - WithPollFinalisedInterval(time.Nanosecond). - WithEventListener(l1.SelectiveListener{ + client := l1.NewClient(subscriber, chain, nopLog, + l1.WithResubscribeDelay(0), + l1.WithPollFinalisedInterval(time.Nanosecond), + l1.WithEventListener(l1.SelectiveListener{ OnNewL1HeadCb: func(head *core.L1Head) { got = head }, - }) + }), + ) ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) require.NoError(t, client.Run(ctx)) @@ -174,6 +209,78 @@ func TestEventListener(t *testing.T) { }, got) } +func TestEventListenerCatchUp(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + nopLog := log.NewNopZapLogger() + network := networks.Mainnet + chain := blockchain.New( + memory.New(), + &network, + blockchain.WithNewState(statetestutils.UseNewState()), + ) + + subscriber := mocks.NewMockSubscriber(ctrl) + subscriber. + EXPECT(). + ChainID(gomock.Any()). + Return(network.L1ChainID, nil). + Times(1) + + // Live subscription delivers nothing; the catch-up scan alone must + // populate nonFinalisedLogs so setL1Head fires the listener callback. + subscriber. + EXPECT(). + WatchLogStateUpdate(gomock.Any(), gomock.Any()). + Return(newFakeSubscription(), nil). + AnyTimes() + + // LatestHeight=10, FinalisedHeight=5, catchUpChunkSize=1000 → single chunk [0, 10]. + subscriber.EXPECT().LatestHeight(gomock.Any()).Return(uint64(10), nil).Times(1) + subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(5), nil).AnyTimes() + + backfilled := &contract.StarknetLogStateUpdate{ + BlockNumber: new(big.Int).SetUint64(7), + BlockHash: new(big.Int).SetUint64(7), + GlobalRoot: new(big.Int).SetUint64(7), + Raw: types.Log{BlockNumber: 3}, + } + subscriber. + EXPECT(). + FilterLogStateUpdate(gomock.Any(), uint64(0), uint64(10)). + Return([]*contract.StarknetLogStateUpdate{backfilled}, nil). + Times(1) + + subscriber.EXPECT().Close().Times(1) + + var got *core.L1Head + client := l1.NewClient(subscriber, chain, nopLog, + l1.WithResubscribeDelay(0), + l1.WithPollFinalisedInterval(time.Hour), + l1.WithEventListener(l1.SelectiveListener{ + OnNewL1HeadCb: func(head *core.L1Head) { + got = head + }, + }), + ) + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + require.NoError(t, client.Run(ctx)) + cancel() + + want := &core.L1Head{ + BlockNumber: 7, + BlockHash: new(felt.Felt).SetUint64(7), + StateRoot: new(felt.Felt).SetUint64(7), + } + require.Equal(t, want, got) + + persisted, err := chain.L1Head() + require.NoError(t, err) + require.Equal(t, *want, persisted) +} + func newTestL1Client(service service) *rpc.Server { server := rpc.NewServer() if err := server.RegisterName("eth", service); err != nil { diff --git a/mocks/mock_subscriber.go b/mocks/mock_subscriber.go index fcc2cce1b1..76a8281aaf 100644 --- a/mocks/mock_subscriber.go +++ b/mocks/mock_subscriber.go @@ -72,6 +72,21 @@ func (mr *MockSubscriberMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSubscriber)(nil).Close)) } +// FilterLogStateUpdate mocks base method. +func (m *MockSubscriber) FilterLogStateUpdate(ctx context.Context, fromBlock, toBlock uint64) ([]*contract.StarknetLogStateUpdate, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FilterLogStateUpdate", ctx, fromBlock, toBlock) + ret0, _ := ret[0].([]*contract.StarknetLogStateUpdate) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FilterLogStateUpdate indicates an expected call of FilterLogStateUpdate. +func (mr *MockSubscriberMockRecorder) FilterLogStateUpdate(ctx, fromBlock, toBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterLogStateUpdate", reflect.TypeOf((*MockSubscriber)(nil).FilterLogStateUpdate), ctx, fromBlock, toBlock) +} + // FinalisedHeight mocks base method. func (m *MockSubscriber) FinalisedHeight(ctx context.Context) (uint64, error) { m.ctrl.T.Helper() diff --git a/node/node.go b/node/node.go index ce9976969a..6345da58a6 100644 --- a/node/node.go +++ b/node/node.go @@ -577,12 +577,11 @@ func newL1Client( return nil, fmt.Errorf("set up ethSubscriber: %w", err) } - l1Client := l1.NewClient(ethSubscriber, chain, log) - + opts := make([]l1.Option, 0, 1) if includeMetrics { - l1Client.WithEventListener(makeL1Metrics(chain, ethSubscriber)) + opts = append(opts, l1.WithEventListener(makeL1Metrics(chain, ethSubscriber))) } - return l1Client, nil + return l1.NewClient(ethSubscriber, chain, log, opts...), nil } // Run starts Juno node by opening the DB, initialising services.