Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions nonvalidator/chain_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var genesis = testutil.NewTestBlock(common.ProtocolMetadata{
type messageInfo struct {
msg *common.Message
from common.NodeID
to common.NodeID
}

// testChain is a helper that book-keeps the current chain-tip, alongside any
Expand Down Expand Up @@ -186,8 +187,8 @@ func (tc *testChain) signatureAggregatorCreator(nodes []common.Node) common.Sign
}
}

// addEpochs adds sealing blocks at epochs, and normal blocks in between
func (tc *testChain) addEpochs(epochs ...uint64) {
// indexEpochs indexes sealing blocks at epochs, and normal blocks in between
func (tc *testChain) indexEpochs(epochs ...uint64) {
// ensure that the new epoch we are adding is not already indexed
require.Greater(tc.t, epochs[0], tc.seq)

Expand Down
211 changes: 140 additions & 71 deletions nonvalidator/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,111 +9,180 @@ import (
"testing"

"github.com/ava-labs/simplex/common"
"github.com/ava-labs/simplex/simplex"
"github.com/ava-labs/simplex/testutil"
"github.com/stretchr/testify/require"
)

// nonValidatorResponderComm implements common.Communication and is used during tests
// to create responses to any requests a node sends or broadcasts.
type nonValidatorResponderComm struct {
t *testing.T
// messageQueue keeps a queue of messages
type messageQueue struct {
responsesLock sync.Mutex
responses []*messageInfo
}

func (m *messageQueue) clearResponses() {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
m.responses = []*messageInfo{}
}

func (m *messageQueue) enqueue(mi *messageInfo) {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
m.responses = append(m.responses, mi)
}

// storage is used to create the responses
storage common.Storage
func (m *messageQueue) popResponse() (*messageInfo, bool) {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
if len(m.responses) == 0 {
return nil, false
}
msg := m.responses[0]
m.responses = m.responses[1:]
return msg, true
}

// routerComm appends messages being sent or broadcast to the message queue.
type routerComm struct {
t *testing.T

// nodes should contain the validator set of the highest epoch
nodes common.Nodes

// ID is the NodeID of the non-validator using this comm. Broadcasts
// pick the first node in `nodes` that is not equal to ID as the
// simulated responder.
// ID is the sender of messages and is the Node using the struct.
ID common.NodeID

// responses is the queue of synthesized responses. Tests will pop messages
// and feed entries into NonValidator.HandleMessage.
responsesLock sync.Mutex
responses []*messageInfo
messageQueue *messageQueue
}

func newTestResponder(t *testing.T, myNodeID common.NodeID, tc *testChain) *nonValidatorResponderComm {
return &nonValidatorResponderComm{
nodes: tc.nodes(),
t: t,
ID: myNodeID,
storage: tc,
}
type testEpochs struct {
t *testing.T
epochs []*simplex.Epoch
}

func (r *nonValidatorResponderComm) Validators() common.Nodes { return r.nodes }
func newTestEpochs(tc *testChain, msgQueue *messageQueue, maxSeqWindow uint64) *testEpochs {
nodes := tc.nodes()
epochs := make([]*simplex.Epoch, 0, len(nodes))

// Enqueues a response coming from `destination`.
func (r *nonValidatorResponderComm) Send(msg *common.Message, destination common.NodeID) {
r.handle(msg, destination)
}
for _, node := range nodes {
epochNodeID := node.Id

// Enqueues responses coming from all other nodes in the network.
func (r *nonValidatorResponderComm) Broadcast(msg *common.Message) {

for _, n := range r.nodes {
if bytes.Equal(n.Id, r.ID) {
continue
comm := &routerComm{
nodes: nodes,
t: tc.t,
ID: epochNodeID,
messageQueue: msgQueue,
}

r.handle(msg, n.Id)
conf, _, _ := testutil.DefaultTestNodeEpochConfig(tc.t, epochNodeID, comm, testutil.NewTestBlockBuilder())
conf.MaxRoundWindow = maxSeqWindow
conf.Storage = tc
conf.SignatureAggregatorCreator = tc.signatureAggregatorCreator
conf.ReplicationEnabled = true

epoch, err := simplex.NewEpoch(conf)
require.NoError(tc.t, err)

epochs = append(epochs, epoch)
}

return &testEpochs{
t: tc.t,
epochs: epochs,
}
}

func (r *nonValidatorResponderComm) handle(msg *common.Message, from common.NodeID) {
switch {
case msg.ReplicationRequest != nil:
r.respondToReplicationRequest(msg.ReplicationRequest, from)
// start starts every epoch in the network, failing the test if any epoch
// fails to start.
func (e *testEpochs) start() {
for _, epoch := range e.epochs {
require.NoError(e.t, epoch.Start())
}
}

func (r *nonValidatorResponderComm) clearResponses() {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
r.responses = []*messageInfo{}
// stop stops every epoch in the network.
func (e *testEpochs) stop() {
for _, epoch := range e.epochs {
epoch.Stop()
}
}

func (r *nonValidatorResponderComm) respondToReplicationRequest(req *common.ReplicationRequest, from common.NodeID) {
resp := &common.ReplicationResponse{}
// handleMessage routes messages between the non-validator and its epochs: a request
// originating from the non-validator is delivered to the addressed epoch, while
// a response from an epoch is delivered back to the non-validator.
func handleMessage(epochs *testEpochs, nv *NonValidator, mi *messageInfo) {
if !bytes.Equal(mi.from, nv.ID) {
// a response from an epoch: deliver it to the non-validator.
require.NoError(epochs.t, nv.HandleMessage(mi.msg, mi.from))
return
}

for _, seq := range req.Seqs {
block, fin, err := r.storage.Retrieve(seq)
if err == nil {
resp.Data = append(resp.Data, common.QuorumRound{Block: block.(common.Block), Finalization: &fin})
// a request from the non-validator: route it to the addressed epoch.
for _, epoch := range epochs.epochs {
if bytes.Equal(epoch.ID, mi.to) {
require.NoError(epochs.t, epoch.HandleMessage(mi.msg, mi.from))
return
}
}
require.Failf(epochs.t, "no epoch for destination", "destination %x", mi.to)
}

func (r *routerComm) Validators() common.Nodes { return r.nodes }

// Enqueues a message sent from this node to `destination`.
func (r *routerComm) Send(msg *common.Message, destination common.NodeID) {
r.handle(msg, destination)
}

if req.LatestFinalizedSeq > 0 && r.storage.NumBlocks() > 0 {
numBlocks := r.storage.NumBlocks()
if req.LatestFinalizedSeq < numBlocks-1 {
block, fin, err := r.storage.Retrieve(numBlocks - 1)
if err == nil {
resp.LatestSeq = &common.QuorumRound{Block: block.(common.Block), Finalization: &fin}
}
// Enqueues a copy of the message addressed to every other node in the network.
func (r *routerComm) Broadcast(msg *common.Message) {
for _, n := range r.nodes {
if bytes.Equal(n.Id, r.ID) {
continue
}
}

if len(resp.Data) == 0 && resp.LatestSeq == nil {
return
r.handle(msg, n.Id)
}

r.enqueue(&messageInfo{msg: &common.Message{ReplicationResponse: resp}, from: from})
}

func (r *nonValidatorResponderComm) enqueue(m *messageInfo) {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
r.responses = append(r.responses, m)
func (r *routerComm) handle(msg *common.Message, to common.NodeID) {
switch {
case msg.VerifiedReplicationResponse != nil:
// Outgoing responses are of the verified type, but incoming responses
// are of the unverified type, so we translate before enqueuing.
vrr := msg.VerifiedReplicationResponse
data := make([]common.QuorumRound, 0, len(vrr.Data))
for _, vqr := range vrr.Data {
data = append(data, *verifiedQRtoQR(&vqr))
}

msg = &common.Message{
ReplicationResponse: &common.ReplicationResponse{
Data: data,
LatestRound: verifiedQRtoQR(vrr.LatestRound),
LatestSeq: verifiedQRtoQR(vrr.LatestFinalizedSeq),
},
}
r.messageQueue.enqueue(&messageInfo{msg: msg, from: r.ID, to: to})
default:
r.messageQueue.enqueue(&messageInfo{msg: msg, from: r.ID, to: to})
}
}

func (r *nonValidatorResponderComm) popResponse() (*messageInfo, bool) {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
if len(r.responses) == 0 {
return nil, false
func verifiedQRtoQR(vqr *common.VerifiedQuorumRound) *common.QuorumRound {
if vqr == nil {
return nil
}

qr := &common.QuorumRound{
Notarization: vqr.Notarization,
Finalization: vqr.Finalization,
EmptyNotarization: vqr.EmptyNotarization,
}
m := r.responses[0]
r.responses = r.responses[1:]
return m, true

if vqr.VerifiedBlock != nil {
qr.Block = vqr.VerifiedBlock.(common.Block)
}

return qr
}
Loading
Loading