Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions nonvalidator/chain_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var genesis = testutil.NewTestBlock(common.ProtocolMetadata{
type messageInfo struct {
msg *common.Message
from common.NodeID
to common.NodeID
}

// testChain is a helper that book-keeps the current chain-tip, alongside any
Expand Down Expand Up @@ -186,8 +187,8 @@ func (tc *testChain) signatureAggregatorCreator(nodes []common.Node) common.Sign
}
}

// addEpochs adds sealing blocks at epochs, and normal blocks in between
func (tc *testChain) addEpochs(epochs ...uint64) {
// indexEpochs indexes sealing blocks at epochs, and normal blocks in between
func (tc *testChain) indexEpochs(epochs ...uint64) {
// ensure that the new epoch we are adding is not already indexed
require.Greater(tc.t, epochs[0], tc.seq)

Expand Down
211 changes: 140 additions & 71 deletions nonvalidator/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,111 +9,180 @@ import (
"testing"

"github.com/ava-labs/simplex/common"
"github.com/ava-labs/simplex/simplex"
"github.com/ava-labs/simplex/testutil"
"github.com/stretchr/testify/require"
)

// nonValidatorResponderComm implements common.Communication and is used during tests
// to create responses to any requests a node sends or broadcasts.
type nonValidatorResponderComm struct {
t *testing.T
// messageQueue keeps a queue of messages
type messageQueue struct {
responsesLock sync.Mutex
responses []*messageInfo
}

func (m *messageQueue) clearResponses() {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
m.responses = []*messageInfo{}
}

func (m *messageQueue) enqueue(mi *messageInfo) {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
m.responses = append(m.responses, mi)
}

// storage is used to create the responses
storage common.Storage
func (m *messageQueue) popResponse() (*messageInfo, bool) {
m.responsesLock.Lock()
defer m.responsesLock.Unlock()
if len(m.responses) == 0 {
return nil, false
}
msg := m.responses[0]
m.responses = m.responses[1:]
return msg, true
}

// routerComm appends messages being sent or broadcast to the message queue.
type routerComm struct {
t *testing.T

// nodes should contain the validator set of the highest epoch
nodes common.Nodes

// ID is the NodeID of the non-validator using this comm. Broadcasts
// pick the first node in `nodes` that is not equal to ID as the
// simulated responder.
// ID is the sender of messages and is the Node using the struct.
ID common.NodeID

// responses is the queue of synthesized responses. Tests will pop messages
// and feed entries into NonValidator.HandleMessage.
responsesLock sync.Mutex
responses []*messageInfo
messageQueue *messageQueue
}

func newTestResponder(t *testing.T, myNodeID common.NodeID, tc *testChain) *nonValidatorResponderComm {
return &nonValidatorResponderComm{
nodes: tc.nodes(),
t: t,
ID: myNodeID,
storage: tc,
}
type testEpochs struct {
t *testing.T
epochs []*simplex.Epoch
}

func (r *nonValidatorResponderComm) Validators() common.Nodes { return r.nodes }
func newTestEpochs(tc *testChain, msgQueue *messageQueue, maxSeqWindow uint64) *testEpochs {
nodes := tc.nodes()
epochs := make([]*simplex.Epoch, 0, len(nodes))

// Enqueues a response coming from `destination`.
func (r *nonValidatorResponderComm) Send(msg *common.Message, destination common.NodeID) {
r.handle(msg, destination)
}
for _, node := range nodes {
epochNodeID := node.Id

// Enqueues responses coming from all other nodes in the network.
func (r *nonValidatorResponderComm) Broadcast(msg *common.Message) {

for _, n := range r.nodes {
if bytes.Equal(n.Id, r.ID) {
continue
comm := &routerComm{
nodes: nodes,
t: tc.t,
ID: epochNodeID,
messageQueue: msgQueue,
}

r.handle(msg, n.Id)
conf, _, _ := testutil.DefaultTestNodeEpochConfig(tc.t, epochNodeID, comm, testutil.NewTestBlockBuilder())
conf.MaxRoundWindow = maxSeqWindow
conf.Storage = tc
conf.SignatureAggregatorCreator = tc.signatureAggregatorCreator
conf.ReplicationEnabled = true

epoch, err := simplex.NewEpoch(conf)
require.NoError(tc.t, err)

epochs = append(epochs, epoch)
}

return &testEpochs{
t: tc.t,
epochs: epochs,
}
}

func (r *nonValidatorResponderComm) handle(msg *common.Message, from common.NodeID) {
switch {
case msg.ReplicationRequest != nil:
r.respondToReplicationRequest(msg.ReplicationRequest, from)
// start starts every epoch in the network, failing the test if any epoch
// fails to start.
func (e *testEpochs) start() {
for _, epoch := range e.epochs {
require.NoError(e.t, epoch.Start())
}
}

func (r *nonValidatorResponderComm) clearResponses() {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
r.responses = []*messageInfo{}
// stop stops every epoch in the network.
func (e *testEpochs) stop() {
for _, epoch := range e.epochs {
epoch.Stop()
}
}

func (r *nonValidatorResponderComm) respondToReplicationRequest(req *common.ReplicationRequest, from common.NodeID) {
resp := &common.ReplicationResponse{}
// handleMessage routes messages between the non-validator and its epochs: a request
// originating from the non-validator is delivered to the addressed epoch, while
// a response from an epoch is delivered back to the non-validator.
func handleMessage(epochs *testEpochs, nv *NonValidator, mi *messageInfo) {
if !bytes.Equal(mi.from, nv.ID) {
// a response from an epoch: deliver it to the non-validator.
require.NoError(epochs.t, nv.HandleMessage(mi.msg, mi.from))
return
}

for _, seq := range req.Seqs {
block, fin, err := r.storage.Retrieve(seq)
if err == nil {
resp.Data = append(resp.Data, common.QuorumRound{Block: block.(common.Block), Finalization: &fin})
// a request from the non-validator: route it to the addressed epoch.
for _, epoch := range epochs.epochs {
if bytes.Equal(epoch.ID, mi.to) {
require.NoError(epochs.t, epoch.HandleMessage(mi.msg, mi.from))
return
}
}
require.Failf(epochs.t, "no epoch for destination", "destination %x", mi.to)
}

func (r *routerComm) Validators() common.Nodes { return r.nodes }

// Enqueues a message sent from this node to `destination`.
func (r *routerComm) Send(msg *common.Message, destination common.NodeID) {
r.handle(msg, destination)
}

if req.LatestFinalizedSeq > 0 && r.storage.NumBlocks() > 0 {
numBlocks := r.storage.NumBlocks()
if req.LatestFinalizedSeq < numBlocks-1 {
block, fin, err := r.storage.Retrieve(numBlocks - 1)
if err == nil {
resp.LatestSeq = &common.QuorumRound{Block: block.(common.Block), Finalization: &fin}
}
// Enqueues a copy of the message addressed to every other node in the network.
func (r *routerComm) Broadcast(msg *common.Message) {
for _, n := range r.nodes {
if bytes.Equal(n.Id, r.ID) {
continue
}
}

if len(resp.Data) == 0 && resp.LatestSeq == nil {
return
r.handle(msg, n.Id)
}

r.enqueue(&messageInfo{msg: &common.Message{ReplicationResponse: resp}, from: from})
}

func (r *nonValidatorResponderComm) enqueue(m *messageInfo) {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
r.responses = append(r.responses, m)
func (r *routerComm) handle(msg *common.Message, to common.NodeID) {
switch {
case msg.VerifiedReplicationResponse != nil:
// Outgoing responses are of the verified type, but incoming responses
// are of the unverified type, so we translate before enqueuing.
vrr := msg.VerifiedReplicationResponse
data := make([]common.QuorumRound, 0, len(vrr.Data))
for _, vqr := range vrr.Data {
data = append(data, *verifiedQRtoQR(&vqr))
}

msg = &common.Message{
ReplicationResponse: &common.ReplicationResponse{
Data: data,
LatestRound: verifiedQRtoQR(vrr.LatestRound),
LatestSeq: verifiedQRtoQR(vrr.LatestFinalizedSeq),
},
}
r.messageQueue.enqueue(&messageInfo{msg: msg, from: r.ID, to: to})
default:
r.messageQueue.enqueue(&messageInfo{msg: msg, from: r.ID, to: to})
}
}

func (r *nonValidatorResponderComm) popResponse() (*messageInfo, bool) {
r.responsesLock.Lock()
defer r.responsesLock.Unlock()
if len(r.responses) == 0 {
return nil, false
func verifiedQRtoQR(vqr *common.VerifiedQuorumRound) *common.QuorumRound {
if vqr == nil {
return nil
}

qr := &common.QuorumRound{
Notarization: vqr.Notarization,
Finalization: vqr.Finalization,
EmptyNotarization: vqr.EmptyNotarization,
}
m := r.responses[0]
r.responses = r.responses[1:]
return m, true

if vqr.VerifiedBlock != nil {
qr.Block = vqr.VerifiedBlock.(common.Block)
}

return qr
}
Loading
Loading