Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions service/history/queues/dlq_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"

"go.temporal.io/server/chasm"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -21,6 +22,7 @@ type (
metricsHandler metrics.Handler
logger log.SnTaggedLogger
namespaceRegistry namespace.Registry
chasmRegistry *chasm.Registry
enqueueMutex sync.Map // map[persistence.QueueKey]*sync.Mutex for per-queue locking
}
// QueueWriter is a subset of persistence.HistoryTaskQueueManager.
Expand All @@ -47,12 +49,14 @@ func NewDLQWriter(
h metrics.Handler,
l log.SnTaggedLogger,
r namespace.Registry,
cr *chasm.Registry,
) *DLQWriter {
return &DLQWriter{
dlqWriter: w,
metricsHandler: h,
logger: l,
namespaceRegistry: r,
chasmRegistry: cr,
}
}

Expand Down Expand Up @@ -97,34 +101,43 @@ func (q *DLQWriter) WriteTaskToDLQ(
if err != nil {
return fmt.Errorf("%w: %v", ErrSendTaskToDLQ, err)
}

nsMetricTag := metrics.NamespaceUnknownTag()
var nsLogTag tag.Tag
ns, err := q.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
if err != nil {
q.logger.Warn("Failed to get namespace name while trying to write a task to DLQ",
tag.WorkflowNamespace(task.GetNamespaceID()),
tag.Error(err),
)
nsLogTag = tag.WorkflowNamespaceID(task.GetNamespaceID())
} else {
nsMetricTag = metrics.NamespaceTag(ns.Name().String())
nsLogTag = tag.WorkflowNamespace(ns.Name().String())
}

// "passive" means the namespace is in standby mode and only replicates data
namespaceState := metrics.PassiveNamespaceStateTagValue
if isNamespaceActive {
namespaceState = metrics.ActiveNamespaceStateTagValue
}
taskType := GetTaskTypeTagValue(task, isNamespaceActive, q.chasmRegistry)
metrics.DLQWrites.With(q.metricsHandler).Record(
1,
metrics.TaskCategoryTag(task.GetCategory().Name()),
metrics.NamespaceStateTag(namespaceState),
Comment on lines 127 to 128
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove those two in next release after this change is rolledout and alert definition get updated. TaskTypeTag gives both the queue and the active/standby information.

nsMetricTag,
metrics.TaskTypeTag(taskType),
metrics.OperationTag(taskType),
getArchetypeTag(task, q.chasmRegistry),
)
ns, err := q.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
var namespaceTag tag.Tag
if err != nil {
q.logger.Warn("Failed to get namespace name while trying to write a task to DLQ",
tag.WorkflowNamespace(task.GetNamespaceID()),
tag.Error(err),
)
namespaceTag = tag.WorkflowNamespaceID(task.GetNamespaceID())
} else {
namespaceTag = tag.WorkflowNamespace(string(ns.Name()))
}
q.logger.Warn("Task enqueued to DLQ",
tag.DLQMessageID(resp.Metadata.ID),
tag.SourceCluster(sourceCluster),
tag.TargetCluster(targetCluster),
tag.TaskType(task.GetType()),
tag.String("task-category", task.GetCategory().Name()),
namespaceTag,
nsLogTag,
)
return nil
}
Expand Down
24 changes: 18 additions & 6 deletions service/history/queues/dlq_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestDLQWriter_ErrGetNamespaceName(t *testing.T) {
logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()}
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry)
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger()))
task := &tasks.WorkflowTask{
WorkflowKey: definition.WorkflowKey{
NamespaceID: string(tests.NamespaceID),
Expand Down Expand Up @@ -85,10 +86,15 @@ func TestDLQWriter_ErrGetNamespaceName(t *testing.T) {
counter, ok := recordings[0].Value.(int64)
assert.True(t, ok)
assert.Equal(t, int64(1), counter)
assert.Len(t, recordings[0].Tags, 2)
require.Len(t, recordings[0].Tags, 6)
assert.Equal(t, "transfer", recordings[0].Tags[metrics.TaskCategoryTagName])
namespaceStateTag := metrics.NamespaceStateTag(metrics.ActiveNamespaceStateTagValue)
assert.Equal(t, metrics.ActiveNamespaceStateTagValue, recordings[0].Tags[namespaceStateTag.Key])
require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key])
expectedTaskType := queues.GetTaskTypeTagValue(task, true, chasm.NewRegistry(log.NewTestLogger()))
require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.TaskTypeTagName])
require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.OperationTagName])
require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName])
}

func TestDLQWriter_Ok(t *testing.T) {
Expand All @@ -101,7 +107,7 @@ func TestDLQWriter_Ok(t *testing.T) {
logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()}
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry)
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger()))
task := &tasks.WorkflowTask{
WorkflowKey: definition.WorkflowKey{
NamespaceID: string(tests.NamespaceID),
Expand Down Expand Up @@ -131,10 +137,16 @@ func TestDLQWriter_Ok(t *testing.T) {
counter, ok := recordings[0].Value.(int64)
assert.True(t, ok)
assert.Equal(t, int64(1), counter)
assert.Len(t, recordings[0].Tags, 2)
require.Len(t, recordings[0].Tags, 6)
assert.Equal(t, "transfer", recordings[0].Tags[metrics.TaskCategoryTagName])
namespaceStateTag := metrics.NamespaceStateTag("active")
assert.Equal(t, "active", recordings[0].Tags[namespaceStateTag.Key])
require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key])
chasmReg := chasm.NewRegistry(log.NewTestLogger())
expectedTaskType := queues.GetTaskTypeTagValue(task, true, chasmReg)
require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.TaskTypeTagName])
require.Equal(t, expectedTaskType, recordings[0].Tags[metrics.OperationTagName])
require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName])
}

func TestDLQWriter_ConcurrentWrites(t *testing.T) {
Expand All @@ -148,7 +160,7 @@ func TestDLQWriter_ConcurrentWrites(t *testing.T) {
namespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(&namespace.Namespace{}, nil).AnyTimes()
logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()}
metricsHandler := metricstest.NewCaptureHandler()
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry)
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger()))

const numConcurrentWrites = 50
var g errgroup.Group
Expand Down Expand Up @@ -227,7 +239,7 @@ func TestDLQWriter_ConcurrentWritesDifferentQueues(t *testing.T) {
namespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(&namespace.Namespace{}, nil).AnyTimes()
logger := &logRecorder{SnTaggedLogger: log.NewTestLogger()}
metricsHandler := metricstest.NewCaptureHandler()
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry)
writer := queues.NewDLQWriter(queueWriter, metricsHandler, logger, namespaceRegistry, chasm.NewRegistry(log.NewTestLogger()))

const numConcurrentWrites = 50
const numQueues = 5
Expand Down
36 changes: 18 additions & 18 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (s *executableSuite) TestExecuteHandleErr_ResetAttempt() {
func (s *executableSuite) TestExecuteHandleErr_Corrupted() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return false
}
Expand All @@ -509,7 +509,7 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() {
func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttempts() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttempts() {
func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsDLQDisabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return false
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsDLQDisabled()
func (s *executableSuite) TestExecute_DontSendToDLQAfterMaxAttemptsExpectedError() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -599,7 +599,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisableDropCo
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return dlqEnabled
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisable() {
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return dlqEnabled
}
Expand Down Expand Up @@ -665,7 +665,7 @@ func (s *executableSuite) TestExecute_SendToDLQAfterMaxAttemptsThenDisable() {
func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ_WhenEnabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand All @@ -689,7 +689,7 @@ func (s *executableSuite) TestExecute_SendsInternalErrorsToDLQ_WhenEnabled() {
func (s *executableSuite) TestExecute_DoesntSendInternalErrorsToDLQ_WhenDisabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -719,7 +719,7 @@ func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() {
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return dlqEnabled
}
Expand All @@ -746,7 +746,7 @@ func (s *executableSuite) TestExecute_SendInternalErrorsToDLQ_ThenDisable() {
func (s *executableSuite) TestExecute_DLQ() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand All @@ -768,7 +768,7 @@ func (s *executableSuite) TestExecute_DLQThenDisable() {
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return dlqEnabled
}
Expand All @@ -790,7 +790,7 @@ func (s *executableSuite) TestExecute_DLQThenDisable() {
func (s *executableSuite) TestExecute_DLQFailThenRetry() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func (s *executableSuite) TestTaskCancellation() {
func (s *executableSuite) TestExecute_SendToDLQErrPatternDoesNotMatch() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternDoesNotMatch() {
func (s *executableSuite) TestExecute_SendToDLQErrPatternEmptyString() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternEmptyString() {
func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() {
queueWriter := &queuestest.FakeQueueWriter{}
executable1 := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand All @@ -1106,7 +1106,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() {
}).Times(1)

executable2 := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return true
}
Expand Down Expand Up @@ -1143,7 +1143,7 @@ func (s *executableSuite) TestExecute_SendToDLQErrPatternMatchesMultiple() {
func (s *executableSuite) TestExecute_ErrPatternIfDLQDisabled() {
queueWriter := &queuestest.FakeQueueWriter{}
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return false
}
Expand Down Expand Up @@ -1175,7 +1175,7 @@ func (s *executableSuite) TestExecute_ErrorErrPatternThenDisableDLQ() {
queueWriter := &queuestest.FakeQueueWriter{}
dlqEnabled := true
executable := s.newTestExecutable(func(p *params) {
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry)
p.dlqWriter = queues.NewDLQWriter(queueWriter, metrics.NoopMetricsHandler, log.NewTestLogger(), s.mockNamespaceRegistry, s.chasmRegistry)
p.dlqEnabled = func() bool {
return dlqEnabled
}
Expand Down
9 changes: 7 additions & 2 deletions service/history/replication/dlq_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/metrics/metricstest"
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestNewDLQWriterAdapter(t *testing.T) {
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
writer := replication.NewDLQWriterAdapter(
queues.NewDLQWriter(queueWriter, metricsHandler, log.NewTestLogger(), namespaceRegistry),
queues.NewDLQWriter(queueWriter, metricsHandler, log.NewTestLogger(), namespaceRegistry, chasm.NewRegistry(log.NewTestLogger())),
taskSerializer,
"test-current-cluster",
)
Expand Down Expand Up @@ -112,10 +113,14 @@ func TestNewDLQWriterAdapter(t *testing.T) {
snapshot := capture.Snapshot()
recordings := snapshot[metrics.DLQWrites.Name()]
assert.Len(t, recordings, 1)
assert.Len(t, recordings[0].Tags, 2)
require.Len(t, recordings[0].Tags, 6)
assert.Equal(t, "replication", recordings[0].Tags[metrics.TaskCategoryTagName])
namespaceStateTag := metrics.NamespaceStateTag(metrics.PassiveNamespaceStateTagValue)
assert.Equal(t, metrics.PassiveNamespaceStateTagValue, recordings[0].Tags[namespaceStateTag.Key])
require.Equal(t, metrics.NamespaceUnknownTag().Value, recordings[0].Tags[metrics.NamespaceUnknownTag().Key])
require.NotEmpty(t, recordings[0].Tags[metrics.TaskTypeTagName])
require.NotEmpty(t, recordings[0].Tags[metrics.OperationTagName])
require.Equal(t, chasm.WorkflowComponentName, recordings[0].Tags[metrics.ArchetypeTagName])
}
})
}
Expand Down
Loading