From c30255cd1a7ce646dcc08ede47baab045d5844e8 Mon Sep 17 00:00:00 2001 From: Ady0333 Date: Sun, 15 Feb 2026 02:32:23 +0530 Subject: [PATCH] Fix orphaned pvtdata missing entries from stale iterator during elg conversion Re-create the LevelDB iterator after releasing and re-acquiring purgerLock in processCollElgEvents. The old snapshot-based iterator yields entries deleted by the purger during the unlock window, creating eligible missing data entries with no expiry entry that are never cleaned up. Signed-off-by: Ady0333 --- core/ledger/pvtdatastorage/store.go | 7 +++ core/ledger/pvtdatastorage/store_test.go | 63 ++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index eb1f9d1ffa9..31a0659ceda 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -1054,6 +1054,13 @@ func (s *Store) processCollElgEvents() error { s.purgerLock.Unlock() time.Sleep(sleepTime * time.Millisecond) s.purgerLock.Lock() + // Re-create the iterator so it reflects any deletes + // made by the purger while the lock was released. + collItr.Release() + collItr, err = s.db.GetIterator(startKey, endKey) + if err != nil { + return err + } } } // entry loop diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index 3ddf65cbaae..1e1413ca5da 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -1765,6 +1765,69 @@ func testutilWaitForCollElgProcToFinish(s *Store) { s.collElgProcSync.waitForDone() } +func TestProcessCollElgEventsIteratorRecreatedAfterPurge(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy(map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + }) + conf := pvtDataConf() + conf.MaxBatchSize = 1 + conf.BatchesInterval = 500 + + env := NewTestStoreEnv(t, "TestCollElgStaleIter", btlPolicy, conf) + defer env.Cleanup() + s := env.TestStore + + s.purgeInterval = math.MaxUint64 + + s.purgerLock.Lock() + s.purgerLock.Unlock() //lint:ignore SA2001 syncpoint + + require.NoError(t, s.Commit(0, nil, nil, nil)) + + const numBlocks = uint64(10) + for blk := uint64(1); blk <= numBlocks; blk++ { + missingData := make(ledger.TxMissingPvtData) + missingData.Add(1, "ns-1", "coll-1", false) + require.NoError(t, s.Commit(blk, nil, missingData, nil)) + } + + const lastBlock = uint64(15) + for blk := numBlocks + 1; blk <= lastBlock; blk++ { + require.NoError(t, s.Commit(blk, nil, nil, nil)) + } + + { + key := encodeCollElgKey(lastBlock) + m := newCollElgInfo(map[string][]string{"ns-1": []string{"coll-1"}}) + val, err := encodeCollElgVal(m) + require.NoError(t, err) + b := s.db.NewUpdateBatch() + b.Put(key, val) + require.NoError(t, s.db.WriteBatch(b, true)) + } + + procErr := make(chan error, 1) + go func() { + procErr <- s.processCollElgEvents() + }() + + time.Sleep(100 * time.Millisecond) + + s.purgerLock.Lock() + require.NoError(t, s.purgeExpiredData(0, lastBlock)) + s.purgerLock.Unlock() + + require.NoError(t, <-procErr) + + for blk := uint64(1); blk <= numBlocks; blk++ { + k := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: blk}} + require.False(t, + testElgPrioMissingDataKeyExists(t, s, k), + "orphaned eligible prioritized missing data entry must not exist for expired block %d", blk, + ) + } +} + func produceSamplePvtdata(t *testing.T, txNum uint64, nsColls []string) *ledger.TxPvtData { builder := rwsetutil.NewRWSetBuilder() for _, nsColl := range nsColls {