diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index cd2e73b5049..de1c14809c1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1539,6 +1539,11 @@ Don't change this on a live cluster without using the gradual change mechanism. true, `EnableReplicationStream turn on replication stream`, ) + EnableCloseInboundReplicationStreamOnShutdown = NewGlobalBoolSetting( + "history.enableCloseInboundReplicationStreamOnShutdown", + true, + `EnableCloseInboundReplicationStreamOnShutdown closes inbound replication streams on shutdown, signaling the remote sender to stop. Disable if this causes unexpected issues during rolling restarts.`, + ) EnableSeparateReplicationEnableFlag = NewGlobalBoolSetting( "history.enableSeparateReplicationEnableFlag", false, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 3ba258df0d3..4c34ee594f7 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -12,9 +12,10 @@ import ( type Config struct { NumberOfShards int32 - EnableReplicationStream dynamicconfig.BoolPropertyFn - EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn - HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn + EnableReplicationStream dynamicconfig.BoolPropertyFn + EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn + EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn + HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn NamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -436,9 +437,10 @@ func NewConfig( cfg := &Config{ NumberOfShards: numberOfShards, - EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc), - EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc), - HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), + EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc), + EnableCloseInboundReplicationStreamOnShutdown: dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Get(dc), + EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc), + HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc), RPS: dynamicconfig.HistoryRPS.Get(dc), NamespaceRPS: dynamicconfig.HistoryNamespaceRPS.Get(dc), diff --git a/service/history/replication/stream_receiver_monitor.go b/service/history/replication/stream_receiver_monitor.go index 5fe4699ca94..d9e6db5b759 100644 --- a/service/history/replication/stream_receiver_monitor.go +++ b/service/history/replication/stream_receiver_monitor.go @@ -106,6 +106,12 @@ func (m *StreamReceiverMonitorImpl) Stop() { stream.Stop() delete(m.outboundStreams, serverKey) } + if m.Config.EnableCloseInboundReplicationStreamOnShutdown() { + for clientKey, stream := range m.inboundStreams { + stream.Stop() + delete(m.inboundStreams, clientKey) + } + } m.Logger.Info("StreamReceiverMonitor stopped.") } diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index 702e637a69e..1b032a0801b 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -14,6 +14,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client" + "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" @@ -358,6 +359,81 @@ func (s *streamReceiverMonitorSuite) TestDoReconcileInboundStreams_Reactivate() s.Equal(streamSenderValid, stream) } +func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() { + clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31()) + serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31()) + streamSender := NewMockStreamSender(s.controller) + streamSender.EXPECT().Key().Return(ClusterShardKeyPair{ + Client: clientKey, + Server: serverKey, + }).AnyTimes() + streamSender.EXPECT().Stop() + s.streamReceiverMonitor.RegisterInboundStream(streamSender) + + s.streamReceiverMonitor.Lock() + s.Len(s.streamReceiverMonitor.inboundStreams, 1) + s.streamReceiverMonitor.Unlock() + + // Transition to started state so Stop() proceeds past the CAS check. + s.streamReceiverMonitor.status = common.DaemonStatusStarted + s.streamReceiverMonitor.Stop() + + s.streamReceiverMonitor.Lock() + defer s.streamReceiverMonitor.Unlock() + s.Empty(s.streamReceiverMonitor.inboundStreams) +} + +func (s *streamReceiverMonitorSuite) TestStop_SkipsInboundStreamsWhenFlagDisabled() { + col := dynamicconfig.NewCollection( + dynamicconfig.StaticClient(map[dynamicconfig.Key]any{ + dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Key(): false, + }), + log.NewNoopLogger(), + ) + monitor := NewStreamReceiverMonitor( + ProcessToolBox{ + Config: configs.NewConfig(col, 1), + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + MetricsHandler: metrics.NoopMetricsHandler, + Logger: log.NewNoopLogger(), + DLQWriter: NoopDLQWriter{}, + }, + NewExecutableTaskConverter(ProcessToolBox{ + Config: configs.NewConfig(col, 1), + ClusterMetadata: s.clusterMetadata, + ClientBean: s.clientBean, + ShardController: s.shardController, + MetricsHandler: metrics.NoopMetricsHandler, + Logger: log.NewNoopLogger(), + DLQWriter: NoopDLQWriter{}, + }), + true, + ) + + clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31()) + serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31()) + streamSender := NewMockStreamSender(s.controller) + streamSender.EXPECT().Key().Return(ClusterShardKeyPair{ + Client: clientKey, + Server: serverKey, + }).AnyTimes() + // Stop should NOT be called on inbound streams when the flag is disabled. + monitor.RegisterInboundStream(streamSender) + + monitor.Lock() + s.Len(monitor.inboundStreams, 1) + monitor.Unlock() + + monitor.status = common.DaemonStatusStarted + monitor.Stop() + + monitor.Lock() + defer monitor.Unlock() + s.Len(monitor.inboundStreams, 1, "inbound streams should not be closed when flag is disabled") +} + func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() { s.clusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return("some cluster name").AnyTimes() diff --git a/service/history/service.go b/service/history/service.go index bf28d1d2a50..e9e9c995f55 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -145,10 +145,20 @@ func (s *Service) Stop() { time.Sleep(s.config.ShutdownDrainDuration()) } - // Stop shard controller. We should have waited long enough for all shards to realize they - // lost ownership and close, but if not, this will definitely close them. - s.logger.Info("ShutdownHandler: Initiating shardController shutdown") - s.handler.controller.Stop() + enableCloseInboundReplicationStreamOnShutdown := s.config.EnableCloseInboundReplicationStreamOnShutdown() + // When enabled, stop handler components (including the replication stream monitor) before + // waiting for gRPC handlers to return. This signals inbound stream senders on the peer to + // stop, allowing their handler goroutines to unblock and return cleanly before GracefulStop. + // Without this, those goroutines block indefinitely and the gRPC server falls back to a + // forceful Stop(), causing unclean H2 teardowns on the peer. + // Guarded by feature flag so the ordering change can be reverted if needed. + if enableCloseInboundReplicationStreamOnShutdown { + s.logger.Info("ShutdownHandler: Initiating handler shutdown") + s.handler.Stop() + } else { + s.logger.Info("ShutdownHandler: Initiating shardController shutdown") + s.handler.controller.Stop() + } // All grpc handlers should be cancelled now. Give them a little time to return. t := time.AfterFunc(2*time.Second, func() { @@ -157,8 +167,9 @@ func (s *Service) Stop() { }) s.server.GracefulStop() t.Stop() - - s.handler.Stop() + if !enableCloseInboundReplicationStreamOnShutdown { + s.handler.Stop() + } s.visibilityManager.Close() s.logger.Info("history stopped")