From 415fd8e05b2150f322730b35def3e3b07954d0ee Mon Sep 17 00:00:00 2001 From: infrmtcs Date: Tue, 7 Oct 2025 15:46:03 +0700 Subject: [PATCH 1/2] feat: Integrate Sync P2P protocol to consensus --- consensus/consensus.go | 10 +- consensus/consensus_test.go | 92 +++++-- consensus/driver/commit_listener.go | 2 - consensus/driver/driver.go | 141 +++++++--- consensus/driver/driver_test.go | 3 + consensus/mock.go | 19 +- consensus/mocks/mock_application.go | 69 ----- consensus/mocks/mock_commit_listener.go | 69 ----- consensus/mocks/mock_state_machine.go | 28 ++ consensus/p2p/buffered/proto_broadcaster.go | 5 +- .../p2p/buffered/rebroadcast_strategy.go | 2 +- consensus/p2p/vote/vote_broadcasters.go | 6 +- consensus/sync/mock_p2p_test.go | 31 --- consensus/sync/node_test.go | 30 --- consensus/sync/sync.go | 148 +++++------ consensus/sync/sync_test.go | 245 +++++++++--------- consensus/tendermint/process.go | 29 ++- consensus/tendermint/tendermint.go | 20 +- p2p/pubsub/pubsub.go | 1 + p2p/pubsub/testutils/adjacent_nodes.go | 86 ++++-- p2p/pubsub/testutils/pubsub.go | 20 +- 21 files changed, 553 insertions(+), 503 deletions(-) delete mode 100644 consensus/mocks/mock_application.go delete mode 100644 consensus/mocks/mock_commit_listener.go delete mode 100644 consensus/sync/mock_p2p_test.go delete mode 100644 consensus/sync/node_test.go diff --git a/consensus/consensus.go b/consensus/consensus.go index 930f1c46ee..7b75e27abb 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -12,11 +12,13 @@ import ( "github.com/NethermindEth/juno/consensus/proposal" "github.com/NethermindEth/juno/consensus/proposer" "github.com/NethermindEth/juno/consensus/starknet" + consensusSync "github.com/NethermindEth/juno/consensus/sync" "github.com/NethermindEth/juno/consensus/tendermint" "github.com/NethermindEth/juno/consensus/types" "github.com/NethermindEth/juno/consensus/votecounter" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/p2p/sync" "github.com/NethermindEth/juno/utils" "github.com/NethermindEth/juno/vm" "github.com/libp2p/go-libp2p/core/host" @@ -36,6 +38,7 @@ func Init( database db.KeyValueStore, blockchain *blockchain.Blockchain, vm vm.VM, + blockFetcher *sync.BlockFetcher, nodeAddress *starknet.Address, validators votecounter.Validators[starknet.Address], timeoutFn driver.TimeoutFn, @@ -49,7 +52,7 @@ func Init( tendermintDB := consensusDB.NewTendermintDB[starknet.Value, starknet.Hash, starknet.Address](database) - executor := builder.NewExecutor(blockchain, vm, logger, false, true) // TODO: We're currently skipping signature validation + executor := builder.NewExecutor(blockchain, vm, logger, false, false) builder := builder.New(blockchain, executor) proposalStore := proposal.ProposalStore[starknet.Hash]{} @@ -59,6 +62,9 @@ func Init( p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes, bootstrapPeersFn) commitListener := driver.NewCommitListener(logger, &proposalStore, proposer, p2p) + + messageExtractor := consensusSync.New(validators, toValue, &proposalStore) + driver := driver.New( logger, tendermintDB, @@ -66,6 +72,8 @@ func Init( commitListener, p2p.Broadcasters(), p2p.Listeners(), + blockFetcher, + &messageExtractor, timeoutFn, ) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index b06651ce8a..020b6e802d 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -3,6 +3,9 @@ package consensus_test import ( "fmt" goitre "iter" + "maps" + "slices" + gosync "sync" "testing" "time" @@ -10,17 +13,19 @@ import ( "github.com/NethermindEth/juno/consensus" "github.com/NethermindEth/juno/consensus/driver" "github.com/NethermindEth/juno/consensus/starknet" - "github.com/NethermindEth/juno/consensus/types" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db/memory" "github.com/NethermindEth/juno/genesis" + "github.com/NethermindEth/juno/p2p/dht" "github.com/NethermindEth/juno/p2p/pubsub/testutils" + "github.com/NethermindEth/juno/p2p/server" + "github.com/NethermindEth/juno/p2p/starknetp2p" + p2psync "github.com/NethermindEth/juno/p2p/sync" "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/NethermindEth/juno/vm" "github.com/sourcegraph/conc" - "github.com/sourcegraph/conc/iter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -28,7 +33,7 @@ import ( const ( commitBufferSize = 1024 - maxRoundWait = types.Round(5) + maxCommitWait = 5 * time.Minute ) var ( @@ -98,7 +103,8 @@ func loadGenesis( func initNode( t *testing.T, index int, - node *testutils.Node, + consensusNode *testutils.Node, + syncNode *testutils.Node, logger *utils.ZapLogger, commits chan commit, cfg *testConfig, @@ -120,20 +126,41 @@ func initNode( } vm := vm.New(&chainInfo, false, logger) + blockFetcher := p2psync.NewBlockFetcher(bc, syncNode.Host, &network, logger) + syncServer := server.New(syncNode.Host, bc, logger) + services, err := consensus.Init( - node.Host, + consensusNode.Host, logger, consensusDB, bc, vm, + &blockFetcher, &mockServices.NodeAddress, mockServices.Validators, mockServices.TimeoutFn, - node.GetBootstrapPeers, + consensusNode.GetBootstrapPeers, ) require.NoError(t, err) wg := conc.NewWaitGroup() + wg.Go(func() { + dht, err := dht.New( + t.Context(), + syncNode.Host, + &network, + starknetp2p.SyncProtocolID, + syncNode.GetBootstrapPeers, + ) + require.NoError(t, err) + require.NoError(t, dht.Bootstrap(t.Context())) + t.Cleanup(func() { + require.NoError(t, dht.Close()) + }) + }) + wg.Go(func() { + require.NoError(t, syncServer.Run(t.Context())) + }) wg.Go(func() { require.NoError(t, services.Proposer.Run(t.Context())) }) @@ -181,14 +208,9 @@ func writeBlock( } } -func commitStream(t *testing.T, nodeCount int, commits chan commit) goitre.Seq[commit] { +func commitStream(t *testing.T, commits chan commit) goitre.Seq[commit] { t.Helper() - timeoutFn := consensus.MockTimeoutFn(nodeCount) - var maxCommitWait time.Duration - for round := range maxRoundWait { - maxCommitWait += timeoutFn(types.StepPropose, round) + timeoutFn(types.StepPrevote, round) + timeoutFn(types.StepPrecommit, round) - } return func(yield func(commit) bool) { t.Helper() for { @@ -217,7 +239,7 @@ func assertCommits(t *testing.T, commits chan commit, cfg testConfig, logger *ut commitCount := make(map[uint64]int) nextHeight := uint64(1) - for commit := range commitStream(t, cfg.nodeCount, commits) { + for commit := range commitStream(t, commits) { blockNumber := commit.committedBlock.Block.Number blockHash := commit.committedBlock.Block.Hash @@ -265,6 +287,24 @@ func assertCommits(t *testing.T, commits chan commit, cfg testConfig, logger *ut } } +func setupNodes( + t *testing.T, + msg string, + logger utils.Logger, + cfg *testConfig, + honestNodeCount int, +) testutils.Nodes { + network := cfg.networkSetup(honestNodeCount) + for from, edges := range network { + logger.Info( + msg, + zap.Int("from", from), + zap.Any("to", slices.Sorted(maps.Keys(edges))), + ) + } + return testutils.BuildNetworks(t, network) +} + func runTest(t *testing.T, cfg testConfig) { t.Helper() logger, err := utils.NewZapLogger(utils.NewLogLevel(logLevel), true) @@ -276,12 +316,26 @@ func runTest(t *testing.T, cfg testConfig) { commits := make(chan commit, commitBufferSize) - nodes := testutils.BuildNetworks(t, cfg.networkSetup(honestNodeCount)) + consensusNodes := setupNodes(t, "consensus network", logger, &cfg, honestNodeCount) + syncNodes := setupNodes(t, "sync network", logger, &cfg, honestNodeCount) - iterator := iter.Iterator[testutils.Node]{MaxGoroutines: honestNodeCount} - iterator.ForEachIdx(nodes, func(i int, node *testutils.Node) { - initNode(t, i, node, logger, commits, &cfg, genesisDiff, genesisClasses) - }) + wg := gosync.WaitGroup{} + t.Cleanup(wg.Wait) + for i := range honestNodeCount { + wg.Go(func() { + initNode( + t, + i, + &consensusNodes[i], + &syncNodes[i], + logger, + commits, + &cfg, + genesisDiff, + genesisClasses, + ) + }) + } assertCommits(t, commits, cfg, logger) } @@ -306,7 +360,7 @@ func runWithAllHonestAndSilentFaultyNodes(t *testing.T, cfg testConfig) { func TestTendermintCluster(t *testing.T) { runWithAllHonestAndSilentFaultyNodes(t, testConfig{ nodeCount: 4, - targetHeight: 10, + targetHeight: 60, networkSetup: testutils.LineNetworkConfig, }) diff --git a/consensus/driver/commit_listener.go b/consensus/driver/commit_listener.go index 99cb4fa548..021358f8a5 100644 --- a/consensus/driver/commit_listener.go +++ b/consensus/driver/commit_listener.go @@ -16,8 +16,6 @@ type CommitHook[V types.Hashable[H], H types.Hash] interface { } // CommitListener is a component that is used to notify different components that a new committed block is available. -// -//go:generate mockgen -destination=../mocks/mock_commit_listener.go -package=mocks github.com/NethermindEth/juno/consensus/driver CommitListener type CommitListener[V types.Hashable[H], H types.Hash] interface { CommitHook[V, H] // Listen returns a channel that will receive committed blocks. diff --git a/consensus/driver/driver.go b/consensus/driver/driver.go index 91cf1c14ea..6a4a0e7239 100644 --- a/consensus/driver/driver.go +++ b/consensus/driver/driver.go @@ -3,13 +3,16 @@ package driver import ( "context" "fmt" + gosync "sync" "time" "github.com/NethermindEth/juno/consensus/db" "github.com/NethermindEth/juno/consensus/p2p" + consensusSync "github.com/NethermindEth/juno/consensus/sync" "github.com/NethermindEth/juno/consensus/tendermint" "github.com/NethermindEth/juno/consensus/types" "github.com/NethermindEth/juno/consensus/types/actions" + "github.com/NethermindEth/juno/p2p/sync" "github.com/NethermindEth/juno/utils" "go.uber.org/zap" ) @@ -17,17 +20,21 @@ import ( type TimeoutFn func(step types.Step, round types.Round) time.Duration type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct { - log utils.Logger - db db.TendermintDB[V, H, A] - stateMachine tendermint.StateMachine[V, H, A] - commitListener CommitListener[V, H] - broadcasters p2p.Broadcasters[V, H, A] - listeners p2p.Listeners[V, H, A] - - getTimeout TimeoutFn + log utils.Logger + db db.TendermintDB[V, H, A] + stateMachine tendermint.StateMachine[V, H, A] + commitListener CommitListener[V, H] + broadcasters p2p.Broadcasters[V, H, A] + listeners p2p.Listeners[V, H, A] + blockFetcher *sync.BlockFetcher + messageExtractor *consensusSync.MessageExtractor[V, H, A] + getTimeout TimeoutFn scheduledTms map[types.Timeout]*time.Timer timeoutsCh chan types.Timeout + syncListener chan sync.BlockBody + lastQuorum types.Height + wg gosync.WaitGroup } func New[V types.Hashable[H], H types.Hash, A types.Addr]( @@ -37,18 +44,24 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr]( commitListener CommitListener[V, H], broadcasters p2p.Broadcasters[V, H, A], listeners p2p.Listeners[V, H, A], + blockFetcher *sync.BlockFetcher, + messageExtractor *consensusSync.MessageExtractor[V, H, A], getTimeout TimeoutFn, ) Driver[V, H, A] { return Driver[V, H, A]{ - log: log, - db: db, - stateMachine: stateMachine, - commitListener: commitListener, - broadcasters: broadcasters, - listeners: listeners, - getTimeout: getTimeout, - scheduledTms: make(map[types.Timeout]*time.Timer), - timeoutsCh: make(chan types.Timeout), + log: log, + db: db, + stateMachine: stateMachine, + commitListener: commitListener, + broadcasters: broadcasters, + listeners: listeners, + blockFetcher: blockFetcher, + messageExtractor: messageExtractor, + getTimeout: getTimeout, + scheduledTms: make(map[types.Timeout]*time.Timer), + timeoutsCh: make(chan types.Timeout), + syncListener: make(chan sync.BlockBody, 1), + wg: gosync.WaitGroup{}, } } @@ -85,7 +98,9 @@ func (d *Driver[V, H, A]) replay(ctx context.Context) error { return nil } +//nolint:gocyclo // This is having higher complexity due to adding ok check for unmanaged channels. func (d *Driver[V, H, A]) listen(ctx context.Context) error { + defer d.wg.Wait() for { select { case <-ctx.Done(): @@ -124,6 +139,17 @@ func (d *Driver[V, H, A]) listen(ctx context.Context) error { return nil } actions = d.stateMachine.ProcessPrecommit(p) + case p, ok := <-d.syncListener: + if !ok { + return nil + } + + if p.Err != nil { + d.syncCurrentHeight(ctx) + } else { + proposal, precommits := d.messageExtractor.Extract(&p) + actions = d.stateMachine.ProcessSync(&proposal, precommits) + } } isCommitted, err = d.execute(ctx, false, actions) @@ -131,6 +157,8 @@ func (d *Driver[V, H, A]) listen(ctx context.Context) error { return err } } + + d.syncCurrentHeight(ctx) } } @@ -168,24 +196,13 @@ func (d *Driver[V, H, A]) execute( d.broadcasters.PrecommitBroadcaster.Broadcast(ctx, (*types.Precommit[H, A])(action)) case *actions.ScheduleTimeout: - d.setTimeout(ctx, types.Timeout(*action)) + d.scheduleTimeout(ctx, types.Timeout(*action)) case *actions.Commit[V, H, A]: - d.log.Debug( - "Committing", - zap.Uint("height", uint(action.Height)), - zap.Int("round", int(action.Round)), - ) - d.commitListener.OnCommit(ctx, action.Height, *action.Value) - - if err := d.db.DeleteWALEntries(action.Height); err != nil { - return true, fmt.Errorf("failed to delete WAL messages during commit: %w", err) - } - - return true, nil + return true, d.commit(ctx, action) case *actions.TriggerSync: - d.triggerSync(*action) + d.triggerSync(ctx, *action) default: return false, fmt.Errorf("unexpected action type: %T", action) @@ -194,11 +211,7 @@ func (d *Driver[V, H, A]) execute( return false, nil } -func (d *Driver[V, H, A]) triggerSync(sync actions.TriggerSync) { - // TODO: Implement this -} - -func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) { +func (d *Driver[V, H, A]) scheduleTimeout(ctx context.Context, timeout types.Timeout) { d.scheduledTms[timeout] = time.AfterFunc(d.getTimeout(timeout.Step, timeout.Round), func() { select { case <-ctx.Done(): @@ -206,3 +219,59 @@ func (d *Driver[V, H, A]) setTimeout(ctx context.Context, timeout types.Timeout) } }) } + +func (d *Driver[V, H, A]) commit(ctx context.Context, commit *actions.Commit[V, H, A]) error { + d.log.Debug( + "Committing", + zap.Uint("height", uint(commit.Height)), + zap.Int("round", int(commit.Round)), + ) + d.commitListener.OnCommit(ctx, commit.Height, *commit.Value) + + if err := d.db.DeleteWALEntries(commit.Height); err != nil { + return fmt.Errorf("failed to delete WAL messages during commit: %w", err) + } + + return nil +} + +func (d *Driver[V, H, A]) triggerSync(ctx context.Context, triggerSync actions.TriggerSync) { + currentlyHasFutureQuorum := d.hasFutureQuorum() + // TODO: Temporary workaround to only trigger the next height, because the sync process + // doesn't support triggering multiple heights at once. + d.lastQuorum = max(d.lastQuorum, triggerSync.End) + + // Only trigger sync if haven't triggered yet. + if !currentlyHasFutureQuorum { + d.syncCurrentHeight(ctx) + } +} + +// TODO: Temporary workaround to only trigger the next height, because the sync process currently +// doesn't support triggering multiple heights at once. +func (d *Driver[V, H, A]) syncCurrentHeight(ctx context.Context) { + height := d.stateMachine.Height() + + if !d.hasFutureQuorum() { + return + } + + d.wg.Go(func() { + select { + case <-ctx.Done(): + return + default: + } + + if err := d.blockFetcher.ProcessBlock(ctx, uint64(height), d.syncListener); err != nil { + select { + case <-ctx.Done(): + case d.syncListener <- sync.BlockBody{Err: err}: + } + } + }) +} + +func (d *Driver[V, H, A]) hasFutureQuorum() bool { + return d.lastQuorum > d.stateMachine.Height() +} diff --git a/consensus/driver/driver_test.go b/consensus/driver/driver_test.go index 8c928682db..be69051c2f 100644 --- a/consensus/driver/driver_test.go +++ b/consensus/driver/driver_test.go @@ -178,6 +178,8 @@ func TestDriver(t *testing.T) { newMockCommitListener(t, &commitAction), broadcasters, listeners, + nil, // TODO: Add tests for trigger sync + nil, // TODO: Add tests for trigger sync mockTimeoutFn, ) @@ -194,6 +196,7 @@ func TestDriver(t *testing.T) { // timeouts to be scheduled (`toAction`). These timeouts will then be triggered (`ProcessTimeout`). // We force the stateMachine to return a random set of actions (`generateAndRegisterRandomActions`) here just to test that // the driver will actually receive them. + stateMachine.EXPECT().Height().AnyTimes() stateMachine.EXPECT().ProcessStart(types.Round(0)).Return( append(generateAndRegisterRandomActions(random, expectedBroadcast), toAction(inputTimeoutProposal)), ) diff --git a/consensus/mock.go b/consensus/mock.go index 5ba91ed55b..712a955728 100644 --- a/consensus/mock.go +++ b/consensus/mock.go @@ -8,6 +8,7 @@ import ( "github.com/NethermindEth/juno/consensus/driver" "github.com/NethermindEth/juno/consensus/starknet" + consensusSync "github.com/NethermindEth/juno/consensus/sync" "github.com/NethermindEth/juno/consensus/types" "github.com/NethermindEth/juno/consensus/votecounter" "github.com/NethermindEth/juno/core/felt" @@ -15,8 +16,7 @@ import ( ) const ( - timeoutBase = 500 * time.Microsecond - timeoutRoundFactor = 100 * time.Millisecond + timeoutRoundFactor = 10 * time.Millisecond ) type MockServices struct { @@ -31,7 +31,7 @@ func InitMockServices(hiSeed, loSeed uint64, nodeIndex, nodeCount int) MockServi PrivateKey: mockKey(nodeIndex), NodeAddress: mockNodeAddress(nodeIndex), Validators: newMockValidators(hiSeed, loSeed, nodeCount), - TimeoutFn: MockTimeoutFn(nodeCount), + TimeoutFn: mockTimeoutFn(nodeCount), } } @@ -47,7 +47,14 @@ func (n mockValidators) TotalVotingPower(height types.Height) types.VotingPower return types.VotingPower(len(n)) } +// Currently we mock one voting power for all validators, to be removed once +// we can query the voting power from the staking contracts. +// The special case is for precommits from sync protocol, to be removed once +// we can extract precommits from the sync protocol messages. func (n mockValidators) ValidatorVotingPower(height types.Height, addr *starknet.Address) types.VotingPower { + if addr != nil && *addr == consensusSync.SyncProtocolPrecommitSender { + return types.VotingPower(len(n)) + } return types.VotingPower(1) } @@ -61,11 +68,9 @@ func mockNodeAddress(i int) starknet.Address { return felt.FromUint64[starknet.Address](uint64(i)) } -func MockTimeoutFn(nodeCount int) func(types.Step, types.Round) time.Duration { +func mockTimeoutFn(nodeCount int) func(types.Step, types.Round) time.Duration { return func(step types.Step, round types.Round) time.Duration { - // Total number of messages are N^2, so the load is roughly proportional to O(N^2) - // Every round increases the timeout by timeoutRoundFactor. It also guarantees that the timeout will be at least timeoutRoundFactor - delta := time.Duration(nodeCount*nodeCount)*timeoutBase + time.Duration(round+1)*timeoutRoundFactor + delta := (time.Duration(round) + time.Duration(nodeCount)) * timeoutRoundFactor // The formulae follow the lemma in the paper switch step { diff --git a/consensus/mocks/mock_application.go b/consensus/mocks/mock_application.go deleted file mode 100644 index 976b7d30de..0000000000 --- a/consensus/mocks/mock_application.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/NethermindEth/juno/consensus/tendermint (interfaces: Application) -// -// Generated by this command: -// -// mockgen -destination=../mocks/mock_application.go -package=mocks github.com/NethermindEth/juno/consensus/tendermint Application -// - -// Package mocks is a generated GoMock package. -package mocks - -import ( - reflect "reflect" - - types "github.com/NethermindEth/juno/consensus/types" - gomock "go.uber.org/mock/gomock" -) - -// MockApplication is a mock of Application interface. -type MockApplication[V types.Hashable[H], H types.Hash] struct { - ctrl *gomock.Controller - recorder *MockApplicationMockRecorder[V, H] - isgomock struct{} -} - -// MockApplicationMockRecorder is the mock recorder for MockApplication. -type MockApplicationMockRecorder[V types.Hashable[H], H types.Hash] struct { - mock *MockApplication[V, H] -} - -// NewMockApplication creates a new mock instance. -func NewMockApplication[V types.Hashable[H], H types.Hash](ctrl *gomock.Controller) *MockApplication[V, H] { - mock := &MockApplication[V, H]{ctrl: ctrl} - mock.recorder = &MockApplicationMockRecorder[V, H]{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockApplication[V, H]) EXPECT() *MockApplicationMockRecorder[V, H] { - return m.recorder -} - -// Valid mocks base method. -func (m *MockApplication[V, H]) Valid(arg0 V) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Valid", arg0) - ret0, _ := ret[0].(bool) - return ret0 -} - -// Valid indicates an expected call of Valid. -func (mr *MockApplicationMockRecorder[V, H]) Valid(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Valid", reflect.TypeOf((*MockApplication[V, H])(nil).Valid), arg0) -} - -// Value mocks base method. -func (m *MockApplication[V, H]) Value() V { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Value") - ret0, _ := ret[0].(V) - return ret0 -} - -// Value indicates an expected call of Value. -func (mr *MockApplicationMockRecorder[V, H]) Value() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Value", reflect.TypeOf((*MockApplication[V, H])(nil).Value)) -} diff --git a/consensus/mocks/mock_commit_listener.go b/consensus/mocks/mock_commit_listener.go deleted file mode 100644 index 5740bcccc0..0000000000 --- a/consensus/mocks/mock_commit_listener.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/NethermindEth/juno/consensus/driver (interfaces: CommitListener) -// -// Generated by this command: -// -// mockgen -destination=../mocks/mock_commit_listener.go -package=mocks github.com/NethermindEth/juno/consensus/driver CommitListener -// - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - - types "github.com/NethermindEth/juno/consensus/types" - sync "github.com/NethermindEth/juno/sync" - gomock "go.uber.org/mock/gomock" -) - -// MockCommitListener is a mock of CommitListener interface. -type MockCommitListener[V types.Hashable[H], H types.Hash] struct { - ctrl *gomock.Controller - recorder *MockCommitListenerMockRecorder[V, H] - isgomock struct{} -} - -// MockCommitListenerMockRecorder is the mock recorder for MockCommitListener. -type MockCommitListenerMockRecorder[V types.Hashable[H], H types.Hash] struct { - mock *MockCommitListener[V, H] -} - -// NewMockCommitListener creates a new mock instance. -func NewMockCommitListener[V types.Hashable[H], H types.Hash](ctrl *gomock.Controller) *MockCommitListener[V, H] { - mock := &MockCommitListener[V, H]{ctrl: ctrl} - mock.recorder = &MockCommitListenerMockRecorder[V, H]{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockCommitListener[V, H]) EXPECT() *MockCommitListenerMockRecorder[V, H] { - return m.recorder -} - -// Listen mocks base method. -func (m *MockCommitListener[V, H]) Listen() <-chan sync.CommittedBlock { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Listen") - ret0, _ := ret[0].(<-chan sync.CommittedBlock) - return ret0 -} - -// Listen indicates an expected call of Listen. -func (mr *MockCommitListenerMockRecorder[V, H]) Listen() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Listen", reflect.TypeOf((*MockCommitListener[V, H])(nil).Listen)) -} - -// OnCommit mocks base method. -func (m *MockCommitListener[V, H]) OnCommit(arg0 context.Context, arg1 types.Height, arg2 V) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnCommit", arg0, arg1, arg2) -} - -// OnCommit indicates an expected call of OnCommit. -func (mr *MockCommitListenerMockRecorder[V, H]) OnCommit(arg0, arg1, arg2 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnCommit", reflect.TypeOf((*MockCommitListener[V, H])(nil).OnCommit), arg0, arg1, arg2) -} diff --git a/consensus/mocks/mock_state_machine.go b/consensus/mocks/mock_state_machine.go index 64d44d4cd9..e22fe3cb71 100644 --- a/consensus/mocks/mock_state_machine.go +++ b/consensus/mocks/mock_state_machine.go @@ -42,6 +42,20 @@ func (m *MockStateMachine[V, H, A]) EXPECT() *MockStateMachineMockRecorder[V, H, return m.recorder } +// Height mocks base method. +func (m *MockStateMachine[V, H, A]) Height() types.Height { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Height") + ret0, _ := ret[0].(types.Height) + return ret0 +} + +// Height indicates an expected call of Height. +func (mr *MockStateMachineMockRecorder[V, H, A]) Height() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Height", reflect.TypeOf((*MockStateMachine[V, H, A])(nil).Height)) +} + // ProcessPrecommit mocks base method. func (m *MockStateMachine[V, H, A]) ProcessPrecommit(arg0 *types.Precommit[H, A]) []actions.Action[V, H, A] { m.ctrl.T.Helper() @@ -98,6 +112,20 @@ func (mr *MockStateMachineMockRecorder[V, H, A]) ProcessStart(arg0 any) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessStart", reflect.TypeOf((*MockStateMachine[V, H, A])(nil).ProcessStart), arg0) } +// ProcessSync mocks base method. +func (m *MockStateMachine[V, H, A]) ProcessSync(arg0 *types.Proposal[V, H, A], arg1 []types.Precommit[H, A]) []actions.Action[V, H, A] { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProcessSync", arg0, arg1) + ret0, _ := ret[0].([]actions.Action[V, H, A]) + return ret0 +} + +// ProcessSync indicates an expected call of ProcessSync. +func (mr *MockStateMachineMockRecorder[V, H, A]) ProcessSync(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSync", reflect.TypeOf((*MockStateMachine[V, H, A])(nil).ProcessSync), arg0, arg1) +} + // ProcessTimeout mocks base method. func (m *MockStateMachine[V, H, A]) ProcessTimeout(arg0 types.Timeout) []actions.Action[V, H, A] { m.ctrl.T.Helper() diff --git a/consensus/p2p/buffered/proto_broadcaster.go b/consensus/p2p/buffered/proto_broadcaster.go index 955d07c508..9b4b97d720 100644 --- a/consensus/p2p/buffered/proto_broadcaster.go +++ b/consensus/p2p/buffered/proto_broadcaster.go @@ -41,7 +41,6 @@ func (b ProtoBroadcaster[M]) Broadcast(ctx context.Context, msg M) { } func (b ProtoBroadcaster[M]) Loop(ctx context.Context, topic *pubsub.Topic) { - readinessOpt := pubsub.WithReadiness(pubsub.MinTopicSize(1)) var rebroadcasted rebroadcastMessages time.Sleep(pubsub.GossipSubHeartbeatInitialDelay * 2) @@ -58,7 +57,7 @@ func (b ProtoBroadcaster[M]) Loop(ctx context.Context, topic *pubsub.Topic) { } for { - if err := topic.Publish(ctx, msgBytes, readinessOpt); err != nil && !errors.Is(err, context.Canceled) { + if err := topic.Publish(ctx, msgBytes); err != nil && !errors.Is(err, context.Canceled) { b.log.Error("unable to send message", zap.Error(err)) time.Sleep(b.retryInterval) continue @@ -71,7 +70,7 @@ func (b ProtoBroadcaster[M]) Loop(ctx context.Context, topic *pubsub.Topic) { } case <-rebroadcasted.trigger: for msgBytes := range rebroadcasted.messages { - if err := topic.Publish(ctx, msgBytes, readinessOpt); err != nil && !errors.Is(err, context.Canceled) { + if err := topic.Publish(ctx, msgBytes); err != nil && !errors.Is(err, context.Canceled) { b.log.Error("unable to rebroadcast message", zap.Error(err)) } } diff --git a/consensus/p2p/buffered/rebroadcast_strategy.go b/consensus/p2p/buffered/rebroadcast_strategy.go index 4bdc7d1184..13b7bcb016 100644 --- a/consensus/p2p/buffered/rebroadcast_strategy.go +++ b/consensus/p2p/buffered/rebroadcast_strategy.go @@ -36,7 +36,7 @@ func (s *rebroadcastStrategy[M, K]) Receive(msg M, msgBytes []byte) rebroadcastM s.cache[key] = msgBytes return rebroadcastMessages{ - trigger: time.After(s.rebroadcastInterval), + trigger: time.Tick(s.rebroadcastInterval), messages: maps.Values(s.cache), } } diff --git a/consensus/p2p/vote/vote_broadcasters.go b/consensus/p2p/vote/vote_broadcasters.go index dfe36da5f5..af595a8c4b 100644 --- a/consensus/p2p/vote/vote_broadcasters.go +++ b/consensus/p2p/vote/vote_broadcasters.go @@ -25,13 +25,11 @@ func NewVoteBroadcaster[H types.Hash, A types.Addr]( return voteBroadcaster[H, A]{ log: log, voteAdapter: voteAdapter, - ProtoBroadcaster: buffered.NewProtoBroadcaster( + ProtoBroadcaster: buffered.NewProtoBroadcaster[*consensus.Vote]( log, bufferSizeConfig.VoteProtoBroadcaster, bufferSizeConfig.RetryInterval, - buffered.NewRebroadcastStrategy(bufferSizeConfig.RebroadcastInterval, func(msg *consensus.Vote) consensus.Vote_VoteType { - return msg.VoteType - }), + nil, ), } } diff --git a/consensus/sync/mock_p2p_test.go b/consensus/sync/mock_p2p_test.go deleted file mode 100644 index 3af1e0d299..0000000000 --- a/consensus/sync/mock_p2p_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package sync_test - -import ( - "context" - "sync" -) - -type mockListener[M any] struct { - ch chan M -} - -func newMockListener[M any](ch chan M) *mockListener[M] { - return &mockListener[M]{ - ch: ch, - } -} - -func (m *mockListener[M]) Listen() <-chan M { - return m.ch -} - -type mockBroadcaster[M any] struct { - mu sync.Mutex - broadcastedMessages []M -} - -func (m *mockBroadcaster[M]) Broadcast(ctx context.Context, msg M) { - m.mu.Lock() - defer m.mu.Unlock() - m.broadcastedMessages = append(m.broadcastedMessages, msg) -} diff --git a/consensus/sync/node_test.go b/consensus/sync/node_test.go deleted file mode 100644 index c5bec2a638..0000000000 --- a/consensus/sync/node_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package sync_test - -import ( - "math/rand/v2" - - "github.com/NethermindEth/juno/consensus/starknet" - "github.com/NethermindEth/juno/consensus/types" - "github.com/NethermindEth/juno/core/felt" -) - -type nodes []int - -func newNodes(nodeCount int) nodes { - return nodes(rand.Perm(nodeCount)) -} - -func (n nodes) TotalVotingPower(height types.Height) types.VotingPower { - return types.VotingPower(len(n)) -} - -func (n nodes) ValidatorVotingPower(height types.Height, addr *starknet.Address) types.VotingPower { - return types.VotingPower(1) -} - -// Randomised proposer selection, with prime coefficients so that for each height, the order of proposers is different. -func (n nodes) Proposer(height types.Height, round types.Round) starknet.Address { - nodeIndex := (int(height)*17 + int(round)*31) % len(n) - nodeID := n[nodeIndex] - return felt.FromUint64[starknet.Address](uint64(nodeID)) -} diff --git a/consensus/sync/sync.go b/consensus/sync/sync.go index 00f3bb0b6d..57fc0d55d0 100755 --- a/consensus/sync/sync.go +++ b/consensus/sync/sync.go @@ -1,104 +1,106 @@ package sync import ( - "context" + "math" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/builder" "github.com/NethermindEth/juno/consensus/proposal" + "github.com/NethermindEth/juno/consensus/starknet" "github.com/NethermindEth/juno/consensus/types" + "github.com/NethermindEth/juno/consensus/votecounter" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/p2p/sync" ) -const syncRoundPlaceHolder = 0 // Todo: We use this value until the round is added to the spec +// TODO: We use this value until the round is added to the spec +const ValidRoundPlaceholder = -1 -type Sync[V types.Hashable[H], H types.Hash, A types.Addr] struct { - blockListener <-chan sync.BlockBody // sync service to be run separately - driverProposalCh chan<- *types.Proposal[V, H, A] - driverPrecommitCh chan<- *types.Precommit[H, A] - // Todo: for now we can forge the precommit votes of our peers - // In practice, this information needs to be exposed by peers. - getPrecommits func(*sync.BlockBody) []types.Precommit[H, A] +// TODO: Remove this once we can extract precommits from the sync protocol messages. +var SyncProtocolPrecommitSender = felt.FromUint64[starknet.Address](math.MaxUint64) + +type MessageExtractor[V types.Hashable[H], H types.Hash, A types.Addr] struct { + validators votecounter.Validators[A] toValue func(*felt.Felt) V proposalStore *proposal.ProposalStore[H] } func New[V types.Hashable[H], H types.Hash, A types.Addr]( - blockListener <-chan sync.BlockBody, - driverProposalCh chan<- *types.Proposal[V, H, A], - driverPrecommitCh chan<- *types.Precommit[H, A], - getPrecommits func(*sync.BlockBody) []types.Precommit[H, A], + validators votecounter.Validators[A], toValue func(*felt.Felt) V, proposalStore *proposal.ProposalStore[H], -) Sync[V, H, A] { - return Sync[V, H, A]{ - blockListener: blockListener, - driverProposalCh: driverProposalCh, - driverPrecommitCh: driverPrecommitCh, - getPrecommits: getPrecommits, - toValue: toValue, - proposalStore: proposalStore, +) MessageExtractor[V, H, A] { + return MessageExtractor[V, H, A]{ + validators: validators, + toValue: toValue, + proposalStore: proposalStore, } } -func (s *Sync[V, H, A]) Run(ctx context.Context) error { +// TODO: This is a temporary solution to find the round, because currently the specs do not have +// the round. This is dangerous because it's not guaranteed to find the round. +// We need eventually remove this. +func (s *MessageExtractor[V, H, A]) findRound(height types.Height, sender A) types.Round { + var round types.Round for { - select { - case <-ctx.Done(): - return nil - case committedBlock, ok := <-s.blockListener: - if !ok { - return nil - } + if s.validators.Proposer(height, round) == sender { + return round + } + round++ + } +} - msgV := s.toValue(committedBlock.Block.Hash) - msgH := msgV.Hash() - concatCommitments := core.ConcatCounts( - committedBlock.Block.TransactionCount, - committedBlock.Block.EventCount, - committedBlock.StateUpdate.StateDiff.Length(), - committedBlock.Block.L1DAMode, - ) - buildResult := builder.BuildResult{ - Preconfirmed: &core.PreConfirmed{ - Block: committedBlock.Block, - StateUpdate: committedBlock.StateUpdate, - NewClasses: committedBlock.NewClasses, - }, - SimulateResult: &blockchain.SimulateResult{ - BlockCommitments: committedBlock.Commitments, - ConcatCount: concatCommitments, - }, - L2GasConsumed: committedBlock.Block.L2GasConsumed(), - } - s.proposalStore.Store(msgH, &buildResult) +func (s *MessageExtractor[V, H, A]) Extract( + committedBlock *sync.BlockBody, +) (types.Proposal[V, H, A], []types.Precommit[H, A]) { + msgV := s.toValue(committedBlock.Block.Hash) + msgH := msgV.Hash() + concatCommitments := core.ConcatCounts( + committedBlock.Block.TransactionCount, + committedBlock.Block.EventCount, + committedBlock.StateUpdate.StateDiff.Length(), + committedBlock.Block.L1DAMode, + ) + buildResult := builder.BuildResult{ + Preconfirmed: &core.PreConfirmed{ + Block: committedBlock.Block, + StateUpdate: committedBlock.StateUpdate, + NewClasses: committedBlock.NewClasses, + }, + SimulateResult: &blockchain.SimulateResult{ + BlockCommitments: committedBlock.Commitments, + ConcatCount: concatCommitments, + }, + L2GasConsumed: committedBlock.Block.L2GasConsumed(), + } + s.proposalStore.Store(msgH, &buildResult) - precommits := s.getPrecommits(&committedBlock) - for _, precommit := range precommits { - select { - case <-ctx.Done(): - return nil - case s.driverPrecommitCh <- &precommit: - } - } + round := s.findRound( + types.Height(committedBlock.Block.Number), + A(*committedBlock.Block.SequencerAddress), + ) - proposal := types.Proposal[V, H, A]{ - MessageHeader: types.MessageHeader[A]{ - Height: types.Height(committedBlock.Block.Number), - Round: syncRoundPlaceHolder, - Sender: A(*committedBlock.Block.SequencerAddress), - }, - ValidRound: -1, - Value: &msgV, - } + proposal := types.Proposal[V, H, A]{ + MessageHeader: types.MessageHeader[A]{ + Height: types.Height(committedBlock.Block.Number), + Round: round, + Sender: A(*committedBlock.Block.SequencerAddress), + }, + ValidRound: ValidRoundPlaceholder, + Value: &msgV, + } - select { - case <-ctx.Done(): - return nil - case s.driverProposalCh <- &proposal: - } - } + precommits := []types.Precommit[H, A]{ + { + MessageHeader: types.MessageHeader[A]{ + Height: types.Height(committedBlock.Block.Number), + Round: round, + Sender: A(SyncProtocolPrecommitSender), + }, + ID: &msgH, + }, } + + return proposal, precommits } diff --git a/consensus/sync/sync_test.go b/consensus/sync/sync_test.go index 84904196b7..ca953e17fd 100755 --- a/consensus/sync/sync_test.go +++ b/consensus/sync/sync_test.go @@ -1,168 +1,169 @@ package sync_test import ( - "context" + "math/rand/v2" "testing" "time" - "github.com/NethermindEth/juno/consensus/db" - "github.com/NethermindEth/juno/consensus/driver" - "github.com/NethermindEth/juno/consensus/mocks" - "github.com/NethermindEth/juno/consensus/p2p" + "github.com/NethermindEth/juno/consensus" "github.com/NethermindEth/juno/consensus/proposal" "github.com/NethermindEth/juno/consensus/starknet" consensusSync "github.com/NethermindEth/juno/consensus/sync" "github.com/NethermindEth/juno/consensus/tendermint" "github.com/NethermindEth/juno/consensus/types" + "github.com/NethermindEth/juno/consensus/types/actions" + "github.com/NethermindEth/juno/consensus/votecounter" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" - "github.com/NethermindEth/juno/db/pebblev2" "github.com/NethermindEth/juno/p2p/sync" "github.com/NethermindEth/juno/utils" + "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" ) -var ( - comittedHeight = -1 - blockID = uint64(9) +const ( + nodeCount = 4 + nodeIndex = nodeCount + 1 // This is to guarantee that this node is not the proposer ) -type ( - listeners = p2p.Listeners[starknet.Value, starknet.Hash, starknet.Address] - broadcasters = p2p.Broadcasters[starknet.Value, starknet.Hash, starknet.Address] -) - -func mockTimeoutFn(step types.Step, round types.Round) time.Duration { - return 10 * time.Second +type app struct { + cur uint64 } -func TestSync(t *testing.T) { - ctrl := gomock.NewController(t) - - ctx, cancel := context.WithCancel(t.Context()) - - nodeAddr := felt.FromUint64[starknet.Address](123) - logger := utils.NewNopZapLogger() +func (a *app) Value() starknet.Value { + a.cur = (a.cur + 1) % 100 + return felt.FromUint64[starknet.Value](a.cur) +} - dbPath := t.TempDir() - testDB, err := pebblev2.New(dbPath) - require.NoError(t, err) +func (a *app) Valid(v starknet.Value) bool { + return true +} - tmDB := db.NewTendermintDB[starknet.Value, starknet.Hash, starknet.Address](testDB) - require.NotNil(t, tmDB) +func TestMessageExtractor(t *testing.T) { + allNodes := consensus.InitMockServices(0, 0, nodeIndex, nodeCount) proposalStore := proposal.ProposalStore[starknet.Hash]{} - allNodes := newNodes(4) - mockApp := mocks.NewMockApplication[starknet.Value, starknet.Hash](ctrl) - mockApp.EXPECT().Valid(gomock.Any()).AnyTimes().Return(true) - mockCommitListener := mocks.NewMockCommitListener[starknet.Value, starknet.Hash](ctrl) - - mockCommitListener.EXPECT(). - OnCommit(gomock.Any(), types.Height(0), gomock.Any()). - Do(func(_ any, newHeight types.Height, _ any) { - comittedHeight = int(newHeight) - cancel() - }) - - stateMachine := tendermint.New(logger, nodeAddr, mockApp, allNodes, types.Height(0)) - - proposalCh := make(chan *starknet.Proposal) - prevoteCh := make(chan *starknet.Prevote) - precommitCh := make(chan *starknet.Precommit) - - listeners := listeners{ - ProposalListener: newMockListener(proposalCh), - PrevoteListener: newMockListener(prevoteCh), - PrecommitListener: newMockListener(precommitCh), - } - broadcasters := broadcasters{ - ProposalBroadcaster: &mockBroadcaster[*starknet.Proposal]{}, - PrevoteBroadcaster: &mockBroadcaster[*starknet.Prevote]{}, - PrecommitBroadcaster: &mockBroadcaster[*starknet.Precommit]{}, - } - - driver := driver.New( - utils.NewNopZapLogger(), - tmDB, - stateMachine, - mockCommitListener, - broadcasters, - listeners, - mockTimeoutFn, + messageExtractor := consensusSync.New[starknet.Value, starknet.Hash, starknet.Address]( + allNodes.Validators, + toValue, + &proposalStore, ) - go func() { - require.NoError(t, driver.Run(ctx)) - }() - - mockInCh := make(chan sync.BlockBody) - - consensusSyncService := consensusSync.New(mockInCh, proposalCh, precommitCh, getPrecommits, toValue, &proposalStore) - - block0 := getCommittedBlock(allNodes) - block0Hash := block0.Block.Hash - valueHash := toValue(block0Hash).Hash() - go func() { - // P2P Sync service inserts block into mockInCh - mockInCh <- block0 - }() - - require.NoError(t, consensusSyncService.Run(ctx)) - require.NotEmpty(t, proposalStore.Get(valueHash)) // Ensure the Driver sees the correct proposal - require.NotEqual(t, comittedHeight, -1, "expected a block to be committed") + blockBody, expectedProposal, expectedPrecommits := getTestData(allNodes.Validators) + actualProposal, actualPrecommits := messageExtractor.Extract(&blockBody) + + t.Run(("Verify stored build result"), func(t *testing.T) { + buildResult := proposalStore.Get(actualProposal.Value.Hash()) + require.NotEmpty(t, buildResult) + require.Equal(t, buildResult.Preconfirmed.Block, blockBody.Block) + require.Equal(t, buildResult.Preconfirmed.StateUpdate, blockBody.StateUpdate) + require.Equal(t, buildResult.Preconfirmed.NewClasses, blockBody.NewClasses) + require.Equal(t, buildResult.SimulateResult.BlockCommitments, blockBody.Commitments) + require.Equal(t, buildResult.L2GasConsumed, blockBody.Block.L2GasConsumed()) + }) + + t.Run(("Verify proposal"), func(t *testing.T) { + require.Equal(t, expectedProposal, actualProposal) + }) + + t.Run(("Verify precommits"), func(t *testing.T) { + require.Equal(t, expectedPrecommits, actualPrecommits) + }) + + t.Run(("State machine should be able to commit"), func(t *testing.T) { + logger := utils.NewNopZapLogger() + nodeAddr := felt.FromUint64[starknet.Address](uint64(nodeCount) + 1) + + stateMachine := tendermint.New[starknet.Value, starknet.Hash, starknet.Address]( + logger, + nodeAddr, + &app{}, + allNodes.Validators, + actualProposal.Height, + ) + stateMachine.ProcessStart(0) + + resultActions := stateMachine.ProcessSync(&actualProposal, actualPrecommits) + expectedCommit := actions.Commit[starknet.Value, starknet.Hash, starknet.Address]( + expectedProposal, + ) + require.Contains(t, resultActions, &expectedCommit) + }) } -func getCommittedBlock(allNodes nodes) sync.BlockBody { - proposerAddr := allNodes.Proposer(types.Height(0), types.Round(0)) - return sync.BlockBody{ +func getTestData( + validators votecounter.Validators[starknet.Address], +) (sync.BlockBody, starknet.Proposal, []starknet.Precommit) { + height := types.Height(rand.Uint64N(1000000)) + round := types.Round(rand.IntN(nodeCount)) + blockHash := felt.Random[felt.Felt]() + proposerAddr := validators.Proposer(height, round) + blockBody := sync.BlockBody{ Block: &core.Block{ Header: &core.Header{ - Hash: felt.NewFromUint64[felt.Felt](blockID), - TransactionCount: 2, - EventCount: 3, + Hash: &blockHash, + ParentHash: felt.NewRandom[felt.Felt](), + Number: uint64(height), + GlobalStateRoot: felt.NewRandom[felt.Felt](), SequencerAddress: (*felt.Felt)(&proposerAddr), - Number: 0, + TransactionCount: rand.Uint64N(100), + EventCount: rand.Uint64N(1000), + Timestamp: uint64(time.Now().Unix()), + ProtocolVersion: "0.14.0", + EventsBloom: &bloom.BloomFilter{}, + L1GasPriceETH: felt.NewRandom[felt.Felt](), + Signatures: make([][]*felt.Felt, 100), + L1GasPriceSTRK: felt.NewRandom[felt.Felt](), + L1DAMode: core.Blob, + L1DataGasPrice: &core.GasPrice{ + PriceInWei: felt.NewRandom[felt.Felt](), + PriceInFri: felt.NewRandom[felt.Felt](), + }, + L2GasPrice: &core.GasPrice{ + PriceInWei: felt.NewRandom[felt.Felt](), + PriceInFri: felt.NewRandom[felt.Felt](), + }, }, }, StateUpdate: &core.StateUpdate{ - StateDiff: &core.StateDiff{}, + BlockHash: &blockHash, + NewRoot: felt.NewRandom[felt.Felt](), + OldRoot: felt.NewRandom[felt.Felt](), + StateDiff: utils.HeapPtr(core.EmptyStateDiff()), + }, + NewClasses: make(map[felt.Felt]core.ClassDefinition), + Commitments: &core.BlockCommitments{ + TransactionCommitment: felt.NewRandom[felt.Felt](), + EventCommitment: felt.NewRandom[felt.Felt](), + ReceiptCommitment: felt.NewRandom[felt.Felt](), + StateDiffCommitment: felt.NewRandom[felt.Felt](), }, - NewClasses: make(map[felt.Felt]core.ClassDefinition), - Commitments: &core.BlockCommitments{}, } -} -func toValue(in *felt.Felt) starknet.Value { - return starknet.Value(*in) -} - -func getPrecommits(*sync.BlockBody) []starknet.Precommit { - blockID := felt.FromUint64[starknet.Hash](blockID) - return []starknet.Precommit{ - { - MessageHeader: types.MessageHeader[starknet.Address]{ - Height: types.Height(0), - Round: types.Round(0), - Sender: felt.FromUint64[starknet.Address](1), - }, - ID: &blockID, - }, - { - MessageHeader: types.MessageHeader[starknet.Address]{ - Height: types.Height(0), - Round: types.Round(0), - Sender: felt.FromUint64[starknet.Address](2), - }, - ID: &blockID, + proposal := starknet.Proposal{ + MessageHeader: types.MessageHeader[starknet.Address]{ + Height: height, + Round: round, + Sender: proposerAddr, }, + ValidRound: consensusSync.ValidRoundPlaceholder, + Value: (*starknet.Value)(&blockHash), + } + + precommits := []starknet.Precommit{ { MessageHeader: types.MessageHeader[starknet.Address]{ - Height: types.Height(0), - Round: types.Round(0), - Sender: felt.FromUint64[starknet.Address](3), + Height: height, + Round: round, + Sender: consensusSync.SyncProtocolPrecommitSender, }, - ID: &blockID, + ID: (*starknet.Hash)(&blockHash), }, } + + return blockBody, proposal, precommits +} + +func toValue(in *felt.Felt) starknet.Value { + return starknet.Value(*in) } diff --git a/consensus/tendermint/process.go b/consensus/tendermint/process.go index 16bd84248d..ff9ae4e62e 100644 --- a/consensus/tendermint/process.go +++ b/consensus/tendermint/process.go @@ -48,6 +48,17 @@ func (s *stateMachine[V, H, A]) ProcessPrecommit(p *types.Precommit[H, A]) []act return s.processMessage(p, (*wal.Precommit[H, A])(p)) } +func (s *stateMachine[V, H, A]) ProcessSync( + proposal *types.Proposal[V, H, A], + precommits []types.Precommit[H, A], +) []actions.Action[V, H, A] { + actions := s.ProcessProposal(proposal) + for _, precommit := range precommits { + actions = append(actions, s.ProcessPrecommit(&precommit)...) + } + return actions +} + func (s *stateMachine[V, H, A]) hasFuturePrecommitQuorum(p *types.Precommit[H, A]) bool { return p.ID != nil && p.Height > s.state.height && @@ -56,18 +67,14 @@ func (s *stateMachine[V, H, A]) hasFuturePrecommitQuorum(p *types.Precommit[H, A } func (s *stateMachine[V, H, A]) triggerSync(p *types.Precommit[H, A]) []actions.Action[V, H, A] { - syncStart := max(s.lastTriggerSync+1, s.state.height) - s.lastTriggerSync = p.Height - - return []actions.Action[V, H, A]{ - &actions.WriteWAL[V, H, A]{ - Entry: (*wal.Precommit[H, A])(p), - }, - &actions.TriggerSync{ - Start: syncStart, - End: p.Height, - }, + s.lastQuorum = max(s.lastQuorum, p.Height) + triggerSync := actions.TriggerSync{ + Start: max(s.lastTriggerSync+1, s.state.height), + End: s.lastQuorum, } + s.lastTriggerSync = triggerSync.End + + return []actions.Action[V, H, A]{&triggerSync} } func (s *stateMachine[V, H, A]) processMessage( diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index 70a1c839fc..b68dda28af 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -5,10 +5,11 @@ import ( "github.com/NethermindEth/juno/consensus/types/actions" "github.com/NethermindEth/juno/consensus/types/wal" "github.com/NethermindEth/juno/consensus/votecounter" + "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/utils" + "go.uber.org/zap" ) -//go:generate mockgen -destination=../mocks/mock_application.go -package=mocks github.com/NethermindEth/juno/consensus/tendermint Application type Application[V types.Hashable[H], H types.Hash] interface { // Value returns the value to the Tendermint consensus algorithm which can be proposed to other validators. Value() V @@ -25,12 +26,14 @@ type Slasher[M types.Message[V, H, A], V types.Hashable[H], H types.Hash, A type //go:generate mockgen -destination=../mocks/mock_state_machine.go -package=mocks github.com/NethermindEth/juno/consensus/tendermint StateMachine type StateMachine[V types.Hashable[H], H types.Hash, A types.Addr] interface { + Height() types.Height ProcessStart(types.Round) []actions.Action[V, H, A] ProcessTimeout(types.Timeout) []actions.Action[V, H, A] ProcessProposal(*types.Proposal[V, H, A]) []actions.Action[V, H, A] ProcessPrevote(*types.Prevote[H, A]) []actions.Action[V, H, A] ProcessPrecommit(*types.Precommit[H, A]) []actions.Action[V, H, A] ProcessWAL(wal.Entry[V, H, A]) []actions.Action[V, H, A] + ProcessSync(*types.Proposal[V, H, A], []types.Precommit[H, A]) []actions.Action[V, H, A] } type stateMachine[V types.Hashable[H], H types.Hash, A types.Addr] struct { @@ -41,6 +44,7 @@ type stateMachine[V types.Hashable[H], H types.Hash, A types.Addr] struct { application Application[V, H] isHeightStarted bool lastTriggerSync types.Height + lastQuorum types.Height } type state[V types.Hashable[H], H types.Hash] struct { @@ -79,6 +83,10 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr]( } } +func (s *stateMachine[V, H, A]) Height() types.Height { + return s.state.height +} + type CachedProposal[V types.Hashable[H], H types.Hash, A types.Addr] struct { types.Proposal[V, H, A] Valid bool @@ -97,6 +105,16 @@ func (s *stateMachine[V, H, A]) resetState(round types.Round) { func (s *stateMachine[V, H, A]) startRound(r types.Round) actions.Action[V, H, A] { s.resetState(r) + if r > 0 { + proposer := felt.Felt(s.voteCounter.Proposer(r - 1)) + s.log.Debug( + "Failed round", + zap.Uint("height", uint(s.state.height)), + zap.Int("round", int(r-1)), + zap.Stringer("proposer", &proposer), + ) + } + if p := s.voteCounter.Proposer(r); p == s.nodeAddr { var proposalValue *V if s.state.validValue != nil { diff --git a/p2p/pubsub/pubsub.go b/p2p/pubsub/pubsub.go index 9e6f221ae0..206b0e4d4f 100644 --- a/p2p/pubsub/pubsub.go +++ b/p2p/pubsub/pubsub.go @@ -55,6 +55,7 @@ func Run( pubSub, err := pubsub.NewGossipSub( ctx, host, + pubsub.WithFloodPublish(true), pubsub.WithPeerOutboundQueueSize(pubSubQueueSize), pubsub.WithValidateQueueSize(pubSubQueueSize), pubsub.WithDiscovery(routing.NewRoutingDiscovery(dht)), diff --git a/p2p/pubsub/testutils/adjacent_nodes.go b/p2p/pubsub/testutils/adjacent_nodes.go index 7d147b9916..6b32d3c281 100644 --- a/p2p/pubsub/testutils/adjacent_nodes.go +++ b/p2p/pubsub/testutils/adjacent_nodes.go @@ -2,6 +2,10 @@ package testutils import "math/rand" +// Because the bootstrap peers process only connect to up to 2 nodes +// See https://github.com/libp2p/go-libp2p-kad-dht/blob/v0.35.1/dht.go#L530-L541 +const maxBootstrappers = 2 + // AdjacentNodes is a slice of N sets, each representing the nodes adjacent to the node at the index. type AdjacentNodes []map[int]struct{} @@ -38,45 +42,85 @@ func (a AdjacentNodes) ConnectRandomDirection(from, to int) bool { return true } +func (a AdjacentNodes) Validate() bool { + connected := make([]map[int]struct{}, len(a)) + for from := range a { + connected[from] = make(map[int]struct{}) + } + + for from, edges := range a { + // Ignore all outgoing edges to nodes that have more than maxBootstrappers bootstrap peers. + if len(edges) > maxBootstrappers { + continue + } + + for to := range edges { + connected[from][to] = struct{}{} + connected[to][from] = struct{}{} + } + } + + // Simple BFS to check if all nodes are connected + visited := map[int]struct{}{0: {}} + queue := []int{0} + for len(queue) > 0 { + from := queue[0] + queue = queue[1:] + for to := range connected[from] { + if _, ok := visited[to]; !ok { + visited[to] = struct{}{} + queue = append(queue, to) + } + } + } + + return len(visited) == len(a) +} + // NetworkConfigFn is a function that returns an AdjacentNodes for a given number of nodes. type NetworkConfigFn func(int) AdjacentNodes -func LineNetworkConfig(n int) AdjacentNodes { - adjacentNodes := NewAdjacentNodes(n) +func newNetworkConfigFn(f func(AdjacentNodes)) NetworkConfigFn { + return func(n int) AdjacentNodes { + for { + adjacentNodes := NewAdjacentNodes(n) + f(adjacentNodes) + if adjacentNodes.Validate() { + return adjacentNodes + } + } + } +} +var LineNetworkConfig = newNetworkConfigFn(func(adjacentNodes AdjacentNodes) { + n := len(adjacentNodes) for i := 0; i+1 < n; i++ { adjacentNodes.Connect(i, i+1) } - - return adjacentNodes -} +}) //nolint:gosec // The whole package is for testing purpose only, so it's safe to use weak random. -func SmallWorldNetworkConfig(n int) AdjacentNodes { - adjacentNodes := NewAdjacentNodes(n) +var SmallWorldNetworkConfig = newNetworkConfigFn(func(adjacentNodes AdjacentNodes) { + const ( + rateOfNeighborConnections = 0.2 + rateOfRandomConnections = 0.2 + ) - rateOfNeighborConnections := 0.25 + n := len(adjacentNodes) for i := range n { // Connect to the next node adjacentNodes.ConnectRandomDirection(i, (i+1)%n) - // 50% chance to connect to a node 2-5 hops away - for distance := 2; distance < 6; distance++ { + // 20% chance to connect to a node 2-3 hops away + for distance := 2; distance < 4; distance++ { if rand.Float64() < rateOfNeighborConnections { adjacentNodes.ConnectRandomDirection(i, (i+distance)%n) } } - } - for i := range n { - // Connect to 2 random nodes - for range 2 { - randomNode := rand.Intn(n) - for !adjacentNodes.ConnectRandomDirection(i, randomNode) { - randomNode = rand.Intn(n) - } + // 20% chance to connect to a random node + if rand.Float64() < rateOfRandomConnections { + adjacentNodes.ConnectRandomDirection(i, rand.Intn(n)) } } - - return adjacentNodes -} +}) diff --git a/p2p/pubsub/testutils/pubsub.go b/p2p/pubsub/testutils/pubsub.go index 9580aa5772..faab2fe874 100644 --- a/p2p/pubsub/testutils/pubsub.go +++ b/p2p/pubsub/testutils/pubsub.go @@ -3,6 +3,7 @@ package testutils import ( "bytes" "crypto/ed25519" + "sync" "testing" "github.com/NethermindEth/juno/consensus/p2p/config" @@ -13,7 +14,6 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/sourcegraph/conc" "github.com/sourcegraph/conc/iter" "github.com/stretchr/testify/require" ) @@ -32,7 +32,7 @@ func BuildNetworks( adjacentNodes AdjacentNodes, ) Nodes { nodes := make([]Node, len(adjacentNodes)) - wg := conc.NewWaitGroup() + wg := sync.WaitGroup{} for i := range nodes { wg.Go(func() { var err error @@ -45,7 +45,17 @@ func BuildNetworks( } wg.Wait() - wg = conc.NewWaitGroup() + // This is a workaround to make sure that all `GetBootstrapPeers` are executed + // concurrently once, which ensures that all nodes use the bootstrap peers. + onces := make([]sync.Once, len(nodes)) + ready := make(chan struct{}) + bootstrapWg := sync.WaitGroup{} + bootstrapWg.Add(len(nodes)) + go func() { + bootstrapWg.Wait() + close(ready) + }() + for i := range nodes { wg.Go(func() { peers := make([]peer.AddrInfo, 0, len(adjacentNodes[i])) @@ -56,6 +66,10 @@ func BuildNetworks( }) } nodes[i].GetBootstrapPeers = func() []peer.AddrInfo { + onces[i].Do(func() { + bootstrapWg.Done() + }) + <-ready return peers } }) From d4c3844035cf451dbf03dab7c20d14059d8f7075 Mon Sep 17 00:00:00 2001 From: infrmtcs Date: Wed, 16 Jul 2025 10:18:35 +0700 Subject: [PATCH 2/2] feat: Consensus initialization --- .dockerignore | 7 +- cmd/juno/juno.go | 66 +++++++++++---- consensus-tests/Dockerfile.shooter | 3 + consensus-tests/README.md | 43 ++++++++++ consensus-tests/all.sh | 20 +++++ consensus-tests/docker-compose.yaml | 100 ++++++++++++++++++++++ consensus-tests/setup.sh | 36 ++++++++ consensus-tests/shoot.sh | 35 ++++++++ node/node.go | 127 ++++++++++++++++++++++++---- 9 files changed, 404 insertions(+), 33 deletions(-) create mode 100644 consensus-tests/Dockerfile.shooter create mode 100644 consensus-tests/README.md create mode 100755 consensus-tests/all.sh create mode 100644 consensus-tests/docker-compose.yaml create mode 100755 consensus-tests/setup.sh create mode 100755 consensus-tests/shoot.sh diff --git a/.dockerignore b/.dockerignore index 26be94bb80..23f10f027d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -23,4 +23,9 @@ CODE_OF_CONDUCT.md CONTRIBUTING.md # Scripts -scripts/ \ No newline at end of file +scripts/ + +.devbox/ +.plugins/ +tmp/ +snapshots/ diff --git a/cmd/juno/juno.go b/cmd/juno/juno.go index c1e0eb389f..c8ddb9d1bf 100644 --- a/cmd/juno/juno.go +++ b/cmd/juno/juno.go @@ -62,6 +62,15 @@ const ( p2pPeersF = "p2p-peers" p2pFeederNodeF = "p2p-feeder-node" p2pPrivateKey = "p2p-private-key" + consensusF = "consensus" + consensusAddrF = "consensus-addr" + consensusPeersF = "consensus-peers" + consensusDBPathF = "consensus-db-path" + consensusMockIndexF = "consensus-mock-index" // TODO: remove this + consensusMockCountF = "consensus-mock-count" // TODO: remove this + mempoolF = "mempool" + mempoolAddrF = "mempool-addr" + mempoolPeersF = "mempool-peers" metricsF = "metrics" metricsHostF = "metrics-host" metricsPortF = "metrics-port" @@ -123,6 +132,15 @@ const ( defaultP2pPeers = "" defaultP2pFeederNode = false defaultP2pPrivateKey = "" + defaultConsensus = false + defaultConsensusAddr = "" + defaultConsensusPeers = "" + defaultConsensusDBPath = "" + defaultConsensusMockIndex = 0 // TODO: remove this + defaultConsensusMockCount = 0 // TODO: remove this + defaultMempool = false + defaultMempoolAddr = "" + defaultMempoolPeers = "" defaultMetrics = false defaultMetricsPort = 9090 defaultGRPC = false @@ -194,21 +212,30 @@ const ( "These peers can be either Feeder or regular nodes." p2pFeederNodeUsage = "EXPERIMENTAL: Run juno as a feeder node which will only sync from feeder gateway and gossip the new" + " blocks to the network." - p2pPrivateKeyUsage = "EXPERIMENTAL: Hexadecimal representation of a private key on the Ed25519 elliptic curve." - metricsUsage = "Enables the Prometheus metrics endpoint on the default port." - metricsHostUsage = "The interface on which the Prometheus endpoint will listen for requests." - metricsPortUsage = "The port on which the Prometheus endpoint will listen for requests." - grpcUsage = "Enable the HTTP gRPC server on the default port." - grpcHostUsage = "The interface on which the gRPC server will listen for requests." - grpcPortUsage = "The port on which the gRPC server will listen for requests." - maxVMsUsage = "Maximum number for VM instances to be used for RPC calls concurrently" - maxVMQueueUsage = "Maximum number for requests to queue after reaching max-vms before starting to reject incoming requests" - remoteDBUsage = "gRPC URL of a remote Juno node" - rpcMaxBlockScanUsage = "Maximum number of blocks scanned in single starknet_getEvents call" - dbCacheSizeUsage = "Determines the amount of memory (in megabytes) allocated for caching data in the database." - dbMaxHandlesUsage = "A soft limit on the number of open files that can be used by the DB" - gwAPIKeyUsage = "API key for gateway endpoints to avoid throttling" //nolint: gosec - gwTimeoutsUsage = "Timeouts for requests made to the gateway. Can be specified in three ways:\n" + + p2pPrivateKeyUsage = "EXPERIMENTAL: Hexadecimal representation of a private key on the Ed25519 elliptic curve." + consensusUsage = "EXPERIMENTAL: Enables the consensus server." + consensusAddrUsage = "EXPERIMENTAL: Specify the address of the consensus server." + consensusPeersUsage = "EXPERIMENTAL: Specify list of consensus peers split by a comma." + consensusDBPathUsage = "EXPERIMENTAL: Specify the path to the consensus database." + consensusMockIndexUsage = "EXPERIMENTAL: Specify the index of the mock consensus server to use." // TODO: remove this + consensusMockCountUsage = "EXPERIMENTAL: Specify the number of mock consensus servers to use." // TODO: remove this + mempoolUsage = "EXPERIMENTAL: Enables the mempool server." + mempoolAddrUsage = "EXPERIMENTAL: Specify the address of the mempool server." + mempoolPeersUsage = "EXPERIMENTAL: Specify list of mempool peers split by a comma." + metricsUsage = "Enables the Prometheus metrics endpoint on the default port." + metricsHostUsage = "The interface on which the Prometheus endpoint will listen for requests." + metricsPortUsage = "The port on which the Prometheus endpoint will listen for requests." + grpcUsage = "Enable the HTTP gRPC server on the default port." + grpcHostUsage = "The interface on which the gRPC server will listen for requests." + grpcPortUsage = "The port on which the gRPC server will listen for requests." + maxVMsUsage = "Maximum number for VM instances to be used for RPC calls concurrently" + maxVMQueueUsage = "Maximum number for requests to queue after reaching max-vms before starting to reject incoming requests" + remoteDBUsage = "gRPC URL of a remote Juno node" + rpcMaxBlockScanUsage = "Maximum number of blocks scanned in single starknet_getEvents call" + dbCacheSizeUsage = "Determines the amount of memory (in megabytes) allocated for caching data in the database." + dbMaxHandlesUsage = "A soft limit on the number of open files that can be used by the DB" + gwAPIKeyUsage = "API key for gateway endpoints to avoid throttling" //nolint: gosec + gwTimeoutsUsage = "Timeouts for requests made to the gateway. Can be specified in three ways:\n" + "- Single value (e.g. '5s'): After each failure, the timeout will increase dynamically.\n" + "- Comma-separated list (e.g. '5s,10s,20s'): Each value will be used in sequence after failures.\n" + "- Single value with trailing comma (e.g. '5s,'): Uses a fixed timeout without dynamic adjustment." @@ -411,6 +438,15 @@ func NewCmd(config *node.Config, run func(*cobra.Command, []string) error) *cobr junoCmd.Flags().String(p2pPeersF, defaultP2pPeers, p2pPeersUsage) junoCmd.Flags().Bool(p2pFeederNodeF, defaultP2pFeederNode, p2pFeederNodeUsage) junoCmd.Flags().String(p2pPrivateKey, defaultP2pPrivateKey, p2pPrivateKeyUsage) + junoCmd.Flags().Bool(consensusF, defaultConsensus, consensusUsage) + junoCmd.Flags().String(consensusAddrF, defaultConsensusAddr, consensusAddrUsage) + junoCmd.Flags().String(consensusPeersF, defaultConsensusPeers, consensusPeersUsage) + junoCmd.Flags().String(consensusDBPathF, defaultConsensusDBPath, consensusDBPathUsage) + junoCmd.Flags().Int(consensusMockIndexF, defaultConsensusMockIndex, consensusMockIndexUsage) // TODO: remove this + junoCmd.Flags().Int(consensusMockCountF, defaultConsensusMockCount, consensusMockCountUsage) // TODO: remove this + junoCmd.Flags().Bool(mempoolF, defaultMempool, mempoolUsage) + junoCmd.Flags().String(mempoolAddrF, defaultMempoolAddr, mempoolAddrUsage) + junoCmd.Flags().String(mempoolPeersF, defaultMempoolPeers, mempoolPeersUsage) junoCmd.Flags().Bool(metricsF, defaultMetrics, metricsUsage) junoCmd.Flags().String(metricsHostF, defaultHost, metricsHostUsage) junoCmd.Flags().Uint16(metricsPortF, defaultMetricsPort, metricsPortUsage) diff --git a/consensus-tests/Dockerfile.shooter b/consensus-tests/Dockerfile.shooter new file mode 100644 index 0000000000..f31e4eecf7 --- /dev/null +++ b/consensus-tests/Dockerfile.shooter @@ -0,0 +1,3 @@ +FROM starknet/starkli:0.4.1 + +RUN apk add --no-cache bash curl jq diff --git a/consensus-tests/README.md b/consensus-tests/README.md new file mode 100644 index 0000000000..1332faca30 --- /dev/null +++ b/consensus-tests/README.md @@ -0,0 +1,43 @@ +# Juno Consensus Test + +This is an experimental test setup for running 4 Juno nodes (0-3) with Tendermint consensus algorithm to observe network performance under load. + +**Node topology**: Node 1 → Node 0, Node 2 → Node 1, Node 3 → Node 2 + +## Test Flow + +1. **Node Setup**: 4 Juno nodes are initialized and connected in sequence using Docker containers +2. **Account Setup**: A setup service creates `SHOOTER_COUNT` accounts, each funded with ETH and STRK tokens +3. **Load Testing**: `SHOOTER_COUNT` shooter instances each send 1,000 transactions concurrently, load balanced across all 4 nodes + +## Prerequisites + +Docker and Docker Compose installed. + +## Usage + +Set the compose file location: +```bash +export COMPOSE_FILE=consensus-tests/docker-compose.yaml +``` + +Run the test: +```bash +docker compose up -d --build +``` + +View logs: +```bash +docker compose logs -f +``` + +Stop and cleanup: +```bash +docker compose down -v +``` + +## Configuration + +- `SHOOTER_COUNT`: Number of shooter instances and funded accounts to create (default: 4) + - Total transactions = `SHOOTER_COUNT × 1,000` + - Adjust based on your system resources \ No newline at end of file diff --git a/consensus-tests/all.sh b/consensus-tests/all.sh new file mode 100755 index 0000000000..31bd024b54 --- /dev/null +++ b/consensus-tests/all.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +DATA_DIR=${DATA_DIR:-"consensus-tests"} +export STARKNET_RPC="http://node0:6060" + +sleep 2 # TODO: Figure out why we need to wait + +shooters="$1" + +root_address="0x406a8f52e741619b17410fc90774e4b36f968e1a71ae06baacfe1f55d987923" +export ROOT_PRIVATE_KEY="0x3a4791edf67fa0b32b812e41bc8bc4e9b79915412b1331f7669cbe23e97e15a" + +until starkli account fetch "$root_address" --force --output "$DATA_DIR/root.json"; do + echo "Failed to fetch root account, retrying..." + sleep 1 +done + +for ((i = 1; i <= shooters; i++)); do + consensus-tests/setup.sh "$i" +done diff --git a/consensus-tests/docker-compose.yaml b/consensus-tests/docker-compose.yaml new file mode 100644 index 0000000000..51efc1e30a --- /dev/null +++ b/consensus-tests/docker-compose.yaml @@ -0,0 +1,100 @@ +x-node: &node + image: nethermind/juno:consensus + command: [ + "--http", + "--http-host", "0.0.0.0", + "--network", "sepolia", + "--log-level", "debug", + "--disable-l1-verification", + "--db-path", "/db/main", + "--seq-genesis-file", "/genesis/genesis_prefund_accounts.json", + "--consensus", + "--consensus-db-path", "/db/consensus", + "--consensus-addr", "/ip4/0.0.0.0/tcp/7070", + "--consensus-mock-count", "4", + "--mempool", + "--mempool-addr", "/ip4/0.0.0.0/tcp/7071", + ] + configs: + - genesis + +services: + node0: + <<: *node + ports: + - "6060:6060" + environment: + JUNO_CONSENSUS_MOCK_INDEX: 0 + JUNO_CONSENSUS_PEERS: + JUNO_MEMPOOL_PEERS: + + node1: + <<: *node + ports: + - "6061:6060" + environment: + JUNO_CONSENSUS_MOCK_INDEX: 1 + JUNO_CONSENSUS_PEERS: /dns4/node0/tcp/7070/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN + JUNO_MEMPOOL_PEERS: /dns4/node0/tcp/7071/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN + + node2: + <<: *node + ports: + - "6062:6060" + environment: + JUNO_CONSENSUS_MOCK_INDEX: 2 + JUNO_CONSENSUS_PEERS: /dns4/node1/tcp/7070/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X + JUNO_MEMPOOL_PEERS: /dns4/node1/tcp/7071/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X + + node3: + <<: *node + ports: + - "6063:6060" + environment: + JUNO_CONSENSUS_MOCK_INDEX: 3 + JUNO_CONSENSUS_PEERS: /dns4/node2/tcp/7070/p2p/12D3KooWH3uVF6wv47WnArKHk5p6cvgCJEb74UTmxztmQDc298L3 + JUNO_MEMPOOL_PEERS: /dns4/node2/tcp/7071/p2p/12D3KooWH3uVF6wv47WnArKHk5p6cvgCJEb74UTmxztmQDc298L3 + + setup: + build: + context: . + dockerfile: Dockerfile.shooter + entrypoint: ["/consensus-tests/all.sh", "${SHOOTER_COUNT:-4}"] + environment: + DATA_DIR: /data + depends_on: + - node0 + configs: + - consensus-tests + volumes: + - data:/data + + shooters: + build: + context: . + dockerfile: Dockerfile.shooter + entrypoint: ["/consensus-tests/shoot.sh"] + environment: + DATA_DIR: /data + depends_on: + setup: + condition: service_completed_successfully + webdis: + condition: service_started + configs: + - consensus-tests + volumes: + - data:/data + scale: ${SHOOTER_COUNT:-4} + + webdis: + image: nicolas/webdis:latest + +configs: + genesis: + file: ../genesis + consensus-tests: + file: ./ + +volumes: + data: \ No newline at end of file diff --git a/consensus-tests/setup.sh b/consensus-tests/setup.sh new file mode 100755 index 0000000000..9ee962e1a6 --- /dev/null +++ b/consensus-tests/setup.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +DATA_DIR=${DATA_DIR:-"consensus-tests"} +INDEX="$1" +TARGET="$((INDEX % 4))" +export STARKNET_RPC="http://node$TARGET:6060" + +echo "Generate a new keypair for account #$INDEX" +KEYPAIR=$(starkli signer gen-keypair) +export PRIVATE_KEY=$(starkli signer gen-keypair | awk '/Private key/ {print $4}') + +echo "Private key #$INDEX: $PRIVATE_KEY" + +echo "Init account #$INDEX" +ADDRESS=$(starkli account oz init "$DATA_DIR/$INDEX.json" --force --private-key "$PRIVATE_KEY" --class-hash "0x61dac032f228abef9c6626f995015233097ae253a7f72d68552db02f2971b8f" 2>&1 | tee /dev/stderr | tr -d ' ' | grep "^0x") + +echo "Fund ETH to account #$INDEX" +until starkli invoke eth transfer "$ADDRESS" u256:100000000000000000 --account "$DATA_DIR/root.json" --private-key "$ROOT_PRIVATE_KEY" --watch --poll-interval 500; do + echo "Failed to fund ETH to account #$INDEX, retrying..." + sleep 1 +done + +echo "Fund STRK to account #$INDEX" +until starkli invoke strk transfer "$ADDRESS" u256:100000000000000000 --account "$DATA_DIR/root.json" --private-key "$ROOT_PRIVATE_KEY" --watch --poll-interval 500; do + echo "Failed to fund STRK to account #$INDEX, retrying..." + sleep 1 +done + +echo "Deploy account #$INDEX" +until yes "" | starkli account deploy "$DATA_DIR/$INDEX.json" --private-key "$PRIVATE_KEY" --poll-interval 500; do + echo "Failed to deploy account #$INDEX, retrying..." + sleep 1 +done + +echo "Write private key to be reused" +echo "$PRIVATE_KEY" > "$DATA_DIR/$INDEX.key" diff --git a/consensus-tests/shoot.sh b/consensus-tests/shoot.sh new file mode 100755 index 0000000000..24088044a4 --- /dev/null +++ b/consensus-tests/shoot.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +DATA_DIR=${DATA_DIR:-"consensus-tests"} + +if [ -z "$1" ]; then + INDEX=$(curl --silent 'http://webdis:7379/INCR/lock' | jq '.INCR') +else + INDEX="$1" +fi + +echo "Shooting account #$INDEX" + +PORT="$((6060 + INDEX % 4))" +TARGET="$((INDEX % 4))" +export STARKNET_RPC="http://node$TARGET:6060" + +PRIVATE_KEY=$(cat "$DATA_DIR/$INDEX.key") + +for i in {1..1000}; do + starkli invoke eth transfer 0x83afd3f4caedc6eebf44246fe54e38c95e3179a5ec9ea81740eca5b482d12e 1 0 \ + --account "$DATA_DIR/$INDEX.json" \ + --private-key "$PRIVATE_KEY" \ + --l1-gas 1000000 \ + --l1-gas-price-raw 1 \ + --l1-data-gas 1000000 \ + --l1-data-gas-price-raw 1 \ + --l2-gas 1000000 \ + --l2-gas-price-raw 1 \ + --nonce "$i" \ + >/dev/null 2>&1 + + if [ $((i % 100)) -eq 0 ]; then + echo "Account #$INDEX sent $i transactions" + fi +done diff --git a/node/node.go b/node/node.go index 1c9743b761..a68bc71284 100644 --- a/node/node.go +++ b/node/node.go @@ -17,6 +17,9 @@ import ( "github.com/NethermindEth/juno/builder" "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/clients/gateway" + "github.com/NethermindEth/juno/consensus" + "github.com/NethermindEth/juno/consensus/datasource" + "github.com/NethermindEth/juno/consensus/p2p/config" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" @@ -25,7 +28,11 @@ import ( "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/l1" "github.com/NethermindEth/juno/mempool" + mempoolp2p "github.com/NethermindEth/juno/mempool/p2p" "github.com/NethermindEth/juno/p2p" + "github.com/NethermindEth/juno/p2p/dht" + "github.com/NethermindEth/juno/p2p/pubsub" + p2psync "github.com/NethermindEth/juno/p2p/sync" "github.com/NethermindEth/juno/plugin" "github.com/NethermindEth/juno/rpc" "github.com/NethermindEth/juno/rpc/rpccore" @@ -38,6 +45,7 @@ import ( "github.com/NethermindEth/juno/validator" "github.com/NethermindEth/juno/vm" "github.com/consensys/gnark-crypto/ecc/stark-curve/ecdsa" + "github.com/libp2p/go-libp2p/core/peer" "github.com/mitchellh/mapstructure" "github.com/sourcegraph/conc" "go.uber.org/zap" @@ -104,6 +112,17 @@ type Config struct { RPCCallMaxGas uint64 `mapstructure:"rpc-call-max-gas"` ReadinessBlockTolerance uint `mapstructure:"readiness-block-tolerance"` + Consensus bool `mapstructure:"consensus"` + ConsensusAddr string `mapstructure:"consensus-addr"` + ConsensusPeers string `mapstructure:"consensus-peers"` + ConsensusDBPath string `mapstructure:"consensus-db-path"` + ConsensusMockIndex int `mapstructure:"consensus-mock-index"` // TODO: remove this + ConsensusMockCount int `mapstructure:"consensus-mock-count"` // TODO: remove this + + Mempool bool `mapstructure:"mempool"` + MempoolAddr string `mapstructure:"mempool-addr"` + MempoolPeers string `mapstructure:"mempool-peers"` + SubmittedTransactionsCacheSize uint `mapstructure:"submitted-transactions-cache-size"` SubmittedTransactionsCacheEntryTTL time.Duration `mapstructure:"submitted-transactions-cache-entry-ttl"` @@ -275,30 +294,21 @@ func New(cfg *Config, version string, logLevel *utils.LogLevel) (*Node, error) { } nodeVM = vm.New(&chainInfo, false, log) throttledVM = NewThrottledVM(nodeVM, cfg.MaxVMs, int32(cfg.MaxVMQueue)) - feederGatewayDataSource := sync.NewFeederGatewayDataSource(chain, adaptfeeder.New(client)) - synchronizer = sync.New( - chain, - feederGatewayDataSource, - log, - cfg.PendingPollInterval, - cfg.PreConfirmedPollInterval, - dbIsRemote, - database, - ) - synchronizer.WithPlugin(junoPlugin) - gatewayClient = gateway.NewClient(cfg.Network.GatewayURL, log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey) + var syncDataSource sync.DataSource + var mempool mempool.Pool + if cfg.P2P { if cfg.Network == utils.Mainnet { return nil, fmt.Errorf("P2P cannot be used on %v network", utils.Mainnet) } log.Warn("P2P features enabled. Please note P2P is in experimental stage") - if !cfg.P2PFeederNode { - // Do not start the feeder synchronisation - synchronizer = nil + if cfg.P2PFeederNode { + syncDataSource = sync.NewFeederGatewayDataSource(chain, adaptfeeder.New(client)) } + p2pService, err = p2p.New(cfg.P2PAddr, cfg.P2PPublicAddr, version, cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode, chain, &cfg.Network, log, database) if err != nil { @@ -306,16 +316,99 @@ func New(cfg *Config, version string, logLevel *utils.LogLevel) (*Node, error) { } services = append(services, p2pService) + } else if cfg.Consensus { + // TODO: Replace these mock components with the actual ones + mockConsensusServices := consensus.InitMockServices(0, 0, cfg.ConsensusMockIndex, cfg.ConsensusMockCount) + + consensusDB, err := pebblev2.New( + cfg.ConsensusDBPath, + pebblev2.WithCacheSize(cfg.DBCacheSize), + pebblev2.WithMaxOpenFiles(cfg.DBMaxHandles), + pebblev2.WithLogger(cfg.Colour), + ) + if err != nil { + return nil, fmt.Errorf("open consensus DB: %w", err) + } + + consensusHost, err := pubsub.GetHost(mockConsensusServices.PrivateKey, cfg.ConsensusAddr) + if err != nil { + return nil, fmt.Errorf("get consensus host: %w", err) + } + + consensusPeers, err := dht.ExtractPeers(cfg.ConsensusPeers) + if err != nil { + return nil, fmt.Errorf("extract consensus peers: %w", err) + } + + blockFetcher := p2psync.NewBlockFetcher(chain, consensusHost, &cfg.Network, log) + + consensusServices, err := consensus.Init( + consensusHost, + log, + consensusDB, + chain, + nodeVM, + &blockFetcher, + &mockConsensusServices.NodeAddress, + mockConsensusServices.Validators, + mockConsensusServices.TimeoutFn, + func() []peer.AddrInfo { + return consensusPeers + }, + ) + if err != nil { + return nil, fmt.Errorf("init consensus services: %w", err) + } + + dataSource := datasource.New(consensusServices.CommitListener, consensusServices.Proposer) + + syncDataSource = dataSource + + if cfg.Mempool { + mempoolHost, err := pubsub.GetHost(mockConsensusServices.PrivateKey, cfg.MempoolAddr) + if err != nil { + return nil, fmt.Errorf("get mempool host: %w", err) + } + + mempoolPeers, err := dht.ExtractPeers(cfg.MempoolPeers) + if err != nil { + return nil, fmt.Errorf("extract mempool peers: %w", err) + } + + mempoolp2p := mempoolp2p.New( + &cfg.Network, + mempoolHost, + log, + consensusServices.Proposer, + &config.DefaultBufferSizes, + func() []peer.AddrInfo { + return mempoolPeers + }, + ) + mempool = mempoolp2p + services = append(services, mempoolp2p) + } else { + mempool = consensusServices.Proposer + } + + gatewayClient = nil + + services = append(services, consensusServices.Proposer, consensusServices.P2P, consensusServices.Driver, dataSource) + } else { + syncDataSource = sync.NewFeederGatewayDataSource(chain, adaptfeeder.New(client)) } var syncReader sync.Reader = &sync.NoopSynchronizer{} - if synchronizer != nil { + if syncDataSource != nil { + synchronizer = sync.New(chain, syncDataSource, log, cfg.PendingPollInterval, cfg.PreConfirmedPollInterval, dbIsRemote, database) + synchronizer.WithPlugin(junoPlugin) syncReader = synchronizer } submittedTransactionsCache := rpccore.NewTransactionCache(cfg.SubmittedTransactionsCacheEntryTTL, cfg.SubmittedTransactionsCacheSize) services = append(services, submittedTransactionsCache) rpcHandler = rpc.New(chain, syncReader, throttledVM, version, log, &cfg.Network). + WithMempool(mempool). WithGateway(gatewayClient). WithFeeder(client). WithSubmittedTransactionsCache(submittedTransactionsCache). @@ -547,7 +640,7 @@ func (n *Node) Run(ctx context.Context) { return } - if n.cfg.Sequencer { + if n.cfg.Sequencer || n.cfg.Consensus { // Custom networks are not supported in sequencer mode yet if !slices.Contains(utils.KnownNetworkNames, n.cfg.Network.Name) { n.log.Error("Custom networks are not supported in sequencer mode yet")