Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bb96ab1
Exclude system workers from ListWorkers by default
rkannan82 Apr 29, 2026
ded6b71
Add unit test for system worker filtering in ListWorkers
rkannan82 Apr 29, 2026
4cce906
Also exclude /temporal-sys/ path-style task queues (worker-commands)
rkannan82 Apr 30, 2026
7feb2df
Revert "Also exclude /temporal-sys/ path-style task queues (worker-co…
rkannan82 Apr 30, 2026
d360bfb
Add pagination test for system worker filtering
rkannan82 Apr 30, 2026
093da36
Simplify pagination test to explicit per-page assertions
rkannan82 Apr 30, 2026
769af51
Merge branch 'main' into kannan/list-workers-exclude-system
rkannan82 Apr 30, 2026
5cce93d
Store isSystemWorker at upsert time instead of checking task queue na…
rkannan82 May 1, 2026
35df1fb
Rename isSystem to isSystemWorker and add filterWorkers test coverage
rkannan82 May 1, 2026
d7bfe94
Merge branch 'main' into kannan/list-workers-exclude-system
rkannan82 May 1, 2026
6342296
Bump go.temporal.io/api to pick up include_system_workers field
rkannan82 May 1, 2026
8ef1141
Fix isSystemWorkerWorker typo in entry struct
rkannan82 May 1, 2026
1ae9010
Use principal to classify system workers with task queue prefix fallback
rkannan82 May 1, 2026
30a8a36
Check both principal type and name for system worker classification
rkannan82 May 1, 2026
63af273
Fix import ordering and whitespace formatting
rkannan82 May 1, 2026
95bf65c
Merge branch 'main' into kannan/list-workers-exclude-system
rkannan82 May 1, 2026
3473528
Merge remote-tracking branch 'origin/main' into kannan/list-workers-e…
rkannan82 May 2, 2026
0450481
Fix comment spacing in test file
rkannan82 May 2, 2026
cbb8611
Merge remote-tracking branch 'origin/kannan/list-workers-exclude-syst…
rkannan82 May 2, 2026
7ff6ef0
Merge remote-tracking branch 'origin/main' into kannan/system-worker-…
rkannan82 May 2, 2026
8c0ce17
Fix comment spacing in registry_test.go
rkannan82 May 2, 2026
98df0cb
Fix comment spacing and extra blank line in tests
rkannan82 May 2, 2026
5dcd1f5
Rename isSystemPrincipal to isSystemWorker
rkannan82 May 2, 2026
382b850
Rename TestIsSystemPrincipal to TestIsSystemWorker
rkannan82 May 2, 2026
ad52b6a
Fix import grouping (gci)
rkannan82 May 2, 2026
2531b86
Fix import grouping (gci)
rkannan82 May 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/primitives/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package primitives

import (
"fmt"
"strings"

"go.temporal.io/api/serviceerror"
)
Expand All @@ -17,6 +18,13 @@ const (
DLQActivityTQ = "temporal-sys-dlq-activity-tq"
)

const internalTaskQueuePrefix = "temporal-sys-"

// IsInternalTaskQueue returns true if the task queue name belongs to an internal system task queue.
func IsInternalTaskQueue(taskQueue string) bool {
return strings.HasPrefix(taskQueue, internalTaskQueuePrefix)
}

func IsInternalPerNsTaskQueue(taskQueue string) bool {
return taskQueue == PerNSWorkerTaskQueue
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
go.temporal.io/sdk v1.41.1
go.uber.org/fx v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4 h1:AwdQ+0+voxlyZZ54q88ezX0Zzxz2s9H+oSmCf4byy5k=
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683 h1:GtwQjX9hN0pRjuneBpl/xvcu9Xl9llAt4GjKrlpP0sg=
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=
Expand Down
12 changes: 7 additions & 5 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,13 @@ func (h *Handler) ListNexusEndpoints(ctx context.Context, request *matchingservi

// RecordWorkerHeartbeat receive heartbeat request from the worker.
func (h *Handler) RecordWorkerHeartbeat(
_ context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
ctx context.Context, request *matchingservice.RecordWorkerHeartbeatRequest,
) (*matchingservice.RecordWorkerHeartbeatResponse, error) {
nsID := namespace.ID(request.GetNamespaceId())
nsName := h.namespaceName(nsID)
principal := headers.GetPrincipal(ctx)

h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, request.GetHeartbeartRequest().GetWorkerHeartbeat())
h.workersRegistry.RecordWorkerHeartbeats(nsID, nsName, principal, request.GetHeartbeartRequest().GetWorkerHeartbeat())
return &matchingservice.RecordWorkerHeartbeatResponse{}, nil
}

Expand All @@ -612,9 +613,10 @@ func (h *Handler) ListWorkers(
nsID := namespace.ID(request.GetNamespaceId())
listRequest := request.GetListRequest()
resp, err := h.workersRegistry.ListWorkers(nsID, workers.ListWorkersParams{
Query: listRequest.GetQuery(),
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.GetNextPageToken(),
Query: listRequest.GetQuery(),
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.GetNextPageToken(),
IncludeSystemWorkers: listRequest.GetIncludeSystemWorkers(),
})
if err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions service/matching/workers/registry.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package workers

import (
commonpb "go.temporal.io/api/common/v1"
workerpb "go.temporal.io/api/worker/v1"
"go.temporal.io/server/common/namespace"
)

type (
// ListWorkersParams contains parameters for listing workers.
ListWorkersParams struct {
Query string
PageSize int
NextPageToken []byte // Opaque token from a previous response to resume pagination.
Query string
PageSize int
NextPageToken []byte // Opaque token from a previous response to resume pagination.
IncludeSystemWorkers bool
}

// ListWorkersResponse contains the result of listing workers.
Expand All @@ -20,7 +22,7 @@ type (
}

Registry interface {
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat)
RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat)
ListWorkers(nsID namespace.ID, params ListWorkersParams) (ListWorkersResponse, error)
DescribeWorker(nsID namespace.ID, workerInstanceKey string) (*workerpb.WorkerHeartbeat, error)
}
Expand Down
57 changes: 41 additions & 16 deletions service/matching/workers/registry_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"sync/atomic"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workerpb "go.temporal.io/api/worker/v1"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand All @@ -29,10 +31,11 @@ type listWorkersPageToken struct {
type (
// entry wraps a WorkerHeartbeat along with its namespace and eviction metadata.
entry struct {
nsID namespace.ID
hb *workerpb.WorkerHeartbeat
lastSeen time.Time
elem *list.Element
nsID namespace.ID
hb *workerpb.WorkerHeartbeat
lastSeen time.Time
elem *list.Element
isSystemWorker bool
}
// bucket holds part of the keyspace: a map from namespace → (map of instanceKey → entry),
// plus a recency list for eviction.
Expand Down Expand Up @@ -80,7 +83,7 @@ func newBucket() *bucket {
// upsertHeartbeats inserts or refreshes a WorkerHeartbeat under the given namespace.
// Returns the count of added and removed entries separately.
// Workers with WORKER_STATUS_SHUTDOWN are immediately removed from the registry.
func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
func (b *bucket) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) (added int64, removed int64) {
now := time.Now()

b.mu.Lock()
Expand All @@ -105,16 +108,20 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
continue
}

isSystemWorker := isSystemPrincipal(principal, hb.GetTaskQueue())

// Normal upsert
if e, exists := mp[key]; exists {
e.hb = hb
e.lastSeen = now
e.isSystemWorker = isSystemWorker
b.order.MoveToBack(e.elem)
} else {
e = &entry{
nsID: nsID,
hb: hb,
lastSeen: now,
nsID: nsID,
hb: hb,
lastSeen: now,
isSystemWorker: isSystemWorker,
}
e.elem = b.order.PushBack(e)
mp[key] = e
Expand All @@ -126,9 +133,11 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
}

// filterWorkers returns all WorkerHeartbeats in a namespace
// for which predicate(hb) returns true.
// for which predicate(hb) returns true. System workers are excluded
// unless includeSystemWorkers is true.
func (b *bucket) filterWorkers(
nsID namespace.ID,
includeSystemWorkers bool,
predicate func(*workerpb.WorkerHeartbeat) bool,
) []*workerpb.WorkerHeartbeat {
b.mu.Lock()
Expand All @@ -140,6 +149,9 @@ func (b *bucket) filterWorkers(
}
out := make([]*workerpb.WorkerHeartbeat, 0, len(mp))
for _, e := range mp {
if !includeSystemWorkers && e.isSystemWorker {
continue
}
if predicate(e.hb) {
out = append(out, e.hb)
}
Expand Down Expand Up @@ -246,9 +258,9 @@ func (m *registryImpl) getBucket(nsID namespace.ID) *bucket {

// upsertHeartbeat records or refreshes a WorkerHeartbeat under the given namespace.
// New entries increment the global counter.
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.WorkerHeartbeat) {
func (m *registryImpl) upsertHeartbeats(nsID namespace.ID, principal *commonpb.Principal, heartbeats []*workerpb.WorkerHeartbeat) {
b := m.getBucket(nsID)
added, removed := b.upsertHeartbeats(nsID, heartbeats)
added, removed := b.upsertHeartbeats(nsID, principal, heartbeats)
m.total.Add(added - removed)
if added > 0 {
metrics.WorkerRegistryWorkersAdded.With(m.metricsHandler).Record(added)
Expand Down Expand Up @@ -280,17 +292,19 @@ func (m *registryImpl) recordEvictionMetric() {
}

// filterWorkers returns all WorkerHeartbeats in a namespace
// for which predicate(hb) returns true.
// for which predicate(hb) returns true. System workers are excluded
// unless includeSystemWorkers is true.
func (m *registryImpl) filterWorkers(
nsID namespace.ID,
includeSystemWorkers bool,
predicate func(*workerpb.WorkerHeartbeat) bool,
) []*workerpb.WorkerHeartbeat {
b := m.getBucket(nsID)

if b == nil {
return nil
}
return b.filterWorkers(nsID, predicate)
return b.filterWorkers(nsID, includeSystemWorkers, predicate)
}

// evictLoop periodically triggers TTL and capacity-based eviction.
Expand Down Expand Up @@ -362,8 +376,8 @@ func (m *registryImpl) Stop() {
close(m.quit)
}

func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, workerHeartbeat []*workerpb.WorkerHeartbeat) {
m.upsertHeartbeats(nsID, workerHeartbeat)
func (m *registryImpl) RecordWorkerHeartbeats(nsID namespace.ID, nsName namespace.Name, principal *commonpb.Principal, workerHeartbeat []*workerpb.WorkerHeartbeat) {
m.upsertHeartbeats(nsID, principal, workerHeartbeat)
m.metricsEmitter.emit(nsID, nsName, workerHeartbeat)
}

Expand All @@ -384,7 +398,7 @@ func (m *registryImpl) ListWorkers(nsID namespace.ID, params ListWorkersParams)
}

// Get all matching workers and paginate
workers := m.filterWorkers(nsID, predicate)
workers := m.filterWorkers(nsID, params.IncludeSystemWorkers, predicate)
return paginateWorkers(workers, params.PageSize, params.NextPageToken)
}

Expand Down Expand Up @@ -464,3 +478,14 @@ func (m *registryImpl) DescribeWorker(nsID namespace.ID, workerInstanceKey strin
}
return b.getWorkerHeartbeat(nsID, workerInstanceKey)
}

// isSystemPrincipal determines if a worker is a system worker.
// If a principal is available, it checks whether the principal identifies
// the Temporal server itself (type="temporal", name="internal"). Otherwise,
// it falls back to checking the task queue name prefix.
func isSystemPrincipal(principal *commonpb.Principal, taskQueue string) bool {
if principal != nil {
return principal.GetType() == "temporal" && principal.GetName() == "internal"
Comment thread
rkannan82 marked this conversation as resolved.
}
return primitives.IsInternalTaskQueue(taskQueue)
}
Loading
Loading