Skip to content

Commit 7b33aa8

Browse files
Qian1900claude
andauthored
Distribute lagging replicas across replica groups to reduce head-of-line blocking (#3259)
* Spread laggers across chunks in continuous replication to reduce head-of-line blocking Adds an opt-in chunking mode for continuous replication that sorts replicas within each per-remote-DataNode pass by remote lag (descending) and round-robin distributes them into chunks of at most replicationMaxPartitionCountPerRequest, instead of slicing sequentially via Utils.partitionList. Today's sequential slicing can co-locate multiple ambient laggers in one chunk, where the chunk's iteration time is gated by whichever member has the most data to fetch — a head-of-line blocking effect. The spread mode guarantees that when laggers <= chunk count, at most one lagger lands in any one chunk. Gated by replication.spread.laggers.across.chunks, default false. When false, behavior is byte-identical to today (delegates to Utils.partitionList). When true, sorts via a snapshot-then-stride helper. Snapshot is required because RemoteReplicaInfo.getRemoteLagFromLocalInBytes() reads non-volatile state written by other threads; re-reading inside the sort comparator can trip TimSort's contract check and kill the replication cycle. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Pass ReplicationConfig to DataNodeTracker; drop two primitive config params Address review feedback: DataNodeTracker now takes ReplicationConfig directly (matching ReplicaThread / ReplicationManager / ReplicationEngine / BackupCheckerThread convention) instead of receiving primitive config-derived values. Drops two primitive ctor params: maxActiveGroupSize and spreadLaggersAcrossChunks. DataNodeTracker reads them from replicationConfig.replicationMaxPartitionCountPerRequest and replicationConfig.replicationSpreadLaggersAcrossChunks internally. replicaThrottleDurationMs stays as a primitive: it's lane-specific (inter vs intra DC), and ReplicaThread already resolves it at construction. Passing the resolved value keeps lane logic where it belongs (in ReplicaThread) rather than duplicating the lane branch inside DataNodeTracker. Net: 8-param ctor → 7-param. Behavior unchanged; same fields read. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f6989d9 commit 7b33aa8

4 files changed

Lines changed: 371 additions & 9 deletions

File tree

ambry-api/src/main/java/com/github/ambry/config/ReplicationConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class ReplicationConfig {
2929
public static final String REPLICATION_MODEL_ACROSS_DATACENTERS = "replication.model.across.datacenters";
3030
public static final String REPLICATION_ENABLE_CONTINUOUS_REPLICATION = "replication.enable.continuous.replication";
3131
public static final String REPLICATION_GROUP_ITERATION_LIMIT = "replication.group.iteration.limit";
32+
public static final String REPLICATION_SPREAD_LAGGERS_ACROSS_CHUNKS = "replication.spread.laggers.across.chunks";
3233
public static final String REPLICATION_STANDBY_WAIT_TIMEOUT_TO_TRIGGER_CROSS_COLO_FETCH_SECONDS =
3334
"replication.standby.wait.timeout.to.trigger.cross.colo.fetch.seconds";
3435

@@ -70,6 +71,16 @@ public class ReplicationConfig {
7071
@Default("1")
7172
public final int replicationGroupIterationLimit;
7273

74+
/**
75+
* Continuous replication only — no-op when {@code replication.enable.continuous.replication=false}.
76+
* When true, replicas within each per-remote-DataNode chunking pass are sorted by remote lag in bytes
77+
* (descending), then round-robin distributed into chunks of at most
78+
* {@code replicationMaxPartitionCountPerRequest}. When false, replicas are sliced sequentially.
79+
*/
80+
@Config(REPLICATION_SPREAD_LAGGERS_ACROSS_CHUNKS)
81+
@Default("false")
82+
public final boolean replicationSpreadLaggersAcrossChunks;
83+
7384
/**
7485
* The number of replica threads on each server that runs the replication protocol for intra dc replication
7586
*/
@@ -407,6 +418,8 @@ public ReplicationConfig(VerifiableProperties verifiableProperties) {
407418
verifiableProperties.getBoolean(REPLICATION_ENABLE_CONTINUOUS_REPLICATION, false);
408419
replicationGroupIterationLimit =
409420
verifiableProperties.getIntInRange(REPLICATION_GROUP_ITERATION_LIMIT, 1, 1, 10000000);
421+
replicationSpreadLaggersAcrossChunks =
422+
verifiableProperties.getBoolean(REPLICATION_SPREAD_LAGGERS_ACROSS_CHUNKS, false);
410423
replicationNumOfIntraDCReplicaThreads =
411424
verifiableProperties.getInt("replication.no.of.intra.dc.replica.threads", 1);
412425
replicationNumOfInterDCReplicaThreads =

ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,8 +2318,8 @@ private void fillDataNodeTrackers() {
23182318
List<RemoteReplicaInfo> remoteReplicasPerNode = entry.getValue();
23192319

23202320
DataNodeTracker dataNodeTracker =
2321-
new DataNodeTracker(remoteHost, remoteReplicasPerNode, maxReplicaCountPerRequest, currentStartGroupId, time,
2322-
threadThrottleDurationMs);
2321+
new DataNodeTracker(remoteHost, remoteReplicasPerNode, currentStartGroupId, time, threadThrottleDurationMs,
2322+
replicationConfig, RemoteReplicaInfo::getRemoteLagFromLocalInBytes);
23232323
logger.trace("Thread name: {} for datanode {} create datanode tracker {}", threadName, remoteHost,
23242324
dataNodeTracker);
23252325

ambry-replication/src/main/java/com/github/ambry/replication/continuous/DataNodeTracker.java

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,18 @@
1414
package com.github.ambry.replication.continuous;
1515

1616
import com.github.ambry.clustermap.DataNodeId;
17+
import com.github.ambry.config.ReplicationConfig;
1718
import com.github.ambry.replication.RemoteReplicaInfo;
1819
import com.github.ambry.replication.ReplicaThread;
1920
import com.github.ambry.utils.Time;
2021
import com.github.ambry.utils.Utils;
2122
import java.util.ArrayList;
2223
import java.util.Collections;
2324
import java.util.Comparator;
25+
import java.util.IdentityHashMap;
2426
import java.util.List;
27+
import java.util.Map;
28+
import java.util.function.ToLongFunction;
2529
import java.util.stream.Collectors;
2630

2731

@@ -42,22 +46,33 @@ public class DataNodeTracker {
4246
* All group trackers have consecutive group id.
4347
* @param dataNodeId remote host for which to create datanode tracker
4448
* @param remoteReplicas remote replicas for this data node
45-
* @param maxActiveGroupSize maximum count of replicas in active groups
4649
* @param startGroupId group id from which we can start and increment and generate unique group id for each group
4750
* @param time Ambry time
48-
* @param replicaThrottleDurationMs throttle duration for replicas
51+
* @param replicaThrottleDurationMs throttle duration for replicas. Lane-specific
52+
* (inter vs intra DC); resolved once by the owning {@link ReplicaThread}
53+
* and passed in as a primitive so lane logic doesn't leak into this class.
54+
* @param replicationConfig replication config. Reads
55+
* {@link ReplicationConfig#replicationMaxPartitionCountPerRequest} for the
56+
* per-chunk replica cap and
57+
* {@link ReplicationConfig#replicationSpreadLaggersAcrossChunks} for whether
58+
* the chunking pass sorts and round-robins instead of slicing sequentially.
59+
* @param lagExtractor function that returns a replica's remote-lag-in-bytes; used as the sort key when
60+
* {@link ReplicationConfig#replicationSpreadLaggersAcrossChunks} is true. Supplied by the
61+
* caller so the chunking helper does not need access to package-private state on
62+
* {@link RemoteReplicaInfo}.
4963
*/
50-
public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteReplicas, int maxActiveGroupSize,
51-
int startGroupId, Time time, long replicaThrottleDurationMs) {
64+
public DataNodeTracker(DataNodeId dataNodeId, List<RemoteReplicaInfo> remoteReplicas, int startGroupId, Time time,
65+
long replicaThrottleDurationMs, ReplicationConfig replicationConfig,
66+
ToLongFunction<RemoteReplicaInfo> lagExtractor) {
5267
this.dataNodeId = dataNodeId;
5368
this.activeGroupTrackers = new ArrayList<>();
5469

5570
int currentGroupId = startGroupId;
5671

57-
// for this data node break a larger array of remote replicas to smaller multiple arrays of maxActiveGroupSize
72+
// for this data node break a larger array of remote replicas to smaller multiple arrays
5873
List<List<RemoteReplicaInfo>> remoteReplicaSegregatedList =
59-
maxActiveGroupSize > 0 ? Utils.partitionList(remoteReplicas, maxActiveGroupSize)
60-
: Collections.singletonList(remoteReplicas);
74+
chunkReplicas(remoteReplicas, replicationConfig.replicationMaxPartitionCountPerRequest,
75+
replicationConfig.replicationSpreadLaggersAcrossChunks, lagExtractor);
6176

6277
// for each of smaller array of remote replicas create active group trackers with consecutive group ids
6378
for (List<RemoteReplicaInfo> remoteReplicaList : remoteReplicaSegregatedList) {
@@ -147,4 +162,42 @@ public String toString() {
147162
return "DataNodeTracker :[" + dataNodeId.toString() + " " + activeGroupTrackers.toString() + " "
148163
+ standByGroupTracker.toString() + "]";
149164
}
165+
166+
/**
167+
* Splits {@code remoteReplicas} into chunks of at most {@code maxActiveGroupSize}. When {@code spread}
168+
* is false, replicas are sliced sequentially via {@link Utils#partitionList}. When true, replicas are
169+
* first sorted by {@code lagExtractor} descending, then round-robin distributed across chunks so that
170+
* the top laggers land in different chunks. When {@code maxActiveGroupSize <= 0}, returns a single
171+
* chunk containing all replicas.
172+
*
173+
* Package-private for test access; do not call from outside {@link DataNodeTracker}.
174+
*/
175+
static List<List<RemoteReplicaInfo>> chunkReplicas(List<RemoteReplicaInfo> remoteReplicas, int maxActiveGroupSize,
176+
boolean spread, ToLongFunction<RemoteReplicaInfo> lagExtractor) {
177+
if (maxActiveGroupSize <= 0) {
178+
return Collections.singletonList(remoteReplicas);
179+
}
180+
if (!spread || remoteReplicas.isEmpty()) {
181+
return Utils.partitionList(remoteReplicas, maxActiveGroupSize);
182+
}
183+
List<RemoteReplicaInfo> sorted = new ArrayList<>(remoteReplicas);
184+
// Snapshot lag once per replica before sorting. Re-reading inside the comparator is unsafe:
185+
// the underlying field is mutated by other threads and can trip TimSort's contract check.
186+
// IdentityHashMap because RemoteReplicaInfo overrides equals() without hashCode().
187+
Map<RemoteReplicaInfo, Long> lagSnapshot = new IdentityHashMap<>(sorted.size());
188+
for (RemoteReplicaInfo r : sorted) {
189+
lagSnapshot.put(r, lagExtractor.applyAsLong(r));
190+
}
191+
sorted.sort((a, b) -> Long.compare(lagSnapshot.get(b), lagSnapshot.get(a)));
192+
int chunkCount = (sorted.size() + maxActiveGroupSize - 1) / maxActiveGroupSize;
193+
List<List<RemoteReplicaInfo>> chunks = new ArrayList<>(chunkCount);
194+
for (int chunk = 0; chunk < chunkCount; chunk++) {
195+
List<RemoteReplicaInfo> bucket = new ArrayList<>(maxActiveGroupSize);
196+
for (int j = chunk; j < sorted.size(); j += chunkCount) {
197+
bucket.add(sorted.get(j));
198+
}
199+
chunks.add(bucket);
200+
}
201+
return chunks;
202+
}
150203
}

0 commit comments

Comments
 (0)