Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,11 @@ Don't change this on a live cluster without using the gradual change mechanism.
true,
`EnableReplicationStream turn on replication stream`,
)
EnableCloseInboundReplicationStreamOnShutdown = NewGlobalBoolSetting(
"history.enableCloseInboundReplicationStreamOnShutdown",
true,
`EnableCloseInboundReplicationStreamOnShutdown closes inbound replication streams on shutdown, signaling the remote sender to stop. Disable if this causes unexpected issues during rolling restarts.`,
)
EnableSeparateReplicationEnableFlag = NewGlobalBoolSetting(
"history.enableSeparateReplicationEnableFlag",
false,
Expand Down
10 changes: 6 additions & 4 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
type Config struct {
NumberOfShards int32

EnableReplicationStream dynamicconfig.BoolPropertyFn
EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn
EnableReplicationStream dynamicconfig.BoolPropertyFn
EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn
EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn
HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn

RPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -436,8 +437,9 @@ func NewConfig(
cfg := &Config{
NumberOfShards: numberOfShards,

EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc),
EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc),
EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc),
EnableCloseInboundReplicationStreamOnShutdown: dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Get(dc),
EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc),
HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc),

RPS: dynamicconfig.HistoryRPS.Get(dc),
Expand Down
6 changes: 6 additions & 0 deletions service/history/replication/stream_receiver_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (m *StreamReceiverMonitorImpl) Stop() {
stream.Stop()
delete(m.outboundStreams, serverKey)
}
if m.Config.EnableCloseInboundReplicationStreamOnShutdown() {
for clientKey, stream := range m.inboundStreams {
stream.Stop()
delete(m.inboundStreams, clientKey)
}
}
m.Logger.Info("StreamReceiverMonitor stopped.")
}

Expand Down
76 changes: 76 additions & 0 deletions service/history/replication/stream_receiver_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -358,6 +359,81 @@ func (s *streamReceiverMonitorSuite) TestDoReconcileInboundStreams_Reactivate()
s.Equal(streamSenderValid, stream)
}

func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() {
clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31())
serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31())
streamSender := NewMockStreamSender(s.controller)
streamSender.EXPECT().Key().Return(ClusterShardKeyPair{
Client: clientKey,
Server: serverKey,
}).AnyTimes()
streamSender.EXPECT().Stop()
s.streamReceiverMonitor.RegisterInboundStream(streamSender)

s.streamReceiverMonitor.Lock()
s.Len(s.streamReceiverMonitor.inboundStreams, 1)
s.streamReceiverMonitor.Unlock()

// Transition to started state so Stop() proceeds past the CAS check.
s.streamReceiverMonitor.status = common.DaemonStatusStarted
s.streamReceiverMonitor.Stop()

s.streamReceiverMonitor.Lock()
defer s.streamReceiverMonitor.Unlock()
s.Empty(s.streamReceiverMonitor.inboundStreams)
}

func (s *streamReceiverMonitorSuite) TestStop_SkipsInboundStreamsWhenFlagDisabled() {
col := dynamicconfig.NewCollection(
dynamicconfig.StaticClient(map[dynamicconfig.Key]any{
dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Key(): false,
}),
log.NewNoopLogger(),
)
monitor := NewStreamReceiverMonitor(
ProcessToolBox{
Config: configs.NewConfig(col, 1),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
DLQWriter: NoopDLQWriter{},
},
NewExecutableTaskConverter(ProcessToolBox{
Config: configs.NewConfig(col, 1),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
DLQWriter: NoopDLQWriter{},
}),
true,
)

clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31())
serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31())
streamSender := NewMockStreamSender(s.controller)
streamSender.EXPECT().Key().Return(ClusterShardKeyPair{
Client: clientKey,
Server: serverKey,
}).AnyTimes()
// Stop should NOT be called on inbound streams when the flag is disabled.
monitor.RegisterInboundStream(streamSender)

monitor.Lock()
s.Len(monitor.inboundStreams, 1)
monitor.Unlock()

monitor.status = common.DaemonStatusStarted
monitor.Stop()

monitor.Lock()
defer monitor.Unlock()
s.Len(monitor.inboundStreams, 1, "inbound streams should not be closed when flag is disabled")
}

func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() {
s.clusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return("some cluster name").AnyTimes()
Expand Down
23 changes: 17 additions & 6 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,20 @@ func (s *Service) Stop() {
time.Sleep(s.config.ShutdownDrainDuration())
}

// Stop shard controller. We should have waited long enough for all shards to realize they
// lost ownership and close, but if not, this will definitely close them.
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
enableCloseInboundReplicationStreamOnShutdown := s.config.EnableCloseInboundReplicationStreamOnShutdown()
// When enabled, stop handler components (including the replication stream monitor) before
// waiting for gRPC handlers to return. This signals inbound stream senders on the peer to
// stop, allowing their handler goroutines to unblock and return cleanly before GracefulStop.
// Without this, those goroutines block indefinitely and the gRPC server falls back to a
// forceful Stop(), causing unclean H2 teardowns on the peer.
// Guarded by feature flag so the ordering change can be reverted if needed.
if enableCloseInboundReplicationStreamOnShutdown {
s.logger.Info("ShutdownHandler: Initiating handler shutdown")
s.handler.Stop()
} else {
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
}

// All grpc handlers should be cancelled now. Give them a little time to return.
t := time.AfterFunc(2*time.Second, func() {
Expand All @@ -157,8 +167,9 @@ func (s *Service) Stop() {
})
s.server.GracefulStop()
t.Stop()

s.handler.Stop()
if !enableCloseInboundReplicationStreamOnShutdown {
s.handler.Stop()
}
Comment thread
cursor[bot] marked this conversation as resolved.
s.visibilityManager.Close()

s.logger.Info("history stopped")
Expand Down
Loading