From 740368f11bbb9048c6a6a9b35173b4321b48877b Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 17 Apr 2026 19:30:40 +0100 Subject: [PATCH 1/5] Cleanly stop inbound streams during shutdown. --- .../replication/stream_receiver_monitor.go | 4 +++ .../stream_receiver_monitor_test.go | 25 +++++++++++++++++++ service/history/service.go | 13 +++++----- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/service/history/replication/stream_receiver_monitor.go b/service/history/replication/stream_receiver_monitor.go index 5fe4699ca94..bcd177a85f6 100644 --- a/service/history/replication/stream_receiver_monitor.go +++ b/service/history/replication/stream_receiver_monitor.go @@ -106,6 +106,10 @@ func (m *StreamReceiverMonitorImpl) Stop() { stream.Stop() delete(m.outboundStreams, serverKey) } + for clientKey, stream := range m.inboundStreams { + stream.Stop() + delete(m.inboundStreams, clientKey) + } m.Logger.Info("StreamReceiverMonitor stopped.") } diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index 702e637a69e..3ba02696809 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -14,6 +14,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client" + "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -358,6 +359,30 @@ func (s *streamReceiverMonitorSuite) TestDoReconcileInboundStreams_Reactivate() s.Equal(streamSenderValid, stream) } +func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() { + clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31()) + serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31()) + streamSender := NewMockStreamSender(s.controller) + streamSender.EXPECT().Key().Return(ClusterShardKeyPair{ + Client: clientKey, + Server: serverKey, + }).AnyTimes() + streamSender.EXPECT().Stop() + s.streamReceiverMonitor.RegisterInboundStream(streamSender) + + s.streamReceiverMonitor.Lock() + s.Equal(1, len(s.streamReceiverMonitor.inboundStreams)) + s.streamReceiverMonitor.Unlock() + + // Transition to started state so Stop() proceeds past the CAS check. + s.streamReceiverMonitor.status = common.DaemonStatusStarted + s.streamReceiverMonitor.Stop() + + s.streamReceiverMonitor.Lock() + defer s.streamReceiverMonitor.Unlock() + s.Equal(0, len(s.streamReceiverMonitor.inboundStreams)) +} + func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() { s.clusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return("some cluster name").AnyTimes() diff --git a/service/history/service.go b/service/history/service.go index bf28d1d2a50..dbcd9dcf86f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -145,10 +145,13 @@ func (s *Service) Stop() { time.Sleep(s.config.ShutdownDrainDuration()) } - // Stop shard controller. We should have waited long enough for all shards to realize they - // lost ownership and close, but if not, this will definitely close them. - s.logger.Info("ShutdownHandler: Initiating shardController shutdown") - s.handler.controller.Stop() + // Stop handler components (replication stream monitor, shard controller, etc.) so that + // inbound replication stream senders are signaled to stop before we wait for gRPC handlers + // to return. Without this, stream sender goroutines block indefinitely on Wait() and the + // gRPC server falls back to forceful Stop() after the drain timeout, causing unclean H2 + // connection teardowns on the peer. + s.logger.Info("ShutdownHandler: Initiating handler shutdown") + s.handler.Stop() // All grpc handlers should be cancelled now. Give them a little time to return. t := time.AfterFunc(2*time.Second, func() { @@ -157,8 +160,6 @@ func (s *Service) Stop() { }) s.server.GracefulStop() t.Stop() - - s.handler.Stop() s.visibilityManager.Close() s.logger.Info("history stopped") From 60b7d4d38d912ce63ec25362f56835ed2625845e Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Fri, 17 Apr 2026 19:45:41 +0100 Subject: [PATCH 2/5] Lint. --- service/history/replication/stream_receiver_monitor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index 3ba02696809..a1c75e27a88 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -371,7 +371,7 @@ func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() { s.streamReceiverMonitor.RegisterInboundStream(streamSender) s.streamReceiverMonitor.Lock() - s.Equal(1, len(s.streamReceiverMonitor.inboundStreams)) + s.Len(s.streamReceiverMonitor.inboundStreams, 1) s.streamReceiverMonitor.Unlock() // Transition to started state so Stop() proceeds past the CAS check. @@ -380,7 +380,7 @@ func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() { s.streamReceiverMonitor.Lock() defer s.streamReceiverMonitor.Unlock() - s.Equal(0, len(s.streamReceiverMonitor.inboundStreams)) + s.Empty(s.streamReceiverMonitor.inboundStreams) } func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() { From 06c6b18cbb5d56d3078c77342440f2889ba6f996 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 29 Apr 2026 13:27:54 +0100 Subject: [PATCH 3/5] Gate the change behind feature flag, but default to on. --- common/dynamicconfig/constants.go | 5 ++ service/history/configs/config.go | 10 ++-- .../replication/stream_receiver_monitor.go | 8 +-- .../stream_receiver_monitor_test.go | 51 +++++++++++++++++++ service/history/service.go | 23 ++++++--- 5 files changed, 83 insertions(+), 14 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4e596234782..c3d9bcbae1f 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1525,6 +1525,11 @@ Don't change this on a live cluster without using the gradual change mechanism. true, `EnableReplicationStream turn on replication stream`, ) + EnableCloseInboundReplicationStreamOnShutdown = NewGlobalBoolSetting( + "history.enableCloseInboundReplicationStreamOnShutdown", + true, + `EnableCloseInboundReplicationStreamOnShutdown closes inbound replication streams on shutdown, signaling the remote sender to stop. Disable if this causes unexpected issues during rolling restarts.`, + ) EnableSeparateReplicationEnableFlag = NewGlobalBoolSetting( "history.enableSeparateReplicationEnableFlag", false, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index c7d911df239..2cdd1716bde 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -12,8 +12,9 @@ import ( type Config struct { NumberOfShards int32 - EnableReplicationStream dynamicconfig.BoolPropertyFn - EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn + EnableReplicationStream dynamicconfig.BoolPropertyFn + EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn + EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn @@ -436,8 +437,9 @@ func NewConfig( cfg := &Config{ NumberOfShards: numberOfShards, - EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc), - EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc), + EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc), + EnableCloseInboundReplicationStreamOnShutdown: dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Get(dc), + EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc), HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), RPS: dynamicconfig.HistoryRPS.Get(dc), diff --git a/service/history/replication/stream_receiver_monitor.go b/service/history/replication/stream_receiver_monitor.go index bcd177a85f6..d9e6db5b759 100644 --- a/service/history/replication/stream_receiver_monitor.go +++ b/service/history/replication/stream_receiver_monitor.go @@ -106,9 +106,11 @@ func (m *StreamReceiverMonitorImpl) Stop() { stream.Stop() delete(m.outboundStreams, serverKey) } - for clientKey, stream := range m.inboundStreams { - stream.Stop() - delete(m.inboundStreams, clientKey) + if m.Config.EnableCloseInboundReplicationStreamOnShutdown() { + for clientKey, stream := range m.inboundStreams { + stream.Stop() + delete(m.inboundStreams, clientKey) + } } m.Logger.Info("StreamReceiverMonitor stopped.") } diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index a1c75e27a88..1b032a0801b 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -383,6 +383,57 @@ func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() { s.Empty(s.streamReceiverMonitor.inboundStreams) } +func (s *streamReceiverMonitorSuite) TestStop_SkipsInboundStreamsWhenFlagDisabled() { + col := dynamicconfig.NewCollection( + dynamicconfig.StaticClient(map[dynamicconfig.Key]any{ + dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Key(): false, + }), + log.NewNoopLogger(), + ) + monitor := NewStreamReceiverMonitor( + ProcessToolBox{ + Config: configs.NewConfig(col, 1), + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + MetricsHandler: metrics.NoopMetricsHandler, + Logger: log.NewNoopLogger(), + DLQWriter: NoopDLQWriter{}, + }, + NewExecutableTaskConverter(ProcessToolBox{ + Config: configs.NewConfig(col, 1), + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + MetricsHandler: metrics.NoopMetricsHandler, + Logger: log.NewNoopLogger(), + DLQWriter: NoopDLQWriter{}, + }), + true, + ) + + clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31()) + serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31()) + streamSender := NewMockStreamSender(s.controller) + streamSender.EXPECT().Key().Return(ClusterShardKeyPair{ + Client: clientKey, + Server: serverKey, + }).AnyTimes() + // Stop should NOT be called on inbound streams when the flag is disabled. + monitor.RegisterInboundStream(streamSender) + + monitor.Lock() + s.Len(monitor.inboundStreams, 1) + monitor.Unlock() + + monitor.status = common.DaemonStatusStarted + monitor.Stop() + + monitor.Lock() + defer monitor.Unlock() + s.Len(monitor.inboundStreams, 1, "inbound streams should not be closed when flag is disabled") +} + func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() { s.clusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return("some cluster name").AnyTimes() diff --git a/service/history/service.go b/service/history/service.go index dbcd9dcf86f..5e2c21d5d80 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -145,13 +145,19 @@ func (s *Service) Stop() { time.Sleep(s.config.ShutdownDrainDuration()) } - // Stop handler components (replication stream monitor, shard controller, etc.) so that - // inbound replication stream senders are signaled to stop before we wait for gRPC handlers - // to return. Without this, stream sender goroutines block indefinitely on Wait() and the - // gRPC server falls back to forceful Stop() after the drain timeout, causing unclean H2 - // connection teardowns on the peer. - s.logger.Info("ShutdownHandler: Initiating handler shutdown") - s.handler.Stop() + // When enabled, stop handler components (including the replication stream monitor) before + // waiting for gRPC handlers to return. This signals inbound stream senders on the peer to + // stop, allowing their handler goroutines to unblock and return cleanly before GracefulStop. + // Without this, those goroutines block indefinitely and the gRPC server falls back to a + // forceful Stop(), causing unclean H2 teardowns on the peer. + // Guarded by feature flag so the ordering change can be reverted if needed. + if s.config.EnableCloseInboundReplicationStreamOnShutdown() { + s.logger.Info("ShutdownHandler: Initiating handler shutdown") + s.handler.Stop() + } else { + s.logger.Info("ShutdownHandler: Initiating shardController shutdown") + s.handler.controller.Stop() + } // All grpc handlers should be cancelled now. Give them a little time to return. t := time.AfterFunc(2*time.Second, func() { @@ -160,6 +166,9 @@ func (s *Service) Stop() { }) s.server.GracefulStop() t.Stop() + if !s.config.EnableCloseInboundReplicationStreamOnShutdown() { + s.handler.Stop() + } s.visibilityManager.Close() s.logger.Info("history stopped") From 38e2e09788b543900fb29b827efc8299abde5600 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 29 Apr 2026 13:41:00 +0100 Subject: [PATCH 4/5] Use a local variable just in case there happens to be a toggle during shutdown. --- service/history/service.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/service/history/service.go b/service/history/service.go index 5e2c21d5d80..e9e9c995f55 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -145,13 +145,14 @@ func (s *Service) Stop() { time.Sleep(s.config.ShutdownDrainDuration()) } + enableCloseInboundReplicationStreamOnShutdown := s.config.EnableCloseInboundReplicationStreamOnShutdown() // When enabled, stop handler components (including the replication stream monitor) before // waiting for gRPC handlers to return. This signals inbound stream senders on the peer to // stop, allowing their handler goroutines to unblock and return cleanly before GracefulStop. // Without this, those goroutines block indefinitely and the gRPC server falls back to a // forceful Stop(), causing unclean H2 teardowns on the peer. // Guarded by feature flag so the ordering change can be reverted if needed. - if s.config.EnableCloseInboundReplicationStreamOnShutdown() { + if enableCloseInboundReplicationStreamOnShutdown { s.logger.Info("ShutdownHandler: Initiating handler shutdown") s.handler.Stop() } else { @@ -166,7 +167,7 @@ func (s *Service) Stop() { }) s.server.GracefulStop() t.Stop() - if !s.config.EnableCloseInboundReplicationStreamOnShutdown() { + if !enableCloseInboundReplicationStreamOnShutdown { s.handler.Stop() } s.visibilityManager.Close() From 8aab9eeee66662e26214de773b1693d40d5d4f54 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Wed, 29 Apr 2026 15:52:51 +0100 Subject: [PATCH 5/5] Lint. --- service/history/configs/config.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 7608df19df7..4c34ee594f7 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -12,10 +12,10 @@ import ( type Config struct { NumberOfShards int32 - EnableReplicationStream dynamicconfig.BoolPropertyFn - EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn - EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn - HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn + EnableReplicationStream dynamicconfig.BoolPropertyFn + EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn + EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn + HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn NamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -440,7 +440,7 @@ func NewConfig( EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc), EnableCloseInboundReplicationStreamOnShutdown: dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Get(dc), EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc), - HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), + HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), RPS: dynamicconfig.HistoryRPS.Get(dc), NamespaceRPS: dynamicconfig.HistoryNamespaceRPS.Get(dc),