diff --git a/service/matching/handler.go b/service/matching/handler.go index 8afa8ac8b7..59b0f1a7cc 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -590,12 +590,13 @@ func (h *Handler) ListNexusEndpoints(ctx context.Context, request *matchingservi // RecordWorkerHeartbeat receive heartbeat request from the worker. func (h *Handler) RecordWorkerHeartbeat( - _ context.Context, request *matchingservice.RecordWorkerHeartbeatRequest, + ctx context.Context, request *matchingservice.RecordWorkerHeartbeatRequest, ) (*matchingservice.RecordWorkerHeartbeatResponse, error) { nsID := namespace.ID(request.GetNamespaceId()) nsName := h.namespaceName(nsID) + principal := headers.GetPrincipal(ctx) - h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, request.GetHeartbeartRequest().GetWorkerHeartbeat()) + h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, principal, request.GetHeartbeartRequest().GetWorkerHeartbeat()) return &matchingservice.RecordWorkerHeartbeatResponse{}, nil } diff --git a/service/matching/workers/registry.go b/service/matching/workers/registry.go index 1887ffc469..510a7748ac 100644 --- a/service/matching/workers/registry.go +++ b/service/matching/workers/registry.go @@ -1,6 +1,7 @@ package workers import ( + commonpb "go.temporal.io/api/common/v1" workerpb "go.temporal.io/api/worker/v1" "go.temporal.io/server/common/namespace" ) @@ -21,7 +22,7 @@ type ( } Registry interface { - RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat) + RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat) ListWorkers(nsID namespace.ID, params ListWorkersParams) (ListWorkersResponse, error) DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error) } diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index 593e5d8aa5..f0f35aac6c 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -10,9 +10,11 @@ import ( "sync/atomic" "time" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" workerpb "go.temporal.io/api/worker/v1" + "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -82,7 +84,7 @@ func newBucket() *bucket { // upsertHeartbeats inserts or refreshes a WorkerHeartbeat under the given namespace. // Returns the count of added and removed entries separately. // Workers with WORKER_STATUS_SHUTDOWN are immediately removed from the registry. -func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) { +func (b *bucket) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) { now := time.Now() b.mu.Lock() @@ -107,7 +109,7 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work continue } - isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue()) + isSystemWorker := isSystemWorker(principal, hb.GetTaskQueue()) // Normal upsert if e, exists := mp[key]; exists { @@ -257,9 +259,9 @@ func (m *registryImpl) getBucket(nsID namespace.ID) *bucket { // upsertHeartbeat records or refreshes a WorkerHeartbeat under the given namespace. // New entries increment the global counter. -func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) { +func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) { b := m.getBucket(nsID) - added, removed := b.upsertHeartbeats(nsID, heartbeats) + added, removed := b.upsertHeartbeats(nsID, principal, heartbeats) m.total.Add(added - removed) if added > 0 { metrics.WorkerRegistryWorkersAdded.With(m.metricsHandler).Record(added) @@ -375,8 +377,8 @@ func (m *registryImpl) Stop() { close(m.quit) } -func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat) { - m.upsertHeartbeats(nsID, workerHeartbeat) +func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat) { + m.upsertHeartbeats(nsID, principal, workerHeartbeat) m.metricsEmitter.emit(nsID, nsName, workerHeartbeat) } @@ -477,3 +479,14 @@ func (m *registryImpl) DescribeWorker(nsID namespace.ID, workerInstanceKey strin } return b.getWorkerHeartbeat(nsID, workerInstanceKey) } + +// isSystemWorker determines if a worker is a system worker. +// If a principal is available, it checks whether the principal identifies +// the Temporal server itself (type="temporal", name="internal"). Otherwise, +// it falls back to checking the task queue name prefix. +func isSystemWorker(principal *commonpb.Principal, taskQueue string) bool { + if principal != nil { + return principal.GetType() == authorization.InternalPrincipalType && principal.GetName() == authorization.InternalPrincipalName + } + return primitives.IsInternalTaskQueue(taskQueue) +} diff --git a/service/matching/workers/registry_impl_test.go b/service/matching/workers/registry_impl_test.go index 22c81acc9a..69ef63b331 100644 --- a/service/matching/workers/registry_impl_test.go +++ b/service/matching/workers/registry_impl_test.go @@ -9,8 +9,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" workerpb "go.temporal.io/api/worker/v1" + "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" @@ -47,7 +49,7 @@ func TestUpdateAndListNamespace(t *testing.T) { // Add some heartbeats hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING} hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING} - m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2}) + m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb1, hb2}) list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) // Order is not guaranteed; check contents by keys @@ -92,7 +94,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) { // Add two running workers hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_RUNNING} hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker2", Status: enumspb.WORKER_STATUS_RUNNING} - m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2}) + m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb1, hb2}) // Verify both workers are registered list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) @@ -101,7 +103,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) { // Worker1 sends shutdown status hbShutdown := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "worker1", Status: enumspb.WORKER_STATUS_SHUTDOWN} - m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown}) + m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hbShutdown}) // Verify only worker1 is removed, worker2 remains list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) @@ -139,7 +141,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) { // Send shutdown for non-existent worker - should be a no-op hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "unknown", Status: enumspb.WORKER_STATUS_SHUTDOWN} - m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) // Verify nothing happened list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) @@ -165,7 +167,7 @@ func TestListNamespacePredicate(t *testing.T) { for i := 1; i <= 5; i++ { key := fmt.Sprintf("key%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } // Table-driven tests for predicates @@ -191,7 +193,7 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) { m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) defer m.Stop() - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{ + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "user-1", TaskQueue: "my-queue"}, {WorkerInstanceKey: "user-2", TaskQueue: "my-queue"}, {WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"}, @@ -211,6 +213,65 @@ func TestFilterWorkersExcludesSystemWorkers(t *testing.T) { }) } +func TestIsSystemWorker(t *testing.T) { + t.Run("principal with type temporal marks as system worker", func(t *testing.T) { + m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + defer m.Stop() + + principal := &commonpb.Principal{Type: authorization.InternalPrincipalType, Name: authorization.InternalPrincipalName} + m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "sys-worker", TaskQueue: "any-queue"}, + }) + + list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue) + require.Empty(t, list, "worker with temporal principal should be excluded") + + list = m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 1) + }) + + t.Run("principal with type temporal but non-internal name is not system worker", func(t *testing.T) { + m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + defer m.Stop() + + principal := &commonpb.Principal{Type: authorization.InternalPrincipalType, Name: "other"} + m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "worker", TaskQueue: "any-queue"}, + }) + + list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 1, "worker with non-internal name should not be excluded") + }) + + t.Run("principal with non-temporal type is not system worker", func(t *testing.T) { + m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + defer m.Stop() + + principal := &commonpb.Principal{Type: "user", Name: "alice"} + m.upsertHeartbeats("ns", principal, []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "user-worker", TaskQueue: "temporal-sys-per-ns-tq"}, + }) + + // Even though task queue has system prefix, principal says it's a user + list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 1, "worker with user principal should not be excluded") + }) + + t.Run("nil principal falls back to task queue prefix", func(t *testing.T) { + m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + defer m.Stop() + + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "sys-worker", TaskQueue: "temporal-sys-per-ns-tq"}, + {WorkerInstanceKey: "user-worker", TaskQueue: "my-queue"}, + }) + + list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 1) + require.Equal(t, "user-worker", list[0].WorkerInstanceKey) + }) +} + func TestEvictByTTL(t *testing.T) { // Use capture handler to verify metrics captureHandler := metricstest.NewCaptureHandler() @@ -231,7 +292,7 @@ func TestEvictByTTL(t *testing.T) { defer m.Stop() hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "oldWorker"} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) // Manually move beyond TTL b := m.getBucket("ns") @@ -279,7 +340,7 @@ func TestEvictByCapacity(t *testing.T) { for i := 1; i <= 5; i++ { key := fmt.Sprintf("cap%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } // All entries have lastSeen.Before(now) when MinEvictAge=0, so eligible @@ -337,7 +398,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) { for i := 1; i <= 3; i++ { key := fmt.Sprintf("worker%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } // Verify we're over capacity @@ -389,7 +450,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) { for i := 1; i <= 3; i++ { key := fmt.Sprintf("worker%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } // Virtual time advance - instant with synctest! @@ -439,14 +500,14 @@ func TestMultipleNamespaces(t *testing.T) { {WorkerInstanceKey: "ns1-worker2", TaskQueue: "queue1"}, {WorkerInstanceKey: "ns1-worker3", TaskQueue: "queue2"}, } - m.upsertHeartbeats("namespace1", ns1Workers) + m.upsertHeartbeats("namespace1", nil /* principal */, ns1Workers) // Add 2 workers to namespace2 ns2Workers := []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "ns2-worker1", TaskQueue: "queue3"}, {WorkerInstanceKey: "ns2-worker2", TaskQueue: "queue3"}, } - m.upsertHeartbeats("namespace2", ns2Workers) + m.upsertHeartbeats("namespace2", nil /* principal */, ns2Workers) // Verify functional behavior first ns1List := m.filterWorkers("namespace1", true /* includeSystemWorkers */, alwaysTrue) @@ -504,7 +565,7 @@ func TestEvictLoopRecordsUtilizationMetric(t *testing.T) { for i := 1; i <= 3; i++ { key := fmt.Sprintf("worker%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key} - m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("ns", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } // Verify initial state @@ -548,7 +609,7 @@ func BenchmarkUpdate(b *testing.B) { hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "benchWorker"} b.ResetTimer() for i := 0; i < b.N; i++ { - m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("benchNs", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } } @@ -569,7 +630,7 @@ func BenchmarkListNamespace(b *testing.B) { for i := range 1000 { key := fmt.Sprintf("worker%d", i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key} - m.upsertHeartbeats("benchNs", []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats("benchNs", nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -604,7 +665,7 @@ func BenchmarkRandomUpdate(b *testing.B) { for i := range totalHeartbeats { key := fmt.Sprintf("%s-worker%d", ns, i) hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key, CurrentStickyCacheSize: int32(i)} - m.upsertHeartbeats(ns, []*workerpb.WorkerHeartbeat{hb}) + m.upsertHeartbeats(ns, nil /* principal */, []*workerpb.WorkerHeartbeat{hb}) pairs = append(pairs, pair{ns: ns, hb: hb}) } } @@ -613,7 +674,7 @@ func BenchmarkRandomUpdate(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { p := pairs[r.Intn(len(pairs))] - m.upsertHeartbeats(p.ns, []*workerpb.WorkerHeartbeat{p.hb}) + m.upsertHeartbeats(p.ns, nil /* principal */, []*workerpb.WorkerHeartbeat{p.hb}) } } @@ -662,7 +723,7 @@ func TestActivitySlotsMetric(t *testing.T) { testNamespace := tv.NamespaceID() testNamespaceName := namespace.Name(testNamespace + "_name") - m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3}) + m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3}) // Verify activity slots metrics snapshot := capture.Snapshot() @@ -739,7 +800,7 @@ func TestPluginMetricsExported(t *testing.T) { testNamespace := tv.NamespaceID() testNamespaceName := namespace.Name(testNamespace + "_name") - m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3}) + m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1, worker2, worker3}) // Verify plugin metrics - should have exactly 3 recordings despite plugin-a being in both workers snapshot := capture.Snapshot() @@ -801,7 +862,7 @@ func TestPluginMetricsDisabled(t *testing.T) { // Upsert heartbeats for test namespace testNamespace := tv.NamespaceID() testNamespaceName := namespace.Name(testNamespace + "_name") - m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, []*workerpb.WorkerHeartbeat{worker1}) + m.RecordWorkerHeartbeats(testNamespace, testNamespaceName, nil /* principal */, []*workerpb.WorkerHeartbeat{worker1}) // Verify no plugin metrics are recorded when disabled snapshot := capture.Snapshot() diff --git a/service/matching/workers/registry_test.go b/service/matching/workers/registry_test.go index f24d564b1a..ba7dfba404 100644 --- a/service/matching/workers/registry_test.go +++ b/service/matching/workers/registry_test.go @@ -60,7 +60,7 @@ func TestRegistryImpl_RecordWorkerHeartbeat(t *testing.T) { { name: "record worker in existing namespace", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "existing-worker", }}) }, @@ -74,7 +74,7 @@ func TestRegistryImpl_RecordWorkerHeartbeat(t *testing.T) { { name: "update existing worker", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", TaskQueue: "tq1", }}) @@ -97,7 +97,7 @@ func TestRegistryImpl_RecordWorkerHeartbeat(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) tt.setup(r) - r.RecordWorkerHeartbeats(tt.nsID, namespace.Name(tt.nsID+"_name"), []*workerpb.WorkerHeartbeat{tt.workerHeartbeat}) + r.RecordWorkerHeartbeats(tt.nsID, namespace.Name(tt.nsID+"_name"), nil /* principal */, []*workerpb.WorkerHeartbeat{tt.workerHeartbeat}) // Check if namespace exists nsBuket := r.getBucket(tt.nsID) @@ -145,7 +145,7 @@ func TestRegistryImpl_ListWorkers(t *testing.T) { { name: "list single worker", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) }, @@ -156,13 +156,13 @@ func TestRegistryImpl_ListWorkers(t *testing.T) { { name: "list multiple workers", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker2", }}) - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker3", }}) }, @@ -174,11 +174,11 @@ func TestRegistryImpl_ListWorkers(t *testing.T) { name: "list workers from specific namespace only", setup: func(r *registryImpl) { // Setup namespace1 - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) // Setup namespace2 - r.upsertHeartbeats("namespace2", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace2", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker2", }}) }, @@ -235,7 +235,7 @@ func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) { { name: "valid query - basic filtering", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker1", TaskQueue: "queue1"}, {WorkerInstanceKey: "worker2", TaskQueue: "queue2"}, }) @@ -248,7 +248,7 @@ func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) { { name: "valid compound query - multiple conditions", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker1", TaskQueue: "queue1"}, {WorkerInstanceKey: "worker2", TaskQueue: "queue2"}, }) @@ -261,7 +261,7 @@ func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) { { name: "valid query - no matches", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker1", TaskQueue: "queue1"}, }) }, @@ -273,7 +273,7 @@ func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) { { name: "invalid query - malformed SQL", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker1"}, }) }, @@ -295,11 +295,11 @@ func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) { name: "query returns requested namespace only", setup: func(r *registryImpl) { // Add workers to namespace1 - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker1", TaskQueue: "queue"}, }) // Add workers to namespace2 - r.upsertHeartbeats("namespace2", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("namespace2", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker2", TaskQueue: "queue"}, }) }, @@ -366,7 +366,7 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { { name: "list empty worker", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) }, @@ -377,7 +377,7 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { { name: "list single worker, doesn't exist", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) }, @@ -388,7 +388,7 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { { name: "list single worker", setup: func(r *registryImpl) { - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) }, @@ -399,11 +399,11 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) { name: "list workers from specific namespace only", setup: func(r *registryImpl) { // Setup namespace1 - r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace1", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker1", }}) // Setup namespace2 - r.upsertHeartbeats("namespace2", []*workerpb.WorkerHeartbeat{{ + r.upsertHeartbeats("namespace2", nil /* principal */, []*workerpb.WorkerHeartbeat{{ WorkerInstanceKey: "worker2", }}) }, @@ -434,7 +434,7 @@ func TestRegistryImpl_ListWorkersPagination(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) // Add 5 workers in non-sorted order to verify sorting works - r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker-c"}, {WorkerInstanceKey: "worker-a"}, {WorkerInstanceKey: "worker-e"}, @@ -529,7 +529,7 @@ func TestRegistryImpl_ListWorkersPaginationWithDeletedCursor(t *testing.T) { func TestRegistryImpl_ListWorkersNoPagination(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) - r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker-a"}, {WorkerInstanceKey: "worker-b"}, {WorkerInstanceKey: "worker-c"}, @@ -545,7 +545,7 @@ func TestRegistryImpl_ListWorkersNoPagination(t *testing.T) { func TestRegistryImpl_ListWorkersInvalidPageToken(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) - r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "worker-a"}, }) @@ -558,7 +558,7 @@ func TestRegistryImpl_ListWorkersExcludesSystemWorkers(t *testing.T) { r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) // Add workers on a user task queue and a system (internal) task queue. - r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{ + r.upsertHeartbeats("ns1", nil /* principal */, []*workerpb.WorkerHeartbeat{ {WorkerInstanceKey: "user-worker-1", TaskQueue: "my-queue"}, {WorkerInstanceKey: "user-worker-2", TaskQueue: "my-queue"}, {WorkerInstanceKey: "sys-worker-1", TaskQueue: "temporal-sys-per-ns-tq"},