Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/primitives/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package primitives

import (
"fmt"
"strings"

"go.temporal.io/api/serviceerror"
)
Expand All @@ -17,6 +18,13 @@ const (
DLQActivityTQ = "temporal-sys-dlq-activity-tq"
)

const internalTaskQueuePrefix = "temporal-sys-"

// IsInternalTaskQueue returns true if the task queue name belongs to an internal system task queue.
func IsInternalTaskQueue(taskQueue string) bool {
return strings.HasPrefix(taskQueue, internalTaskQueuePrefix)
}

func IsInternalPerNsTaskQueue(taskQueue string) bool {
return taskQueue == PerNSWorkerTaskQueue
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2
go.temporal.io/sdk v1.41.1
go.uber.org/fx v1.24.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0 h1:R
go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.3.0/go.mod h1:I89cynRj8y+383o7tEQVg2SVA6SRgDVIouWPUVXjx0U=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0 h1:CQvJSldHRUN6Z8jsUeYv8J0lXRvygALXIzsmAeCcZE0=
go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.3.0/go.mod h1:xSQ+mEfJe/GjK1LXEyVOoSI1N9JV9ZI923X5kup43W4=
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4 h1:AwdQ+0+voxlyZZ54q88ezX0Zzxz2s9H+oSmCf4byy5k=
go.temporal.io/api v1.62.12-0.20260428190948-a7b1d495e2e4/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683 h1:GtwQjX9hN0pRjuneBpl/xvcu9Xl9llAt4GjKrlpP0sg=
go.temporal.io/api v1.62.12-0.20260430203359-15c391664683/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ=
go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50=
go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA=
Expand Down
7 changes: 4 additions & 3 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,10 @@ func (h *Handler) ListWorkers(
nsID := namespace.ID(request.GetNamespaceId())
listRequest := request.GetListRequest()
resp, err := h.workersRegistry.ListWorkers(nsID, workers.ListWorkersParams{
Query: listRequest.GetQuery(),
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.GetNextPageToken(),
Query: listRequest.GetQuery(),
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.GetNextPageToken(),
IncludeSystemWorkers: listRequest.GetIncludeSystemWorkers(),
})
if err != nil {
return nil, err
Expand Down
7 changes: 4 additions & 3 deletions service/matching/workers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
type (
// ListWorkersParams contains parameters for listing workers.
ListWorkersParams struct {
Query string
PageSize int
NextPageToken []byte // Opaque token from a previous response to resume pagination.
Query string
PageSize int
NextPageToken []byte // Opaque token from a previous response to resume pagination.
IncludeSystemWorkers bool
}

// ListWorkersResponse contains the result of listing workers.
Expand Down
35 changes: 24 additions & 11 deletions service/matching/workers/registry_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives"
"go.uber.org/fx"
)

Expand All @@ -29,10 +30,11 @@ type listWorkersPageToken struct {
type (
// entry wraps a WorkerHeartbeat along with its namespace and eviction metadata.
entry struct {
nsID namespace.ID
hb *workerpb.WorkerHeartbeat
lastSeen time.Time
elem *list.Element
nsID namespace.ID
hb *workerpb.WorkerHeartbeat
lastSeen time.Time
elem *list.Element
isSystemWorker bool
}
// bucket holds part of the keyspace: a map from namespace → (map of instanceKey → entry),
// plus a recency list for eviction.
Expand Down Expand Up @@ -105,16 +107,20 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
continue
}

isSystemWorker := primitives.IsInternalTaskQueue(hb.GetTaskQueue())

// Normal upsert
if e, exists := mp[key]; exists {
e.hb = hb
e.lastSeen = now
e.isSystemWorker = isSystemWorker
b.order.MoveToBack(e.elem)
} else {
e = &entry{
nsID: nsID,
hb: hb,
lastSeen: now,
nsID: nsID,
hb: hb,
lastSeen: now,
isSystemWorker: isSystemWorker,
}
e.elem = b.order.PushBack(e)
mp[key] = e
Expand All @@ -126,9 +132,11 @@ func (b *bucket) upsertHeartbeats(nsID namespace.ID, heartbeats []*workerpb.Work
}

// filterWorkers returns all WorkerHeartbeats in a namespace
// for which predicate(hb) returns true.
// for which predicate(hb) returns true. System workers are excluded
// unless includeSystemWorkers is true.
func (b *bucket) filterWorkers(
nsID namespace.ID,
includeSystemWorkers bool,
predicate func(*workerpb.WorkerHeartbeat) bool,
) []*workerpb.WorkerHeartbeat {
b.mu.Lock()
Expand All @@ -140,6 +148,9 @@ func (b *bucket) filterWorkers(
}
out := make([]*workerpb.WorkerHeartbeat, 0, len(mp))
for _, e := range mp {
if !includeSystemWorkers && e.isSystemWorker {
continue
}
if predicate(e.hb) {
out = append(out, e.hb)
}
Expand Down Expand Up @@ -280,17 +291,19 @@ func (m *registryImpl) recordEvictionMetric() {
}

// filterWorkers returns all WorkerHeartbeats in a namespace
// for which predicate(hb) returns true.
// for which predicate(hb) returns true. System workers are excluded
// unless includeSystemWorkers is true.
func (m *registryImpl) filterWorkers(
nsID namespace.ID,
includeSystemWorkers bool,
predicate func(*workerpb.WorkerHeartbeat) bool,
) []*workerpb.WorkerHeartbeat {
b := m.getBucket(nsID)

if b == nil {
return nil
}
return b.filterWorkers(nsID, predicate)
return b.filterWorkers(nsID, includeSystemWorkers, predicate)
}

// evictLoop periodically triggers TTL and capacity-based eviction.
Expand Down Expand Up @@ -384,7 +397,7 @@ func (m *registryImpl) ListWorkers(nsID namespace.ID, params ListWorkersParams)
}

// Get all matching workers and paginate
workers := m.filterWorkers(nsID, predicate)
workers := m.filterWorkers(nsID, params.IncludeSystemWorkers, predicate)
return paginateWorkers(workers, params.PageSize, params.NextPageToken)
}

Expand Down
51 changes: 38 additions & 13 deletions service/matching/workers/registry_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
workerpb "go.temporal.io/api/worker/v1"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -40,15 +41,15 @@ func TestUpdateAndListNamespace(t *testing.T) {
defer m.Stop()

// No entries initially
list := m.filterWorkers("ns1", alwaysTrue)
list := m.filterWorkers("ns1", true /*includeSystemWorkers*/, alwaysTrue)
assert.Empty(t, list, "expected empty list before updates")

// Add some heartbeats
hb1 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerA", Status: enumspb.WORKER_STATUS_RUNNING}
hb2 := &workerpb.WorkerHeartbeat{WorkerInstanceKey: "workerB", Status: enumspb.WORKER_STATUS_RUNNING}
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})

list = m.filterWorkers("ns1", alwaysTrue)
list = m.filterWorkers("ns1", true /*includeSystemWorkers*/, alwaysTrue)
// Order is not guaranteed; check contents by keys
keys := []string{list[0].WorkerInstanceKey, list[1].WorkerInstanceKey}
assert.Contains(t, keys, "workerA")
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb1, hb2})

// Verify both workers are registered
list := m.filterWorkers("ns1", alwaysTrue)
list := m.filterWorkers("ns1", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, list, 2, "both workers should be registered")
assert.Equal(t, int64(2), m.total.Load(), "total should be 2")

Expand All @@ -103,7 +104,7 @@ func TestShutdownStatusRemovesWorker(t *testing.T) {
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hbShutdown})

// Verify only worker1 is removed, worker2 remains
list = m.filterWorkers("ns1", alwaysTrue)
list = m.filterWorkers("ns1", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, list, 1, "only one worker should remain")
assert.Equal(t, "worker2", list[0].WorkerInstanceKey, "worker2 should remain")
assert.Equal(t, int64(1), m.total.Load(), "total should be 1 after shutdown")
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestShutdownStatusForNonExistentWorker(t *testing.T) {
m.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{hb})

// Verify nothing happened
list := m.filterWorkers("ns1", alwaysTrue)
list := m.filterWorkers("ns1", true /*includeSystemWorkers*/, alwaysTrue)
assert.Empty(t, list, "no workers should exist")
assert.Zero(t, m.total.Load(), "total should remain 0")
}
Expand Down Expand Up @@ -180,12 +181,36 @@ func TestListNamespacePredicate(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
list := m.filterWorkers("ns", tc.pred)
list := m.filterWorkers("ns", true /*includeSystemWorkers*/, tc.pred)
assert.Len(t, list, tc.wantCount)
})
}
}

func TestFilterWorkersExcludesSystemWorkers(t *testing.T) {
m := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))
defer m.Stop()

m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{
{WorkerInstanceKey: "user-1", TaskQueue: "my-queue"},
{WorkerInstanceKey: "user-2", TaskQueue: "my-queue"},
{WorkerInstanceKey: "sys-1", TaskQueue: "temporal-sys-per-ns-tq"},
})

t.Run("includeSystemWorkers=true returns all", func(t *testing.T) {
list := m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
require.Len(t, list, 3)
})

t.Run("includeSystemWorkers=false excludes system workers", func(t *testing.T) {
list := m.filterWorkers("ns", false /*includeSystemWorkers*/, alwaysTrue)
require.Len(t, list, 2)
for _, w := range list {
require.NotEqual(t, "sys-1", w.WorkerInstanceKey)
}
})
}

func TestEvictByTTL(t *testing.T) {
// Use capture handler to verify metrics
captureHandler := metricstest.NewCaptureHandler()
Expand Down Expand Up @@ -216,7 +241,7 @@ func TestEvictByTTL(t *testing.T) {
// Perform eviction
m.evictByTTL()

list := m.filterWorkers("ns", alwaysTrue)
list := m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
assert.Empty(t, list, "entry should be evicted by TTL")
assert.Zero(t, m.total.Load(), "total counter should be decremented")

Expand Down Expand Up @@ -261,7 +286,7 @@ func TestEvictByCapacity(t *testing.T) {
m.evictByCapacity()

// Ensure we evicted down to maxItems
remaining := m.filterWorkers("ns", alwaysTrue)
remaining := m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, remaining, int(maxItems), "should evict down to maxItems")
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total counter should not exceed maxItems")

Expand Down Expand Up @@ -323,7 +348,7 @@ func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
m.evictByCapacity()

// All entries should still be there (protected by minEvictAge)
workers := m.filterWorkers("ns", alwaysTrue)
workers := m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, workers, 3, "all entries should be protected by minEvictAge")
assert.Equal(t, int64(3), m.total.Load(), "should still exceed maxItems due to protection")

Expand Down Expand Up @@ -374,7 +399,7 @@ func TestEvictByCapacityAfterMinAge(t *testing.T) {
m.evictByCapacity()

// Should have evicted down to maxItems
workers := m.filterWorkers("ns", alwaysTrue)
workers := m.filterWorkers("ns", true /*includeSystemWorkers*/, alwaysTrue)
assert.LessOrEqual(t, len(workers), int(maxItems), "should evict down to maxItems")
assert.LessOrEqual(t, m.total.Load(), int64(maxItems), "total should be within limits")

Expand Down Expand Up @@ -424,10 +449,10 @@ func TestMultipleNamespaces(t *testing.T) {
m.upsertHeartbeats("namespace2", ns2Workers)

// Verify functional behavior first
ns1List := m.filterWorkers("namespace1", alwaysTrue)
ns1List := m.filterWorkers("namespace1", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, ns1List, 3, "namespace1 should have 3 workers")

ns2List := m.filterWorkers("namespace2", alwaysTrue)
ns2List := m.filterWorkers("namespace2", true /*includeSystemWorkers*/, alwaysTrue)
assert.Len(t, ns2List, 2, "namespace2 should have 2 workers")

assert.Equal(t, int64(5), m.total.Load(), "total should be 5 workers across namespaces")
Expand Down Expand Up @@ -548,7 +573,7 @@ func BenchmarkListNamespace(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = m.filterWorkers("benchNs", alwaysTrue)
_ = m.filterWorkers("benchNs", true /*includeSystemWorkers*/, alwaysTrue)
}
}

Expand Down
51 changes: 51 additions & 0 deletions service/matching/workers/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,57 @@ func TestRegistryImpl_ListWorkersInvalidPageToken(t *testing.T) {
assert.Contains(t, err.Error(), "invalid next_page_token")
}

func TestRegistryImpl_ListWorkersExcludesSystemWorkers(t *testing.T) {
r := newRegistryImpl(testDefaultRegistryParams(metrics.NoopMetricsHandler))

// Add workers on a user task queue and a system (internal) task queue.
r.upsertHeartbeats("ns1", []*workerpb.WorkerHeartbeat{
{WorkerInstanceKey: "user-worker-1", TaskQueue: "my-queue"},
{WorkerInstanceKey: "user-worker-2", TaskQueue: "my-queue"},
{WorkerInstanceKey: "sys-worker-1", TaskQueue: "temporal-sys-per-ns-tq"},
})

t.Run("excludes system workers by default", func(t *testing.T) {
resp, err := r.ListWorkers("ns1", ListWorkersParams{})
require.NoError(t, err)
require.Len(t, resp.Workers, 2, "should only return user workers")

workerKeys := make([]string, len(resp.Workers))
for i, w := range resp.Workers {
workerKeys[i] = w.WorkerInstanceKey
}
require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2"}, workerKeys)
})

t.Run("includes system workers when requested", func(t *testing.T) {
resp, err := r.ListWorkers("ns1", ListWorkersParams{IncludeSystemWorkers: true})
require.NoError(t, err)
require.Len(t, resp.Workers, 3, "should return all workers including system")

workerKeys := make([]string, len(resp.Workers))
for i, w := range resp.Workers {
workerKeys[i] = w.WorkerInstanceKey
}
require.ElementsMatch(t, []string{"user-worker-1", "user-worker-2", "sys-worker-1"}, workerKeys)
})

t.Run("pagination excludes system workers from page counts", func(t *testing.T) {
// Page 1 (sorted: "user-worker-1" comes first)
resp1, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1})
require.NoError(t, err)
require.Len(t, resp1.Workers, 1)
require.Equal(t, "user-worker-1", resp1.Workers[0].WorkerInstanceKey)
require.NotNil(t, resp1.NextPageToken)

// Page 2
resp2, err := r.ListWorkers("ns1", ListWorkersParams{PageSize: 1, NextPageToken: resp1.NextPageToken})
require.NoError(t, err)
require.Len(t, resp2.Workers, 1)
require.Equal(t, "user-worker-2", resp2.Workers[0].WorkerInstanceKey)
require.Nil(t, resp2.NextPageToken)
})
}

func TestRegistryImpl_RecordStorageDriverMetric(t *testing.T) {
t.Run("disabled when ExternalPayloadsEnabled is false", func(t *testing.T) {
captureHandler := metricstest.NewCaptureHandler()
Expand Down
Loading