Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 8 additions & 61 deletions common/configtx/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,69 +91,16 @@ func equalConfigPolicies(lhs, rhs *cb.ConfigPolicy) bool {
bytes.Equal(lhs.Policy.Value, rhs.Policy.Value)
}

// The subset functions check if inner is a subset of outer
// TODO, try to consolidate these three methods into one, as the code
// contents are the same, but the function signatures need to be different
func subsetOfGroups(inner, outer map[string]*cb.ConfigGroup) bool {
// The empty set is a subset of all sets
if len(inner) == 0 {
return true
}

// If inner has more elements than outer, it cannot be a subset
if len(inner) > len(outer) {
return false
}

// If any element in inner is not in outer, it is not a subset
for key := range inner {
if _, ok := outer[key]; !ok {
return false
}
}

return true
}

func subsetOfPolicies(inner, outer map[string]*cb.ConfigPolicy) bool {
// The empty set is a subset of all sets
if len(inner) == 0 {
return true
}

// If inner has more elements than outer, it cannot be a subset
// subsetOf checks if every key in inner is also present in outer.
func subsetOf[V any](inner, outer map[string]V) bool {
if len(inner) > len(outer) {
return false
}

// If any element in inner is not in outer, it is not a subset
for key := range inner {
if _, ok := outer[key]; !ok {
return false
}
}

return true
}

func subsetOfValues(inner, outer map[string]*cb.ConfigValue) bool {
// The empty set is a subset of all sets
if len(inner) == 0 {
return true
}

// If inner has more elements than outer, it cannot be a subset
if len(inner) > len(outer) {
return false
}

// If any element in inner is not in outer, it is not a subset
for key := range inner {
if _, ok := outer[key]; !ok {
return false
}
}

return true
}

Expand All @@ -163,12 +110,12 @@ func equalConfigGroup(lhs, rhs *cb.ConfigGroup) bool {
return false
}

if !subsetOfGroups(lhs.Groups, rhs.Groups) ||
!subsetOfGroups(rhs.Groups, lhs.Groups) ||
!subsetOfPolicies(lhs.Policies, rhs.Policies) ||
!subsetOfPolicies(rhs.Policies, lhs.Policies) ||
!subsetOfValues(lhs.Values, rhs.Values) ||
!subsetOfValues(rhs.Values, lhs.Values) {
if !subsetOf(lhs.Groups, rhs.Groups) ||
!subsetOf(rhs.Groups, lhs.Groups) ||
!subsetOf(lhs.Policies, rhs.Policies) ||
!subsetOf(rhs.Policies, lhs.Policies) ||
!subsetOf(lhs.Values, rhs.Values) ||
!subsetOf(rhs.Values, lhs.Values) {
return false
}

Expand Down
110 changes: 60 additions & 50 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,28 @@ func NewDistributor(chainID string, gossip gossipAdapter, factory CollectionAcce

// Distribute broadcast reliably private data read write set based on policies
func (d *distributorImpl) Distribute(txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
disseminationPlan, err := d.computeDisseminationPlan(txID, privData, blkHt)
disseminationPlans, err := d.computeDisseminationPlan(txID, privData, blkHt)
if err != nil {
return errors.WithStack(err)
}
return d.disseminate(disseminationPlan)
return d.disseminate(disseminationPlans)
}

type dissemination struct {
msg *protoext.SignedGossipMessage
criteria gossipgossip.SendCriteria
}

type collectionDisseminationPlan struct {
plan []*dissemination
requiredPeerCount int
}

func (d *distributorImpl) computeDisseminationPlan(txID string,
privDataWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo,
blkHt uint64) ([]*dissemination, error) {
blkHt uint64) ([]*collectionDisseminationPlan, error) {
privData := privDataWithConfig.PvtRwset
var disseminationPlan []*dissemination
var disseminationPlans []*collectionDisseminationPlan
for _, pvtRwset := range privData.NsPvtRwset {
namespace := pvtRwset.Namespace
configPackage, found := privDataWithConfig.CollectionConfigs[namespace]
Expand Down Expand Up @@ -194,10 +199,10 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
if err != nil {
return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName)
}
disseminationPlan = append(disseminationPlan, dPlan...)
disseminationPlans = append(disseminationPlans, dPlan)
}
}
return disseminationPlan, nil
return disseminationPlans, nil
}

func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPackage, collection *rwset.CollectionPvtReadWriteSet) (*peer.CollectionConfig, error) {
Expand All @@ -211,7 +216,7 @@ func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPacka
return nil, errors.New(fmt.Sprint("no configuration for collection", collection.CollectionName, "found"))
}

func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) ([]*dissemination, error) {
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) (*collectionDisseminationPlan, error) {
var disseminationPlan []*dissemination

routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChannelID(d.chainID), func(signature api.PeerSignature) bool {
Expand Down Expand Up @@ -249,10 +254,8 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
peerEndpoints[string(peer.PKIid)] = epToAdd
}

// Initialize maximumPeerRemainingCount and requiredPeerRemainingCount,
// these will be decremented until we've selected enough peers for dissemination
// maximumPeerRemainingCount is decremented as peers are selected across orgs.
maximumPeerRemainingCount := colAP.MaximumPeerCount()
requiredPeerRemainingCount := colAP.RequiredPeerCount()

remainingPeersAcrossOrgs := []api.PeerIdentityInfo{}
selectedPeerEndpointsForDebug := []string{}
Expand All @@ -264,22 +267,14 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
// PHASE 1 - Select one peer from each eligible org
if maximumPeerRemainingCount > 0 {
for _, selectionPeersForOrg := range identitySetsByOrg {

// Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination.
// TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount.
acksRequired := 1
if requiredPeerRemainingCount == 0 {
acksRequired = 0
}

selectedPeerIndex := r.IntN(len(selectionPeersForOrg))
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: 1,
MinAck: acksRequired,
MinAck: 1,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId)
},
Expand All @@ -299,15 +294,14 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
}
}

if requiredPeerRemainingCount > 0 {
requiredPeerRemainingCount--
}

maximumPeerRemainingCount--
if maximumPeerRemainingCount == 0 {
d.logger.Debug("MaximumPeerCount satisfied")
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
return &collectionDisseminationPlan{
plan: disseminationPlan,
requiredPeerCount: colAP.RequiredPeerCount(),
}, nil
}
}
}
Expand All @@ -318,18 +312,14 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
}
for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 {
required := 1
if requiredPeerRemainingCount == 0 {
required = 0
}
selectedPeerIndex := r.IntN(len(remainingPeersAcrossOrgs))
peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex]
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: 1,
MinAck: required,
MinAck: 1,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2Send.PKIId)
},
Expand All @@ -341,9 +331,6 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
},
})
if requiredPeerRemainingCount > 0 {
requiredPeerRemainingCount--
}

maximumPeerRemainingCount--

Expand All @@ -352,7 +339,10 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
}

d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
return &collectionDisseminationPlan{
plan: disseminationPlan,
requiredPeerCount: colAP.RequiredPeerCount(),
}, nil
}

// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid
Expand Down Expand Up @@ -385,27 +375,47 @@ func (d *distributorImpl) eligiblePeersOfChannel(routingFilter filter.RoutingFil
return eligiblePeers
}

func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error {
var failures uint32
func (d *distributorImpl) disseminate(disseminationPlans []*collectionDisseminationPlan) error {
var wg sync.WaitGroup
wg.Add(len(disseminationPlan))
start := time.Now()
for _, dis := range disseminationPlan {
go func(dis *dissemination) {
defer wg.Done()
defer d.reportSendDuration(start)
err := d.SendByCriteria(dis.msg, dis.criteria)
if err != nil {
atomic.AddUint32(&failures, 1)
m := dis.msg.GetPrivateData().Payload
d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err)
}
}(dis)

// successCounts tracks how many sends succeeded per collection plan (by index).
successCounts := make([]uint32, len(disseminationPlans))

for idx, colPlan := range disseminationPlans {
for _, dis := range colPlan.plan {
wg.Add(1)
go func(planIdx int, dis *dissemination) {
defer wg.Done()
defer d.reportSendDuration(start)
err := d.SendByCriteria(dis.msg, dis.criteria)
if err != nil {
m := dis.msg.GetPrivateData().Payload
d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err)
} else {
atomic.AddUint32(&successCounts[planIdx], 1)
}
}(idx, dis)
}
}
wg.Wait()
failureCount := atomic.LoadUint32(&failures)
if failureCount != 0 {
return errors.Errorf("Failed disseminating %d out of %d private dissemination plans", failureCount, len(disseminationPlan))

// Check that each collection received at least RequiredPeerCount acknowledgements.
var collectionFailures int
for idx, colPlan := range disseminationPlans {
successes := int(atomic.LoadUint32(&successCounts[idx]))
if successes < colPlan.requiredPeerCount {
if len(colPlan.plan) > 0 {
m := colPlan.plan[0].msg.GetPrivateData().Payload
d.logger.Errorf("Failed to meet required peer count for TxID [%s] collection [%s] namespace [%s]: needed %d acknowledgements, got %d",
m.TxId, m.CollectionName, m.Namespace, colPlan.requiredPeerCount, successes)
}
collectionFailures++
}
}

if collectionFailures != 0 {
return errors.Errorf("Failed disseminating to required peer count for %d out of %d private data dissemination plans", collectionFailures, len(disseminationPlans))
}
return nil
}
Expand Down
Loading