From 7f5251ddd1fbd5d1695e6181a12e9b19538254c3 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 17:30:28 +0200 Subject: [PATCH 1/4] perf: reaper --- block/internal/cache/generic_cache.go | 36 ++++++++++++++ block/internal/cache/manager.go | 7 ++- block/internal/reaping/bench_test.go | 42 ++++++++++++++++ block/internal/reaping/reaper.go | 69 +++++++++++---------------- 4 files changed, 113 insertions(+), 41 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 5e9f5ff4f5..7e814074e3 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -58,6 +58,19 @@ func (c *Cache) isSeen(hash string) bool { return c.hashes[hash] } +// areSeen checks which hashes have been seen. Returns a boolean slice +// parallel to the input where result[i] is true if hashes[i] is in the +// cache. Acquires the read lock once for the entire batch. +func (c *Cache) areSeen(hashes []string) []bool { + c.mu.RLock() + defer c.mu.RUnlock() + result := make([]bool, len(hashes)) + for i, h := range hashes { + result[i] = c.hashes[h] + } + return result +} + func (c *Cache) setSeen(hash string, height uint64) { c.mu.Lock() defer c.mu.Unlock() @@ -75,6 +88,29 @@ func (c *Cache) removeSeen(hash string) { delete(c.hashes, hash) } +// setSeenBatch marks all hashes as seen under a single write lock. +// For height 0 (transactions), the hashByHeight bookkeeping is skipped +// since all txs share the same sentinel height — the map lookup and +// overwrite on every entry is pure overhead with no benefit. +func (c *Cache) setSeenBatch(hashes []string, height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + if height == 0 { + for _, h := range hashes { + c.hashes[h] = true + } + return + } + for _, h := range hashes { + if existing, ok := c.hashByHeight[height]; ok && existing == h { + c.hashes[existing] = true + continue + } + c.hashes[h] = true + c.hashByHeight[height] = h + } +} + func (c *Cache) getDAIncluded(hash string) (uint64, bool) { c.mu.RLock() defer c.mu.RUnlock() diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index a91b1410d7..e907002600 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -52,6 +52,7 @@ type CacheManager interface { // Transaction operations IsTxSeen(hash string) bool + AreTxsSeen(hashes []string) []bool SetTxSeen(hash string) SetTxsSeen(hashes []string) CleanupOldTxs(olderThan time.Duration) int @@ -204,6 +205,10 @@ func (m *implementation) IsTxSeen(hash string) bool { return m.txCache.isSeen(hash) } +func (m *implementation) AreTxsSeen(hashes []string) []bool { + return m.txCache.areSeen(hashes) +} + func (m *implementation) SetTxSeen(hash string) { // Use 0 as height since transactions don't have a block height yet m.txCache.setSeen(hash, 0) @@ -212,9 +217,9 @@ func (m *implementation) SetTxSeen(hash string) { } func (m *implementation) SetTxsSeen(hashes []string) { + m.txCache.setSeenBatch(hashes, 0) now := time.Now() for _, hash := range hashes { - m.txCache.setSeen(hash, 0) m.txTimestamps.Store(hash, now) } } diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go index 5ec0aaa69d..829eab6427 100644 --- a/block/internal/reaping/bench_test.go +++ b/block/internal/reaping/bench_test.go @@ -246,6 +246,48 @@ func BenchmarkReaperFlow_Sustained(b *testing.B) { }) } +func BenchmarkReaperFlow_DrainOnly(b *testing.B) { + for _, batchSize := range []int{100, 500, 1000} { + for _, txSize := range []int{256, 4096} { + name := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize) + b.Run(name, func(b *testing.B) { + exec := &infiniteExecutor{} + seq := &countingSequencer{} + cm := newBenchCache(b) + + r, err := NewReaper( + exec, seq, + genesis.Genesis{ChainID: "bench"}, + zerolog.Nop(), cm, + 10*time.Millisecond, + func() {}, + ) + if err != nil { + b.Fatal(err) + } + + txs := make([][]byte, batchSize) + for i := range txs { + txs[i] = make([]byte, txSize) + _, _ = rand.Read(txs[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + exec.mu.Lock() + exec.batch = txs + exec.mu.Unlock() + r.drainMempool(nil) + } + b.StopTimer() + + total := seq.submitted.Load() + b.ReportMetric(float64(total)/b.Elapsed().Seconds(), "txs/sec") + }) + } + } +} + func BenchmarkReaperFlow_StartStop(b *testing.B) { b.Run("lifecycle", func(b *testing.B) { exec := &infiniteExecutor{} diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 796505e462..d35dbfff3e 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -143,11 +143,6 @@ func (r *Reaper) Stop() error { return nil } -type pendingTx struct { - tx []byte - hash string -} - func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { var totalSubmitted int @@ -175,16 +170,32 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { break } - filtered := r.filterNewTxs(txs) - if len(filtered) == 0 { + hashes := hashTxs(txs) + seen := r.cache.AreTxsSeen(hashes) + + newTxs := make([][]byte, 0, len(txs)) + newHashes := make([]string, 0, len(txs)) + for i, tx := range txs { + if !seen[i] { + newTxs = append(newTxs, tx) + newHashes = append(newHashes, hashes[i]) + } + } + + if len(newTxs) == 0 { break } - n, err := r.submitFiltered(filtered) + _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ + Id: []byte(r.chainID), + Batch: &coresequencer.Batch{Transactions: newTxs}, + }) if err != nil { - return totalSubmitted > 0, err + return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } - totalSubmitted += n + + r.cache.SetTxsSeen(newHashes) + totalSubmitted += len(newTxs) } if totalSubmitted > 0 { @@ -194,38 +205,16 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { return totalSubmitted > 0, nil } -func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx { - pending := make([]pendingTx, 0, len(txs)) - for _, tx := range txs { - h := hashTx(tx) - if !r.cache.IsTxSeen(h) { - pending = append(pending, pendingTx{tx: tx, hash: h}) - } - } - return pending -} - -func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) { - txs := make([][]byte, len(batch)) - hashes := make([]string, len(batch)) - for i, p := range batch { - txs[i] = p.tx - hashes[i] = p.hash +func hashTxs(txs [][]byte) []string { + hashes := make([]string, len(txs)) + for i, tx := range txs { + h := sha256.Sum256(tx) + hashes[i] = hex.EncodeToString(h[:]) } - - _, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ - Id: []byte(r.chainID), - Batch: &coresequencer.Batch{Transactions: txs}, - }) - if err != nil { - return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) - } - - r.cache.SetTxsSeen(hashes) - return len(txs), nil + return hashes } func hashTx(tx []byte) string { - hash := sha256.Sum256(tx) - return hex.EncodeToString(hash[:]) + h := sha256.Sum256(tx) + return hex.EncodeToString(h[:]) } From a9af66eef3c2a8d4c2140060942d363f44c1248f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 19:49:35 +0200 Subject: [PATCH 2/4] rm --- block/internal/reaping/bench_test.go | 42 ---------------------------- 1 file changed, 42 deletions(-) diff --git a/block/internal/reaping/bench_test.go b/block/internal/reaping/bench_test.go index 829eab6427..5ec0aaa69d 100644 --- a/block/internal/reaping/bench_test.go +++ b/block/internal/reaping/bench_test.go @@ -246,48 +246,6 @@ func BenchmarkReaperFlow_Sustained(b *testing.B) { }) } -func BenchmarkReaperFlow_DrainOnly(b *testing.B) { - for _, batchSize := range []int{100, 500, 1000} { - for _, txSize := range []int{256, 4096} { - name := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize) - b.Run(name, func(b *testing.B) { - exec := &infiniteExecutor{} - seq := &countingSequencer{} - cm := newBenchCache(b) - - r, err := NewReaper( - exec, seq, - genesis.Genesis{ChainID: "bench"}, - zerolog.Nop(), cm, - 10*time.Millisecond, - func() {}, - ) - if err != nil { - b.Fatal(err) - } - - txs := make([][]byte, batchSize) - for i := range txs { - txs[i] = make([]byte, txSize) - _, _ = rand.Read(txs[i]) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - exec.mu.Lock() - exec.batch = txs - exec.mu.Unlock() - r.drainMempool(nil) - } - b.StopTimer() - - total := seq.submitted.Load() - b.ReportMetric(float64(total)/b.Elapsed().Seconds(), "txs/sec") - }) - } - } -} - func BenchmarkReaperFlow_StartStop(b *testing.B) { b.Run("lifecycle", func(b *testing.B) { exec := &infiniteExecutor{} From edf5525d9218649a7a81239a970ca15ac7136c30 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 19:50:28 +0200 Subject: [PATCH 3/4] cl --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73e5429c8a..0097fb829b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changes + +- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) + ## v1.1.1 ### Changes From a790d8bb1ca0f53bf386715c88b19fcd779ac71c Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 24 Apr 2026 19:51:40 +0200 Subject: [PATCH 4/4] cmt --- block/internal/cache/generic_cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 7e814074e3..96ee82d8b2 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -101,6 +101,8 @@ func (c *Cache) setSeenBatch(hashes []string, height uint64) { } return } + + // currently not used, but there for compleness against setSeen for _, h := range hashes { if existing, ok := c.hashByHeight[height]; ok && existing == h { c.hashes[existing] = true