From 3961b146537246594327139cae35f960e2bbc536 Mon Sep 17 00:00:00 2001 From: amit-momin Date: Fri, 29 May 2026 17:06:18 -0500 Subject: [PATCH] Added MinResponsesToAggregate config --- pkg/capabilities/capabilities.go | 6 ++++++ pkg/capabilities/pb/registry.pb.go | 21 +++++++++++++++---- pkg/capabilities/pb/registry.proto | 6 ++++++ ...ocr3_chain_capabilities_config_types.pb.go | 20 ++++++++++++++---- ...ocr3_chain_capabilities_config_types.proto | 5 +++++ .../capability/capabilities_registry.go | 3 +++ 6 files changed, 53 insertions(+), 8 deletions(-) diff --git a/pkg/capabilities/capabilities.go b/pkg/capabilities/capabilities.go index 47c60a285a..1a62b55e50 100644 --- a/pkg/capabilities/capabilities.go +++ b/pkg/capabilities/capabilities.go @@ -664,6 +664,12 @@ type RemoteExecutableConfig struct { RequestTimeout time.Duration ServerMaxParallelRequests uint32 RequestHasherType RequestHasherType + + // MinResponsesToAggregate is the minimum number of chain-capability DON nodes that must + // return identical responses before the workflow DON accepts the read result. + // 0 defaults to F+1 of the remote DON. + // Must satisfy F+1 <= MinResponsesToAggregate <= N when non-zero. + MinResponsesToAggregate uint32 } // NOTE: consider splitting this config into values stored in Registry (KS-118) diff --git a/pkg/capabilities/pb/registry.pb.go b/pkg/capabilities/pb/registry.pb.go index 2b395859d8..6392021058 100644 --- a/pkg/capabilities/pb/registry.pb.go +++ b/pkg/capabilities/pb/registry.pb.go @@ -304,8 +304,13 @@ type RemoteExecutableConfig struct { RequestTimeout *durationpb.Duration `protobuf:"bytes,6,opt,name=request_timeout,json=requestTimeout,proto3" json:"request_timeout,omitempty"` ServerMaxParallelRequests uint32 `protobuf:"varint,7,opt,name=server_max_parallel_requests,json=serverMaxParallelRequests,proto3" json:"server_max_parallel_requests,omitempty"` RequestHasherType RequestHasherType `protobuf:"varint,8,opt,name=request_hasher_type,json=requestHasherType,proto3,enum=loop.RequestHasherType" json:"request_hasher_type,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // minResponsesToAggregate is the minimum number of chain-capability DON nodes that must + // return identical responses before the workflow DON accepts the read result. + // 0 defaults to F+1 of the remote DON + // Must satisfy F+1 <= minResponsesToAggregate <= N when non-zero. + MinResponsesToAggregate uint32 `protobuf:"varint,9,opt,name=minResponsesToAggregate,proto3" json:"minResponsesToAggregate,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *RemoteExecutableConfig) Reset() { @@ -380,6 +385,13 @@ func (x *RemoteExecutableConfig) GetRequestHasherType() RequestHasherType { return RequestHasherType_Simple } +func (x *RemoteExecutableConfig) GetMinResponsesToAggregate() uint32 { + if x != nil { + return x.MinResponsesToAggregate + } + return 0 +} + type AggregatorConfig struct { state protoimpl.MessageState `protogen:"open.v1"` AggregatorType AggregatorType `protobuf:"varint,1,opt,name=aggregator_type,json=aggregatorType,proto3,enum=loop.AggregatorType" json:"aggregator_type,omitempty"` @@ -797,7 +809,7 @@ const file_registry_proto_rawDesc = "" + "\fmaxBatchSize\x18\x05 \x01(\rR\fmaxBatchSize\x12O\n" + "\x15batchCollectionPeriod\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x15batchCollectionPeriod\"Z\n" + "\x12RemoteTargetConfig\x12D\n" + - "\x1drequestHashExcludedAttributes\x18\x01 \x03(\tR\x1drequestHashExcludedAttributes\"\xc5\x03\n" + + "\x1drequestHashExcludedAttributes\x18\x01 \x03(\tR\x1drequestHashExcludedAttributes\"\xff\x03\n" + "\x16RemoteExecutableConfig\x12D\n" + "\x1drequestHashExcludedAttributes\x18\x01 \x03(\tR\x1drequestHashExcludedAttributes\x12O\n" + "\x15transmission_schedule\x18\x04 \x01(\x0e2\x1a.loop.TransmissionScheduleR\x14transmissionSchedule\x12:\n" + @@ -805,7 +817,8 @@ const file_registry_proto_rawDesc = "" + "deltaStage\x12B\n" + "\x0frequest_timeout\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x0erequestTimeout\x12?\n" + "\x1cserver_max_parallel_requests\x18\a \x01(\rR\x19serverMaxParallelRequests\x12G\n" + - "\x13request_hasher_type\x18\b \x01(\x0e2\x17.loop.RequestHasherTypeR\x11requestHasherTypeJ\x04\b\x02\x10\x03J\x04\b\x03\x10\x04\"Q\n" + + "\x13request_hasher_type\x18\b \x01(\x0e2\x17.loop.RequestHasherTypeR\x11requestHasherType\x128\n" + + "\x17minResponsesToAggregate\x18\t \x01(\rR\x17minResponsesToAggregateJ\x04\b\x02\x10\x03J\x04\b\x03\x10\x04\"Q\n" + "\x10AggregatorConfig\x12=\n" + "\x0faggregator_type\x18\x01 \x01(\x0e2\x14.loop.AggregatorTypeR\x0eaggregatorType\"\x99\x02\n" + "\x16CapabilityMethodConfig\x12O\n" + diff --git a/pkg/capabilities/pb/registry.proto b/pkg/capabilities/pb/registry.proto index fdd6d834bb..98f79b38c9 100644 --- a/pkg/capabilities/pb/registry.proto +++ b/pkg/capabilities/pb/registry.proto @@ -52,6 +52,12 @@ message RemoteExecutableConfig { google.protobuf.Duration request_timeout = 6; uint32 server_max_parallel_requests = 7; RequestHasherType request_hasher_type = 8; + + // minResponsesToAggregate is the minimum number of chain-capability DON nodes that must + // return identical responses before the workflow DON accepts the read result. + // 0 defaults to F+1 of the remote DON + // Must satisfy F+1 <= minResponsesToAggregate <= N when non-zero. + uint32 minResponsesToAggregate = 9; } message AggregatorConfig { diff --git a/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.pb.go b/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.pb.go index 47e697bf45..4fdb5f02c3 100644 --- a/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.pb.go +++ b/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.pb.go @@ -31,8 +31,12 @@ type ReportingPluginConfig struct { MaxReportLengthBytes uint32 `protobuf:"varint,4,opt,name=maxReportLengthBytes,proto3" json:"maxReportLengthBytes,omitempty"` MaxReportCount uint32 `protobuf:"varint,5,opt,name=maxReportCount,proto3" json:"maxReportCount,omitempty"` MaxBatchSize uint32 `protobuf:"varint,6,opt,name=maxBatchSize,proto3" json:"maxBatchSize,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // minResponsesToAggregate is the minimum number of nodes that must report + // identical observations for a value to be accepted as the read result. + // 0 defaults to F+1. Must satisfy F+1 <= minResponsesToAggregate <= N when non-zero. + MinResponsesToAggregate uint32 `protobuf:"varint,7,opt,name=minResponsesToAggregate,proto3" json:"minResponsesToAggregate,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ReportingPluginConfig) Reset() { @@ -107,18 +111,26 @@ func (x *ReportingPluginConfig) GetMaxBatchSize() uint32 { return 0 } +func (x *ReportingPluginConfig) GetMinResponsesToAggregate() uint32 { + if x != nil { + return x.MinResponsesToAggregate + } + return 0 +} + var File_ocr3_chain_capabilities_config_types_proto protoreflect.FileDescriptor const file_ocr3_chain_capabilities_config_types_proto_rawDesc = "" + "\n" + - "*ocr3_chain_capabilities_config_types.proto\x12$chain_capabilities_ocr3_config_types\"\xbd\x02\n" + + "*ocr3_chain_capabilities_config_types.proto\x12$chain_capabilities_ocr3_config_types\"\xf7\x02\n" + "\x15ReportingPluginConfig\x120\n" + "\x13maxQueryLengthBytes\x18\x01 \x01(\rR\x13maxQueryLengthBytes\x12<\n" + "\x19maxObservationLengthBytes\x18\x02 \x01(\rR\x19maxObservationLengthBytes\x124\n" + "\x15maxOutcomeLengthBytes\x18\x03 \x01(\rR\x15maxOutcomeLengthBytes\x122\n" + "\x14maxReportLengthBytes\x18\x04 \x01(\rR\x14maxReportLengthBytes\x12&\n" + "\x0emaxReportCount\x18\x05 \x01(\rR\x0emaxReportCount\x12\"\n" + - "\fmaxBatchSize\x18\x06 \x01(\rR\fmaxBatchSizeB9Z7capabilities/v2/chain-capabilities/consensus/ocr3/typesb\x06proto3" + "\fmaxBatchSize\x18\x06 \x01(\rR\fmaxBatchSize\x128\n" + + "\x17minResponsesToAggregate\x18\a \x01(\rR\x17minResponsesToAggregateB9Z7capabilities/v2/chain-capabilities/consensus/ocr3/typesb\x06proto3" var ( file_ocr3_chain_capabilities_config_types_proto_rawDescOnce sync.Once diff --git a/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.proto b/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.proto index 854372b769..4f937659c5 100644 --- a/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.proto +++ b/pkg/capabilities/v2/chain-capabilities/consensus/ocr3/types/ocr3_chain_capabilities_config_types.proto @@ -14,4 +14,9 @@ message ReportingPluginConfig { uint32 maxReportCount = 5; uint32 maxBatchSize = 6; + + // minResponsesToAggregate is the minimum number of nodes that must report + // identical observations for a value to be accepted as the read result. + // 0 defaults to F+1. Must satisfy F+1 <= minResponsesToAggregate <= N when non-zero. + uint32 minResponsesToAggregate = 7; } \ No newline at end of file diff --git a/pkg/loop/internal/core/services/capability/capabilities_registry.go b/pkg/loop/internal/core/services/capability/capabilities_registry.go index b398ea8c93..a85d85c258 100644 --- a/pkg/loop/internal/core/services/capability/capabilities_registry.go +++ b/pkg/loop/internal/core/services/capability/capabilities_registry.go @@ -235,6 +235,7 @@ func decodeRemoteExecutableConfig(prtc *capabilitiespb.RemoteExecutableConfig) * remoteExecutableConfig.RequestTimeout = prtc.RequestTimeout.AsDuration() remoteExecutableConfig.ServerMaxParallelRequests = prtc.ServerMaxParallelRequests remoteExecutableConfig.RequestHasherType = capabilities.RequestHasherType(prtc.RequestHasherType) + remoteExecutableConfig.MinResponsesToAggregate = prtc.MinResponsesToAggregate return remoteExecutableConfig } @@ -451,6 +452,7 @@ func (c *capabilitiesRegistryServer) ConfigForCapability(ctx context.Context, re RequestTimeout: durationpb.New(cc.RemoteExecutableConfig.RequestTimeout), ServerMaxParallelRequests: cc.RemoteExecutableConfig.ServerMaxParallelRequests, RequestHasherType: capabilitiespb.RequestHasherType(cc.RemoteExecutableConfig.RequestHasherType), + MinResponsesToAggregate: cc.RemoteExecutableConfig.MinResponsesToAggregate, }, } } @@ -485,6 +487,7 @@ func (c *capabilitiesRegistryServer) ConfigForCapability(ctx context.Context, re RequestTimeout: durationpb.New(mConfig.RemoteExecutableConfig.RequestTimeout), ServerMaxParallelRequests: mConfig.RemoteExecutableConfig.ServerMaxParallelRequests, RequestHasherType: capabilitiespb.RequestHasherType(mConfig.RemoteExecutableConfig.RequestHasherType), + MinResponsesToAggregate: mConfig.RemoteExecutableConfig.MinResponsesToAggregate, }, } }