Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,11 @@ Don't change this on a live cluster without using the gradual change mechanism.
true,
`EnableReplicationStream turn on replication stream`,
)
EnableCloseInboundReplicationStreamOnShutdown = NewGlobalBoolSetting(
"history.enableCloseInboundReplicationStreamOnShutdown",
true,
`EnableCloseInboundReplicationStreamOnShutdown closes inbound replication streams on shutdown, signaling the remote sender to stop. Disable if this causes unexpected issues during rolling restarts.`,
)
EnableSeparateReplicationEnableFlag = NewGlobalBoolSetting(
"history.enableSeparateReplicationEnableFlag",
false,
Expand Down
14 changes: 8 additions & 6 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
type Config struct {
NumberOfShards int32

EnableReplicationStream dynamicconfig.BoolPropertyFn
EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn
HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn
EnableReplicationStream dynamicconfig.BoolPropertyFn
EnableCloseInboundReplicationStreamOnShutdown dynamicconfig.BoolPropertyFn
EnableSeparateReplicationEnableFlag dynamicconfig.BoolPropertyFn
HistoryReplicationDLQV2 dynamicconfig.BoolPropertyFn

RPS dynamicconfig.IntPropertyFn
NamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -436,9 +437,10 @@ func NewConfig(
cfg := &Config{
NumberOfShards: numberOfShards,

EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc),
EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc),
HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc),
EnableReplicationStream: dynamicconfig.EnableReplicationStream.Get(dc),
EnableCloseInboundReplicationStreamOnShutdown: dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Get(dc),
EnableSeparateReplicationEnableFlag: dynamicconfig.EnableSeparateReplicationEnableFlag.Get(dc),
HistoryReplicationDLQV2: dynamicconfig.EnableHistoryReplicationDLQV2.Get(dc),

RPS: dynamicconfig.HistoryRPS.Get(dc),
NamespaceRPS: dynamicconfig.HistoryNamespaceRPS.Get(dc),
Expand Down
6 changes: 6 additions & 0 deletions service/history/replication/stream_receiver_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (m *StreamReceiverMonitorImpl) Stop() {
stream.Stop()
delete(m.outboundStreams, serverKey)
}
if m.Config.EnableCloseInboundReplicationStreamOnShutdown() {
for clientKey, stream := range m.inboundStreams {
stream.Stop()
delete(m.inboundStreams, clientKey)
}
}
m.Logger.Info("StreamReceiverMonitor stopped.")
}

Expand Down
76 changes: 76 additions & 0 deletions service/history/replication/stream_receiver_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -358,6 +359,81 @@ func (s *streamReceiverMonitorSuite) TestDoReconcileInboundStreams_Reactivate()
s.Equal(streamSenderValid, stream)
}

func (s *streamReceiverMonitorSuite) TestStop_StopsInboundStreams() {
clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31())
serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31())
streamSender := NewMockStreamSender(s.controller)
streamSender.EXPECT().Key().Return(ClusterShardKeyPair{
Client: clientKey,
Server: serverKey,
}).AnyTimes()
streamSender.EXPECT().Stop()
s.streamReceiverMonitor.RegisterInboundStream(streamSender)

s.streamReceiverMonitor.Lock()
s.Len(s.streamReceiverMonitor.inboundStreams, 1)
s.streamReceiverMonitor.Unlock()

// Transition to started state so Stop() proceeds past the CAS check.
s.streamReceiverMonitor.status = common.DaemonStatusStarted
s.streamReceiverMonitor.Stop()

s.streamReceiverMonitor.Lock()
defer s.streamReceiverMonitor.Unlock()
s.Empty(s.streamReceiverMonitor.inboundStreams)
}

func (s *streamReceiverMonitorSuite) TestStop_SkipsInboundStreamsWhenFlagDisabled() {
col := dynamicconfig.NewCollection(
dynamicconfig.StaticClient(map[dynamicconfig.Key]any{
dynamicconfig.EnableCloseInboundReplicationStreamOnShutdown.Key(): false,
}),
log.NewNoopLogger(),
)
monitor := NewStreamReceiverMonitor(
ProcessToolBox{
Config: configs.NewConfig(col, 1),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
DLQWriter: NoopDLQWriter{},
},
NewExecutableTaskConverter(ProcessToolBox{
Config: configs.NewConfig(col, 1),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
DLQWriter: NoopDLQWriter{},
}),
true,
)

clientKey := NewClusterShardKey(int32(cluster.TestAlternativeClusterInitialFailoverVersion), rand.Int31())
serverKey := NewClusterShardKey(int32(cluster.TestCurrentClusterInitialFailoverVersion), rand.Int31())
streamSender := NewMockStreamSender(s.controller)
streamSender.EXPECT().Key().Return(ClusterShardKeyPair{
Client: clientKey,
Server: serverKey,
}).AnyTimes()
// Stop should NOT be called on inbound streams when the flag is disabled.
monitor.RegisterInboundStream(streamSender)

monitor.Lock()
s.Len(monitor.inboundStreams, 1)
monitor.Unlock()

monitor.status = common.DaemonStatusStarted
monitor.Stop()

monitor.Lock()
defer monitor.Unlock()
s.Len(monitor.inboundStreams, 1, "inbound streams should not be closed when flag is disabled")
}

func (s *streamReceiverMonitorSuite) TestDoReconcileOutboundStreams_Add() {
s.clusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.clusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return("some cluster name").AnyTimes()
Expand Down
23 changes: 17 additions & 6 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,20 @@ func (s *Service) Stop() {
time.Sleep(s.config.ShutdownDrainDuration())
}

// Stop shard controller. We should have waited long enough for all shards to realize they
// lost ownership and close, but if not, this will definitely close them.
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
enableCloseInboundReplicationStreamOnShutdown := s.config.EnableCloseInboundReplicationStreamOnShutdown()
// When enabled, stop handler components (including the replication stream monitor) before
// waiting for gRPC handlers to return. This signals inbound stream senders on the peer to
// stop, allowing their handler goroutines to unblock and return cleanly before GracefulStop.
// Without this, those goroutines block indefinitely and the gRPC server falls back to a
// forceful Stop(), causing unclean H2 teardowns on the peer.
// Guarded by feature flag so the ordering change can be reverted if needed.
if enableCloseInboundReplicationStreamOnShutdown {
s.logger.Info("ShutdownHandler: Initiating handler shutdown")
s.handler.Stop()
} else {
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
}

// All grpc handlers should be cancelled now. Give them a little time to return.
t := time.AfterFunc(2*time.Second, func() {
Expand All @@ -157,8 +167,9 @@ func (s *Service) Stop() {
})
s.server.GracefulStop()
t.Stop()

s.handler.Stop()
if !enableCloseInboundReplicationStreamOnShutdown {
s.handler.Stop()
}
Comment thread
cursor[bot] marked this conversation as resolved.
s.visibilityManager.Close()

s.logger.Info("history stopped")
Expand Down
Loading