-
Notifications
You must be signed in to change notification settings - Fork 439
RATIS-2508. appendEntries log messages improvement. #1440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -150,6 +150,7 @@ | |
| import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto; | ||
| import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto; | ||
| import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto; | ||
| import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString; | ||
| import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString; | ||
| import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString; | ||
| import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString; | ||
|
|
@@ -239,31 +240,25 @@ public long[] getFollowerMatchIndices() { | |
| private final RetryCacheImpl retryCache; | ||
| private final CommitInfoCache commitInfoCache = new CommitInfoCache(); | ||
| private final WriteIndexCache writeIndexCache; | ||
| private final NavigableIndices appendLogTermIndices; | ||
|
|
||
| private final RaftServerJmxAdapter jmxAdapter = new RaftServerJmxAdapter(this); | ||
| private final LeaderElectionMetrics leaderElectionMetrics; | ||
| private final RaftServerMetricsImpl raftServerMetrics; | ||
| private final CountDownLatch closeFinishedLatch = new CountDownLatch(1); | ||
|
|
||
| // To avoid append entry before complete start() method | ||
| // For example, if thread1 start(), but before thread1 startAsFollower(), thread2 receive append entry | ||
| // request, and change state to RUNNING by lifeCycle.compareAndTransition(STARTING, RUNNING), | ||
| // then thread1 execute lifeCycle.transition(RUNNING) in startAsFollower(), | ||
| // So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING, | ||
| private final AtomicBoolean startComplete; | ||
| // Disallow appendEntries before start() complete; otherwise, it could fail with illegal lifeCycle transition | ||
| private final AtomicBoolean startComplete = new AtomicBoolean(false); | ||
| private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); | ||
| private final CountDownLatch closeFinishedLatch = new CountDownLatch(1); | ||
|
|
||
| private final TransferLeadership transferLeadership; | ||
| private final SnapshotManagementRequestHandler snapshotRequestHandler; | ||
| private final SnapshotInstallationHandler snapshotInstallationHandler; | ||
|
|
||
| private final ExecutorService serverExecutor; | ||
| private final ExecutorService clientExecutor; | ||
|
|
||
| private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); | ||
| private final ThreadGroup threadGroup; | ||
|
|
||
| private final NavigableIndices appendLogTermIndices; | ||
|
|
||
| RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) | ||
| throws IOException { | ||
| final RaftPeerId id = proxy.getId(); | ||
|
|
@@ -292,9 +287,6 @@ public long[] getFollowerMatchIndices() { | |
| this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics( | ||
| getMemberId(), this::getCommitIndex, retryCache::getStatistics); | ||
|
|
||
| this.startComplete = new AtomicBoolean(false); | ||
| this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); | ||
|
|
||
| this.transferLeadership = new TransferLeadership(this, properties); | ||
| this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); | ||
| this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); | ||
|
|
@@ -309,6 +301,7 @@ public long[] getFollowerMatchIndices() { | |
| RaftServerConfigKeys.ThreadPool.clientCached(properties), | ||
| RaftServerConfigKeys.ThreadPool.clientSize(properties), | ||
| id + "-client"); | ||
| this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); | ||
| } | ||
|
|
||
| private long getCommitIndex(RaftPeerId id) { | ||
|
|
@@ -1703,6 +1696,11 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde | |
| final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); | ||
| final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); | ||
| return appendFuture.whenCompleteAsync((r, t) -> { | ||
| if (t != null) { | ||
| LOG.warn("{}: appendEntries* failed: {}", getMemberId(), toLogEntryTermIndexString(entries), t); | ||
| } else if (LOG.isDebugEnabled()) { | ||
| LOG.debug("{}: appendEntries* succeeded {}", getMemberId(), toLogEntryTermIndexString(entries)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| } | ||
| followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); | ||
| timer.stop(); | ||
| }, getServerExecutor()).thenApply(v -> { | ||
|
|
@@ -1753,7 +1751,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro | |
| && !(appendLogTermIndices != null && appendLogTermIndices.contains(previous)) | ||
| && !state.containsTermIndex(previous)) { | ||
| final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex()); | ||
| LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous); | ||
| LOG.info("{}: Failed appendEntries, previous log entry {} not found", getMemberId(), previous); | ||
| return replyNextIndex; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -87,9 +87,23 @@ public static String toLogEntriesString(List<LogEntryProto> entries) { | |||||||||||||||
|
|
||||||||||||||||
| public static String toLogEntriesShortString(List<LogEntryProto> entries, | ||||||||||||||||
| Function<StateMachineLogEntryProto, String> stateMachineToString) { | ||||||||||||||||
| return entries == null ? null | ||||||||||||||||
| : entries.isEmpty()? "<empty>" | ||||||||||||||||
| : "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0), stateMachineToString); | ||||||||||||||||
| if (entries == null) { | ||||||||||||||||
| return null; | ||||||||||||||||
| } | ||||||||||||||||
| return toLogEntryTermIndexString(entries) | ||||||||||||||||
| + (entries.isEmpty() ? "" : ", first=" + toLogEntryString(entries.get(0), stateMachineToString)); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| public static String toLogEntryTermIndexString(List<LogEntryProto> entries) { | ||||||||||||||||
| final int n = entries.size(); | ||||||||||||||||
| return n == 0 ? toLogEntryTermIndexString(n, null, null) | ||||||||||||||||
| : toLogEntryTermIndexString(n, TermIndex.valueOf(entries.get(0)), TermIndex.valueOf(entries.get(n - 1))); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| public static String toLogEntryTermIndexString(int n, TermIndex first, TermIndex last) { | ||||||||||||||||
| return n == 0 ? "HEARTBEAT" | ||||||||||||||||
| : n == 1 ? "entry" + first | ||||||||||||||||
| : n + " entries:" + first + "..." + last; | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing
Suggested change
Also, would ratis/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java Lines 894 to 896 in 3e927cc
|
||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| public static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long index) { | ||||||||||||||||
|
|
||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we add ms for lastRpcElapsedTime but not electionTimeout