diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java index e7bb2b1693..df076875bf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java @@ -97,7 +97,7 @@ static Throwable unwrapThrowable(Throwable t) { return unwrapped; } } - return t; + return JavaUtils.unwrapCompletionException(t); } static IOException unwrapException(StatusRuntimeException se) { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 69421e9f0f..053cc5c0f4 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -63,6 +63,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString; + /** * A new log appender implementation using grpc bi-directional stream API. */ @@ -301,8 +303,8 @@ private void mayWait() { getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException ie) { - LOG.warn(this + ": Wait interrupted by " + ie); Thread.currentThread().interrupt(); + LOG.warn("{} is interrupted: {}", this, ie.toString()); } } @@ -616,11 +618,11 @@ void removePending(InstallSnapshotReplyProto reply) { if (isNotificationOnly) { Preconditions.assertSame(InstallSnapshotReplyBodyCase.SNAPSHOTINDEX, reply.getInstallSnapshotReplyBodyCase(), "reply case"); - Preconditions.assertSame(INSTALL_SNAPSHOT_NOTIFICATION_INDEX, (int) index, "poll index"); + Preconditions.assertSame(INSTALL_SNAPSHOT_NOTIFICATION_INDEX, index, "poll index"); } else { Preconditions.assertSame(InstallSnapshotReplyBodyCase.REQUESTINDEX, reply.getInstallSnapshotReplyBodyCase(), "reply case"); - Preconditions.assertSame(reply.getRequestIndex(), (int) index, "poll index"); + Preconditions.assertSame(reply.getRequestIndex(), index, "poll index"); } } } @@ -889,13 +891,9 @@ boolean isHeartbeat() { @Override public String toString() { - final String entries = entriesCount == 0? "" - : entriesCount == 1? ",entry=" + firstEntry - : ",entries=" + firstEntry + "..." + lastEntry; return JavaUtils.getClassSimpleName(getClass()) + ":cid=" + callId - + ",entriesCount=" + entriesCount - + entries; + + ":" + toLogEntryTermIndexString(entriesCount, firstEntry, lastEntry); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index b01abcddc0..7a8414ca2d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -158,8 +158,8 @@ private void runImpl() { } synchronized (server) { if (roleChangeChecking(electionTimeout)) { - LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}", - this, lastRpcTime.elapsedTime(), electionTimeout); + LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}ms, electionTimeout:{}", + this, lastRpcTime.elapsedTimeMs(), electionTimeout); server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters. // election timeout, should become a candidate server.changeToCandidate(false); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c0e93338a6..1c9cd3f658 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -150,6 +150,7 @@ import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto; import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto; import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto; +import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString; import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString; import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString; import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString; @@ -239,18 +240,16 @@ public long[] getFollowerMatchIndices() { private final RetryCacheImpl retryCache; private final CommitInfoCache commitInfoCache = new CommitInfoCache(); private final WriteIndexCache writeIndexCache; + private final NavigableIndices appendLogTermIndices; private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this); private final LeaderElectionMetrics leaderElectionMetrics; private final RaftServerMetricsImpl raftServerMetrics; - private final CountDownLatch closeFinishedLatch = new CountDownLatch(1); - // To avoid append entry before complete start() method - // For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry - // request, and change state to RUNNING by lifeCycle.compareAndTransition(STARTING, RUNNING), - // then thread1 execute lifeCycle.transition(RUNNING) in startAsFollower(), - // So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING, - private final AtomicBoolean startComplete; + // Disallow appendEntries before start() complete; otherwise, it could fail with illegal lifeCycle transition + private final AtomicBoolean startComplete = new AtomicBoolean(false); + private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); + private final CountDownLatch closeFinishedLatch = new CountDownLatch(1); private final TransferLeadership transferLeadership; private final SnapshotManagementRequestHandler snapshotRequestHandler; @@ -258,12 +257,8 @@ public long[] getFollowerMatchIndices() { private final ExecutorService serverExecutor; private final ExecutorService clientExecutor; - - private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; - private final NavigableIndices appendLogTermIndices; - RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { final RaftPeerId id = proxy.getId(); @@ -292,9 +287,6 @@ public long[] getFollowerMatchIndices() { this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics( getMemberId(), this::getCommitIndex, retryCache::getStatistics); - this.startComplete = new AtomicBoolean(false); - this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); - this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); @@ -309,6 +301,7 @@ public long[] getFollowerMatchIndices() { RaftServerConfigKeys.ThreadPool.clientCached(properties), RaftServerConfigKeys.ThreadPool.clientSize(properties), id + "-client"); + this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); } private long getCommitIndex(RaftPeerId id) { @@ -1703,6 +1696,11 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); return appendFuture.whenCompleteAsync((r, t) -> { + if (t != null) { + LOG.warn("{}: appendEntries* failed: {}", getMemberId(), toLogEntryTermIndexString(entries), t); + } else if (LOG.isDebugEnabled()) { + LOG.debug("{}: appendEntries* succeeded: {}", getMemberId(), toLogEntryTermIndexString(entries)); + } followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); timer.stop(); }, getServerExecutor()).thenApply(v -> { @@ -1753,7 +1751,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List entries) { public static String toLogEntriesShortString(List entries, Function stateMachineToString) { - return entries == null ? null - : entries.isEmpty()? "" - : "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0), stateMachineToString); + if (entries == null) { + return null; + } + return toLogEntryTermIndexString(entries) + + (entries.isEmpty() ? "" : ", first=" + toLogEntryString(entries.get(0), stateMachineToString)); + } + + public static String toLogEntryTermIndexString(List entries) { + final int n = entries.size(); + return n == 0 ? toLogEntryTermIndexString(n, null, null) + : toLogEntryTermIndexString(n, TermIndex.valueOf(entries.get(0)), TermIndex.valueOf(entries.get(n - 1))); + } + + public static String toLogEntryTermIndexString(int n, TermIndex first, TermIndex last) { + return n == 0 ? "HEARTBEAT" + : n == 1 ? "entry=" + first + : n + " entries=" + first + "..." + last; } public static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long index) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java index 3a5db62859..50b0918b83 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java @@ -21,18 +21,17 @@ import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.ProtoUtils; -import java.util.List; import java.util.function.Function; +import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntriesShortString; + /** * This class provides convenient utilities for converting Protocol Buffers messages to strings. * The output strings are for information purpose only. @@ -50,14 +49,12 @@ public static String toAppendEntriesRequestString(AppendEntriesRequestProto requ if (request == null) { return null; } - final List entries = request.getEntriesList(); return ProtoUtils.toString(request.getServerRequest()) + "-t" + request.getLeaderTerm() + ",previous=" + TermIndex.valueOf(request.getPreviousLog()) + ",leaderCommit=" + request.getLeaderCommit() + ",initializing? " + request.getInitializing() - + "," + (entries.isEmpty()? "HEARTBEAT" : "entries: " + - LogProtoUtils.toLogEntriesShortString(entries, stateMachineToString)); + + "," + toLogEntriesShortString(request.getEntriesList(), stateMachineToString); } public static String toAppendEntriesReplyString(AppendEntriesReplyProto reply) { @@ -87,7 +84,7 @@ public static String toInstallSnapshotRequestString(InstallSnapshotRequestProto s = "notify:" + TermIndex.valueOf(notification.getFirstAvailableTermIndex()); break; default: - throw new IllegalStateException("Unexpected body case in " + request); + throw new IllegalStateException("Unexpected InstallSnapshotRequestBodyCase in " + request); } return ProtoUtils.toString(request.getServerRequest()) + "-t" + request.getLeaderTerm() @@ -122,11 +119,7 @@ public static String toRequestVoteReplyString(RequestVoteReplyProto proto) { + "-last:" + TermIndex.valueOf(proto.getLastEntry()); } - /** - * Used to generate the necessary unified name in the submodules under - * {@link org.apache.ratis.server.impl.RaftServerImpl}, which consists - * of {@link org.apache.ratis.server.impl.ServerState#memberId} and the specific class. - */ + /** Generate the unified name for the given member and class. */ public static String generateUnifiedName(RaftGroupMemberId memberId, Class clazz) { return memberId + "-" + JavaUtils.getClassSimpleName(clazz); }