diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 4d95a7d7e5..6f57fe6f13 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -77,10 +77,14 @@ type PendingManager interface { GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) GetLastSubmittedHeaderHeight() uint64 + ResetInFlightHeaderRange(start, end uint64) SetLastSubmittedDataHeight(ctx context.Context, height uint64) GetLastSubmittedDataHeight() uint64 + ResetInFlightDataRange(start, end uint64) NumPendingHeaders() uint64 NumPendingData() uint64 + NumPendingHeadersTotal() uint64 + NumPendingDataTotal() uint64 } // Manager combines CacheManager and PendingManager. @@ -311,6 +315,14 @@ func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, heigh m.pendingHeaders.SetLastSubmittedHeaderHeight(ctx, height) } +func (m *implementation) ResetInFlightHeaderHeight() { + m.pendingHeaders.ResetInFlightHeaderRange(0, 0) +} + +func (m *implementation) ResetInFlightHeaderRange(start, end uint64) { + m.pendingHeaders.ResetInFlightHeaderRange(start, end) +} + func (m *implementation) GetLastSubmittedDataHeight() uint64 { return m.pendingData.GetLastSubmittedDataHeight() } @@ -319,6 +331,10 @@ func (m *implementation) SetLastSubmittedDataHeight(ctx context.Context, height m.pendingData.SetLastSubmittedDataHeight(ctx, height) } +func (m *implementation) ResetInFlightDataRange(start, end uint64) { + m.pendingData.ResetInFlightDataRange(start, end) +} + func (m *implementation) NumPendingHeaders() uint64 { return m.pendingHeaders.NumPendingHeaders() } @@ -327,6 +343,14 @@ func (m *implementation) NumPendingData() uint64 { return m.pendingData.NumPendingData() } +func (m *implementation) NumPendingHeadersTotal() uint64 { + return m.pendingHeaders.NumPendingHeadersTotal() +} + +func (m *implementation) NumPendingDataTotal() uint64 { + return m.pendingData.NumPendingDataTotal() +} + // SetPendingEvent sets the event at the specified height. func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEvent) { m.pendingMu.Lock() diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index fa5aebf34b..7b3fac74c2 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -221,6 +221,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { // update last submitted heights and re-check cm.SetLastSubmittedHeaderHeight(ctx, 1) cm.SetLastSubmittedDataHeight(ctx, 2) + cm.ResetInFlightHeaderRange(1, 3) + cm.ResetInFlightDataRange(2, 3) + + // numPending views (before getPending claims items) + assert.Equal(t, uint64(2), cm.NumPendingHeaders()) + assert.Equal(t, uint64(1), cm.NumPendingData()) headers, _, err = cm.GetPendingHeaders(ctx) require.NoError(t, err) @@ -231,10 +237,6 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { require.NoError(t, err) require.Len(t, signedData, 1) assert.Equal(t, uint64(3), signedData[0].Height()) - - // numPending views - assert.Equal(t, uint64(2), cm.NumPendingHeaders()) - assert.Equal(t, uint64(1), cm.NumPendingData()) } func TestManager_TxOperations(t *testing.T) { diff --git a/block/internal/cache/pending_base.go b/block/internal/cache/pending_base.go index 5dc239b8e9..17eba6c7f1 100644 --- a/block/internal/cache/pending_base.go +++ b/block/internal/cache/pending_base.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "slices" "sync" "sync/atomic" @@ -18,6 +19,13 @@ import ( // DefaultPendingCacheSize is the default size for the pending items cache. const DefaultPendingCacheSize = 200_000 +// inFlightClaim tracks a contiguous range of heights claimed by getPending +// for DA submission. Claims prevent concurrent getPending calls from +// returning the same items. On submission failure, resetInFlightRange +// removes the claim; if the range is below lastHeight it is added to gaps +// so the items can be re-fetched. +type inFlightClaim struct{ start, end uint64 } + // pendingBase is a generic struct for tracking items (headers, data, etc.) // that need to be published to the DA layer in order. It handles persistence // of the last submitted height and provides methods for retrieving pending items. @@ -28,14 +36,19 @@ type pendingBase[T any] struct { fetch func(ctx context.Context, store store.Store, height uint64) (T, error) lastHeight atomic.Uint64 + // inFlightMu protects inFlightClaims and gaps. + inFlightMu sync.Mutex + inFlightClaims []inFlightClaim // sorted by start + // gaps holds ranges that failed after a later submission advanced lastHeight + // past them. getPending checks gaps first so these items are retried. + gaps []inFlightClaim // sorted by start + // Pending items cache to avoid re-fetching all items on every call. - // We cache the items themselves, keyed by height. pendingCache *lru.Cache[uint64, T] mu sync.Mutex // Protects getPending logic } -// newPendingBase constructs a new pendingBase for a given type. func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey string, fetch func(ctx context.Context, store store.Store, height uint64) (T, error)) (*pendingBase[T], error) { pendingCache, err := lru.New[uint64, T](DefaultPendingCacheSize) if err != nil { @@ -57,6 +70,9 @@ func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey str // getPending returns a sorted slice of pending items of type T. // It caches fetched items to avoid re-fetching on subsequent calls. +// Returned items are registered as an in-flight claim so that concurrent +// callers do not receive the same items. On failure, the caller must call +// resetInFlightRange with the same [start, end] to re-expose the items. func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) { pb.mu.Lock() defer pb.mu.Unlock() @@ -66,48 +82,44 @@ func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) { if err != nil { return nil, err } - if lastSubmitted == storeHeight { - return nil, nil - } if lastSubmitted > storeHeight { - return nil, fmt.Errorf("height of last submitted item (%d) is greater than height of last item (%d)", lastSubmitted, storeHeight) + return nil, fmt.Errorf("last submitted height (%d) is greater than store height (%d)", lastSubmitted, storeHeight) } - // Limit the number of items to return based on cache capacity. - // This prevents the LRU from evicting entries we need, which would cause re-fetches. - pendingCount := storeHeight - lastSubmitted - endHeight := storeHeight - if pendingCount > DefaultPendingCacheSize { - endHeight = lastSubmitted + DefaultPendingCacheSize + pb.inFlightMu.Lock() + rangeStart, rangeEnd := findAvailableRange(pb.gaps, pb.inFlightClaims, lastSubmitted, storeHeight) + pb.inFlightMu.Unlock() + + if rangeStart == 0 || rangeStart > rangeEnd { + return nil, nil } - // Fetch only items that are not already in cache - for h := lastSubmitted + 1; h <= endHeight; h++ { - if _, ok := pb.pendingCache.Peek(h); ok { - continue // Already cached, skip fetching + // Cap range to cache capacity + if rangeEnd-rangeStart+1 > uint64(DefaultPendingCacheSize) { + rangeEnd = rangeStart + uint64(DefaultPendingCacheSize) - 1 + } + + pending := make([]T, 0, rangeEnd-rangeStart+1) + for h := rangeStart; h <= rangeEnd; h++ { + if item, ok := pb.pendingCache.Get(h); ok { + pending = append(pending, item) + continue } item, err := pb.fetch(ctx, pb.store, h) if err != nil { - return nil, err + return pending, err } pb.pendingCache.Add(h, item) + pending = append(pending, item) } - // Build the result slice from cache (only up to endHeight) - pending := make([]T, 0, endHeight-lastSubmitted) - for h := lastSubmitted + 1; h <= endHeight; h++ { - if item, ok := pb.pendingCache.Get(h); ok { - pending = append(pending, item) - } else { - // This shouldn't happen, but fetch if missing - item, err := pb.fetch(ctx, pb.store, h) - if err != nil { - return pending, err - } - pb.pendingCache.Add(h, item) - pending = append(pending, item) - } + if len(pending) > 0 { + pb.inFlightMu.Lock() + pb.inFlightClaims = insertClaim(pb.inFlightClaims, inFlightClaim{start: rangeStart, end: rangeEnd}) + pb.gaps = removeGapRange(pb.gaps, rangeStart, rangeEnd) + pb.inFlightMu.Unlock() } + return pending, nil } @@ -117,7 +129,36 @@ func (pb *pendingBase[T]) numPending() uint64 { pb.logger.Error().Err(err).Msg("failed to get height in numPending") return 0 } - return height - pb.lastHeight.Load() + + lastSubmitted := pb.lastHeight.Load() + + pb.inFlightMu.Lock() + var count uint64 + for _, gap := range pb.gaps { + count += countUnclaimed(gap.start, gap.end, pb.inFlightClaims) + } + if height > lastSubmitted { + count += countUnclaimed(lastSubmitted+1, height, pb.inFlightClaims) + } + pb.inFlightMu.Unlock() + + return count +} + +func (pb *pendingBase[T]) numPendingTotal() uint64 { + height, err := pb.store.Height(context.Background()) + if err != nil { + pb.logger.Error().Err(err).Msg("failed to get height in numPendingTotal") + return 0 + } + + lastSubmitted := pb.lastHeight.Load() + + if height <= lastSubmitted { + return 0 + } + + return height - lastSubmitted } func (pb *pendingBase[T]) getLastSubmittedHeight() uint64 { @@ -133,10 +174,55 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub if err != nil { pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA") } - // Note: We don't explicitly clear submitted entries from the cache here. - // Since getPending() only iterates from lastSubmitted+1, old entries are simply - // never accessed. The LRU will naturally evict them when capacity is reached. - // This avoids O(N) iteration over the cache on every submission. + } + + pb.inFlightMu.Lock() + pb.inFlightClaims = trimClaimsBelow(pb.inFlightClaims, newLastSubmittedHeight) + pb.gaps = trimGapsBelow(pb.gaps, newLastSubmittedHeight) + pb.inFlightMu.Unlock() +} + +// resetInFlightRange removes any in-flight claim overlapping [start, end]. +// If the claim has been trimmed by setLastSubmittedHeight (partial success), +// the trimmed claim range is used for gap computation to avoid re-exposing +// already-submitted items. If no claim is found (removed by a concurrent +// setLastSubmittedHeight), the caller's range is used instead. +func (pb *pendingBase[T]) resetInFlightRange(start, end uint64) { + pb.inFlightMu.Lock() + defer pb.inFlightMu.Unlock() + + var removedClaim *inFlightClaim + n := 0 + for _, c := range pb.inFlightClaims { + if c.end < start || c.start > end { + pb.inFlightClaims[n] = c + n++ + } else { + cc := c + removedClaim = &cc + } + } + pb.inFlightClaims = pb.inFlightClaims[:n] + + currentLast := pb.lastHeight.Load() + + var gapStart, gapEnd uint64 + if removedClaim != nil { + gapStart = removedClaim.start + gapEnd = removedClaim.end + } else { + gapStart = start + gapEnd = end + } + + if gapStart > currentLast { + return + } + if gapEnd > currentLast { + gapEnd = currentLast + } + if gapStart <= gapEnd { + pb.gaps = insertClaim(pb.gaps, inFlightClaim{start: gapStart, end: gapEnd}) } } @@ -158,3 +244,131 @@ func (pb *pendingBase[T]) init() error { pb.lastHeight.CompareAndSwap(0, lsh) return nil } + +// --------------------------------------------------------------------------- +// Helper functions for claim / gap bookkeeping +// --------------------------------------------------------------------------- + +// findAvailableRange returns the first contiguous range of heights that are +// not covered by any active claim. Gaps take priority; then items above +// lastHeight. Returns (0, 0) when nothing is available. +func findAvailableRange(gaps, claims []inFlightClaim, lastHeight, storeHeight uint64) (uint64, uint64) { + // Check gaps first + for _, gap := range gaps { + s, e := firstUnclaimed(gap.start, gap.end, claims) + if s <= e { + return s, e + } + } + // Items above lastHeight + if lastHeight < storeHeight { + s, e := firstUnclaimed(lastHeight+1, storeHeight, claims) + return s, e + } + return 0, 0 +} + +// firstUnclaimed returns the first contiguous unclaimed sub-range within [lo, hi]. +func firstUnclaimed(lo, hi uint64, claims []inFlightClaim) (uint64, uint64) { + h := lo + ci := 0 + for ci < len(claims) && claims[ci].end < h { + ci++ + } + // Skip past any claim that covers h + for ci < len(claims) && h >= claims[ci].start && h <= claims[ci].end { + h = claims[ci].end + 1 + ci++ + for ci < len(claims) && claims[ci].end < h { + ci++ + } + } + if h > hi { + return 0, 0 + } + end := hi + if ci < len(claims) && claims[ci].start <= hi { + end = claims[ci].start - 1 + } + return h, end +} + +// countUnclaimed counts heights in [lo, hi] not covered by any claim. +func countUnclaimed(lo, hi uint64, claims []inFlightClaim) uint64 { + if lo > hi { + return 0 + } + total := hi - lo + 1 + for _, c := range claims { + if c.end < lo || c.start > hi { + continue + } + ovStart := max(c.start, lo) + ovEnd := min(c.end, hi) + total -= ovEnd - ovStart + 1 + } + return total +} + +// insertClaim inserts a claim into a sorted-by-start slice, keeping it sorted. +func insertClaim(sorted []inFlightClaim, c inFlightClaim) []inFlightClaim { + idx, _ := slices.BinarySearchFunc(sorted, c.start, func(e inFlightClaim, v uint64) int { + if e.start < v { + return -1 + } + if e.start > v { + return 1 + } + return 0 + }) + return slices.Insert(sorted, idx, c) +} + +// removeGapRange removes or trims gaps covered by [start, end]. +func removeGapRange(gaps []inFlightClaim, start, end uint64) []inFlightClaim { + var result []inFlightClaim + for _, g := range gaps { + if g.end < start || g.start > end { + result = append(result, g) + continue + } + // Partial overlap: keep portions outside [start, end] + if g.start < start { + result = append(result, inFlightClaim{start: g.start, end: start - 1}) + } + if g.end > end { + result = append(result, inFlightClaim{start: end + 1, end: g.end}) + } + } + return result +} + +// trimClaimsBelow removes the portion of each claim that is <= height. +func trimClaimsBelow(claims []inFlightClaim, height uint64) []inFlightClaim { + result := claims[:0] + for _, c := range claims { + if c.end <= height { + continue + } + if c.start <= height { + c.start = height + 1 + } + result = append(result, c) + } + return result +} + +// trimGapsBelow removes gaps fully below height and trims partial overlaps. +func trimGapsBelow(gaps []inFlightClaim, height uint64) []inFlightClaim { + result := gaps[:0] + for _, g := range gaps { + if g.end <= height { + continue + } + if g.start <= height { + g.start = height + 1 + } + result = append(result, g) + } + return result +} diff --git a/block/internal/cache/pending_base_test.go b/block/internal/cache/pending_base_test.go index eb9734a035..d15af428ed 100644 --- a/block/internal/cache/pending_base_test.go +++ b/block/internal/cache/pending_base_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) func TestPendingBase_ErrorConditions(t *testing.T) { @@ -27,12 +28,10 @@ func TestPendingBase_ErrorConditions(t *testing.T) { require.Error(t, err) // 2) lastSubmitted > height yields error from getPending - // reset metadata to a valid higher value than store height bz := make([]byte, 8) binary.LittleEndian.PutUint64(bz, 5) require.NoError(t, st.SetMetadata(ctx, store.LastSubmittedHeaderHeightKey, bz)) - // ensure store height stays lower (0) ph, err := NewPendingHeaders(st, logger) require.NoError(t, err) pending, _, err := ph.GetPendingHeaders(ctx) @@ -40,7 +39,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) { assert.Len(t, pending, 0) // 3) NewPendingData shares same behavior - err = st.SetMetadata(ctx, LastSubmittedDataHeightKey, []byte{0xFF}) // invalid length + err = st.SetMetadata(ctx, LastSubmittedDataHeightKey, []byte{0xFF}) require.NoError(t, err) _, err = NewPendingData(st, logger) require.Error(t, err) @@ -57,14 +56,12 @@ func TestPendingBase_PersistLastSubmitted(t *testing.T) { ph, err := NewPendingHeaders(st, logger) require.NoError(t, err) - // store height 3 to make numPending meaningful batch, err := st.NewBatch(ctx) require.NoError(t, err) require.NoError(t, batch.SetHeight(3)) require.NoError(t, batch.Commit()) assert.Equal(t, uint64(3), ph.NumPendingHeaders()) - // set last submitted higher and ensure metadata is written ph.SetLastSubmittedHeaderHeight(ctx, 2) raw, err := st.GetMetadata(ctx, store.LastSubmittedHeaderHeightKey) require.NoError(t, err) @@ -72,9 +69,211 @@ func TestPendingBase_PersistLastSubmitted(t *testing.T) { lsh := binary.LittleEndian.Uint64(raw) assert.Equal(t, uint64(2), lsh) - // setting a lower height should not overwrite ph.SetLastSubmittedHeaderHeight(ctx, 1) raw2, err := st.GetMetadata(ctx, store.LastSubmittedHeaderHeightKey) require.NoError(t, err) assert.Equal(t, raw, raw2) } + +func TestPendingBase_InFlightClaim(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := testMemStore(t) + chainID := "inflight" + + for _, h := range []uint64{1, 2, 3, 4, 5} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + ph, err := NewPendingHeaders(st, zerolog.Nop()) + require.NoError(t, err) + + // Claim all 5 items + headers, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers, 5) + + // All claimed, nothing pending + assert.Equal(t, uint64(0), ph.NumPendingHeaders()) + + // Second call returns nothing (all claimed) + headers2, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + assert.Empty(t, headers2) + + // Simulate failure: reset the claim range + ph.ResetInFlightHeaderRange(1, 5) + + // Items are available again + assert.Equal(t, uint64(5), ph.NumPendingHeaders()) + headers3, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers3, 5) +} + +func TestPendingBase_InFlightPartialAdvance(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := testMemStore(t) + chainID := "inflight-partial" + + for _, h := range []uint64{1, 2, 3, 4, 5} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + ph, err := NewPendingHeaders(st, zerolog.Nop()) + require.NoError(t, err) + + // Claim all items [1..5] + headers, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers, 5) + + // Partial success: [1..3] submitted, claim trimmed to [4..5] + ph.SetLastSubmittedHeaderHeight(ctx, 3) + + // Claim [4..5] still active, so getPending returns nothing + headers2, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + assert.Empty(t, headers2) + assert.Equal(t, uint64(0), ph.NumPendingHeaders()) + + // Add new items at heights 6-8 + for _, h := range []uint64{6, 7, 8} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + // Claim [4..5] still active, but items [6..8] are available + assert.Equal(t, uint64(3), ph.NumPendingHeaders()) + headers3, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers3, 3) + assert.Equal(t, uint64(6), headers3[0].Height()) + assert.Equal(t, uint64(8), headers3[2].Height()) + + // Retry of [4..5] succeeds + ph.SetLastSubmittedHeaderHeight(ctx, 5) + + // Claims [6..8] active, lastHeight=5, all covered + assert.Equal(t, uint64(0), ph.NumPendingHeaders()) +} + +func TestPendingBase_InFlightGapReexposure(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := testMemStore(t) + chainID := "inflight-gap" + + // Start with items [1..5] + for _, h := range []uint64{1, 2, 3, 4, 5} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + ph, err := NewPendingHeaders(st, zerolog.Nop()) + require.NoError(t, err) + + // Claim A: all available items [1..5] + hA, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, hA, 5) + + // Add items [6..15] + for _, h := range []uint64{6, 7, 8, 9, 10, 11, 12, 13, 14, 15} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + // Claim B: items [6..15] (claim A still covers [1..5]) + hB, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, hB, 10) + assert.Equal(t, uint64(6), hB[0].Height()) + + // A succeeds + ph.SetLastSubmittedHeaderHeight(ctx, 5) + // B succeeds (lastHeight jumps to 15) + ph.SetLastSubmittedHeaderHeight(ctx, 15) + + // Now simulate: a retry of items [8..10] from claim B had failed earlier + // and the retry loop also fails. Reset the sub-range. + // Since [8..10] is below lastHeight=15, it becomes a gap. + ph.ResetInFlightHeaderRange(8, 10) + + // Gap [8..10] should be available + assert.Equal(t, uint64(3), ph.NumPendingHeaders()) + + hRetry, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, hRetry, 3) + assert.Equal(t, uint64(8), hRetry[0].Height()) + assert.Equal(t, uint64(10), hRetry[2].Height()) +} + +func TestPendingBase_InFlightResetOnFailure(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := testMemStore(t) + chainID := "inflight-reset" + + for _, h := range []uint64{1, 2, 3} { + hdr, data := types.GetRandomBlock(h, int(h-1), chainID) + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(hdr, data, &types.Signature{})) + require.NoError(t, batch.SetHeight(h)) + require.NoError(t, batch.Commit()) + } + + ph, err := NewPendingHeaders(st, zerolog.Nop()) + require.NoError(t, err) + + // Claim all items + _, _, err = ph.GetPendingHeaders(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(0), ph.NumPendingHeaders()) + + // Simulate failure and reset + ph.ResetInFlightHeaderRange(1, 3) + assert.Equal(t, uint64(3), ph.NumPendingHeaders()) + + // Claim again + headers, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers, 3) + + // Partial success: [1] submitted, claim trimmed to [2..3] + ph.SetLastSubmittedHeaderHeight(ctx, 1) + // Failure of remaining [2..3]: reset the trimmed claim + ph.ResetInFlightHeaderRange(2, 3) + + // Items from height 2 onward should be available + assert.Equal(t, uint64(2), ph.NumPendingHeaders()) + headers2, _, err := ph.GetPendingHeaders(ctx) + require.NoError(t, err) + require.Len(t, headers2, 2) + assert.Equal(t, uint64(2), headers2[0].Height()) +} diff --git a/block/internal/cache/pending_data.go b/block/internal/cache/pending_data.go index b31334a416..1550f1e1af 100644 --- a/block/internal/cache/pending_data.go +++ b/block/internal/cache/pending_data.go @@ -81,10 +81,19 @@ func (pd *PendingData) NumPendingData() uint64 { return pd.base.numPending() } +func (pd *PendingData) NumPendingDataTotal() uint64 { + pd.advancePastEmptyData(context.Background()) + return pd.base.numPendingTotal() +} + func (pd *PendingData) SetLastSubmittedDataHeight(ctx context.Context, newLastSubmittedDataHeight uint64) { pd.base.setLastSubmittedHeight(ctx, newLastSubmittedDataHeight) } +func (pd *PendingData) ResetInFlightDataRange(start, end uint64) { + pd.base.resetInFlightRange(start, end) +} + // advancePastEmptyData advances lastSubmittedDataHeight past any consecutive empty data blocks. // This ensures that NumPendingData doesn't count empty data that won't be published to DA. func (pd *PendingData) advancePastEmptyData(ctx context.Context) { diff --git a/block/internal/cache/pending_data_test.go b/block/internal/cache/pending_data_test.go index 06e1dd9921..97630bb1d5 100644 --- a/block/internal/cache/pending_data_test.go +++ b/block/internal/cache/pending_data_test.go @@ -47,6 +47,7 @@ func TestPendingData_BasicFlow(t *testing.T) { // set last submitted and verify persistence pendingData.SetLastSubmittedDataHeight(ctx, 1) + pendingData.ResetInFlightDataRange(1, 3) metadataRaw, err := store.GetMetadata(ctx, LastSubmittedDataHeightKey) require.NoError(t, err) require.Len(t, metadataRaw, 8) diff --git a/block/internal/cache/pending_headers.go b/block/internal/cache/pending_headers.go index d12f9627af..8d9810678b 100644 --- a/block/internal/cache/pending_headers.go +++ b/block/internal/cache/pending_headers.go @@ -76,10 +76,18 @@ func (ph *PendingHeaders) NumPendingHeaders() uint64 { return ph.base.numPending() } +func (ph *PendingHeaders) NumPendingHeadersTotal() uint64 { + return ph.base.numPendingTotal() +} + func (ph *PendingHeaders) SetLastSubmittedHeaderHeight(ctx context.Context, newLastSubmittedHeaderHeight uint64) { ph.base.setLastSubmittedHeight(ctx, newLastSubmittedHeaderHeight) } +func (ph *PendingHeaders) ResetInFlightHeaderRange(start, end uint64) { + ph.base.resetInFlightRange(start, end) +} + func (ph *PendingHeaders) GetLastSubmittedHeaderHeight() uint64 { return ph.base.getLastSubmittedHeight() } diff --git a/block/internal/cache/pending_headers_test.go b/block/internal/cache/pending_headers_test.go index 8691f3a574..23ac729548 100644 --- a/block/internal/cache/pending_headers_test.go +++ b/block/internal/cache/pending_headers_test.go @@ -47,6 +47,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) { // advance last submitted height and verify persistence + filtering pendingHeaders.SetLastSubmittedHeaderHeight(ctx, 2) + pendingHeaders.ResetInFlightHeaderRange(1, 3) metadataRaw, err := store.GetMetadata(ctx, storepkg.LastSubmittedHeaderHeightKey) require.NoError(t, err) require.Len(t, metadataRaw, 8) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index f5be5e1b40..d7a562d264 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -484,8 +484,8 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval || e.pendingCheckCounter%pendingCheckInterval == 0 if shouldCheck { - pendingHeaders := e.cache.NumPendingHeaders() - pendingData := e.cache.NumPendingData() + pendingHeaders := e.cache.NumPendingHeadersTotal() + pendingData := e.cache.NumPendingDataTotal() if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData || pendingData >= e.config.Node.MaxPendingHeadersAndData { e.logger.Warn(). diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 83f56d9cb5..40f3be50ff 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -122,8 +122,7 @@ type DASubmitter struct { addressSelector pkgda.AddressSelector // envelopeCache caches fully signed DA envelopes by height to avoid re-signing on retries - envelopeCache *lru.Cache[uint64, []byte] - envelopeCacheMu sync.RWMutex + envelopeCache *lru.Cache[uint64, []byte] // lastSubmittedHeight tracks the last successfully submitted height for lazy cache invalidation. // This avoids O(N) iteration over the cache on every submission. @@ -131,6 +130,8 @@ type DASubmitter struct { // signingWorkers is the number of parallel workers for signing signingWorkers int + + wg sync.WaitGroup } // NewDASubmitter creates a new DA submitter @@ -194,22 +195,12 @@ func NewDASubmitter( } } -// recordFailure records a DA submission failure in metrics -func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { - counter, ok := s.metrics.DASubmitterFailures[reason] - if !ok { - s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded") - return - } - counter.Add(1) - - if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok { - gauge.Set(float64(time.Now().Unix())) - } +func (s *DASubmitter) Close() { + s.wg.Wait() } // SubmitHeaders submits pending headers to DA layer -func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { +func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { if len(headers) == 0 { return nil } @@ -230,28 +221,274 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed return err } - return submitToDA(s, ctx, headers, envelopes, - func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { - heights := make([]uint64, len(submitted)) - for i, header := range submitted { - cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) - heights[i] = header.Height() + postSubmit := s.makeHeaderPostSubmit(ctx, cache) + namespace := s.client.GetHeaderNamespace() + + s.wg.Go(func() { + s.submitWithRetry(ctx, envelopes, namespace, func(submittedCount int, daHeight uint64) { + if submittedCount > 0 { + postSubmit(headers[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}}) + } + if onSubmitSuccess != nil { + onSubmitSuccess() + } + }, onSubmitError, "header") + }) + + return nil +} + +func (s *DASubmitter) makeHeaderPostSubmit(ctx context.Context, cache cache.Manager) func([]*types.SignedHeader, *datypes.ResultSubmit) { + return func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { + heights := make([]uint64, len(submitted)) + for i, header := range submitted { + cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) + heights[i] = header.Height() + } + if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store") + } + if l := len(submitted); l > 0 { + lastHeight := submitted[l-1].Height() + cache.SetLastSubmittedHeaderHeight(ctx, lastHeight) + s.lastSubmittedHeight.Store(lastHeight) + } + } +} + +// SubmitData submits pending data to DA layer +func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error { + if len(unsignedDataList) == 0 { + return nil + } + + if len(marshalledData) != len(unsignedDataList) { + return fmt.Errorf("marshalledData length (%d) does not match unsignedDataList length (%d)", len(marshalledData), len(unsignedDataList)) + } + + // Sign the data (cache returns unsigned SignedData structs) + signedDataList, signedDataListBz, err := s.signData(ctx, unsignedDataList, marshalledData, signer, genesis) + if err != nil { + return fmt.Errorf("failed to sign data: %w", err) + } + + if len(signedDataList) == 0 { + return nil + } + + s.logger.Info().Int("count", len(signedDataList)).Msg("submitting data to DA") + + postSubmit := s.makeDataPostSubmit(ctx, cache) + namespace := s.client.GetDataNamespace() + + s.wg.Go(func() { + s.submitWithRetry(ctx, signedDataListBz, namespace, func(submittedCount int, daHeight uint64) { + if submittedCount > 0 { + postSubmit(signedDataList[:submittedCount], &datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: uint64(submittedCount), Height: daHeight}}) + } + if onSubmitSuccess != nil { + onSubmitSuccess() + } + }, onSubmitError, "data") + }) + + return nil +} + +func (s *DASubmitter) makeDataPostSubmit(ctx context.Context, cache cache.Manager) func([]*types.SignedData, *datypes.ResultSubmit) { + return func(submitted []*types.SignedData, res *datypes.ResultSubmit) { + heights := make([]uint64, len(submitted)) + for i, sd := range submitted { + cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) + heights[i] = sd.Height() + } + if err := s.dataDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { + s.logger.Error().Err(err).Msg("failed to append da height hint in data p2p store") + } + if l := len(submitted); l > 0 { + lastHeight := submitted[l-1].Height() + cache.SetLastSubmittedDataHeight(ctx, lastHeight) + } + } +} + +func (s *DASubmitter) submitWithRetry( + ctx context.Context, + marshaled [][]byte, + namespace []byte, + onSuccess func(submittedCount int, daHeight uint64), + onError func(error), + itemType string, +) { + pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) + options := []byte(s.config.DA.SubmitOptions) + + if len(marshaled) == 0 { + if onError != nil { + onError(nil) + } + return + } + + limitedMarshaled, oversized := limitBatchBySizeBytes(marshaled, pol.MaxBlobBytes) + if oversized { + s.logger.Error(). + Str("itemType", itemType). + Uint64("maxBlobBytes", pol.MaxBlobBytes). + Msg("CRITICAL: item exceeds maximum blob size") + if onError != nil { + onError(common.ErrOversizedItem) + } + return + } + marshaled = limitedMarshaled + + rs := retryState{} + + // Start the retry loop + for rs.Attempt < pol.MaxAttempts { + // Record resend metric for retry attempts (not the first attempt) + if rs.Attempt > 0 { + s.metrics.DASubmitterResends.Add(1) + } + + if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { + if onError != nil { + onError(nil) } - if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { - s.logger.Error().Err(err).Msg("failed to append da height hint in header p2p store") - // ignoring error here, since we don't want to block the block submission' + return + } + + // Select signing address and merge with options + signingAddress := s.addressSelector.Next() + mergedOptions, err := mergeSubmitOptions(options, signingAddress) + if err != nil { + s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") + if onError != nil { + onError(err) } - if l := len(submitted); l > 0 { - lastHeight := submitted[l-1].Height() - cache.SetLastSubmittedHeaderHeight(ctx, lastHeight) - // Update last submitted height for lazy cache invalidation (O(1) instead of O(N)) - s.lastSubmittedHeight.Store(lastHeight) + return + } + + // Perform submission + start := time.Now() + res := s.client.Submit(ctx, marshaled, -1, namespace, mergedOptions) + s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got Submit response") + + // Record submission result for observability + if vis := server.GetDAVisualizationServer(); vis != nil { + vis.RecordSubmission(&res, 0, uint64(len(marshaled)), namespace) + } + + switch res.Code { + case datypes.StatusSuccess: + submitted := int(res.SubmittedCount) + if onSuccess != nil { + onSuccess(submitted, res.Height) + } + s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer") + if submitted == len(marshaled) { + return + } + // partial success: advance window + marshaled = marshaled[submitted:] + rs.Next(reasonSuccess, pol) + + case datypes.StatusTooBig: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonTooBig) + // Iteratively halve until it fits or single-item too big + if len(marshaled) == 1 { + s.logger.Error(). + Str("itemType", itemType). + Msg("CRITICAL: single item exceeds DA blob size limit") + if onError != nil { + onError(common.ErrOversizedItem) + } + return + } + half := len(marshaled) / 2 + if half == 0 { + half = 1 + } + marshaled = marshaled[:half] + s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") + rs.Next(reasonTooBig, pol) + + case datypes.StatusNotIncludedInBlock: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) + s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") + rs.Next(reasonMempool, pol) + + case datypes.StatusAlreadyInMempool: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) + s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") + rs.Next(reasonMempool, pol) + + case datypes.StatusContextCanceled: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonContextCanceled) + s.logger.Info().Msg("DA layer submission canceled due to context cancellation") + if onError != nil { + onError(nil) } - }, - "header", - s.client.GetHeaderNamespace(), - []byte(s.config.DA.SubmitOptions), - ) + return + + default: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonUnknown) + s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") + rs.Next(reasonFailure, pol) + } + } + + // Final failure after max attempts + s.recordFailure(common.DASubmitterFailureReasonTimeout) + s.logger.Error().Str("itemType", itemType).Int("attempts", rs.Attempt).Msg("failed to submit all items to DA layer after max attempts") + if onError != nil { + onError(fmt.Errorf("failed to submit after %d attempts", rs.Attempt)) + } +} + +// limitBatchBySizeBytes returns a prefix of marshaled blobs whose total size does not exceed maxBytes. +// If the first blob exceeds maxBytes, it returns (nil, true) to indicate an unrecoverable oversized item. +func limitBatchBySizeBytes(marshaled [][]byte, maxBytes uint64) ([][]byte, bool) { + total := uint64(0) + count := 0 + for i, b := range marshaled { + sz := uint64(len(b)) + if sz > maxBytes { + if i == 0 { + return nil, true + } + break + } + if total+sz > maxBytes { + break + } + total += sz + count++ + } + if count == 0 { + return nil, true + } + return marshaled[:count], false +} + +// recordFailure records a DA submission failure in metrics +func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { + counter, ok := s.metrics.DASubmitterFailures[reason] + if !ok { + s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded") + return + } + counter.Add(1) + + if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok { + gauge.Set(float64(time.Now().Unix())) + } } // createDAEnvelopes creates signed DA envelopes for the given headers. @@ -283,6 +520,7 @@ func (s *DASubmitter) createDAEnvelopes(ctx context.Context, headers []*types.Si // For small batches, sign sequentially to avoid goroutine overhead if len(needSigning) <= 2 || s.signingWorkers <= 1 { + // Send jobs for _, i := range needSigning { envelope, err := s.signAndCacheEnvelope(ctx, headers[i], marshalledHeaders[i], signer) if err != nil { @@ -330,7 +568,6 @@ func (s *DASubmitter) signEnvelopesParallel( }) } - // Send jobs for _, i := range needSigning { jobs <- signJob{index: i} } @@ -391,9 +628,6 @@ func (s *DASubmitter) getCachedEnvelope(height uint64) []byte { if height <= s.lastSubmittedHeight.Load() { return nil } - s.envelopeCacheMu.RLock() - defer s.envelopeCacheMu.RUnlock() - if envelope, ok := s.envelopeCache.Get(height); ok { return envelope } @@ -410,56 +644,9 @@ func (s *DASubmitter) setCachedEnvelope(height uint64, envelope []byte) { if height <= s.lastSubmittedHeight.Load() { return } - s.envelopeCacheMu.Lock() - defer s.envelopeCacheMu.Unlock() - s.envelopeCache.Add(height, envelope) } -// SubmitData submits pending data to DA layer -func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { - if len(unsignedDataList) == 0 { - return nil - } - - if len(marshalledData) != len(unsignedDataList) { - return fmt.Errorf("marshalledData length (%d) does not match unsignedDataList length (%d)", len(marshalledData), len(unsignedDataList)) - } - - // Sign the data (cache returns unsigned SignedData structs) - signedDataList, signedDataListBz, err := s.signData(ctx, unsignedDataList, marshalledData, signer, genesis) - if err != nil { - return fmt.Errorf("failed to sign data: %w", err) - } - - if len(signedDataList) == 0 { - return nil // No non-empty data to submit - } - - s.logger.Info().Int("count", len(signedDataList)).Msg("submitting data to DA") - - return submitToDA(s, ctx, signedDataList, signedDataListBz, - func(submitted []*types.SignedData, res *datypes.ResultSubmit) { - heights := make([]uint64, len(submitted)) - for i, sd := range submitted { - cache.SetDataDAIncluded(sd.Data.DACommitment().String(), res.Height, sd.Height()) - heights[i] = sd.Height() - } - if err := s.dataDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { - s.logger.Error().Err(err).Msg("failed to append da height hint in data p2p store") - // ignoring error here, since we don't want to block the block submission' - } - if l := len(submitted); l > 0 { - lastHeight := submitted[l-1].Height() - cache.SetLastSubmittedDataHeight(ctx, lastHeight) - } - }, - "data", - s.client.GetDataNamespace(), - []byte(s.config.DA.SubmitOptions), - ) -} - // signData signs unsigned SignedData structs returned from cache func (s *DASubmitter) signData(ctx context.Context, unsignedDataList []*types.SignedData, unsignedDataListBz [][]byte, signer signer.Signer, genesis genesis.Genesis) ([]*types.SignedData, [][]byte, error) { if signer == nil { @@ -556,163 +743,6 @@ func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, erro return mergedOptions, nil } -// submitToDA is a generic helper for submitting items to the DA layer with retry, backoff, and gas price logic. -func submitToDA[T any]( - s *DASubmitter, - ctx context.Context, - items []T, - marshaled [][]byte, - postSubmit func([]T, *datypes.ResultSubmit), - itemType string, - namespace []byte, - options []byte, -) error { - if len(items) != len(marshaled) { - return fmt.Errorf("items length (%d) does not match marshaled length (%d)", len(items), len(marshaled)) - } - - pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) - - rs := retryState{Attempt: 0, Backoff: 0} - - // Limit this submission to a single size-capped batch - if len(marshaled) > 0 { - batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes) - if err != nil { - s.logger.Error(). - Str("itemType", itemType). - Uint64("maxBlobBytes", pol.MaxBlobBytes). - Err(err). - Msg("CRITICAL: Unrecoverable error - item exceeds maximum blob size") - return fmt.Errorf("unrecoverable error: no %s items fit within max blob size: %w", itemType, err) - } - items = batchItems - marshaled = batchMarshaled - } - - // Start the retry loop - for rs.Attempt < pol.MaxAttempts { - // Record resend metric for retry attempts (not the first attempt) - if rs.Attempt > 0 { - s.metrics.DASubmitterResends.Add(1) - } - - if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { - return err - } - - // Select signing address and merge with options - signingAddress := s.addressSelector.Next() - mergedOptions, err := mergeSubmitOptions(options, signingAddress) - if err != nil { - s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") - return fmt.Errorf("failed to merge submit options: %w", err) - } - - if signingAddress != "" { - s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission") - } - - // Perform submission - start := time.Now() - res := s.client.Submit(ctx, marshaled, -1, namespace, mergedOptions) - s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") - - // Record submission result for observability - if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { - daVisualizationServer.RecordSubmission(&res, 0, uint64(len(items)), namespace) - } - - switch res.Code { - case datypes.StatusSuccess: - submitted := items[:res.SubmittedCount] - postSubmit(submitted, &res) - s.logger.Info().Str("itemType", itemType).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") - if int(res.SubmittedCount) == len(items) { - rs.Next(reasonSuccess, pol) - return nil - } - // partial success: advance window - items = items[res.SubmittedCount:] - marshaled = marshaled[res.SubmittedCount:] - rs.Next(reasonSuccess, pol) - - case datypes.StatusTooBig: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonTooBig) - // Iteratively halve until it fits or single-item too big - if len(items) == 1 { - s.logger.Error(). - Str("itemType", itemType). - Uint64("maxBlobBytes", pol.MaxBlobBytes). - Msg("CRITICAL: Unrecoverable error - single item exceeds DA blob size limit") - return fmt.Errorf("unrecoverable error: %w: single %s item exceeds DA blob size limit", common.ErrOversizedItem, itemType) - } - half := len(items) / 2 - if half == 0 { - half = 1 - } - items = items[:half] - marshaled = marshaled[:half] - s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") - rs.Next(reasonTooBig, pol) - - case datypes.StatusNotIncludedInBlock: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) - s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") - rs.Next(reasonMempool, pol) - - case datypes.StatusAlreadyInMempool: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) - s.logger.Info().Dur("backoff", pol.MaxBackoff).Msg("retrying due to mempool state") - rs.Next(reasonMempool, pol) - - case datypes.StatusContextCanceled: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonContextCanceled) - s.logger.Info().Msg("DA layer submission canceled due to context cancellation") - return context.Canceled - - default: - // Record failure metric - s.recordFailure(common.DASubmitterFailureReasonUnknown) - s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") - rs.Next(reasonFailure, pol) - } - } - - // Final failure after max attempts - s.recordFailure(common.DASubmitterFailureReasonTimeout) - return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) -} - -// limitBatchBySize returns a prefix of items whose total marshaled size does not exceed maxBytes. -// If the first item exceeds maxBytes, it returns ErrOversizedItem which is unrecoverable. -func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes uint64) ([]T, [][]byte, error) { - total := uint64(0) - count := 0 - for i := range items { - sz := uint64(len(marshaled[i])) - if sz > maxBytes { - if i == 0 { - return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxBytes) - } - break - } - if total+sz > maxBytes { - break - } - total += sz - count++ - } - if count == 0 { - return nil, nil, fmt.Errorf("no items fit within %d bytes", maxBytes) - } - return items[:count], marshaled[:count], nil -} - func waitForBackoffOrContext(ctx context.Context, backoff time.Duration) error { if backoff <= 0 { select { diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index b2c4efcd20..323ddafddf 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -101,11 +101,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( // Submit headers and data - cache returns both items and marshalled bytes headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background()) require.NoError(t, err) - require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n)) + require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil)) dataList, marshalledData, err := cm.GetPendingData(context.Background()) require.NoError(t, err) - require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) + require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil)) + + daSubmitter.Close() // After submission, inclusion markers should be set _, ok := cm.GetHeaderDAIncludedByHeight(1) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 2d79208e92..021e551210 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -1,13 +1,12 @@ package submitting import ( - "context" "testing" "time" "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/config" @@ -16,10 +15,9 @@ import ( "github.com/evstack/ev-node/test/mocks" ) -// helper to build a basic submitter with provided DA mock client and config overrides -func newTestSubmitter(t *testing.T, mockClient *mocks.MockClient, override func(*config.Config)) *DASubmitter { +func newTestBatchSubmitter(t *testing.T, mockClient *mocks.MockClient, override func(*config.Config)) *DASubmitter { + t.Helper() cfg := config.Config{} - // Keep retries small and backoffs minimal cfg.DA.BlockTime.Duration = 1 * time.Millisecond cfg.DA.MaxSubmitAttempts = 3 cfg.DA.SubmitOptions = "opts" @@ -35,115 +33,69 @@ func newTestSubmitter(t *testing.T, mockClient *mocks.MockClient, override func( mockClient.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() mockClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() mockClient.On("HasForcedInclusionNamespace").Return(false).Maybe() - return NewDASubmitter(mockClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop(), nil, nil) + return NewDASubmitter(mockClient, cfg, genesis.Genesis{}, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop(), nil, nil) } -func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { +func TestSubmitWithRetry_MempoolRetry_Succeeds(t *testing.T) { t.Parallel() client := mocks.NewMockClient(t) - nsBz := datypes.NamespaceFromString("ns").Bytes() - opts := []byte("opts") - var usedGas []float64 - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { - usedGas = append(usedGas, args.Get(2).(float64)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusNotIncludedInBlock, SubmittedCount: 0}}). + client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusNotIncludedInBlock}}). Once() - ids := [][]byte{[]byte("id1"), []byte("id2"), []byte("id3")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { - usedGas = append(usedGas, args.Get(2).(float64)) - }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). + client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 2}}). Once() - s := newTestSubmitter(t, client, nil) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } + var submittedCount int + s.submitWithRetry(t.Context(), [][]byte{[]byte("a"), []byte("b")}, nsBz, func(count int, _ uint64) { + submittedCount = count + }, nil, "item") - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - - // Sentinel value is preserved on retry - assert.Equal(t, []float64{-1, -1}, usedGas) + require.Equal(t, 2, submittedCount) + client.AssertExpectations(t) } -func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { +func TestSubmitWithRetry_UnknownError_RetriesThenSucceeds(t *testing.T) { t.Parallel() client := mocks.NewMockClient(t) - nsBz := datypes.NamespaceFromString("ns").Bytes() - opts := []byte("opts") - var usedGas []float64 - - // First attempt: unknown failure -> reasonFailure, gas unchanged for next attempt - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). + client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, mock.Anything). Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}}). Once() - // Second attempt: same gas, success - ids := [][]byte{[]byte("id1")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). + client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 1}}). Once() - s := newTestSubmitter(t, client, nil) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - items := []string{"x"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } + var submittedCount int + s.submitWithRetry(t.Context(), [][]byte{[]byte("x")}, nsBz, func(count int, _ uint64) { + submittedCount = count + }, nil, "item") - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []float64{-1, -1}, usedGas) + require.Equal(t, 1, submittedCount) + client.AssertExpectations(t) } -func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { +func TestSubmitWithRetry_TooBig_HalvesBatch(t *testing.T) { t.Parallel() client := mocks.NewMockClient(t) - nsBz := datypes.NamespaceFromString("ns").Bytes() - - opts := []byte("opts") var batchSizes []int - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). Run(func(args mock.Arguments) { blobs := args.Get(1).([][]byte) batchSizes = append(batchSizes, len(blobs)) @@ -151,121 +103,152 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusTooBig}}). Once() - ids := [][]byte{[]byte("id1"), []byte("id2")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). Run(func(args mock.Arguments) { blobs := args.Get(1).([][]byte) batchSizes = append(batchSizes, len(blobs)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 2}}). Once() - s := newTestSubmitter(t, client, nil) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - items := []string{"a", "b", "c", "d"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } + s.submitWithRetry(t.Context(), [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d")}, nsBz, nil, nil, "item") - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []int{4, 2}, batchSizes) + require.Equal(t, []int{4, 2}, batchSizes) + client.AssertExpectations(t) } -func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { +func TestSubmitWithRetry_PartialSuccess_Advances(t *testing.T) { t.Parallel() client := mocks.NewMockClient(t) + nsBz := datypes.NamespaceFromString("ns").Bytes() + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 2}}). + Once() + + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 1}}). + Once() + + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() + + var totalSubmitted int + s.submitWithRetry(t.Context(), [][]byte{[]byte("a"), []byte("b"), []byte("c")}, nsBz, func(count int, _ uint64) { + totalSubmitted += count + }, nil, "item") + + require.Equal(t, 3, totalSubmitted) + client.AssertExpectations(t) +} + +func TestSubmitWithRetry_MaxAttempts_Exhausted(t *testing.T) { + t.Parallel() + + client := mocks.NewMockClient(t) nsBz := datypes.NamespaceFromString("ns").Bytes() - opts := []byte("opts") - var usedGas []float64 + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "fail"}}). + Times(3) + + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() + + var errReceived error + s.submitWithRetry(t.Context(), [][]byte{[]byte("a")}, nsBz, nil, func(err error) { + errReceived = err + }, "item") - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). + require.Error(t, errReceived) + client.AssertExpectations(t) +} + +func TestSubmitWithRetry_AlreadyInMempool_Retries(t *testing.T) { + t.Parallel() + + client := mocks.NewMockClient(t) + nsBz := datypes.NamespaceFromString("ns").Bytes() + + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusAlreadyInMempool}}). Once() - ids := [][]byte{[]byte("id1")} - client.On("Submit", mock.Anything, mock.Anything, mock.AnythingOfType("float64"), nsBz, opts). - Run(func(args mock.Arguments) { usedGas = append(usedGas, args.Get(2).(float64)) }). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: ids, SubmittedCount: uint64(len(ids))}}). + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, SubmittedCount: 1}}). Once() - s := newTestSubmitter(t, client, nil) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - items := []string{"only"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } + var submittedCount int + s.submitWithRetry(t.Context(), [][]byte{[]byte("a")}, nsBz, func(count int, _ uint64) { + submittedCount = count + }, nil, "item") - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(_ []string, _ *datypes.ResultSubmit) {}, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, []float64{-1, -1}, usedGas) + require.Equal(t, 1, submittedCount) + client.AssertExpectations(t) } -func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { +func TestSubmitWithRetry_EmptyBatch_Noop(t *testing.T) { t.Parallel() client := mocks.NewMockClient(t) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - nsBz := datypes.NamespaceFromString("ns").Bytes() + var errReceived error + s.submitWithRetry(t.Context(), nil, nil, nil, func(err error) { + errReceived = err + }, "item") - opts := []byte("opts") - var totalSubmitted int + require.NoError(t, errReceived) +} - firstIDs := [][]byte{[]byte("id1"), []byte("id2")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: firstIDs, SubmittedCount: uint64(len(firstIDs))}}). +func TestSubmitWithRetry_ContextCanceled_Stops(t *testing.T) { + t.Parallel() + + client := mocks.NewMockClient(t) + nsBz := datypes.NamespaceFromString("ns").Bytes() + + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusContextCanceled}}). Once() - secondIDs := [][]byte{[]byte("id3")} - client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, opts). - Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: secondIDs, SubmittedCount: uint64(len(secondIDs))}}). + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() + + var errReceived error + s.submitWithRetry(t.Context(), [][]byte{[]byte("a")}, nsBz, nil, func(err error) { + errReceived = err + }, "item") + + require.NoError(t, errReceived) + client.AssertExpectations(t) +} + +func TestSubmitWithRetry_SingleItemTooBig_Fails(t *testing.T) { + t.Parallel() + + client := mocks.NewMockClient(t) + nsBz := datypes.NamespaceFromString("ns").Bytes() + + client.On("Submit", mock.Anything, mock.Anything, mock.Anything, nsBz, mock.Anything). + Return(datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusTooBig}}). Once() - s := newTestSubmitter(t, client, nil) + s := newTestBatchSubmitter(t, client, nil) + defer s.Close() - items := []string{"a", "b", "c"} - marshalledItems := make([][]byte, len(items)) - for idx, item := range items { - marshalledItems[idx] = []byte(item) - } + var errReceived error + s.submitWithRetry(t.Context(), [][]byte{[]byte("a")}, nsBz, nil, func(err error) { + errReceived = err + }, "item") - ctx := context.Background() - err := submitToDA[string]( - s, - ctx, - items, - marshalledItems, - func(submitted []string, _ *datypes.ResultSubmit) { totalSubmitted += len(submitted) }, - "item", - nsBz, - opts, - ) - assert.NoError(t, err) - assert.Equal(t, 3, totalSubmitted) + require.ErrorIs(t, errReceived, common.ErrOversizedItem) + client.AssertExpectations(t) } diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index d25786018b..df67a9fe3b 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -35,18 +35,15 @@ const ( func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manager, *mocks.MockClient, genesis.Genesis) { t.Helper() - // Create store and cache ds := sync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) - // Create config cfg := config.DefaultConfig() cfg.DA.Namespace = testHeaderNamespace cfg.DA.DataNamespace = testDataNamespace - // Mock DA client mockDA := mocks.NewMockClient(t) headerNamespace := datypes.NamespaceFromString(cfg.DA.Namespace).Bytes() dataNamespace := datypes.NamespaceFromString(cfg.DA.DataNamespace).Bytes() @@ -55,7 +52,6 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage mockDA.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() mockDA.On("HasForcedInclusionNamespace").Return(false).Maybe() - // Create genesis gen := genesis.Genesis{ ChainID: "test-chain", InitialHeight: 1, @@ -63,7 +59,6 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage ProposerAddress: []byte("test-proposer"), } - // Create DA submitter daSubmitter := NewDASubmitter( mockDA, cfg, @@ -218,10 +213,10 @@ func TestDASubmitter_SubmitHeaders_Success(t *testing.T) { // Get headers from cache and submit headers, marshalledHeaders, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) + err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer, nil, nil) require.NoError(t, err) + submitter.Close() - // Verify headers are marked as DA included _, ok1 := cm.GetHeaderDAIncludedByHeight(1) assert.True(t, ok1) _, ok2 := cm.GetHeaderDAIncludedByHeight(2) @@ -238,7 +233,7 @@ func TestDASubmitter_SubmitHeaders_NoPendingHeaders(t *testing.T) { // Get headers from cache (should be empty) and submit headers, marshalledHeaders, err := cm.GetPendingHeaders(ctx) require.NoError(t, err) - err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer) + err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer, nil, nil) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -333,8 +328,9 @@ func TestDASubmitter_SubmitData_Success(t *testing.T) { // Get data from cache and submit signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen, nil, nil) require.NoError(t, err) + submitter.Close() // Verify data is marked as DA included _, ok := cm.GetDataDAIncludedByHeight(1) @@ -387,7 +383,7 @@ func TestDASubmitter_SubmitData_SkipsEmptyData(t *testing.T) { // Get data from cache and submit signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, signer, gen, nil, nil) require.NoError(t, err) mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) @@ -406,7 +402,7 @@ func TestDASubmitter_SubmitData_NoPendingData(t *testing.T) { // Get data from cache (should be empty) and submit dataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer, gen) + err = submitter.SubmitData(ctx, dataList, marshalledData, cm, signer, gen, nil, nil) require.NoError(t, err) // Should succeed with no action mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } @@ -447,7 +443,7 @@ func TestDASubmitter_SubmitData_NilSigner(t *testing.T) { // Get data from cache and submit with nil signer - should fail signedDataList, marshalledData, err := cm.GetPendingData(ctx) require.NoError(t, err) - err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil, gen) + err = submitter.SubmitData(ctx, signedDataList, marshalledData, cm, nil, gen, nil, nil) require.Error(t, err) assert.Contains(t, err.Error(), "signer is nil") mockDA.AssertNotCalled(t, "Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) diff --git a/block/internal/submitting/da_submitter_tracing.go b/block/internal/submitting/da_submitter_tracing.go index e3c531fcf8..f516694fea 100644 --- a/block/internal/submitting/da_submitter_tracing.go +++ b/block/internal/submitting/da_submitter_tracing.go @@ -30,7 +30,7 @@ func WithTracingDASubmitter(inner DASubmitterAPI) DASubmitterAPI { } } -func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { +func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitHeaders", trace.WithAttributes( attribute.Int("header.count", len(headers)), @@ -38,14 +38,12 @@ func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types. ) defer span.End() - // calculate total size var totalBytes int for _, h := range marshalledHeaders { totalBytes += len(h) } span.SetAttributes(attribute.Int("header.total_bytes", totalBytes)) - // add height range if headers present if len(headers) > 0 { span.SetAttributes( attribute.Int64("header.start_height", int64(headers[0].Height())), @@ -53,7 +51,7 @@ func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types. ) } - err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer) + err := t.inner.SubmitHeaders(ctx, headers, marshalledHeaders, cache, signer, onSubmitSuccess, onSubmitError) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -63,7 +61,7 @@ func (t *tracedDASubmitter) SubmitHeaders(ctx context.Context, headers []*types. return nil } -func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error { ctx, span := t.tracer.Start(ctx, "DASubmitter.SubmitData", trace.WithAttributes( attribute.Int("data.count", len(signedDataList)), @@ -71,14 +69,12 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty ) defer span.End() - // calculate total size var totalBytes int for _, d := range marshalledData { totalBytes += len(d) } span.SetAttributes(attribute.Int("data.total_bytes", totalBytes)) - // add height range if data present if len(signedDataList) > 0 { span.SetAttributes( attribute.Int64("data.start_height", int64(signedDataList[0].Height())), @@ -86,7 +82,7 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty ) } - err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis) + err := t.inner.SubmitData(ctx, signedDataList, marshalledData, cache, signer, genesis, onSubmitSuccess, onSubmitError) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -95,3 +91,7 @@ func (t *tracedDASubmitter) SubmitData(ctx context.Context, signedDataList []*ty return nil } + +func (t *tracedDASubmitter) Close() { + t.inner.Close() +} diff --git a/block/internal/submitting/da_submitter_tracing_test.go b/block/internal/submitting/da_submitter_tracing_test.go index 6edc5c5ec1..60df24d468 100644 --- a/block/internal/submitting/da_submitter_tracing_test.go +++ b/block/internal/submitting/da_submitter_tracing_test.go @@ -19,24 +19,26 @@ import ( ) type mockDASubmitterAPI struct { - submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error - submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + submitHeadersFn func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error + submitDataFn func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error } -func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { +func (m *mockDASubmitterAPI) SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { if m.submitHeadersFn != nil { - return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer) + return m.submitHeadersFn(ctx, headers, marshalledHeaders, cache, signer, onSubmitSuccess, onSubmitError) } return nil } -func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { +func (m *mockDASubmitterAPI) SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error { if m.submitDataFn != nil { - return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis) + return m.submitDataFn(ctx, signedDataList, marshalledData, cache, signer, genesis, onSubmitSuccess, onSubmitError) } return nil } +func (m *mockDASubmitterAPI) Close() {} + func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, *tracetest.SpanRecorder) { t.Helper() sr := tracetest.NewSpanRecorder() @@ -48,7 +50,7 @@ func setupDASubmitterTrace(t *testing.T, inner DASubmitterAPI) (DASubmitterAPI, func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { mock := &mockDASubmitterAPI{ - submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { return nil }, } @@ -66,7 +68,7 @@ func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { []byte("header3"), } - err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil, nil, nil) require.NoError(t, err) spans := sr.Ended() @@ -77,7 +79,7 @@ func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { attrs := span.Attributes() testutil.RequireAttribute(t, attrs, "header.count", 3) - testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) // 7+7+7 + testutil.RequireAttribute(t, attrs, "header.total_bytes", 21) testutil.RequireAttribute(t, attrs, "header.start_height", int64(100)) testutil.RequireAttribute(t, attrs, "header.end_height", int64(102)) } @@ -85,7 +87,7 @@ func TestTracedDASubmitter_SubmitHeaders_Success(t *testing.T) { func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { expectedErr := errors.New("DA submission failed") mock := &mockDASubmitterAPI{ - submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { return expectedErr }, } @@ -97,7 +99,7 @@ func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { } marshalledHeaders := [][]byte{[]byte("header1")} - err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil) + err := submitter.SubmitHeaders(ctx, headers, marshalledHeaders, nil, nil, nil, nil) require.Error(t, err) require.Equal(t, expectedErr, err) @@ -110,14 +112,14 @@ func TestTracedDASubmitter_SubmitHeaders_Error(t *testing.T) { func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { mock := &mockDASubmitterAPI{ - submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error { + submitHeadersFn: func(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error { return nil }, } submitter, sr := setupDASubmitterTrace(t, mock) ctx := context.Background() - err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil) + err := submitter.SubmitHeaders(ctx, []*types.SignedHeader{}, [][]byte{}, nil, nil, nil, nil) require.NoError(t, err) spans := sr.Ended() @@ -131,7 +133,7 @@ func TestTracedDASubmitter_SubmitHeaders_Empty(t *testing.T) { func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { mock := &mockDASubmitterAPI{ - submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error { return nil }, } @@ -147,7 +149,7 @@ func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { []byte("data2data2"), } - err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}, nil, nil) require.NoError(t, err) spans := sr.Ended() @@ -158,7 +160,7 @@ func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { attrs := span.Attributes() testutil.RequireAttribute(t, attrs, "data.count", 2) - testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) // 10+10 + testutil.RequireAttribute(t, attrs, "data.total_bytes", 20) testutil.RequireAttribute(t, attrs, "data.start_height", int64(100)) testutil.RequireAttribute(t, attrs, "data.end_height", int64(101)) } @@ -166,7 +168,7 @@ func TestTracedDASubmitter_SubmitData_Success(t *testing.T) { func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { expectedErr := errors.New("data submission failed") mock := &mockDASubmitterAPI{ - submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error { + submitDataFn: func(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error { return expectedErr }, } @@ -178,7 +180,7 @@ func TestTracedDASubmitter_SubmitData_Error(t *testing.T) { } marshalledData := [][]byte{[]byte("data1")} - err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}) + err := submitter.SubmitData(ctx, signedDataList, marshalledData, nil, nil, genesis.Genesis{}, nil, nil) require.Error(t, err) require.Equal(t, expectedErr, err) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 25dcd781a1..17574a0e67 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -25,8 +25,9 @@ import ( // DASubmitterAPI defines minimal methods needed by Submitter for DA submissions. type DASubmitterAPI interface { - SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer) error - SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error + SubmitHeaders(ctx context.Context, headers []*types.SignedHeader, marshalledHeaders [][]byte, cache cache.Manager, signer signer.Signer, onSubmitSuccess func(), onSubmitError func(error)) error + SubmitData(ctx context.Context, signedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis, onSubmitSuccess func(), onSubmitError func(error)) error + Close() } // Submitter handles DA submission and inclusion processing for both sync and aggregator nodes @@ -153,6 +154,7 @@ func (s *Submitter) Stop() error { if s.cancel != nil { s.cancel() } + s.daSubmitter.Close() // Wait for goroutines to finish with a timeout to prevent hanging done := make(chan struct{}) go func() { @@ -206,9 +208,15 @@ func (s *Submitter) daSubmissionLoop() { // Get headers with marshalled bytes from cache headers, marshalledHeaders, err := s.cache.GetPendingHeaders(s.ctx) if err != nil { + if len(headers) > 0 { + s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) + } s.logger.Error().Err(err).Msg("failed to get pending headers for batching decision") return } + if len(headers) == 0 { + return + } // Calculate total size (excluding signature) totalSize := uint64(0) @@ -223,26 +231,38 @@ func (s *Submitter) daSubmissionLoop() { timeSinceLastSubmit, ) - if shouldSubmit { - s.logger.Debug(). - Time("t", time.Now()). - Uint64("headers", headersNb). - Uint64("total_size_kb", totalSize/1024). - Dur("time_since_last", timeSinceLastSubmit). - Msg("batching strategy triggered header submission") - - if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, marshalledHeaders, s.cache, s.signer); err != nil { - // Check for unrecoverable errors that indicate a critical issue - if errors.Is(err, common.ErrOversizedItem) { - s.logger.Error().Err(err). - Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock") - s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) - return - } - s.logger.Error().Err(err).Msg("failed to submit headers") - } else { - s.lastHeaderSubmit.Store(time.Now().UnixNano()) + if !shouldSubmit { + if len(headers) > 0 { + s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) + } + return + } + + s.logger.Debug(). + Time("t", time.Now()). + Uint64("headers", headersNb). + Uint64("total_size_kb", totalSize/1024). + Dur("time_since_last", timeSinceLastSubmit). + Msg("batching strategy triggered header submission") + + onSuccess := func() { s.lastHeaderSubmit.Store(time.Now().UnixNano()) } + onError := func(err error) { + if len(headers) > 0 { + s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) + } + if errors.Is(err, common.ErrOversizedItem) { + s.logger.Error().Err(err). + Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock") + s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) + return + } + s.logger.Error().Err(err).Msg("failed to submit headers") + } + if err := s.daSubmitter.SubmitHeaders(s.ctx, headers, marshalledHeaders, s.cache, s.signer, onSuccess, onError); err != nil { + if len(headers) > 0 { + s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) } + s.logger.Error().Err(err).Msg("failed to enqueue header submission") } }() } @@ -266,9 +286,15 @@ func (s *Submitter) daSubmissionLoop() { // Get data with marshalled bytes from cache signedDataList, marshalledData, err := s.cache.GetPendingData(s.ctx) if err != nil { + if len(signedDataList) > 0 { + s.cache.ResetInFlightDataRange(signedDataList[0].Height(), signedDataList[len(signedDataList)-1].Height()) + } s.logger.Error().Err(err).Msg("failed to get pending data for batching decision") return } + if len(signedDataList) == 0 { + return + } // Calculate total size (excluding signature) totalSize := uint64(0) @@ -283,26 +309,38 @@ func (s *Submitter) daSubmissionLoop() { timeSinceLastSubmit, ) - if shouldSubmit { - s.logger.Debug(). - Time("t", time.Now()). - Uint64("data", dataNb). - Uint64("total_size_kb", totalSize/1024). - Dur("time_since_last", timeSinceLastSubmit). - Msg("batching strategy triggered data submission") - - if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer, s.genesis); err != nil { - // Check for unrecoverable errors that indicate a critical issue - if errors.Is(err, common.ErrOversizedItem) { - s.logger.Error().Err(err). - Msg("CRITICAL: Data exceeds DA blob size limit - halting to prevent live lock") - s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) - return - } - s.logger.Error().Err(err).Msg("failed to submit data") - } else { - s.lastDataSubmit.Store(time.Now().UnixNano()) + if !shouldSubmit { + if len(signedDataList) > 0 { + s.cache.ResetInFlightDataRange(signedDataList[0].Height(), signedDataList[len(signedDataList)-1].Height()) + } + return + } + + s.logger.Debug(). + Time("t", time.Now()). + Uint64("data", dataNb). + Uint64("total_size_kb", totalSize/1024). + Dur("time_since_last", timeSinceLastSubmit). + Msg("batching strategy triggered data submission") + + onSuccess := func() { s.lastDataSubmit.Store(time.Now().UnixNano()) } + onError := func(err error) { + if len(signedDataList) > 0 { + s.cache.ResetInFlightDataRange(signedDataList[0].Height(), signedDataList[len(signedDataList)-1].Height()) + } + if errors.Is(err, common.ErrOversizedItem) { + s.logger.Error().Err(err). + Msg("CRITICAL: Data exceeds DA blob size limit - halting to prevent live lock") + s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) + return + } + s.logger.Error().Err(err).Msg("failed to submit data") + } + if err := s.daSubmitter.SubmitData(s.ctx, signedDataList, marshalledData, s.cache, s.signer, s.genesis, onSuccess, onError); err != nil { + if len(signedDataList) > 0 { + s.cache.ResetInFlightDataRange(signedDataList[0].Height(), signedDataList[len(signedDataList)-1].Height()) } + s.logger.Error().Err(err).Msg("failed to enqueue data submission") } }() } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index b1e2d2e988..401f2619fc 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -421,7 +421,7 @@ type fakeDASubmitter struct { chData chan struct{} } -func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHeader, _ [][]byte, _ cache.Manager, _ signer.Signer) error { +func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHeader, _ [][]byte, _ cache.Manager, _ signer.Signer, _ func(), _ func(error)) error { select { case f.chHdr <- struct{}{}: default: @@ -429,7 +429,7 @@ func (f *fakeDASubmitter) SubmitHeaders(ctx context.Context, _ []*types.SignedHe return nil } -func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ [][]byte, _ cache.Manager, _ signer.Signer, _ genesis.Genesis) error { +func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, _ [][]byte, _ cache.Manager, _ signer.Signer, _ genesis.Genesis, _ func(), _ func(error)) error { select { case f.chData <- struct{}{}: default: @@ -437,6 +437,8 @@ func (f *fakeDASubmitter) SubmitData(ctx context.Context, _ []*types.SignedData, return nil } +func (f *fakeDASubmitter) Close() {} + // fakeSigner implements signer.Signer with deterministic behavior for tests. type fakeSigner struct{}