Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/schema/CompactionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Objects;
import java.util.Optional;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
Expand Down
27 changes: 26 additions & 1 deletion src/java/org/apache/cassandra/service/reads/ReadCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
import org.apache.cassandra.net.MessagingService;
Expand All @@ -65,6 +66,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
private final boolean couldSpeculate;
private final RequestSensors requestSensors;
private final MonotonicClock clock;

public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
Expand All @@ -86,6 +88,7 @@ public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, Replic
if (logger.isTraceEnabled())
logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan);
this.requestSensors = RequestTracker.instance.get();
this.clock = MonotonicClock.preciseTime;
}

protected P replicaPlan()
Expand Down Expand Up @@ -138,7 +141,29 @@ public void awaitResults() throws ReadFailureException, ReadTimeoutException
if (Tracing.isTracing())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being inside this block will require that tracing is enabled, which is different from trace level logging (which would also need to be enabled.) Is that intended?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentional. The production code logs these details inside Tracing.isTracing() conditional blocks, so the logs only appear when tracing is actively enabled for a session, not just when TRACE log level is configured. This keeps log volume minimal in production while still allowing targeted debugging through tracing when needed.

{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor,
gotData);
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(clock.now() - queryStartNanoTime);
long timeoutMillis = TimeUnit.MILLISECONDS.convert(
command.getTimeout(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
if (!failureReasonByEndpoint.isEmpty())
{
for (Map.Entry<InetAddressAndPort, RequestFailureReason> entry : failureReasonByEndpoint.entrySet())
{
logger.trace(
"Failure: replica={}, reason={}, received={}/{}, timeout={}ms, elapsed={}ms, available={}ms",
entry.getKey().getHostAddress(true), entry.getValue(),
received, blockFor, timeoutMillis, elapsedMillis,
timeoutMillis - elapsedMillis);
}
}
else
{
logger.trace("Timeout: received={}/{}, timeout={}ms, elapsed={}ms, available={}ms, waiting for replicas={}",
received, blockFor, timeoutMillis, elapsedMillis,
timeoutMillis - elapsedMillis,
replicaPlan().contacts());
}
}
else if (logger.isDebugEnabled())
{
Expand Down
178 changes: 171 additions & 7 deletions test/unit/org/apache/cassandra/service/reads/ReadCallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,38 @@
import org.junit.Before;
import org.junit.Test;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.EmptyIterators;
import org.apache.cassandra.db.MultiRangeReadCommand;
import org.apache.cassandra.db.MultiRangeReadResponse;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SimpleBuilders;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReplicaResponseSizeMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClock;

import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -227,7 +231,167 @@ public void testReadResponseMetricsWithDigestResponses() throws Throwable
assertTrue("Digest responses should also be tracked in metrics",
ReplicaResponseSizeMetrics.readResponseBytesReceived.getCount() > initialReadBytes);
}


@Test
public void testTimeoutLoggingWithMonotonicClockElapsedTime() throws Throwable {
Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class);
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.start();
readCallbackLogger.addAppender(listAppender);
Level originalLevel = readCallbackLogger.getLevel();
readCallbackLogger.setLevel(Level.TRACE);

try
{
long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10);
ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas);
DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver =
new DigestResolver<>(command, plan, startTime, noopReadTracker());
ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback =
new ReadCallback<>(resolver, command, plan, startTime);
Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY);
try
{
callback.awaitResults();
fail("Expected ReadTimeoutException");
}
catch (ReadTimeoutException e)
{
boolean foundLogMessage = listAppender.list.stream()
.anyMatch(event ->
event.getFormattedMessage()
.contains("elapsed=")
);
assertTrue("Should log elapsed time from MonotonicClock", foundLogMessage);
}
finally
{
Tracing.instance.stopSession();
}
} finally
{
readCallbackLogger.detachAppender(listAppender);
readCallbackLogger.setLevel(originalLevel);
}
}

@Test
public void testFailureLoggingWithMultipleReplicaFailures() throws Throwable
{
Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class);
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.start();
readCallbackLogger.addAppender(listAppender);
Level originalLevel = readCallbackLogger.getLevel();
readCallbackLogger.setLevel(Level.TRACE);

try
{
long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10);
ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas);
DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver =
new DigestResolver<>(command, plan, startTime, noopReadTracker());
ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback =
new ReadCallback<>(resolver, command, plan, startTime);

Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY);
try
{
callback.onFailure(EP1, org.apache.cassandra.exceptions.RequestFailureReason.TIMEOUT);
callback.onFailure(EP2, org.apache.cassandra.exceptions.RequestFailureReason.READ_TOO_MANY_TOMBSTONES);

callback.awaitResults();
fail("Expected ReadFailureException");
}
catch (org.apache.cassandra.exceptions.ReadFailureException e)
{
long failureLogCount = listAppender.list.stream()
.filter(event -> event.getFormattedMessage().contains("Failure: replica="))
.count();
assertEquals("Should log failure for each failed replica", 2, failureLogCount);

boolean foundEP1 = listAppender.list.stream()
.anyMatch(event -> event.getFormattedMessage().contains(EP1.getHostAddress(true)));
assertTrue("Should log EP1 address", foundEP1);

boolean foundEP2 = listAppender.list.stream()
.anyMatch(event -> event.getFormattedMessage().contains(EP2.getHostAddress(true)));
assertTrue("Should log EP2 address", foundEP2);

boolean foundTimeout = listAppender.list.stream()
.anyMatch(event -> event.getFormattedMessage().contains("TIMEOUT"));
boolean foundTombstones = listAppender.list.stream()
.anyMatch(event -> event.getFormattedMessage().contains("READ_TOO_MANY_TOMBSTONES"));
assertTrue("Should log TIMEOUT reason", foundTimeout);
assertTrue("Should log READ_TOO_MANY_TOMBSTONES reason", foundTombstones);
}
finally
{
Tracing.instance.stopSession();
}
}
finally
{
readCallbackLogger.detachAppender(listAppender);
readCallbackLogger.setLevel(originalLevel);
}
}

@Test
public void testElapsedTimeCalculationAccuracy() throws Throwable
{
Logger readCallbackLogger = (Logger) LoggerFactory.getLogger(ReadCallback.class);
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.start();
readCallbackLogger.addAppender(listAppender);
Level originalLevel = readCallbackLogger.getLevel();
readCallbackLogger.setLevel(Level.TRACE);

try
{
long startTime = MonotonicClock.preciseTime.now() - TimeUnit.SECONDS.toNanos(10);
ReplicaPlan.SharedForTokenRead plan = plan(ConsistencyLevel.QUORUM, targetReplicas);
DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver =
new DigestResolver<>(command, plan, startTime, noopReadTracker());
ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> callback =
new ReadCallback<>(resolver, command, plan, startTime);

Tracing.instance.newSession(ClientState.forInternalCalls(), Tracing.TraceType.QUERY);
try
{
callback.awaitResults();
fail("Expected ReadTimeoutException");
}
catch (ReadTimeoutException e)
{
boolean foundElapsedLog = listAppender.list.stream()
.anyMatch(event -> {
String msg = event.getFormattedMessage();
return msg.contains("elapsed=") && msg.contains("ms");
});
assertTrue("Should log elapsed time from MonotonicClock", foundElapsedLog);

boolean foundAllMetrics = listAppender.list.stream()
.anyMatch(event -> {
String msg = event.getFormattedMessage();
return msg.contains("timeout=") &&
msg.contains("elapsed=") &&
msg.contains("available=");
});
assertTrue("Should log timeout, elapsed, and available time metrics", foundAllMetrics);
}
finally
{
Tracing.instance.stopSession();
}
}
finally
{
readCallbackLogger.detachAppender(listAppender);
readCallbackLogger.setLevel(originalLevel);
}
}

// Helper methods

private Message<ReadResponse> createReadResponseMessage(ReadResponse response, InetAddressAndPort from)
Expand Down