diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..5e5a3ce4d8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -35,6 +35,7 @@ import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; +import org.apache.ratis.thirdparty.io.grpc.stub.ClientCallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; @@ -60,7 +61,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -341,20 +345,36 @@ private boolean haveTooManyPendingRequests() { } static class StreamObservers { - private final CallStreamObserver appendLog; - private final CallStreamObserver heartbeat; + private final ClientCallStreamObserver appendLog; + private final ClientCallStreamObserver heartbeat; private final TimeDuration waitForReady; + private final TimeDuration completeGracePeriod; private volatile boolean running = true; + private final ScheduledExecutorService closer = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "grpc-log-appender-stream-closer"); + t.setDaemon(true); + return t; + }); + + private final AtomicBoolean completed = new AtomicBoolean(false); + private final AtomicBoolean cancelled = new AtomicBoolean(false); + StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat, - TimeDuration waitTimeMin) { - this.appendLog = client.appendEntries(handler, false); - this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; + TimeDuration waitTimeMin, TimeDuration completeGracePeriod) { + this.appendLog = (ClientCallStreamObserver) client.appendEntries(handler, false); + this.heartbeat = separateHeartbeat? + (ClientCallStreamObserver) client.appendEntries(handler, true): null; this.waitForReady = waitTimeMin.isPositive()? waitTimeMin: TimeDuration.ONE_MILLISECOND; + this.completeGracePeriod = completeGracePeriod.isPositive()? completeGracePeriod : TimeDuration.ONE_SECOND; } void onNext(AppendEntriesRequestProto proto) throws InterruptedIOException { + if (!running) { + throw new InterruptedIOException("StreamObservers is stopping/closing"); + } CallStreamObserver stream; boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0; if (isHeartBeat) { @@ -366,7 +386,14 @@ void onNext(AppendEntriesRequestProto proto) while (!stream.isReady() && running) { sleep(waitForReady, isHeartBeat); } - stream.onNext(proto); + try { + stream.onNext(proto); + } catch (Exception e) { + InterruptedIOException ioe = + new InterruptedIOException("Failed to send request via stream"); + ioe.initCause(e); + throw ioe; + } } void stop() { @@ -374,8 +401,58 @@ void stop() { } void onCompleted() { - appendLog.onCompleted(); - Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); + if (completed.compareAndSet(false, true)) { + completeStreamGracefully(appendLog, "appendLog"); + Optional.ofNullable(heartbeat) + .ifPresent(s -> completeStreamGracefully(s, "heartbeat")); + } + final long delayMs = Math.max(1L, completeGracePeriod.toLong(TimeUnit.MILLISECONDS)); + closer.schedule(this::cancelIfStillNeeded, delayMs, TimeUnit.MILLISECONDS); + } + + void cancelNow(String reason, Throwable cause) { + if (cancelled.compareAndSet(false, true)) { + running = false; + cancelStream(appendLog, "appendLog", reason, cause); + Optional.ofNullable(heartbeat) + .ifPresent(s -> cancelStream(s, "heartbeat", reason, cause)); + shutdownCloser(); + } + } + + private void cancelIfStillNeeded() { + if (cancelled.compareAndSet(false, true)) { + cancelStream(appendLog, "appendLog", "Stream completion timeout", null); + Optional.ofNullable(heartbeat) + .ifPresent(s -> cancelStream(s, "heartbeat", "Stream completion timeout", null)); + } + shutdownCloser(); + } + + private void completeStreamGracefully( + ClientCallStreamObserver stream, + String name) { + try { + stream.onCompleted(); + } catch (Exception e) { + LOG.warn("Failed to call onCompleted on {}", name, e); + } + } + + private void cancelStream( + ClientCallStreamObserver stream, + String name, + String reason, + Throwable cause) { + try { + stream.cancel(reason, cause); + } catch (Exception e) { + LOG.warn("Failed to cancel {}", name, e); + } + } + + private void shutdownCloser() { + closer.shutdown(); } } @@ -404,7 +481,8 @@ private void appendLog(boolean heartbeat) throws IOException { increaseNextIndex(pending); if (appendLogRequestObserver == null) { appendLogRequestObserver = new StreamObservers( - getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); + getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin(), + getCompleteGracePeriod()); } } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index ef16f67f67..c931d69a7c 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -686,6 +686,16 @@ static void setWaitTimeMin(RaftProperties properties, TimeDuration minDuration) setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY, minDuration); } + String COMPLETE_GRACE_PERIOD_KEY = PREFIX + ".complete.grace.period"; + TimeDuration COMPLETE_GRACE_PERIOD_DEFAULT = TimeDuration.ONE_SECOND; + static TimeDuration completeGracePeriod(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(COMPLETE_GRACE_PERIOD_DEFAULT.getUnit()), + COMPLETE_GRACE_PERIOD_KEY, COMPLETE_GRACE_PERIOD_DEFAULT, getDefaultLog()); + } + static void setCompleteGracePeriod(RaftProperties properties, TimeDuration duration) { + setTimeDuration(properties::setTimeDuration, COMPLETE_GRACE_PERIOD_KEY, duration); + } + String RETRY_POLICY_KEY = PREFIX + ".retry.policy"; /** * The min wait time as 1ms (0 is not allowed) for first 10, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 5a27cda510..c366967ad2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -61,6 +61,7 @@ public abstract class LogAppenderBase implements LogAppender { private final AtomicBoolean heartbeatTrigger = new AtomicBoolean(); private final TimeDuration waitTimeMin; + private final TimeDuration completeGracePeriod; protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) { this.follower = f; @@ -78,6 +79,7 @@ protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, F this.eventAwaitForSignal = new AwaitForSignal(name); this.waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties); + this.completeGracePeriod = RaftServerConfigKeys.Log.Appender.completeGracePeriod(properties); } @Override @@ -144,6 +146,10 @@ protected TimeDuration getWaitTimeMin() { return waitTimeMin; } + protected TimeDuration getCompleteGracePeriod() { + return completeGracePeriod; + } + protected TimeDuration getRemainingWaitTime() { return waitTimeMin.add(getFollower().getLastRpcSendTime().elapsedTime().negate()); }