diff --git a/service/history/queues/dlq_writer.go b/service/history/queues/dlq_writer.go index 00210b67df6..40434bd5b8c 100644 --- a/service/history/queues/dlq_writer.go +++ b/service/history/queues/dlq_writer.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -21,6 +22,7 @@ type ( metricsHandler metrics.Handler logger log.SnTaggedLogger namespaceRegistry namespace.Registry + chasmRegistry *chasm.Registry enqueueMutex sync.Map // map[persistence.QueueKey]*sync.Mutex for per-queue locking } // QueueWriter is a subset of persistence.HistoryTaskQueueManager. @@ -47,12 +49,14 @@ func NewDLQWriter( h metrics.Handler, l log.SnTaggedLogger, r namespace.Registry, + cr *chasm.Registry, ) *DLQWriter { return &DLQWriter{ dlqWriter: w, metricsHandler: h, logger: l, namespaceRegistry: r, + chasmRegistry: cr, } } @@ -97,34 +101,43 @@ func (q *DLQWriter) WriteTaskToDLQ( if err != nil { return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err) } + + nsMetricTag := metrics.NamespaceUnknownTag() + var nsLogTag tag.Tag + ns, err := q.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID())) + if err != nil { + q.logger.Warn("Failed to get namespace name while trying to write a task to DLQ", + tag.WorkflowNamespace(task.GetNamespaceID()), + tag.Error(err), + ) + nsLogTag = tag.WorkflowNamespaceID(task.GetNamespaceID()) + } else { + nsMetricTag = metrics.NamespaceTag(ns.Name().String()) + nsLogTag = tag.WorkflowNamespace(ns.Name().String()) + } + // "passive" means the namespace is in standby mode and only replicates data namespaceState := metrics.PassiveNamespaceStateTagValue if isNamespaceActive { namespaceState = metrics.ActiveNamespaceStateTagValue } + taskType := GetTaskTypeTagValue(task, isNamespaceActive, q.chasmRegistry) metrics.DLQWrites.With(q.metricsHandler).Record( 1, metrics.TaskCategoryTag(task.GetCategory().Name()), metrics.NamespaceStateTag(namespaceState), + nsMetricTag, + metrics.TaskTypeTag(taskType), + metrics.OperationTag(taskType), + getArchetypeTag(task, q.chasmRegistry), ) - ns, err := q.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID())) - var namespaceTag tag.Tag - if err != nil { - q.logger.Warn("Failed to get namespace name while trying to write a task to DLQ", - tag.WorkflowNamespace(task.GetNamespaceID()), - tag.Error(err), - ) - namespaceTag = tag.WorkflowNamespaceID(task.GetNamespaceID()) - } else { - namespaceTag = tag.WorkflowNamespace(string(ns.Name())) - } q.logger.Warn("Task enqueued to DLQ", tag.DLQMessageID(resp.Metadata.ID), tag.SourceCluster(sourceCluster), tag.TargetCluster(targetCluster), tag.TaskType(task.GetType()), tag.String("task-category", task.GetCategory().Name()), - namespaceTag, + nsLogTag, ) return nil } diff --git a/service/history/queues/dlq_writer_test.go b/service/history/queues/dlq_writer_test.go index 07ca5e60752..06fcc59a889 100644 --- a/service/history/queues/dlq_writer_test.go +++ b/service/history/queues/dlq_writer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -54,7 +55,7 @@ func TestDLQWriter_ErrGetNamespaceName(t *testing.T) { logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()} metricsHandler := metricstest.NewCaptureHandler() capture := metricsHandler.StartCapture() - writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry) + writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())) task := &tasks.WorkflowTask{ WorkflowKey: definition.WorkflowKey{ NamespaceID: string(tests.NamespaceID), @@ -85,10 +86,15 @@ func TestDLQWriter_ErrGetNamespaceName(t *testing.T) { counter, ok := recordings[0].Value.(int64) assert.True(t, ok) assert.Equal(t, int64(1), counter) - assert.Len(t, recordings[0].Tags, 2) + require.Len(t, recordings[0].Tags, 6) assert.Equal(t, "transfer", recordings[0].Tags[metrics.TaskCategoryTagName]) namespaceStateTag := metrics.NamespaceStateTag(metrics.ActiveNamespaceStateTagValue) assert.Equal(t, metrics.ActiveNamespaceStateTagValue, recordings[0].Tags[namespaceStateTag.Key]) + require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key]) + expectedTaskType := queues.GetTaskTypeTagValue(task, true, chasm.NewRegistry(log.NewTestLogger())) + require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.TaskTypeTagName]) + require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.OperationTagName]) + require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName]) } func TestDLQWriter_Ok(t *testing.T) { @@ -101,7 +107,7 @@ func TestDLQWriter_Ok(t *testing.T) { logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()} metricsHandler := metricstest.NewCaptureHandler() capture := metricsHandler.StartCapture() - writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry) + writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())) task := &tasks.WorkflowTask{ WorkflowKey: definition.WorkflowKey{ NamespaceID: string(tests.NamespaceID), @@ -131,10 +137,16 @@ func TestDLQWriter_Ok(t *testing.T) { counter, ok := recordings[0].Value.(int64) assert.True(t, ok) assert.Equal(t, int64(1), counter) - assert.Len(t, recordings[0].Tags, 2) + require.Len(t, recordings[0].Tags, 6) assert.Equal(t, "transfer", recordings[0].Tags[metrics.TaskCategoryTagName]) namespaceStateTag := metrics.NamespaceStateTag("active") assert.Equal(t, "active", recordings[0].Tags[namespaceStateTag.Key]) + require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key]) + chasmReg := chasm.NewRegistry(log.NewTestLogger()) + expectedTaskType := queues.GetTaskTypeTagValue(task, true, chasmReg) + require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.TaskTypeTagName]) + require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.OperationTagName]) + require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName]) } func TestDLQWriter_ConcurrentWrites(t *testing.T) { @@ -148,7 +160,7 @@ func TestDLQWriter_ConcurrentWrites(t *testing.T) { namespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(&namespace.Namespace{}, nil).AnyTimes() logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()} metricsHandler := metricstest.NewCaptureHandler() - writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry) + writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())) const numConcurrentWrites = 50 var g errgroup.Group @@ -227,7 +239,7 @@ func TestDLQWriter_ConcurrentWritesDifferentQueues(t *testing.T) { namespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(&namespace.Namespace{}, nil).AnyTimes() logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()} metricsHandler := metricstest.NewCaptureHandler() - writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry) + writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())) const numConcurrentWrites = 50 const numQueues = 5 diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index b8bee618e76..f1cb19ce282 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -489,7 +489,7 @@ func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() { func (s *executableSuite) TestExecuteHandleErr_Corrupted() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return false } @@ -509,7 +509,7 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() { func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttempts() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -538,7 +538,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttempts() { func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsDLQDisabled() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return false } @@ -566,7 +566,7 @@ func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsDLQDisabled() func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsExpectedError() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -599,7 +599,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisableDropCo queueWriter := &queuestest.FakeQueueWriter{} dlqEnabled := true executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return dlqEnabled } @@ -629,7 +629,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisable() { queueWriter := &queuestest.FakeQueueWriter{} dlqEnabled := true executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return dlqEnabled } @@ -665,7 +665,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisable() { func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ_WhenEnabled() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -689,7 +689,7 @@ func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ_WhenEnabled() { func (s *executableSuite) TestExecute_DoesntSendInternalErrorsToDLQ_WhenDisabled() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -719,7 +719,7 @@ func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() { queueWriter := &queuestest.FakeQueueWriter{} dlqEnabled := true executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return dlqEnabled } @@ -746,7 +746,7 @@ func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() { func (s *executableSuite) TestExecute_DLQ() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -768,7 +768,7 @@ func (s *executableSuite) TestExecute_DLQThenDisable() { queueWriter := &queuestest.FakeQueueWriter{} dlqEnabled := true executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return dlqEnabled } @@ -790,7 +790,7 @@ func (s *executableSuite) TestExecute_DLQThenDisable() { func (s *executableSuite) TestExecute_DLQFailThenRetry() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -1028,7 +1028,7 @@ func (s *executableSuite) TestTaskCancellation() { func (s *executableSuite) TestExecute_SendToDLQErrPatternDoesNotMatch() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -1059,7 +1059,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternDoesNotMatch() { func (s *executableSuite) TestExecute_SendToDLQErrPatternEmptyString() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -1090,7 +1090,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternEmptyString() { func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() { queueWriter := &queuestest.FakeQueueWriter{} executable1 := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -1106,7 +1106,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() { }).Times(1) executable2 := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return true } @@ -1143,7 +1143,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() { func (s *executableSuite) TestExecute_ErrPatternIfDLQDisabled() { queueWriter := &queuestest.FakeQueueWriter{} executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return false } @@ -1175,7 +1175,7 @@ func (s *executableSuite) TestExecute_ErrorErrPatternThenDisableDLQ() { queueWriter := &queuestest.FakeQueueWriter{} dlqEnabled := true executable := s.newTestExecutable(func(p *params) { - p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry) + p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry) p.dlqEnabled = func() bool { return dlqEnabled } diff --git a/service/history/replication/dlq_writer_test.go b/service/history/replication/dlq_writer_test.go index 8e1f31b659e..4a585644b19 100644 --- a/service/history/replication/dlq_writer_test.go +++ b/service/history/replication/dlq_writer_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/metrics/metricstest" @@ -78,7 +79,7 @@ func TestNewDLQWriterAdapter(t *testing.T) { metricsHandler := metricstest.NewCaptureHandler() capture := metricsHandler.StartCapture() writer := replication.NewDLQWriterAdapter( - queues.NewDLQWriter(queueWriter, metricsHandler, log.NewTestLogger(), namespaceRegistry), + queues.NewDLQWriter(queueWriter, metricsHandler, log.NewTestLogger(), namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())), taskSerializer, "test-current-cluster", ) @@ -112,10 +113,14 @@ func TestNewDLQWriterAdapter(t *testing.T) { snapshot := capture.Snapshot() recordings := snapshot[metrics.DLQWrites.Name()] assert.Len(t, recordings, 1) - assert.Len(t, recordings[0].Tags, 2) + require.Len(t, recordings[0].Tags, 6) assert.Equal(t, "replication", recordings[0].Tags[metrics.TaskCategoryTagName]) namespaceStateTag := metrics.NamespaceStateTag(metrics.PassiveNamespaceStateTagValue) assert.Equal(t, metrics.PassiveNamespaceStateTagValue, recordings[0].Tags[namespaceStateTag.Key]) + require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key]) + require.NotEmpty(t, recordings[0].Tags[metrics.TaskTypeTagName]) + require.NotEmpty(t, recordings[0].Tags[metrics.OperationTagName]) + require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName]) } }) }