Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,19 @@ public class PriorityEntry {
private final PartitionId partitionId;
private final int boost;
private final boolean isInterColo;
private final String threadName;

public PriorityEntry(PartitionId partitionId, int boost, boolean isInterColo) {
/**
* @param partitionId the prioritized partition.
* @param boost the fetchSize weight applied to the partition.
* @param isInterColo whether the entry came from the inter-colo (cross-datacenter) thread pool.
* @param threadName the {@link ReplicaThread} holding this entry; may be {@code null} if unknown.
*/
public PriorityEntry(PartitionId partitionId, int boost, boolean isInterColo, String threadName) {
this.partitionId = partitionId;
this.boost = boost;
this.isInterColo = isInterColo;
this.threadName = threadName;
}

public PartitionId getPartitionId() {
Expand All @@ -52,8 +60,18 @@ public boolean isInterColo() {
return isInterColo;
}

/**
* @return the name of the {@link ReplicaThread} that holds this priority entry, or {@code null} if unknown.
* Two threads on the same node can each report the same partition (one entry per thread), so this
* disambiguates otherwise-identical {@code (partition, boost, isInterColo)} rows.
*/
public String getThreadName() {
return threadName;
}

@Override
public String toString() {
return "PriorityEntry[" + partitionId + ", boost=" + boost + ", isInterColo=" + isInterColo + "]";
return "PriorityEntry[" + partitionId + ", boost=" + boost + ", isInterColo=" + isInterColo + ", threadName="
+ threadName + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.replication.PriorityEntry;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -28,10 +30,10 @@
* priority entries held by every {@link com.github.ambry.replication.ReplicaThread} on the
* target storage node at handler-invocation time.
*
* Each {@link PriorityEntry} carries an {@code isInterColo} flag indicating whether the entry
* came from the inter-colo (cross-datacenter) replication thread pool or the intra-colo pool.
* The current design applies priorities to both pools by default, so the same partition
* typically appears twice in the response — once per pool.
* Each {@link PriorityEntry} carries an {@code isInterColo} flag (inter-colo vs intra-colo thread pool)
* and the name of the holding {@link com.github.ambry.replication.ReplicaThread}. A partition can be
* replicated by more than one thread on a node, so it may appear once per thread; the thread name
* disambiguates otherwise-identical {@code (partition, boost, isInterColo)} rows.
*/
public class ListReplicationPriorityAdminResponse extends AdminResponse {
private static final short VERSION_V1 = 1;
Expand All @@ -40,6 +42,7 @@ public class ListReplicationPriorityAdminResponse extends AdminResponse {

private final List<PriorityEntry> entries;
private final byte[][] partitionIdBytes;
private final byte[][] threadNameBytes;
private final long sizeInBytes;

public static ListReplicationPriorityAdminResponse readFrom(DataInputStream stream, ClusterMap clusterMap)
Expand All @@ -58,7 +61,8 @@ public static ListReplicationPriorityAdminResponse readFrom(DataInputStream stre
PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream);
int boost = stream.readInt();
boolean isInterColo = stream.readByte() != 0;
entries.add(new PriorityEntry(partitionId, boost, isInterColo));
String threadName = Utils.readIntString(stream, StandardCharsets.UTF_8);
entries.add(new PriorityEntry(partitionId, boost, isInterColo, threadName));
}
return new ListReplicationPriorityAdminResponse(entries, adminResponse);
}
Expand All @@ -67,8 +71,12 @@ public ListReplicationPriorityAdminResponse(List<PriorityEntry> entries, AdminRe
super(adminResponse.getCorrelationId(), adminResponse.getClientId(), adminResponse.getError());
this.entries = Collections.unmodifiableList(new ArrayList<>(entries));
this.partitionIdBytes = new byte[this.entries.size()][];
this.threadNameBytes = new byte[this.entries.size()][];
for (int i = 0; i < this.entries.size(); i++) {
this.partitionIdBytes[i] = this.entries.get(i).getPartitionId().getBytes();
PriorityEntry entry = this.entries.get(i);
this.partitionIdBytes[i] = entry.getPartitionId().getBytes();
String threadName = entry.getThreadName() == null ? "" : entry.getThreadName();
this.threadNameBytes[i] = threadName.getBytes(StandardCharsets.UTF_8);
}
this.sizeInBytes = computeSizeInBytes();
}
Expand Down Expand Up @@ -98,15 +106,18 @@ protected void prepareBuffer() {
bufferToSend.writeBytes(partitionIdBytes[i]);
bufferToSend.writeInt(entry.getBoost());
bufferToSend.writeByte(entry.isInterColo() ? (byte) 1 : (byte) 0);
// length-prefixed UTF-8 thread name (readable by Utils.readIntString).
bufferToSend.writeInt(threadNameBytes[i].length);
bufferToSend.writeBytes(threadNameBytes[i]);
}
}

private long computeSizeInBytes() {
// parent + version + num-entries
long size = super.sizeInBytes() + Short.BYTES + Integer.BYTES;
for (int i = 0; i < entries.size(); i++) {
// partition id + boost + isInterColo flag
size += partitionIdBytes[i].length + Integer.BYTES + Byte.BYTES;
// partition id + boost + isInterColo flag + (thread-name length prefix + bytes)
size += partitionIdBytes[i].length + Integer.BYTES + Byte.BYTES + Integer.BYTES + threadNameBytes[i].length;
}
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1560,15 +1560,36 @@ public void listReplicationPriorityAdminResponseTest() throws IOException {
doListReplicationPriorityAdminResponseTest(clusterMap, Collections.emptyList());
// Single entry
doListReplicationPriorityAdminResponseTest(clusterMap, Collections.singletonList(
new PriorityEntry(partitions.get(0), 4, false)));
// Mixed intra/inter-colo, multiple entries
new PriorityEntry(partitions.get(0), 4, false, "ReplicaThread-Intra-0-dc1")));
// Mixed intra/inter-colo, multiple entries, distinct thread names
List<PriorityEntry> entries = new ArrayList<>();
entries.add(new PriorityEntry(partitions.get(0), 8, false));
entries.add(new PriorityEntry(partitions.get(1), 16, true));
entries.add(new PriorityEntry(partitions.get(2), 1, false));
entries.add(new PriorityEntry(partitions.get(0), 8, false, "ReplicaThread-Intra-0-dc1"));
entries.add(new PriorityEntry(partitions.get(1), 16, true, "ReplicaThread-Inter-1-dc2"));
entries.add(new PriorityEntry(partitions.get(2), 1, false, "ReplicaThread-Intra-1-dc1"));
doListReplicationPriorityAdminResponseTest(clusterMap, entries);
}

/**
* A {@code null} thread name is serialized as an empty string and reads back as {@code ""} (never null),
* pinning the wire contract for entries created without a thread name.
*/
@Test
public void listReplicationPriorityAdminResponseNullThreadNameTest() throws IOException {
MockClusterMap clusterMap = new MockClusterMap();
List<PartitionId> partitions = clusterMap.getWritablePartitionIds(MockClusterMap.DEFAULT_PARTITION_CLASS);
Assume.assumeTrue("Need at least 1 partition for this test", partitions.size() >= 1);
AdminResponse baseResponse = new AdminResponse(7, "null-threadname-client", ServerErrorCode.NoError);
ListReplicationPriorityAdminResponse response = new ListReplicationPriorityAdminResponse(
Collections.singletonList(new PriorityEntry(partitions.get(0), 3, false, null)), baseResponse);
DataInputStream responseStream = serAndPrepForRead(response, -1, false);
ListReplicationPriorityAdminResponse deserialized =
ListReplicationPriorityAdminResponse.readFrom(responseStream, clusterMap);
Assert.assertEquals("entry count", 1, deserialized.getEntries().size());
Assert.assertEquals("null thread name should read back as empty string", "",
deserialized.getEntries().get(0).getThreadName());
response.release();
}

private void doUpdateReplicationPriorityAdminRequestTest(MockClusterMap clusterMap, List<PartitionId> partitions,
int boost, UpdateReplicationPriorityAdminRequest.Action action) throws IOException {
int correlationId = 1234;
Expand Down Expand Up @@ -1610,6 +1631,7 @@ private void doListReplicationPriorityAdminResponseTest(MockClusterMap clusterMa
Assert.assertEquals("entry[" + i + "].partition", expected.getPartitionId(), actual.getPartitionId());
Assert.assertEquals("entry[" + i + "].boost", expected.getBoost(), actual.getBoost());
Assert.assertEquals("entry[" + i + "].isInterColo", expected.isInterColo(), actual.isInterColo());
Assert.assertEquals("entry[" + i + "].threadName", expected.getThreadName(), actual.getThreadName());
}
response.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,48 @@ public void prioritizePartitions(List<PartitionId> partitions, int boost) {
if (partitions.isEmpty()) {
return;
}
for (PartitionId id : partitions) {
priorityPartitions.put(id, boost);
int applied = 0;
lock.lock();
try {
for (PartitionId id : partitions) {
// Only a thread that actually replicates this partition can act on a boost for it. Setting it on a
// thread that holds no replica of the partition is a no-op that also never auto-prunes (see the
// empty-infos case in shouldAutoPrunePriority), leaving a stale "ghost" entry. Skip those here.
if (hostsPartition(id)) {
priorityPartitions.put(id, boost);
applied++;
}
}
} finally {
lock.unlock();
}
if (applied > 0) {
terminateCycleAndWake();
logger.info("Set replication priority for {} of {} requested partitions on thread {} to boost {}", applied,
partitions.size(), getName(), boost);
} else {
// Expected in the fan-out case: the engine offers every thread the full partition list and most threads
// host none of them. Log at debug so we don't emit N noisy INFO lines per operator request.
logger.debug("No hosted partitions among the {} requested for replication priority on thread {}",
partitions.size(), getName());
}
terminateCycleAndWake();
logger.info("Set replication priority for {} partitions on thread {} to boost {}",
partitions.size(), getName(), boost);
}

/**
* Whether this thread currently replicates at least one {@link RemoteReplicaInfo} of {@code partitionId}.
* Callers MUST hold {@link #lock} as this reads {@link #replicasToReplicateGroupedByNode}.
* @param partitionId the partition to check.
* @return {@code true} if at least one hosted remote replica belongs to {@code partitionId}.
*/
private boolean hostsPartition(PartitionId partitionId) {
for (Set<RemoteReplicaInfo> infos : replicasToReplicateGroupedByNode.values()) {
for (RemoteReplicaInfo info : infos) {
if (info.getReplicaId().getPartitionId().equals(partitionId)) {
return true;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -473,6 +509,14 @@ public void removeRemoteReplicaInfo(RemoteReplicaInfo remoteReplicaInfo) {
} else {
logger.info("RemoteReplicaInfo {} is removed from ReplicaThread {}.", remoteReplicaInfo, threadName);
decreaseAssignedRemoteReplicaInfosMetric();
// Drop any replication priority held for this partition once its last replica leaves this thread;
// otherwise the entry never auto-prunes (empty-infos case) and lingers as a stale "ghost".
PartitionId removedPartition = remoteReplicaInfo.getReplicaId().getPartitionId();
if (priorityPartitions.containsKey(removedPartition) && !hostsPartition(removedPartition)) {
priorityPartitions.remove(removedPartition);
logger.info("Removed replication priority for partition {} on thread {} after its last replica was removed",
removedPartition, threadName);
}
}
} else {
replicationMetrics.remoteReplicaInfoRemoveError.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,14 @@ public List<PriorityEntry> listAllPriorityPartitions() {
for (ReplicaThread thread : pool) {
boolean isInterColo = thread.isReplicatingFromRemoteColo();
for (Map.Entry<PartitionId, Integer> e : thread.listPriorityPartitions().entrySet()) {
entries.add(new PriorityEntry(e.getKey(), e.getValue(), isInterColo));
entries.add(new PriorityEntry(e.getKey(), e.getValue(), isInterColo, thread.getName()));
}
}
}
entries.sort(Comparator
.comparing((PriorityEntry e) -> e.getPartitionId().toPathString())
.thenComparing(PriorityEntry::isInterColo));
.thenComparing(PriorityEntry::isInterColo)
.thenComparing(PriorityEntry::getThreadName));
return entries;
}

Expand Down
Loading
Loading