diff --git a/common/primitives/task_queues.go b/common/primitives/task_queues.go index 4a2910d4dcb..9b88f9733de 100644 --- a/common/primitives/task_queues.go +++ b/common/primitives/task_queues.go @@ -2,6 +2,7 @@ package primitives import ( "fmt" + "strings" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -33,6 +34,13 @@ func IsInternalTaskQueueKind(kind enumspb.TaskQueueKind) bool { return false } +const internalTaskQueuePrefix = "temporal-sys-" + +// IsInternalTaskQueue returns true if the task queue name belongs to an internal system task queue. +func IsInternalTaskQueue(taskQueue string) bool { + return strings.HasPrefix(taskQueue, internalTaskQueuePrefix) +} + func IsInternalPerNsTaskQueue(taskQueue string) bool { return taskQueue == PerNSWorkerTaskQueue } diff --git a/service/matching/handler.go b/service/matching/handler.go index 53575447db2..8afa8ac8b7f 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -606,9 +606,10 @@ func (h *Handler) ListWorkers( nsID := namespace.ID(request.GetNamespaceId()) listRequest := request.GetListRequest() resp, err := h.workersRegistry.ListWorkers(nsID, workers.ListWorkersParams{ - Query: listRequest.GetQuery(), - PageSize: int(listRequest.GetPageSize()), - NextPageToken: listRequest.GetNextPageToken(), + Query: listRequest.GetQuery(), + PageSize: int(listRequest.GetPageSize()), + NextPageToken: listRequest.GetNextPageToken(), + IncludeSystemWorkers: listRequest.GetIncludeSystemWorkers(), }) if err != nil { return nil, err diff --git a/service/matching/workers/registry.go b/service/matching/workers/registry.go index 8caeee08579..1887ffc469b 100644 --- a/service/matching/workers/registry.go +++ b/service/matching/workers/registry.go @@ -8,9 +8,10 @@ import ( type ( // ListWorkersParams contains parameters for listing workers. ListWorkersParams struct { - Query string - PageSize int - NextPageToken []byte // Opaque token from a previous response to resume pagination. + Query string + PageSize int + NextPageToken []byte // Opaque token from a previous response to resume pagination. + IncludeSystemWorkers bool } // ListWorkersResponse contains the result of listing workers. diff --git a/service/matching/workers/registry_impl.go b/service/matching/workers/registry_impl.go index 8ad8c175be3..593e5d8aa56 100644 --- a/service/matching/workers/registry_impl.go +++ b/service/matching/workers/registry_impl.go @@ -16,6 +16,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/primitives" "go.uber.org/fx" ) @@ -29,10 +30,11 @@ type listWorkersPageToken struct { type ( // entry wraps a WorkerHeartbeat along with its namespace and eviction metadata. entry struct { - nsID namespace.ID - hb *workerpb.WorkerHeartbeat - lastSeen time.Time - elem *list.Element + nsID namespace.ID + hb *workerpb.WorkerHeartbeat + lastSeen time.Time + elem *list.Element + isSystemWorker bool } // bucket holds part of the keyspace: a map from namespace → (map of instanceKey → entry), // plus a recency list for eviction. @@ -105,16 +107,20 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work continue } + isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue()) + // Normal upsert if e, exists := mp[key]; exists { e.hb = hb e.lastSeen = now + e.isSystemWorker = isSystemWorker b.order.MoveToBack(e.elem) } else { e = &entry{ - nsID: nsID, - hb: hb, - lastSeen: now, + nsID: nsID, + hb: hb, + lastSeen: now, + isSystemWorker: isSystemWorker, } e.elem = b.order.PushBack(e) mp[key] = e @@ -126,9 +132,11 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work } // filterWorkers returns all WorkerHeartbeats in a namespace -// for which predicate(hb) returns true. +// for which predicate(hb) returns true. System workers are excluded +// unless includeSystemWorkers is true. func (b *bucket) filterWorkers( nsID namespace.ID, + includeSystemWorkers bool, predicate func(*workerpb.WorkerHeartbeat) bool, ) []*workerpb.WorkerHeartbeat { b.mu.Lock() @@ -140,6 +148,9 @@ func (b *bucket) filterWorkers( } out := make([]*workerpb.WorkerHeartbeat, 0, len(mp)) for _, e := range mp { + if !includeSystemWorkers && e.isSystemWorker { + continue + } if predicate(e.hb) { out = append(out, e.hb) } @@ -280,9 +291,11 @@ func (m *registryImpl) recordEvictionMetric() { } // filterWorkers returns all WorkerHeartbeats in a namespace -// for which predicate(hb) returns true. +// for which predicate(hb) returns true. System workers are excluded +// unless includeSystemWorkers is true. func (m *registryImpl) filterWorkers( nsID namespace.ID, + includeSystemWorkers bool, predicate func(*workerpb.WorkerHeartbeat) bool, ) []*workerpb.WorkerHeartbeat { b := m.getBucket(nsID) @@ -290,7 +303,7 @@ func (m *registryImpl) filterWorkers( if b == nil { return nil } - return b.filterWorkers(nsID, predicate) + return b.filterWorkers(nsID, includeSystemWorkers, predicate) } // evictLoop periodically triggers TTL and capacity-based eviction. @@ -384,7 +397,7 @@ func (m *registryImpl) ListWorkers(nsID namespace.ID, params ListWorkersParams) } // Get all matching workers and paginate - workers := m.filterWorkers(nsID, predicate) + workers := m.filterWorkers(nsID, params.IncludeSystemWorkers, predicate) return paginateWorkers(workers, params.PageSize, params.NextPageToken) } diff --git a/service/matching/workers/registry_impl_test.go b/service/matching/workers/registry_impl_test.go index 0829d6722df..22c81acc9aa 100644 --- a/service/matching/workers/registry_impl_test.go +++ b/service/matching/workers/registry_impl_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" enumspb "go.temporal.io/api/enums/v1" workerpb "go.temporal.io/api/worker/v1" "go.temporal.io/server/common/dynamicconfig" @@ -40,7 +41,7 @@ func TestUpdateAndListNamespace(t *testing.T) { defer m.Stop() // No entries initially - list := m.filterWorkers("ns1", alwaysTrue) + list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) assert.Empty(t, list, "expected empty list before updates") // Add some heartbeats @@ -48,7 +49,7 @@ func TestUpdateAndListNamespace(t *testing.T) { hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING} m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2}) - list = m.filterWorkers("ns1", alwaysTrue) + list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) // Order is not guaranteed; check contents by keys keys := []string{list[0].WorkerInstanceKey, list[1].WorkerInstanceKey} assert.Contains(t, keys, "workerA") @@ -94,7 +95,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) { m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2}) // Verify both workers are registered - list := m.filterWorkers("ns1", alwaysTrue) + list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, list, 2, "both workers should be registered") assert.Equal(t, int64(2), m.total.Load(), "total should be 2") @@ -103,7 +104,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) { m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown}) // Verify only worker1 is removed, worker2 remains - list = m.filterWorkers("ns1", alwaysTrue) + list = m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, list, 1, "only one worker should remain") assert.Equal(t, "worker2", list[0].WorkerInstanceKey, "worker2 should remain") assert.Equal(t, int64(1), m.total.Load(), "total should be 1 after shutdown") @@ -141,7 +142,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) { m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb}) // Verify nothing happened - list := m.filterWorkers("ns1", alwaysTrue) + list := m.filterWorkers("ns1", true /* includeSystemWorkers */, alwaysTrue) assert.Empty(t, list, "no workers should exist") assert.Zero(t, m.total.Load(), "total should remain 0") } @@ -180,12 +181,36 @@ func TestListNamespacePredicate(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - list := m.filterWorkers("ns", tc.pred) + list := m.filterWorkers("ns", true /* includeSystemWorkers */, tc.pred) assert.Len(t, list, tc.wantCount) }) } } +func TestFilterWorkersExcludesSystemWorkers(t *testing.T) { + m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + defer m.Stop() + + m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "user-1", TaskQueue: "my-queue"}, + {WorkerInstanceKey: "user-2", TaskQueue: "my-queue"}, + {WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"}, + }) + + t.Run("includeSystemWorkers=true returns all", func(t *testing.T) { + list := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 3) + }) + + t.Run("includeSystemWorkers=false excludes system workers", func(t *testing.T) { + list := m.filterWorkers("ns", false /* includeSystemWorkers */, alwaysTrue) + require.Len(t, list, 2) + for _, w := range list { + require.NotEqual(t, "sys-1", w.WorkerInstanceKey) + } + }) +} + func TestEvictByTTL(t *testing.T) { // Use capture handler to verify metrics captureHandler := metricstest.NewCaptureHandler() @@ -216,7 +241,7 @@ func TestEvictByTTL(t *testing.T) { // Perform eviction m.evictByTTL() - list := m.filterWorkers("ns", alwaysTrue) + list := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) assert.Empty(t, list, "entry should be evicted by TTL") assert.Zero(t, m.total.Load(), "total counter should be decremented") @@ -261,7 +286,7 @@ func TestEvictByCapacity(t *testing.T) { m.evictByCapacity() // Ensure we evicted down to maxItems - remaining := m.filterWorkers("ns", alwaysTrue) + remaining := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, remaining, int(maxItems), "should evict down to maxItems") assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total counter should not exceed maxItems") @@ -323,7 +348,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) { m.evictByCapacity() // All entries should still be there (protected by minEvictAge) - workers := m.filterWorkers("ns", alwaysTrue) + workers := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, workers, 3, "all entries should be protected by minEvictAge") assert.Equal(t, int64(3), m.total.Load(), "should still exceed maxItems due to protection") @@ -374,7 +399,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) { m.evictByCapacity() // Should have evicted down to maxItems - workers := m.filterWorkers("ns", alwaysTrue) + workers := m.filterWorkers("ns", true /* includeSystemWorkers */, alwaysTrue) assert.LessOrEqual(t, len(workers), int(maxItems), "should evict down to maxItems") assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total should be within limits") @@ -424,10 +449,10 @@ func TestMultipleNamespaces(t *testing.T) { m.upsertHeartbeats("namespace2", ns2Workers) // Verify functional behavior first - ns1List := m.filterWorkers("namespace1", alwaysTrue) + ns1List := m.filterWorkers("namespace1", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, ns1List, 3, "namespace1 should have 3 workers") - ns2List := m.filterWorkers("namespace2", alwaysTrue) + ns2List := m.filterWorkers("namespace2", true /* includeSystemWorkers */, alwaysTrue) assert.Len(t, ns2List, 2, "namespace2 should have 2 workers") assert.Equal(t, int64(5), m.total.Load(), "total should be 5 workers across namespaces") @@ -548,7 +573,7 @@ func BenchmarkListNamespace(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - _ = m.filterWorkers("benchNs", alwaysTrue) + _ = m.filterWorkers("benchNs", true /* includeSystemWorkers */, alwaysTrue) } } diff --git a/service/matching/workers/registry_test.go b/service/matching/workers/registry_test.go index 93b874d4b7f..f24d564b1a6 100644 --- a/service/matching/workers/registry_test.go +++ b/service/matching/workers/registry_test.go @@ -554,6 +554,57 @@ func TestRegistryImpl_ListWorkersInvalidPageToken(t *testing.T) { assert.Contains(t, err.Error(), "invalid next_page_token") } +func TestRegistryImpl_ListWorkersExcludesSystemWorkers(t *testing.T) { + r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler)) + + // Add workers on a user task queue and a system (internal) task queue. + r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{ + {WorkerInstanceKey: "user-worker-1", TaskQueue: "my-queue"}, + {WorkerInstanceKey: "user-worker-2", TaskQueue: "my-queue"}, + {WorkerInstanceKey: "sys-worker-1", TaskQueue: "temporal-sys-per-ns-tq"}, + }) + + t.Run("excludes system workers by default", func(t *testing.T) { + resp, err := r.ListWorkers("ns1", ListWorkersParams{}) + require.NoError(t, err) + require.Len(t, resp.Workers, 2, "should only return user workers") + + workerKeys := make([]string, len(resp.Workers)) + for i, w := range resp.Workers { + workerKeys[i] = w.WorkerInstanceKey + } + require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2"}, workerKeys) + }) + + t.Run("includes system workers when requested", func(t *testing.T) { + resp, err := r.ListWorkers("ns1", ListWorkersParams{IncludeSystemWorkers: true}) + require.NoError(t, err) + require.Len(t, resp.Workers, 3, "should return all workers including system") + + workerKeys := make([]string, len(resp.Workers)) + for i, w := range resp.Workers { + workerKeys[i] = w.WorkerInstanceKey + } + require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2", "sys-worker-1"}, workerKeys) + }) + + t.Run("pagination excludes system workers from page counts", func(t *testing.T) { + // Page 1 (sorted: "user-worker-1" comes first) + resp1, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1}) + require.NoError(t, err) + require.Len(t, resp1.Workers, 1) + require.Equal(t, "user-worker-1", resp1.Workers[0].WorkerInstanceKey) + require.NotNil(t, resp1.NextPageToken) + + // Page 2 + resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1, NextPageToken: resp1.NextPageToken}) + require.NoError(t, err) + require.Len(t, resp2.Workers, 1) + require.Equal(t, "user-worker-2", resp2.Workers[0].WorkerInstanceKey) + require.Nil(t, resp2.NextPageToken) + }) +} + func TestRegistryImpl_RecordStorageDriverMetric(t *testing.T) { t.Run("disabled when ExternalPayloadsEnabled is false", func(t *testing.T) { captureHandler := metricstest.NewCaptureHandler()