diff --git a/common/configtx/compare.go b/common/configtx/compare.go index 074660ec2a3..420da396dc6 100644 --- a/common/configtx/compare.go +++ b/common/configtx/compare.go @@ -91,69 +91,16 @@ func equalConfigPolicies(lhs, rhs *cb.ConfigPolicy) bool { bytes.Equal(lhs.Policy.Value, rhs.Policy.Value) } -// The subset functions check if inner is a subset of outer -// TODO, try to consolidate these three methods into one, as the code -// contents are the same, but the function signatures need to be different -func subsetOfGroups(inner, outer map[string]*cb.ConfigGroup) bool { - // The empty set is a subset of all sets - if len(inner) == 0 { - return true - } - - // If inner has more elements than outer, it cannot be a subset - if len(inner) > len(outer) { - return false - } - - // If any element in inner is not in outer, it is not a subset - for key := range inner { - if _, ok := outer[key]; !ok { - return false - } - } - - return true -} - -func subsetOfPolicies(inner, outer map[string]*cb.ConfigPolicy) bool { - // The empty set is a subset of all sets - if len(inner) == 0 { - return true - } - - // If inner has more elements than outer, it cannot be a subset +// subsetOf checks if every key in inner is also present in outer. +func subsetOf[V any](inner, outer map[string]V) bool { if len(inner) > len(outer) { return false } - - // If any element in inner is not in outer, it is not a subset for key := range inner { if _, ok := outer[key]; !ok { return false } } - - return true -} - -func subsetOfValues(inner, outer map[string]*cb.ConfigValue) bool { - // The empty set is a subset of all sets - if len(inner) == 0 { - return true - } - - // If inner has more elements than outer, it cannot be a subset - if len(inner) > len(outer) { - return false - } - - // If any element in inner is not in outer, it is not a subset - for key := range inner { - if _, ok := outer[key]; !ok { - return false - } - } - return true } @@ -163,12 +110,12 @@ func equalConfigGroup(lhs, rhs *cb.ConfigGroup) bool { return false } - if !subsetOfGroups(lhs.Groups, rhs.Groups) || - !subsetOfGroups(rhs.Groups, lhs.Groups) || - !subsetOfPolicies(lhs.Policies, rhs.Policies) || - !subsetOfPolicies(rhs.Policies, lhs.Policies) || - !subsetOfValues(lhs.Values, rhs.Values) || - !subsetOfValues(rhs.Values, lhs.Values) { + if !subsetOf(lhs.Groups, rhs.Groups) || + !subsetOf(rhs.Groups, lhs.Groups) || + !subsetOf(lhs.Policies, rhs.Policies) || + !subsetOf(rhs.Policies, lhs.Policies) || + !subsetOf(lhs.Values, rhs.Values) || + !subsetOf(rhs.Values, lhs.Values) { return false } diff --git a/gossip/privdata/distributor.go b/gossip/privdata/distributor.go index 3430097b542..0af3de5d10a 100644 --- a/gossip/privdata/distributor.go +++ b/gossip/privdata/distributor.go @@ -139,11 +139,11 @@ func NewDistributor(chainID string, gossip gossipAdapter, factory CollectionAcce // Distribute broadcast reliably private data read write set based on policies func (d *distributorImpl) Distribute(txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error { - disseminationPlan, err := d.computeDisseminationPlan(txID, privData, blkHt) + disseminationPlans, err := d.computeDisseminationPlan(txID, privData, blkHt) if err != nil { return errors.WithStack(err) } - return d.disseminate(disseminationPlan) + return d.disseminate(disseminationPlans) } type dissemination struct { @@ -151,11 +151,16 @@ type dissemination struct { criteria gossipgossip.SendCriteria } +type collectionDisseminationPlan struct { + plan []*dissemination + requiredPeerCount int +} + func (d *distributorImpl) computeDisseminationPlan(txID string, privDataWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo, - blkHt uint64) ([]*dissemination, error) { + blkHt uint64) ([]*collectionDisseminationPlan, error) { privData := privDataWithConfig.PvtRwset - var disseminationPlan []*dissemination + var disseminationPlans []*collectionDisseminationPlan for _, pvtRwset := range privData.NsPvtRwset { namespace := pvtRwset.Namespace configPackage, found := privDataWithConfig.CollectionConfigs[namespace] @@ -194,10 +199,10 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, if err != nil { return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName) } - disseminationPlan = append(disseminationPlan, dPlan...) + disseminationPlans = append(disseminationPlans, dPlan) } } - return disseminationPlan, nil + return disseminationPlans, nil } func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPackage, collection *rwset.CollectionPvtReadWriteSet) (*peer.CollectionConfig, error) { @@ -211,7 +216,7 @@ func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPacka return nil, errors.New(fmt.Sprint("no configuration for collection", collection.CollectionName, "found")) } -func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) ([]*dissemination, error) { +func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) (*collectionDisseminationPlan, error) { var disseminationPlan []*dissemination routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChannelID(d.chainID), func(signature api.PeerSignature) bool { @@ -249,10 +254,8 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces peerEndpoints[string(peer.PKIid)] = epToAdd } - // Initialize maximumPeerRemainingCount and requiredPeerRemainingCount, - // these will be decremented until we've selected enough peers for dissemination + // maximumPeerRemainingCount is decremented as peers are selected across orgs. maximumPeerRemainingCount := colAP.MaximumPeerCount() - requiredPeerRemainingCount := colAP.RequiredPeerCount() remainingPeersAcrossOrgs := []api.PeerIdentityInfo{} selectedPeerEndpointsForDebug := []string{} @@ -264,14 +267,6 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces // PHASE 1 - Select one peer from each eligible org if maximumPeerRemainingCount > 0 { for _, selectionPeersForOrg := range identitySetsByOrg { - - // Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination. - // TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount. - acksRequired := 1 - if requiredPeerRemainingCount == 0 { - acksRequired = 0 - } - selectedPeerIndex := r.IntN(len(selectionPeersForOrg)) peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex] selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)]) @@ -279,7 +274,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces Timeout: d.pushAckTimeout, Channel: gossipCommon.ChannelID(d.chainID), MaxPeers: 1, - MinAck: acksRequired, + MinAck: 1, IsEligible: func(member discovery.NetworkMember) bool { return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId) }, @@ -299,15 +294,14 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces } } - if requiredPeerRemainingCount > 0 { - requiredPeerRemainingCount-- - } - maximumPeerRemainingCount-- if maximumPeerRemainingCount == 0 { d.logger.Debug("MaximumPeerCount satisfied") d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) - return disseminationPlan, nil + return &collectionDisseminationPlan{ + plan: disseminationPlan, + requiredPeerCount: colAP.RequiredPeerCount(), + }, nil } } } @@ -318,10 +312,6 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect) } for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 { - required := 1 - if requiredPeerRemainingCount == 0 { - required = 0 - } selectedPeerIndex := r.IntN(len(remainingPeersAcrossOrgs)) peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex] selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)]) @@ -329,7 +319,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces Timeout: d.pushAckTimeout, Channel: gossipCommon.ChannelID(d.chainID), MaxPeers: 1, - MinAck: required, + MinAck: 1, IsEligible: func(member discovery.NetworkMember) bool { return bytes.Equal(member.PKIid, peer2Send.PKIId) }, @@ -341,9 +331,6 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage), }, }) - if requiredPeerRemainingCount > 0 { - requiredPeerRemainingCount-- - } maximumPeerRemainingCount-- @@ -352,7 +339,10 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces } d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug) - return disseminationPlan, nil + return &collectionDisseminationPlan{ + plan: disseminationPlan, + requiredPeerCount: colAP.RequiredPeerCount(), + }, nil } // identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid @@ -385,27 +375,47 @@ func (d *distributorImpl) eligiblePeersOfChannel(routingFilter filter.RoutingFil return eligiblePeers } -func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error { - var failures uint32 +func (d *distributorImpl) disseminate(disseminationPlans []*collectionDisseminationPlan) error { var wg sync.WaitGroup - wg.Add(len(disseminationPlan)) start := time.Now() - for _, dis := range disseminationPlan { - go func(dis *dissemination) { - defer wg.Done() - defer d.reportSendDuration(start) - err := d.SendByCriteria(dis.msg, dis.criteria) - if err != nil { - atomic.AddUint32(&failures, 1) - m := dis.msg.GetPrivateData().Payload - d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err) - } - }(dis) + + // successCounts tracks how many sends succeeded per collection plan (by index). + successCounts := make([]uint32, len(disseminationPlans)) + + for idx, colPlan := range disseminationPlans { + for _, dis := range colPlan.plan { + wg.Add(1) + go func(planIdx int, dis *dissemination) { + defer wg.Done() + defer d.reportSendDuration(start) + err := d.SendByCriteria(dis.msg, dis.criteria) + if err != nil { + m := dis.msg.GetPrivateData().Payload + d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err) + } else { + atomic.AddUint32(&successCounts[planIdx], 1) + } + }(idx, dis) + } } wg.Wait() - failureCount := atomic.LoadUint32(&failures) - if failureCount != 0 { - return errors.Errorf("Failed disseminating %d out of %d private dissemination plans", failureCount, len(disseminationPlan)) + + // Check that each collection received at least RequiredPeerCount acknowledgements. + var collectionFailures int + for idx, colPlan := range disseminationPlans { + successes := int(atomic.LoadUint32(&successCounts[idx])) + if successes < colPlan.requiredPeerCount { + if len(colPlan.plan) > 0 { + m := colPlan.plan[0].msg.GetPrivateData().Payload + d.logger.Errorf("Failed to meet required peer count for TxID [%s] collection [%s] namespace [%s]: needed %d acknowledgements, got %d", + m.TxId, m.CollectionName, m.Namespace, colPlan.requiredPeerCount, successes) + } + collectionFailures++ + } + } + + if collectionFailures != 0 { + return errors.Errorf("Failed disseminating to required peer count for %d out of %d private data dissemination plans", collectionFailures, len(disseminationPlans)) } return nil } diff --git a/gossip/privdata/distributor_test.go b/gossip/privdata/distributor_test.go index 88d56dca09a..fbadf49f4d5 100644 --- a/gossip/privdata/distributor_test.go +++ b/gossip/privdata/distributor_test.go @@ -188,9 +188,9 @@ func TestDistributor(t *testing.T) { require.Equal(t, 2, expectedMaxCount["ns1~c1"]) require.Equal(t, 2, expectedMaxCount["ns2~c2"]) - // and MinAck is minInternalPeers which is 1 - require.Equal(t, 1, expectedMinAck["ns1~c1"]) - require.Equal(t, 1, expectedMinAck["ns2~c2"]) + // All sends use MinAck=1; RequiredPeerCount is verified in aggregate after dissemination. + require.Equal(t, 2, expectedMinAck["ns1~c1"]) + require.Equal(t, 2, expectedMinAck["ns2~c2"]) // Channel is empty after we read 8 times from it require.Len(t, sendings, 0) @@ -231,11 +231,84 @@ func TestDistributor(t *testing.T) { }, }, 0) require.Error(t, err) - require.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private dissemination plans") + require.Contains(t, err.Error(), "Failed disseminating to required peer count for 2 out of 2 private data dissemination plans") require.Equal(t, []string{"channel", channelID}, testMetricProvider.FakeSendDuration.WithArgsForCall(0), ) - require.True(t, testMetricProvider.FakeSendDuration.ObserveArgsForCall(0) > 0) + require.Positive(t, testMetricProvider.FakeSendDuration.ObserveCallCount()) +} + +// Test for partial success: RequiredPeerCount=1, MaximumPeerCount=2 +func TestDistributorPartialSuccess(t *testing.T) { + channelID := "test" + + g := &gossipMock{ + Mock: mock.Mock{}, + PeerSignature: api.PeerSignature{ + Signature: []byte{3, 4, 5}, + Message: []byte{6, 7, 8}, + PeerIdentity: []byte{0, 1, 2}, + }, + } + g.On("PeersOfChannel", gcommon.ChannelID(channelID)).Return([]discovery.NetworkMember{ + {PKIid: gcommon.PKIidType{1}}, + {PKIid: gcommon.PKIidType{2}}, + }) + g.On("IdentityInfo").Return(api.PeerIdentitySet{ + {PKIId: gcommon.PKIidType{1}, Organization: api.OrgIdentityType("org1")}, + {PKIId: gcommon.PKIidType{2}, Organization: api.OrgIdentityType("org2")}, + }) + + // RequiredPeerCount=1, MaximumPeerCount=2: only one of the two peers must acknowledge. + colConfig := &peer.CollectionConfig{ + Payload: &peer.CollectionConfig_StaticCollectionConfig{ + StaticCollectionConfig: &peer.StaticCollectionConfig{ + Name: "c1", + RequiredPeerCount: 1, + MaximumPeerCount: 2, + }, + }, + } + policyMock := &mocks2.CollectionAccessPolicy{} + Setup(policyMock, 1, 2, func(_ protoutil.SignedData) bool { + return true + }, map[string]struct{}{"org1": {}, "org2": {}}, false) + + accessFactoryMock := &mocks2.CollectionAccessFactory{} + accessFactoryMock.On("AccessPolicy", colConfig, channelID).Return(policyMock, nil) + + testMetricProvider := mocks.TestUtilConstructMetricProvider() + m := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).PrivdataMetrics + d := NewDistributor(channelID, g, accessFactoryMock, m, 0) + + pvtData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c1").create() + txData := &transientstore.TxPvtReadWriteSetWithConfigInfo{ + PvtRwset: pvtData[0].WriteSet, + CollectionConfigs: map[string]*peer.CollectionConfigPackage{ + "ns1": {Config: []*peer.CollectionConfig{colConfig}}, + }, + } + + // Case if one of two sends fails, but RequiredPeerCount=1 is still met: no error. + g.On("SendByCriteria", mock.Anything, mock.Anything).Return(errors.New("peer unavailable")).Once() + g.On("SendByCriteria", mock.Anything, mock.Anything).Return(nil) + err := d.Distribute("tx1", txData, 0) + require.NoError(t, err, "should succeed when at least RequiredPeerCount peers acknowledged") + + // Case if both sends fail, RequiredPeerCount=1 not met: error. + g.Mock = mock.Mock{} + g.On("PeersOfChannel", gcommon.ChannelID(channelID)).Return([]discovery.NetworkMember{ + {PKIid: gcommon.PKIidType{1}}, + {PKIid: gcommon.PKIidType{2}}, + }) + g.On("IdentityInfo").Return(api.PeerIdentitySet{ + {PKIId: gcommon.PKIidType{1}, Organization: api.OrgIdentityType("org1")}, + {PKIId: gcommon.PKIidType{2}, Organization: api.OrgIdentityType("org2")}, + }) + g.On("SendByCriteria", mock.Anything, mock.Anything).Return(errors.New("peer unavailable")) + err = d.Distribute("tx2", txData, 0) + require.Error(t, err) + require.Contains(t, err.Error(), "Failed disseminating to required peer count for 1 out of 1") }