From 2550dc6e71cc388840f15623bed3655770265155 Mon Sep 17 00:00:00 2001 From: Santiago Bricio Rojas Date: Sun, 26 Apr 2026 17:31:09 +0200 Subject: [PATCH] fix: release versionedQueuesLock on early return in describe() When buildIds contains an empty string and defaultQueue() returns nil, describe() returned errDefaultQueueNotInit without first releasing the RLock acquired at the start of the function, leaving the mutex permanently locked and blocking any concurrent writers. --- .../matching/task_queue_partition_manager.go | 1 + .../task_queue_partition_manager_test.go | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index b17065c9165..e67ceaba204 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -1035,6 +1035,7 @@ func (pm *taskQueuePartitionManagerImpl) describe( if b == "" { dbq := pm.defaultQueue() if dbq == nil { + pm.versionedQueuesLock.RUnlock() return nil, errDefaultQueueNotInit } versions[dbq.QueueKey().Version()] = true diff --git a/service/matching/task_queue_partition_manager_test.go b/service/matching/task_queue_partition_manager_test.go index 780f50fbc7d..c1306e4e60d 100644 --- a/service/matching/task_queue_partition_manager_test.go +++ b/service/matching/task_queue_partition_manager_test.go @@ -22,6 +22,8 @@ import ( taskqueuespb "go.temporal.io/server/api/taskqueue/v1" hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/future" + "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace" @@ -71,6 +73,34 @@ func TestTaskQueuePartitionManager_Fair_Suite(t *testing.T) { suite.Run(t, &PartitionManagerTestSuite{newMatcher: true, fairness: true}) } +func TestDescribeReleasesVersionedQueuesLockWhenDefaultQueueNotInitialized(t *testing.T) { + t.Parallel() + + pm := &taskQueuePartitionManagerImpl{ + versionedQueues: make(map[PhysicalTaskQueueVersion]physicalTaskQueueManager), + defaultQueueFuture: future.NewFuture[physicalTaskQueueManager](), + logger: log.NewNoopLogger(), + } + + _, err := pm.describe(context.Background(), map[string]bool{"": true}, false, false, false, false, false) + if !errors.Is(err, errDefaultQueueNotInit) { + t.Fatalf("describe() error = %v, want %v", err, errDefaultQueueNotInit) + } + + lockAcquired := make(chan struct{}) + go func() { + pm.versionedQueuesLock.Lock() + defer pm.versionedQueuesLock.Unlock() + close(lockAcquired) + }() + + select { + case <-lockAcquired: + case <-time.After(time.Second): + t.Fatal("versionedQueuesLock remained locked after describe returned") + } +} + func (s *PartitionManagerTestSuite) SetupTest() { s.ProtoAssertions = protorequire.New(s.T()) s.controller = gomock.NewController(s.T())