Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ func (c *Cache) isSeen(hash string) bool {
return c.hashes[hash]
}

// areSeen checks which hashes have been seen. Returns a boolean slice
// parallel to the input where result[i] is true if hashes[i] is in the
// cache. Acquires the read lock once for the entire batch.
func (c *Cache) areSeen(hashes []string) []bool {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]bool, len(hashes))
for i, h := range hashes {
result[i] = c.hashes[h]
}
return result
}

func (c *Cache) setSeen(hash string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -75,6 +88,29 @@ func (c *Cache) removeSeen(hash string) {
delete(c.hashes, hash)
}

// setSeenBatch marks all hashes as seen under a single write lock.
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
// since all txs share the same sentinel height — the map lookup and
// overwrite on every entry is pure overhead with no benefit.
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
if height == 0 {
for _, h := range hashes {
c.hashes[h] = true
}
return
}
for _, h := range hashes {
if existing, ok := c.hashByHeight[height]; ok && existing == h {
c.hashes[existing] = true
continue
}
c.hashes[h] = true
c.hashByHeight[height] = h
}
}

func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
7 changes: 6 additions & 1 deletion block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CacheManager interface {

// Transaction operations
IsTxSeen(hash string) bool
AreTxsSeen(hashes []string) []bool
SetTxSeen(hash string)
SetTxsSeen(hashes []string)
CleanupOldTxs(olderThan time.Duration) int
Expand Down Expand Up @@ -204,6 +205,10 @@ func (m *implementation) IsTxSeen(hash string) bool {
return m.txCache.isSeen(hash)
}

func (m *implementation) AreTxsSeen(hashes []string) []bool {
return m.txCache.areSeen(hashes)
}

func (m *implementation) SetTxSeen(hash string) {
// Use 0 as height since transactions don't have a block height yet
m.txCache.setSeen(hash, 0)
Expand All @@ -212,9 +217,9 @@ func (m *implementation) SetTxSeen(hash string) {
}

func (m *implementation) SetTxsSeen(hashes []string) {
m.txCache.setSeenBatch(hashes, 0)
now := time.Now()
for _, hash := range hashes {
m.txCache.setSeen(hash, 0)
m.txTimestamps.Store(hash, now)
}
}
Expand Down
42 changes: 42 additions & 0 deletions block/internal/reaping/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,48 @@ func BenchmarkReaperFlow_Sustained(b *testing.B) {
})
}

func BenchmarkReaperFlow_DrainOnly(b *testing.B) {
for _, batchSize := range []int{100, 500, 1000} {
for _, txSize := range []int{256, 4096} {
name := fmt.Sprintf("batch=%d/txSize=%d", batchSize, txSize)
b.Run(name, func(b *testing.B) {
exec := &infiniteExecutor{}
seq := &countingSequencer{}
cm := newBenchCache(b)

r, err := NewReaper(
exec, seq,
genesis.Genesis{ChainID: "bench"},
zerolog.Nop(), cm,
10*time.Millisecond,
func() {},
)
if err != nil {
b.Fatal(err)
}

txs := make([][]byte, batchSize)
for i := range txs {
txs[i] = make([]byte, txSize)
_, _ = rand.Read(txs[i])
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
exec.mu.Lock()
exec.batch = txs
exec.mu.Unlock()
r.drainMempool(nil)
}
b.StopTimer()

total := seq.submitted.Load()
b.ReportMetric(float64(total)/b.Elapsed().Seconds(), "txs/sec")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
})
}
}
}

func BenchmarkReaperFlow_StartStop(b *testing.B) {
b.Run("lifecycle", func(b *testing.B) {
exec := &infiniteExecutor{}
Expand Down
69 changes: 29 additions & 40 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ func (r *Reaper) Stop() error {
return nil
}

type pendingTx struct {
tx []byte
hash string
}

func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
var totalSubmitted int

Expand Down Expand Up @@ -175,16 +170,32 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
break
}

filtered := r.filterNewTxs(txs)
if len(filtered) == 0 {
hashes := hashTxs(txs)
seen := r.cache.AreTxsSeen(hashes)

newTxs := make([][]byte, 0, len(txs))
newHashes := make([]string, 0, len(txs))
for i, tx := range txs {
if !seen[i] {
newTxs = append(newTxs, tx)
newHashes = append(newHashes, hashes[i])
}
}

if len(newTxs) == 0 {
break
}

n, err := r.submitFiltered(filtered)
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
})
if err != nil {
return totalSubmitted > 0, err
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}
totalSubmitted += n

r.cache.SetTxsSeen(newHashes)
totalSubmitted += len(newTxs)
}

if totalSubmitted > 0 {
Expand All @@ -194,38 +205,16 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
return totalSubmitted > 0, nil
}

func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx {
pending := make([]pendingTx, 0, len(txs))
for _, tx := range txs {
h := hashTx(tx)
if !r.cache.IsTxSeen(h) {
pending = append(pending, pendingTx{tx: tx, hash: h})
}
}
return pending
}

func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) {
txs := make([][]byte, len(batch))
hashes := make([]string, len(batch))
for i, p := range batch {
txs[i] = p.tx
hashes[i] = p.hash
func hashTxs(txs [][]byte) []string {
hashes := make([]string, len(txs))
for i, tx := range txs {
h := sha256.Sum256(tx)
hashes[i] = hex.EncodeToString(h[:])
}

_, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: txs},
})
if err != nil {
return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}

r.cache.SetTxsSeen(hashes)
return len(txs), nil
return hashes
}

func hashTx(tx []byte) string {
hash := sha256.Sum256(tx)
return hex.EncodeToString(hash[:])
h := sha256.Sum256(tx)
return hex.EncodeToString(h[:])
}
Loading