diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java index df189d5a6e8b..9b4b83fbacc1 100644 --- a/src/java/org/apache/cassandra/schema/CompactionParams.java +++ b/src/java/org/apache/cassandra/schema/CompactionParams.java @@ -24,7 +24,7 @@ import java.util.Objects; import java.util.Optional; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index a1011e33be87..8b385abd9408 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -41,6 +41,7 @@ import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.sensors.RequestTracker; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.concurrent.SimpleCondition; import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics; import org.apache.cassandra.net.MessagingService; @@ -65,6 +66,7 @@ public class ReadCallback, P extends ReplicaPlan.ForRead< private final Map failureReasonByEndpoint; private final boolean couldSpeculate; private final RequestSensors requestSensors; + private final MonotonicClock clock; public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { @@ -86,6 +88,7 @@ public ReadCallback(ResponseResolver resolver, ReadCommand command, Replic if (logger.isTraceEnabled()) logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan); this.requestSensors = RequestTracker.instance.get(); + this.clock = MonotonicClock.preciseTime; } protected P replicaPlan() @@ -138,7 +141,29 @@ public void awaitResults() throws ReadFailureException, ReadTimeoutException if (Tracing.isTracing()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData); + Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, + gotData); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(clock.now() - queryStartNanoTime); + long timeoutMillis = TimeUnit.MILLISECONDS.convert( + command.getTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + if (!failureReasonByEndpoint.isEmpty()) + { + for (Map.Entry entry : failureReasonByEndpoint.entrySet()) + { + logger.trace( + "Failure: replica={}, reason={}, received={}/{}, timeout={}ms, elapsed={}ms, available={}ms", + entry.getKey().getHostAddress(true), entry.getValue(), + received, blockFor, timeoutMillis, elapsedMillis, + timeoutMillis - elapsedMillis); + } + } + else + { + logger.trace("Timeout: received={}/{}, timeout={}ms, elapsed={}ms, available={}ms, waiting for replicas={}", + received, blockFor, timeoutMillis, elapsedMillis, + timeoutMillis - elapsedMillis, + replicaPlan().contacts()); + } } else if (logger.isDebugEnabled()) { diff --git a/test/unit/org/apache/cassandra/service/reads/ReadCallbackTest.java b/test/unit/org/apache/cassandra/service/reads/ReadCallbackTest.java index 1d3731a45250..5976e4e2961d 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadCallbackTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadCallbackTest.java @@ -24,34 +24,38 @@ import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.MultiRangeReadCommand; -import org.apache.cassandra.db.MultiRangeReadResponse; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.SimpleBuilders; import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics; import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.service.QueryInfoTracker; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MonotonicClock; import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.junit.Assert.*; @@ -227,7 +231,167 @@ public void testReadResponseMetricsWithDigestResponses() throws Throwable assertTrue("Digest responses should also be tracked in metrics", ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount() > initialReadBytes); } - + + @Test + public void testTimeoutLoggingWithMonotonicClockElapsedTime() throws Throwable { + Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class); + ListAppender listAppender = new ListAppender<>(); + listAppender.start(); + readCallbackLogger.addAppender(listAppender); + Level originalLevel = readCallbackLogger.getLevel(); + readCallbackLogger.setLevel(Level.TRACE); + + try + { + long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10); + ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas); + DigestResolver resolver = + new DigestResolver<>(command, plan, startTime, noopReadTracker()); + ReadCallback callback = + new ReadCallback<>(resolver, command, plan, startTime); + Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY); + try + { + callback.awaitResults(); + fail("Expected ReadTimeoutException"); + } + catch (ReadTimeoutException e) + { + boolean foundLogMessage = listAppender.list.stream() + .anyMatch(event -> + event.getFormattedMessage() + .contains("elapsed=") + ); + assertTrue("Should log elapsed time from MonotonicClock", foundLogMessage); + } + finally + { + Tracing.instance.stopSession(); + } + } finally + { + readCallbackLogger.detachAppender(listAppender); + readCallbackLogger.setLevel(originalLevel); + } + } + + @Test + public void testFailureLoggingWithMultipleReplicaFailures() throws Throwable + { + Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class); + ListAppender listAppender = new ListAppender<>(); + listAppender.start(); + readCallbackLogger.addAppender(listAppender); + Level originalLevel = readCallbackLogger.getLevel(); + readCallbackLogger.setLevel(Level.TRACE); + + try + { + long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10); + ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas); + DigestResolver resolver = + new DigestResolver<>(command, plan, startTime, noopReadTracker()); + ReadCallback callback = + new ReadCallback<>(resolver, command, plan, startTime); + + Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY); + try + { + callback.onFailure(EP1, org.apache.cassandra.exceptions.RequestFailureReason.TIMEOUT); + callback.onFailure(EP2, org.apache.cassandra.exceptions.RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + + callback.awaitResults(); + fail("Expected ReadFailureException"); + } + catch (org.apache.cassandra.exceptions.ReadFailureException e) + { + long failureLogCount = listAppender.list.stream() + .filter(event -> event.getFormattedMessage().contains("Failure: replica=")) + .count(); + assertEquals("Should log failure for each failed replica", 2, failureLogCount); + + boolean foundEP1 = listAppender.list.stream() + .anyMatch(event -> event.getFormattedMessage().contains(EP1.getHostAddress(true))); + assertTrue("Should log EP1 address", foundEP1); + + boolean foundEP2 = listAppender.list.stream() + .anyMatch(event -> event.getFormattedMessage().contains(EP2.getHostAddress(true))); + assertTrue("Should log EP2 address", foundEP2); + + boolean foundTimeout = listAppender.list.stream() + .anyMatch(event -> event.getFormattedMessage().contains("TIMEOUT")); + boolean foundTombstones = listAppender.list.stream() + .anyMatch(event -> event.getFormattedMessage().contains("READ_TOO_MANY_TOMBSTONES")); + assertTrue("Should log TIMEOUT reason", foundTimeout); + assertTrue("Should log READ_TOO_MANY_TOMBSTONES reason", foundTombstones); + } + finally + { + Tracing.instance.stopSession(); + } + } + finally + { + readCallbackLogger.detachAppender(listAppender); + readCallbackLogger.setLevel(originalLevel); + } + } + + @Test + public void testElapsedTimeCalculationAccuracy() throws Throwable + { + Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class); + ListAppender listAppender = new ListAppender<>(); + listAppender.start(); + readCallbackLogger.addAppender(listAppender); + Level originalLevel = readCallbackLogger.getLevel(); + readCallbackLogger.setLevel(Level.TRACE); + + try + { + long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10); + ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas); + DigestResolver resolver = + new DigestResolver<>(command, plan, startTime, noopReadTracker()); + ReadCallback callback = + new ReadCallback<>(resolver, command, plan, startTime); + + Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY); + try + { + callback.awaitResults(); + fail("Expected ReadTimeoutException"); + } + catch (ReadTimeoutException e) + { + boolean foundElapsedLog = listAppender.list.stream() + .anyMatch(event -> { + String msg = event.getFormattedMessage(); + return msg.contains("elapsed=") && msg.contains("ms"); + }); + assertTrue("Should log elapsed time from MonotonicClock", foundElapsedLog); + + boolean foundAllMetrics = listAppender.list.stream() + .anyMatch(event -> { + String msg = event.getFormattedMessage(); + return msg.contains("timeout=") && + msg.contains("elapsed=") && + msg.contains("available="); + }); + assertTrue("Should log timeout, elapsed, and available time metrics", foundAllMetrics); + } + finally + { + Tracing.instance.stopSession(); + } + } + finally + { + readCallbackLogger.detachAppender(listAppender); + readCallbackLogger.setLevel(originalLevel); + } + } + // Helper methods private Message createReadResponseMessage(ReadResponse response, InetAddressAndPort from)