Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions l1/contract/starknet_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package contract

import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
)

func (_Starknet *StarknetFilterer) FilterLogStateUpdate(
opts *bind.FilterOpts,
) ([]*StarknetLogStateUpdate, error) {
logs, sub, err := _Starknet.contract.FilterLogs(opts, "LogStateUpdate")
if err != nil {
return nil, err
}
defer sub.Unsubscribe()

var out []*StarknetLogStateUpdate
unpack := func(log types.Log) error {
ev := new(StarknetLogStateUpdate)
if err := _Starknet.contract.UnpackLog(ev, "LogStateUpdate", log); err != nil {
return err
}
ev.Raw = log
out = append(out, ev)
return nil
}
for {
select {
case log := <-logs:
if err := unpack(log); err != nil {
return nil, err
}
case err := <-sub.Err():
// bind.BoundContract.FilterLogs ships logs from a goroutine that
// closes sub.Err() when done but never closes the logs channel.
// On graceful close (err == nil) drain remaining buffered logs.
if err != nil {
return nil, err
}
for {
select {
case log := <-logs:
if err := unpack(log); err != nil {
return nil, err
}
default:
return out, nil
}
}
}
}
}
18 changes: 18 additions & 0 deletions l1/eth_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ func (s *EthSubscriber) WatchLogStateUpdate(ctx context.Context, sink chan<- *co
return s.filterer.WatchLogStateUpdate(&bind.WatchOpts{Context: ctx}, sink)
}

func (s *EthSubscriber) FilterLogStateUpdate(
ctx context.Context,
start,
end uint64,
) ([]*contract.StarknetLogStateUpdate, error) {
reqTimer := time.Now()
events, err := s.filterer.FilterLogStateUpdate(&bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
})
if err != nil {
return nil, fmt.Errorf("filter LogStateUpdate [%d,%d]: %w", start, end, err)
}
s.listener.OnL1Call("eth_getLogs", time.Since(reqTimer))
return events, nil
}

func (s *EthSubscriber) ChainID(ctx context.Context) (*big.Int, error) {
reqTimer := time.Now()
chainID, err := s.ethClient.ChainID(ctx)
Expand Down
202 changes: 164 additions & 38 deletions l1/l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ type Subscriber interface {
FinalisedHeight(ctx context.Context) (uint64, error)
LatestHeight(ctx context.Context) (uint64, error)
WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error)
FilterLogStateUpdate(
ctx context.Context,
fromBlock,
toBlock uint64,
) ([]*contract.StarknetLogStateUpdate, error)
ChainID(ctx context.Context) (*big.Int, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)

Close()
}

Expand All @@ -37,39 +41,75 @@ type Client struct {
network *networks.Network
resubscribeDelay time.Duration
pollFinalisedInterval time.Duration
catchUpChunkSize uint64
nonFinalisedLogs map[uint64]*contract.StarknetLogStateUpdate
listener EventListener
}

var _ service.Service = (*Client)(nil)

func NewClient(l1 Subscriber, chain *blockchain.Blockchain, logger log.StructuredLogger) *Client {
return &Client{
l1: l1,
l2Chain: chain,
logger: logger,
network: chain.Network(),
resubscribeDelay: 10 * time.Second,
pollFinalisedInterval: time.Minute,
nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate, 0),
listener: SelectiveListener{},
}
// defaultCatchUpChunkSize is the L1 block range per backward eth_getLogs request.
const defaultCatchUpChunkSize uint64 = 1_000

// options holds configuration for constructing a l1 client.
type options struct {
EventListener EventListener
ResubscribeDelay time.Duration
PollFinalisedInterval time.Duration
// CatchUpChunkSize is the L1 block range per backward eth_getLogs request
// during the startup catch-up scan.
CatchUpChunkSize uint64
}

// Option is a functional option for configuring l1 client options.
type Option func(*options)

func WithEventListener(l EventListener) Option {
return func(o *options) { o.EventListener = l }
}

func (c *Client) WithEventListener(l EventListener) *Client {
c.listener = l
return c
func WithResubscribeDelay(d time.Duration) Option {
return func(o *options) { o.ResubscribeDelay = d }
}

func (c *Client) WithResubscribeDelay(delay time.Duration) *Client {
c.resubscribeDelay = delay
return c
// WithPollFinalisedInterval sets the time to wait before
// checking for an update to the finalised L1 block.
func WithPollFinalisedInterval(d time.Duration) Option {
return func(o *options) { o.PollFinalisedInterval = d }
}

// WithPollFinalisedInterval sets the time to wait before checking for an update to the finalised L1 block.
func (c *Client) WithPollFinalisedInterval(delay time.Duration) *Client {
c.pollFinalisedInterval = delay
return c
// WithCatchUpChunkSize sets the L1 block range per backward eth_getLogs request
// during the startup catch-up scan.
func WithCatchUpChunkSize(size uint64) Option {
return func(o *options) { o.CatchUpChunkSize = size }
}

func NewClient(
l1 Subscriber,
chain *blockchain.Blockchain,
logger log.StructuredLogger,
opts ...Option,
) *Client {
o := options{
EventListener: SelectiveListener{},
ResubscribeDelay: 10 * time.Second,
PollFinalisedInterval: time.Minute,
CatchUpChunkSize: defaultCatchUpChunkSize,
}
for _, opt := range opts {
opt(&o)
}
return &Client{
l1: l1,
l2Chain: chain,
logger: logger,
network: chain.Network(),
resubscribeDelay: o.ResubscribeDelay,
pollFinalisedInterval: o.PollFinalisedInterval,
catchUpChunkSize: o.CatchUpChunkSize,
nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate),
listener: o.EventListener,
}
}

func (c *Client) subscribeToUpdates(ctx context.Context, updateChan chan *contract.StarknetLogStateUpdate) (event.Subscription, error) {
Expand Down Expand Up @@ -113,6 +153,21 @@ func (c *Client) Run(ctx context.Context) error {
return err
}

// catchUpL1HeadUpdates is best-effort: a backward eth_getLogs scan can fail
// on free-tier RPC providers that cap range size or rate-limit. On failure
// we skip catch-up and proceed straight to the live subscription — the L1
// head will lag until the next on-chain LogStateUpdate is observed, which
// is acceptable rather terminating the execution.
if err := c.catchUpL1HeadUpdates(ctx); err != nil {
c.logger.Warn("L1 head catch-up failed; resuming with live subscription only",
zap.Error(err),
)
}

return c.watchL1StateUpdates(ctx)
}

func (c *Client) watchL1StateUpdates(ctx context.Context) error {
buffer := 128

c.logger.Info("Subscribing to L1 updates...")
Expand Down Expand Up @@ -149,22 +204,7 @@ func (c *Client) Run(ctx context.Context) error {
}
defer updateSub.Unsubscribe() //nolint:gocritic
case logStateUpdate := <-updateChan:
c.logger.Debug("Received L1 LogStateUpdate",
zap.String("number", logStateUpdate.BlockNumber.String()),
zap.String("stateRoot", logStateUpdate.GlobalRoot.Text(felt.Base16)),
zap.String("blockHash", logStateUpdate.BlockHash.Text(felt.Base16)),
)

if logStateUpdate.Raw.Removed {
for l1BlockNumber := range c.nonFinalisedLogs {
if l1BlockNumber >= logStateUpdate.Raw.BlockNumber {
delete(c.nonFinalisedLogs, l1BlockNumber)
}
}
// TODO What if the finalised block is also reorged?
} else {
c.nonFinalisedLogs[logStateUpdate.Raw.BlockNumber] = logStateUpdate
}
c.applyLogStateUpdate(logStateUpdate)
default:
break Outer
}
Expand All @@ -177,6 +217,92 @@ func (c *Client) Run(ctx context.Context) error {
}
}

// applyLogStateUpdate merges a LogStateUpdate (from either the forward
// subscription or the historical filter) into nonFinalisedLogs. A removed
// log clears all entries at or above its L1 block number.
func (c *Client) applyLogStateUpdate(u *contract.StarknetLogStateUpdate) {
c.logger.Debug("Received L1 LogStateUpdate",
zap.String("number", u.BlockNumber.String()),
zap.String("stateRoot", u.GlobalRoot.Text(felt.Base16)),
zap.String("blockHash", u.BlockHash.Text(felt.Base16)),
)
if u.Raw.Removed {
for l1BlockNumber := range c.nonFinalisedLogs {
if l1BlockNumber >= u.Raw.BlockNumber {
delete(c.nonFinalisedLogs, l1BlockNumber)
}
}
} else {
c.nonFinalisedLogs[u.Raw.BlockNumber] = u
}
}

// catchUpL1HeadUpdates performs a backward scan of historical LogStateUpdate
// events emitted while the node was offline (or before it ever ran), populating
// nonFinalisedLogs so that the first setL1Head call can write an L1 head
// without waiting for the next forward event. The scan walks back from
// LatestHeight in chunks of catchUpChunkSize until at least one finalised event
// is captured if any exists in the scanned range.
//
// On a mid-scan error the function returns without rolling back: entries
// already merged into nonFinalisedLogs are real on-chain events and remain
// usable by setL1Head and by the live subscription that runs afterward. The
// caller (Run) treats the error as best-effort and proceeds to the live
// subscription, so the partial state is an additive head-start, not a leak.
func (c *Client) catchUpL1HeadUpdates(ctx context.Context) error {
latest, err := c.l1.LatestHeight(ctx)
if err != nil {
return fmt.Errorf("L1 catch-up: failed to get latest height: %w", err)
}
finalised, err := c.l1.FinalisedHeight(ctx)
if err != nil {
return fmt.Errorf("L1 catch-up: failed to get finalised height: %w", err)
}
Comment thread
thiagodeev marked this conversation as resolved.

c.logger.Info("L1 catch-up starting",
zap.Uint64("latest", latest),
zap.Uint64("finalised", finalised),
zap.Uint64("chunkSize", c.catchUpChunkSize),
)

var chunks, total int
foundFinalised := false
to := latest
for {
if ctx.Err() != nil {
return ctx.Err()
}
var from uint64
if to+1 > c.catchUpChunkSize {
from = to + 1 - c.catchUpChunkSize
}
events, err := c.l1.FilterLogStateUpdate(ctx, from, to)
if err != nil {
return err
}
chunks++
total += len(events)
for _, ev := range events {
c.applyLogStateUpdate(ev)
if ev.Raw.BlockNumber <= finalised {
foundFinalised = true
}
}
// Stop once we've captured at least one finalised event (so setL1Head
// has something to commit) or we've walked back to genesis.
if foundFinalised || from == 0 {
c.logger.Info("L1 catch-up complete",
zap.Int("chunks", chunks),
zap.Int("events", total),
zap.Int("nonFinalisedLogs", len(c.nonFinalisedLogs)),
zap.Bool("foundFinalised", foundFinalised),
)
return c.setL1Head(ctx)
}
to = from - 1
}
}

func (c *Client) finalisedHeight(ctx context.Context) uint64 {
for {
select {
Expand Down
Loading
Loading