Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions migration/historyprunner/committer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package historyprunner

import (
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
"github.com/NethermindEth/juno/utils/log"
"go.uber.org/zap"
)

// committer is a pipeline stage that writes batches to the database and
// releases the batch semaphore slot.
type committer struct {
logger log.StructuredLogger
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
}
Comment thread
rodrodros marked this conversation as resolved.

var _ pipeline.State[db.Batch, struct{}] = (*committer)(nil)

func newCommitter(
logger log.StructuredLogger,
batchSemaphore semaphore.ResourceSemaphore[db.Batch],
) *committer {
return &committer{
logger: logger,
batchSemaphore: batchSemaphore,
}
}

func (c *committer) Run(_ int, batch db.Batch, _ chan<- struct{}) error {
c.logger.Debug(
"writing batch",
zap.Int("batch_size", batch.Size()),
)
defer c.logger.Debug(
"wrote batch",
zap.Int("batch_size", batch.Size()),
)

if err := batch.Write(); err != nil {
return err
}

c.batchSemaphore.Put()
return nil
}

func (c *committer) Done(int, chan<- struct{}) error {
return nil
}
Comment thread
rodrodros marked this conversation as resolved.
106 changes: 106 additions & 0 deletions migration/historyprunner/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package historyprunner

import (
"encoding/binary"
"fmt"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/db"
)

// copyScratch holds the reusable byte buffers for one worker's copy
// operations. Each worker owns one copyScratch; buffers are overwritten on
// every entry and consumed (via batch.Put) before the next iteration
// touches them, so reuse is safe. Allocated once at stager/restorer
// construction.
type copyScratch struct {
historyKey [historyKeyBufLen]byte
scratchKey [scratchKeyBufLen]byte
value [historyValueLen]byte
}

type copyDirection bool

const (
historyToScratch copyDirection = false
scratchToHistory copyDirection = true
)
Comment thread
rodrodros marked this conversation as resolved.

func copyStateHistory(
reader db.KeyValueReader,
batch db.Batch,
scratch *copyScratch,
diff *core.StateDiff,
blockNum uint64,
direction copyDirection,
) error {
var blockBE [blockNumberSuffixLen]byte
binary.BigEndian.PutUint64(blockBE[:], blockNum)

move := func(historyKey, scratchKey []byte) error {
src, dst := historyKey, scratchKey
if direction == scratchToHistory {
src, dst = dst, src
}
return copyValue(reader, batch, scratch.value[:], src, dst)
}

for addr, slots := range diff.StorageDiffs {
for slot := range slots {
if err := move(
fillStorageHistoryKey(scratch.historyKey[:], &addr, &slot, blockBE),
fillStorageScratchKey(scratch.scratchKey[:], &addr, &slot, blockBE),
); err != nil {
return err
}
}
}

for addr := range diff.Nonces {
if err := move(
fillNonceHistoryKey(scratch.historyKey[:], &addr, blockBE),
fillNonceScratchKey(scratch.scratchKey[:], &addr, blockBE),
); err != nil {
return err
}
}

for addr := range diff.ReplacedClasses {
if err := move(
fillClassHashHistoryKey(scratch.historyKey[:], &addr, blockBE),
fillClassHashScratchKey(scratch.scratchKey[:], &addr, blockBE),
); err != nil {
return err
Comment on lines +54 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error wrapper for this as well please

}
}

return nil
}

// copyValue reads source into buf, then writes buf to dest. buf is the
// caller's reusable felt-sized scratch (32 bytes); after Put returns, the
// underlying batch has copied buf, so the caller may overwrite it safely.
func copyValue(
reader db.KeyValueReader,
writer db.KeyValueWriter,
buf,
source,
dest []byte,
) error {
err := reader.Get(source, func(data []byte) error {
if len(data) != len(buf) {
return fmt.Errorf(
"history value size mismatch at key %x: want %d bytes, got %d",
source, len(buf), len(data),
)
}

copy(buf, data)
return nil
})
if err != nil {
return err
}

return writer.Put(dest, buf)
}
149 changes: 149 additions & 0 deletions migration/historyprunner/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package historyprunner

import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
)

// migrationScratchTag is the leading byte of every history-pruner scratch
// key. We reuse db.Temporary, the canonical scratch bucket also used by
// deprecated migrations. Production iterators scoped to a single-byte bucket
// prefix (e.g. NewIterator([]byte{14}, ...) for ContractStorageHistory)
// cannot observe scratch keys.
var migrationScratchTag = byte(db.Temporary)

// Scratch keys carry their source bucket tag as byte 1, making them
// self-describing:
//
// scratchKey = [Temporary][originalBucketTag][...rest of original key after
// its own bucket tag byte]
//
// Concretely:
//
// storage : [Temporary][14][addr:32][slot:32][block:8] = 74 bytes
// nonce : [Temporary][15][addr:32][block:8] = 42 bytes
// classhash : [Temporary][16][addr:32][block:8] = 42 bytes
//
// Phase 2 (scratch → history) needs no out-of-band mapping: read the
// original tag from byte 1 of the scratch key, prepend it to the rest, done.
const scratchPrefixLen = 2

const blockNumberSuffixLen = 8

// History-bucket key sizes. These mirror the production schema:
//
// storage : [bucket][addr:32][slot:32][block:8] = 73
// nonce : [bucket][addr:32][block:8] = 41
// classhash : [bucket][addr:32][block:8] = 41
const (
storageHistoryKeyLen = 1 + 2*felt.Bytes + blockNumberSuffixLen
nonceHistoryKeyLen = 1 + felt.Bytes + blockNumberSuffixLen
classHashHistoryKeyLen = 1 + felt.Bytes + blockNumberSuffixLen

// historyKeyBufLen sizes a buffer big enough for any history-bucket key
// in this migration — caller can reuse one buffer across all three.
historyKeyBufLen = storageHistoryKeyLen

// historyValueLen is the felt-sized payload of every history bucket.
historyValueLen = felt.Bytes

storageScratchKeyLen = 1 + storageHistoryKeyLen
nonceScratchKeyLen = 1 + nonceHistoryKeyLen
classHashScratchKeyLen = 1 + classHashHistoryKeyLen

// scratchKeyBufLen sizes a buffer big enough for any scratch key —
// callers can reuse one buffer across all three sub-namespaces.
scratchKeyBufLen = storageScratchKeyLen
)

// fillStorageScratchKey writes [Temporary][14][addr:32][slot:32][block:8] into
// dst and returns dst[:74]. dst must have cap ≥ storageScratchKeyLen.
func fillStorageScratchKey(
buf []byte,
addr, slot *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:storageScratchKeyLen]
buf[0] = migrationScratchTag
buf[1] = byte(db.ContractStorageHistory)
addrBytes := addr.Bytes()
copy(buf[scratchPrefixLen:], addrBytes[:])
slotBytes := slot.Bytes()
copy(buf[scratchPrefixLen+felt.Bytes:], slotBytes[:])
copy(buf[scratchPrefixLen+2*felt.Bytes:], blockBE[:])
return buf
}

// fillNonceScratchKey writes [Temporary][15][addr:32][block:8] into dst.
func fillNonceScratchKey(
buf []byte,
addr *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:nonceScratchKeyLen]
buf[0] = migrationScratchTag
buf[1] = byte(db.ContractNonceHistory)
addrBytes := addr.Bytes()
copy(buf[scratchPrefixLen:], addrBytes[:])
copy(buf[scratchPrefixLen+felt.Bytes:], blockBE[:])
return buf
}

// fillClassHashScratchKey writes [Temporary][16][addr:32][block:8] into dst.
func fillClassHashScratchKey(
buf []byte,
addr *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:classHashScratchKeyLen]
buf[0] = migrationScratchTag
buf[1] = byte(db.ContractClassHashHistory)
addrBytes := addr.Bytes()
copy(buf[scratchPrefixLen:], addrBytes[:])
copy(buf[scratchPrefixLen+felt.Bytes:], blockBE[:])
return buf
}

// fillStorageHistoryKey writes [bucket][addr:32][slot:32][block:8] into dst.
func fillStorageHistoryKey(
buf []byte,
addr, slot *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:storageHistoryKeyLen]
buf[0] = byte(db.ContractStorageHistory)
addrBytes := addr.Bytes()
copy(buf[1:], addrBytes[:])
slotBytes := slot.Bytes()
copy(buf[1+felt.Bytes:], slotBytes[:])
copy(buf[1+2*felt.Bytes:], blockBE[:])
return buf
}

// fillNonceHistoryKey writes [bucket][addr:32][block:8] into dst.
func fillNonceHistoryKey(
buf []byte,
addr *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:nonceHistoryKeyLen]
buf[0] = byte(db.ContractNonceHistory)
addrBytes := addr.Bytes()
copy(buf[1:], addrBytes[:])
copy(buf[1+felt.Bytes:], blockBE[:])
return buf
}

// fillClassHashHistoryKey writes [bucket][addr:32][block:8] into dst.
func fillClassHashHistoryKey(
buf []byte,
addr *felt.Felt,
blockBE [blockNumberSuffixLen]byte,
) []byte {
buf = buf[:classHashHistoryKeyLen]
buf[0] = byte(db.ContractClassHashHistory)
addrBytes := addr.Bytes()
copy(buf[1:], addrBytes[:])
copy(buf[1+felt.Bytes:], blockBE[:])
return buf
}
Loading
Loading