Skip to content

Admin API for partition replication priority#3261

Open
Qian1900 wants to merge 6 commits into
linkedin:masterfrom
Qian1900:qianwang/ACTIONITEM-19683-priority-api
Open

Admin API for partition replication priority#3261
Qian1900 wants to merge 6 commits into
linkedin:masterfrom
Qian1900:qianwang/ACTIONITEM-19683-priority-api

Conversation

@Qian1900
Copy link
Copy Markdown
Contributor

@Qian1900 Qian1900 commented May 15, 2026

Summary

Design doc: linkedin-multiproduct/AmbryLi#3485

Adds an admin API for boosting a partition's per-cycle replication fetch budget, so operators can accelerate catch-up on selected partitions during recovery from a lag incident (e.g. ACTIONITEM-19683 / incident-10927). Server-side implementation only; the AmbryLI frontend REST endpoint + fan-out is a separate PR.

What's in this PR

Wire protocol (additive, append-only enums)

  • AdminRequestOrResponseType.UpdateReplicationPriority, .ListReplicationPriority
  • UpdateReplicationPriorityAdminRequest — partition list + boost + Action (SET / UNSET / UNSET_ALL). Action has explicit wire values (SET=0, UNSET=1, UNSET_ALL=2) and fromWireValue(short) throws IOException on unknown inputs, so a reorder of the enum can't silently corrupt the wire format.
  • ListReplicationPriorityAdminRequest / ListReplicationPriorityAdminResponse — response carries (partitionId, boost, isInterColo) per priority entry per ReplicaThread. PriorityEntry lives in ambry-api so all four priority methods sit on the ReplicationAPI interface as default-throws (no instanceof ReplicationEngine casts at call sites).
  • ReplicationEngine.listAllPriorityPartitions() aggregates priorities across every thread, tagging each entry with its colo.

Replication thread

  • priorityPartitions: ConcurrentHashMap<PartitionId, Integer> per thread, written by admin handlers and read once per cycle in fillDataNodeTrackers (snapshotted to avoid CME).
  • fillDataNodeTrackers auto-prunes entries whose replicas in this thread are all ACTIVE and below the per-iteration lag threshold (shouldAutoPrunePriority predicate).
  • Priority replicas split into singleton ActiveGroupTrackers with weight = boost; the per-cycle baseFetchSize is computed as totalBudget / Σ(replicaCount × weight) so the legacy per-thread budget is preserved.
  • Existing intra-colo bootstrap branch in createReplicaMetadataRequest is preserved unchanged; the weight multiplier composes on top.

Server

  • AmbryServerRequests admin handlers for the two new RPC types.
  • API gate: server.handle.replication.priority.request.enabled (default false, mirrors the existing server.handle.{undelete,force.delete}.request.enabled gates). Both handlers reject with BadRequest + warn log when closed. Lets us ship the OSS binary to every fabric before the cfg2 flip enables the API per fabric, and remains as a per-fabric kill switch post-rollout. Rollout sequence: ship OSS → bump AmbryLI → flip cfg2 per fabric.
  • Validation at the handler boundary (rejected with BadRequest before reaching the engine):
    • Partition list size cap: UpdateReplicationPriorityAdminRequest.MAX_PARTITIONS_PER_REQUEST = 256, enforced both at wire-deserialization and in the handler.
    • Defensive boost cap: MAX_PRIORITY_BOOST = 1024 on the server (the precise wire-layer cap derived from HTTP/2 max-content-length is the frontend's job).
    • Action-aware partition-list shape: SET/UNSET require non-empty list, UNSET_ALL requires empty.
    • SET only: every partition must be hosted on this server (ReplicationEngine.hostsPartition(), O(1) via partitionToPartitionInfo); unknown partitions reject the whole request.
  • Dedicated metric fields in ServerMetrics for both new types; AmbryRequests dispatch updated.

Risk

  • Read-path only. Touches replica-grouping shape and metadata-request fetchSize math. No write path, no schema, no atomicity boundary, no resource lifecycle, no callback semantics.
  • Default-off API gate. Until cfg2 flips on a fabric, every request is rejected at the handler boundary — no engine-level state change is possible.
  • Wire protocol additions are enum-append. Older servers reply with UnknownError per existing dispatcher behavior. Action.fromWireValue rejects unknown values on the wire.
  • priorityPartitions defaults empty ⇒ chunking shape and fetchSize math are byte-identical to today on every thread until an operator sets a priority.
  • Concurrency: priorityPartitions is a ConcurrentHashMap; the replication thread snapshots it once per cycle and works off the snapshot.

Testing Done

  • RequestResponseTest (extended): wire round-trip for SET / UNSET / UNSET_ALL, list-marker, list-response with mixed isInterColo values; new updateReplicationPriorityActionWireValueRejectsUnknown pins IOException for bad Action wire values (-1, 3, Short.MAX_VALUE, Short.MIN_VALUE).
  • DataNodeTrackerTest (extended): 6 ctor cases for the priority chunking path — null/empty snapshot, single & multi priority, conservation invariant (every replica appears exactly once), group-id sequencing, orphan partitions.
  • ReplicaThreadPriorityAutoPruneTest (new, 8 cases): ACTIVE + lag < threshold ⇒ prune; threshold boundary; one-lagging-replica blocks prune; OFFLINE with stale-low-lag must NOT prune (operator's bias persists across transient outage); any non-ACTIVE blocks; empty list returns false (self-defending predicate).
  • RemoteReplicaGroupPollerTest (new):
    • baseFetchSizeBudgetConservation (3 cases) — pure-normal vs one-priority vs zero-replica; total cycle bytes ≤ no-priority budget.
    • createReplicaMetadataRequestFetchSizeMath (4 cases) — weight × baseFetchSize math, zero-base fallback, bootstrap intra-colo path composes with weight.
  • AmbryServerRequestsTest (extended):
    • updateReplicationPriorityRejectsBoostBelowOne / updateReplicationPriorityRejectsBoostAboveCap — handler-layer boost validation.
    • updateReplicationPriorityUnsetIgnoresBoostUNSET path skips boost validation.
    • updateReplicationPriorityRejects*PartitionListShapeSET/UNSET empty-list and UNSET_ALL non-empty-list rejections.
    • updateReplicationPriorityRejectsPartitionsNotHostedOnThisServerSET on a partition not on this server rejects the whole request.
    • updateReplicationPriorityRejectsOversizedPartitionList — list-size cap defense.
    • updateReplicationPriorityAcceptsUnsetAll — wipe-all dispatches to engine.
    • prioritizePartitionsBulkPersistsAndListAggregates — bulk set + listAllPriorityPartitions walks every thread; pins per-partition cardinality + both isInterColo branches.
    • Test setup flips the new config gate on so the priority tests still exercise the handler path.
  • Build: JAVA_HOME=$(/usr/libexec/java_home -v 11) ./gradlew :ambry-replication:test :ambry-protocol:test :ambry-server:test --tests 'com.github.ambry.server.AmbryServerRequestsTest' — all green.

Observability / Ops

  • Dedicated metrics: UpdateReplicationPriorityRequestRate, UpdateReplicationPriorityRequestProcessingTimeInMs, … (5 histograms + 1 rate + 1 dropped-rate per RPC type). Mirrors the pattern of every other admin type.
  • Audit log: each successful UpdateReplicationPriority / ListReplicationPriority invocation logs clientId (+ action / boost / partitions for Update) at INFO so per-host operator actions are traceable. Rejections at the handler boundary log at WARN.

AI Usage

Implementation and tests authored with Claude Code (Opus 4.7).

Adds a per-partition replication priority knob that biases the per-cycle
fetch budget on a storage node toward operator-selected partitions. Server-
side implementation of the design in linkedin-multiproduct/AmbryLi#3485
(internal). Frontend REST endpoint + fan-out is a separate AmbryLI PR.

How it works (design §4.1):
- ReplicaThread holds a ConcurrentHashMap<PartitionId, Integer> priorityPartitions.
  prioritizePartition(id, boost) / clearPriorityPartitions(list) /
  listPriorityPartitions() are admin entry points.
- Each cycle, fillDataNodeTrackers auto-prunes priorities whose replicas in
  this thread are all ACTIVE with lag below the per-iteration budget (caught
  up). OFFLINE / STANDBY entries persist; operator clears them explicitly.
- After auto-prune, priority replicas become singleton ActiveGroupTrackers
  with weight = boost, isPriority = true; the rest go through normal
  chunking (composes with the spread-chunking helper from linkedin#3259).
- baseFetchSize = totalBudget / Σ(replicaCount × weight) is computed once
  per cycle and cached on each tracker. The metadata request's fetchSize is
  weight × baseFetchSize (intra-colo bootstrap baseline substitutes before
  weighting per design §4.1 Change 2).

Wire protocol (additive, append-only enums):
- AdminRequestOrResponseType: SetReplicationPriority, ListReplicationPriority.
- SetReplicationPriorityAdminRequest: partitionIds + boost + clear flag.
  clear=true with empty partition list clears all on this host.
- ListReplicationPriorityAdminRequest / Response: response carries
  (partitionId, boost, lane) entries; lane is INTRA_DC / INTER_DC, derived
  from ReplicaThread.isReplicatingFromRemoteColo().

Server dispatch:
- ReplicationEngine: prioritizePartition / clearPriorityPartitions fan out
  to every ReplicaThread; getReplicaThreadPoolByDc exposes the pool for
  list aggregation.
- AmbryServerRequests: two new handlers wired to the admin switch. boost < 1
  is rejected at the handler boundary with BadRequest; clear=true bypasses
  boost validation. Audit log records clientId per request.

Tests:
- DataNodeTrackerTest: 6 ctor cases for the priority path (null/empty
  snapshot, single & multi priority, conservation invariant, group-id
  sequencing, orphan partitions with no replicas in this slice).
- ReplicaThreadPriorityAutoPruneTest: 8 cases for the auto-prune predicate
  (ACTIVE + lag<threshold ⇒ prune; threshold boundary; one-lagging-replica
  blocks prune; OFFLINE with stale-low-lag must NOT prune — operator bias
  persists across outage; any non-ACTIVE status blocks; empty-list vacuous
  truth contract).
- RequestResponseTest: protocol round-trip — set, clear-targeted, clear-all,
  list marker, list response with INTRA/INTER_DC mixed lanes.
- AmbryServerRequestsTest: server-side boost validation — boost=0 +
  clear=false ⇒ BadRequest; boost=0 + clear=true ⇒ NoError.

Risk:
- Read-path-only behavioral change (chunk shape + fetchSize math). No write
  path, no schema, no atomicity boundary, no resource lifecycle.
- Wire protocol additions are enum-append; older servers respond with
  UnknownError per existing dispatcher behavior.
- priorityPartitions defaults empty ⇒ chunking shape and fetchSize math are
  byte-identical to today.
- Composes with linkedin#3259's spread chunking: priority replicas are split out
  BEFORE the spread sort; non-priority remainder goes through the existing
  chunkReplicas helper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Qian1900 Qian1900 marked this pull request as ready for review May 15, 2026 00:21
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 15, 2026

Codecov Report

❌ Patch coverage is 12.66491% with 331 lines in your changes missing coverage. Please review.
✅ Project coverage is 51.04%. Comparing base (52ba813) to head (6f73b22).
⚠️ Report is 393 commits behind head on master.

Files with missing lines Patch % Lines
...a/com/github/ambry/server/AmbryServerRequests.java 0.00% 86 Missing ⚠️
...va/com/github/ambry/replication/ReplicaThread.java 12.16% 64 Missing and 1 partial ⚠️
...rotocol/UpdateReplicationPriorityAdminRequest.java 13.20% 46 Missing ⚠️
...protocol/ListReplicationPriorityAdminResponse.java 0.00% 37 Missing ⚠️
...om/github/ambry/replication/ReplicationEngine.java 0.00% 26 Missing ⚠️
...in/java/com/github/ambry/server/AmbryRequests.java 0.00% 18 Missing ⚠️
.../ambry/replication/continuous/DataNodeTracker.java 0.00% 15 Missing ⚠️
.../protocol/ListReplicationPriorityAdminRequest.java 0.00% 13 Missing ⚠️
...va/com/github/ambry/replication/PriorityEntry.java 0.00% 9 Missing ⚠️
...a/com/github/ambry/replication/ReplicationAPI.java 0.00% 8 Missing ⚠️
... and 1 more
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3261       +/-   ##
=============================================
- Coverage     64.24%   51.04%   -13.21%     
+ Complexity    10398     8674     -1724     
=============================================
  Files           840      936       +96     
  Lines         71755    79913     +8158     
  Branches       8611     9577      +966     
=============================================
- Hits          46099    40790     -5309     
- Misses        23004    35738    +12734     
- Partials       2652     3385      +733     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@snalli
Copy link
Copy Markdown
Contributor

snalli commented May 15, 2026

I suggest waiting for #3259 to go into deployment and gather evidence from this to improve this tool

Copy link
Copy Markdown
Contributor

@crliao crliao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review pass — risk model is sound (read-path, additive wire, default-off, concurrent-safe). Inline comments below mark spots that need updating. Biggest is L1 (boost math vs. spec mismatch). — review by Claude (Sonnet 4.6), invoked by @crliao_LinkedIn

Comment thread ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java Outdated
Comment thread ambry-replication/src/main/java/com/github/ambry/replication/ReplicaThread.java Outdated
*/
public void clearPriorityPartitions(List<PartitionId> partitions) {
if (partitions.isEmpty()) {
int cleared = priorityPartitions.size();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

B3 — clear=true + empty partition list wipes everything; server trusts the frontend.

The javadoc says "Frontend-layer guards ensure the 'empty list' case is only reachable when the operator explicitly sent clear=true with no partition list". But the server is the durability boundary — an admin tool talking directly to the server (ambry-admin curl ...) or any path bypassing the LI frontend can trigger this accidentally and wipe ALL priorities across ALL threads on the host.

Two mitigations:

Wire-level (preferred): add a separate clear_all: bool to SetReplicationPriorityAdminRequest instead of overloading "clear + empty list". Makes the dangerous case explicit, and clear=true + non-empty list only ever means "clear these N".

Minimal: keep the wire format but log at WARN (not INFO) when this branch fires, so ops dashboards surface every clear-all immediately.

N3 (cosmetic) — priorityPartitions.size() here is approximate under concurrent mutation. The cleared value in the log isn't authoritative; fine as-is, just don't treat it as exact.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a different shape — added an explicit Action enum {SET, UNSET, UNSET_ALL} on the wire. Invalid combos (e.g., clear=true + empty list) are now unrepresentable at the type level. Handler validates partition-list shape against the action: SET/UNSET require non-empty, UNSET_ALL requires empty. Wipe-all is preserved as a feature but only reachable via explicit Action.UNSET_ALL.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Action enum still uses ordinal-based wire encoding (action.ordinal() on write, Action.values()[stream.readShort()] on read with no bounds check). Adding a 4th action later will either silently corrupt old servers (wrong ordinal maps to wrong action if reordered) or throw ArrayIndexOutOfBoundsException (if the ordinal is out of range). Need to pin each entry to an explicit wire integer:

public enum Action {
    SET((short) 0), UNSET((short) 1), UNSET_ALL((short) 2);
    private final short wireValue;
    Action(short wireValue) { this.wireValue = wireValue; }
    public short getWireValue() { return wireValue; }
    static Action fromWireValue(short v) throws IOException {
        for (Action a : values()) { if (a.wireValue == v) return a; }
        throw new IOException("Unknown Action wire value: " + v);
    }
}

Then serialize with action.getWireValue() and deserialize with Action.fromWireValue(stream.readShort()). Old servers receiving an unknown value return BadRequest cleanly instead of crashing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original public enum Action { SET, UNSET, UNSET_ALL } (ordinal-based on the wire) was a deliberate choice to match the existing codebase convention for enums-on-wire — BlobType, ServerErrorCode, RequestOrResponseType all serialize via (short) value.ordinal() and decode via values()[stream.readShort()] (see PutRequest:247, Response:38). Stayed within convention to avoid a bespoke pattern here.

That said, your safety argument is fair — ordinal-based is fragile to reorder and crashes (not throws) on bad inputs, and a one-off pattern doesn't really cost anything for what we get. Switched to explicit wire values per your suggestion (SET=0, UNSET=1, UNSET_ALL=2), with fromWireValue(short) throwing IOException on unknown values.

Added negative-path test in RequestResponseTest.updateReplicationPriorityActionWireValueRejectsUnknown covering -1, 3, Short.MAX_VALUE, Short.MIN_VALUE — all throw IOException.

Qian1900 and others added 2 commits May 19, 2026 18:13
13 inline comments from @crliao addressed (plus a round of independent
review).

## Correctness (L-series)

L1 — Pushed back: math is correct. fetchSize is applied per-replica at the
server (AmbryRequests.java:910), not per-chunk. Renamed local fetchSize to
perReplicaFetchSize in createReplicaMetadataRequest to make the per-replica
semantic self-documenting at the math site.

L2 — Added gate validation at the UpdateReplicationPriority boundary.
Reject the whole request with BadRequest if any listed partition is not
hosted on this server (engine.hostsPartition reads partitionToPartitionInfo).
All-or-nothing. Closes the typo-creates-orphan path that direct-CLI callers
could exploit.

L3 — Dropped the misleading "snapshot once per cycle" comment in
fillDataNodeTrackers; the actual flow is prune-then-snapshot, visible from
the API names.

L4 — Moved all 4 priority methods to ReplicationAPI as default-throw
methods that include getClass().getName() in the message. Both instanceof
ReplicationEngine casts in AmbryServerRequests removed. To support this,
extracted PriorityEntry from the nested class in
ListReplicationPriorityAdminResponse into
ambry-api/.../replication/PriorityEntry.java (new in this PR, no external
callers).

L5 — listAllPriorityPartitions now sorts entries by
(partitionPath, isInterColo) before return. Operators diffing consecutive
list outputs no longer see false-positive reorderings.

## Hardening (B-series)

B1 — Defensive cap MAX_PRIORITY_BOOST = 1024 on the boost field, rejected
at the handler boundary on the SET path. Catches absurd values
(Integer.MAX_VALUE-class bugs) from direct-CLI or buggy callers. The
precise wire-layer limit is enforced on the frontend.

B2 — Defensive cap MAX_PRIORITY_PARTITIONS_PER_REQUEST = 256 on
partition-list size at the handler boundary on all paths. Realistic
priority lists are dozens, not hundreds.

B3 — Added explicit Action enum {SET, UNSET, UNSET_ALL} on the wire to
make the action unambiguous. The handler rejects any partition-list shape
that does not match the action: SET and UNSET require a non-empty list;
UNSET_ALL requires an empty list. Closes the review concern that
'clear=true with empty list' was reachable from direct-CLI callers as an
implicit wipe-all.

## Nits (N-series)

N1 + N2 — Cached byte[][] partitionIdBytes in
ListReplicationPriorityAdminResponse constructor. Both sizeInBytes() and
prepareBuffer() index into the cache — halves PartitionId.getBytes()
allocations on serialization.

N3 — No code change; @crliao said "fine as-is."

N5 — ActiveGroupTracker.toString() adds priority= and weight= suffixes;
flagged in PR description under Observability/Ops.

N7 — Typo "intro colo" -> "intra colo" in createReplicaMetadataRequest.

## Naming cleanups

Renamed for clarity now that the wire handles SET, UNSET, and UNSET_ALL
via a single Action enum:

- SetReplicationPriorityAdminRequest -> UpdateReplicationPriorityAdminRequest
- AdminRequestOrResponseType.SetReplicationPriority -> .UpdateReplicationPriority
- handleSetReplicationPriorityRequest -> handleUpdateReplicationPriorityRequest
- All setReplicationPriority* test methods -> updateReplicationPriority*
- ServerMetrics histogram and meter field names + metric registry name strings

Pairs symmetrically with the read-side ListReplicationPriority. No wire
impact — AdminRequestOrResponseType is encoded by ordinal so renaming the
Java enum value name does not shift the wire byte.

## Round-1 reviewer fixes

- UNSET_ALL log level javadoc claim aligned to INFO (matches the code).
- Wire-level numPartitions/numEntries sanity bound in readFrom (defense in
  depth before the handler cap fires). Prevents OOM via Integer.MAX_VALUE
  in array preallocation.
- Null partition id from clusterMap.getPartitionIdFromStream now triggers
  a clean IOException at readFrom rather than incidental NPE/BadRequest.
- "HTTP/2-derived" wording softened to "wire-layer" — the frontend's check
  is wire-layer-aware regardless of transport.
- engine.hostsPartition simplified from a per-thread linear scan to a
  containsKey lookup on the existing partitionToPartitionInfo map. Deletes
  ReplicaThread.hasPartition (no longer needed). O(threads*replicas) ->
  O(1). Uses the engine-level invariant that already tracks "is this
  partition assigned to this host."

## New tests

- updateReplicationPriorityRejectsUnknownPartition
- updateReplicationPriorityUnsetAcceptsUnknownPartition
- listAllPriorityPartitionsResultsAreOrderedDeterministically
- updateReplicationPriorityRejectsBoostAboveCap
- updateReplicationPriorityRejectsOversizedPartitionList
- updateReplicationPriorityRejectsSetWithEmptyList
- updateReplicationPriorityRejectsUnsetWithEmptyList
- updateReplicationPriorityRejectsUnsetAllWithNonEmptyList
- updateReplicationPriorityAcceptsUnsetAll
- RequestResponseTest extended for the three Action wire round-trips

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…yAdminRequest

Was two caps (wire-layer MAX_PARTITIONS_ON_WIRE=10000 + handler-layer
MAX_PRIORITY_PARTITIONS_PER_REQUEST=256). Different values made the
defense-in-depth story confused. Replaced with a single public constant
UpdateReplicationPriorityAdminRequest.MAX_PARTITIONS_PER_REQUEST=256
referenced by both layers — wire-layer prevents OOM at the same threshold
as the operator-facing limit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
// baseline when the thread has no replicas.
long totalBudget = replicasInThread * replicationConfig.replicationFetchSizeInBytes;
currentCycleBaseFetchSize = totalWeightedReplicas > 0
? totalBudget / totalWeightedReplicas
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test for the budget math here. The conservation property — totalBudget = replicasInThread × config and each replica getting its proportional share — is the core invariant of this feature. A test should verify:

  • With N normal replicas and 1 priority replica (boost=B): baseFetchSize = N×config / (N-1+B)
  • Priority replica gets B × baseFetchSize, normal replica gets 1 × baseFetchSize
  • Total bytes across all replicas equals N × config (budget-neutral)
  • Zero-replica edge case falls back to replicationFetchSizeInBytes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added RemoteReplicaGroupPollerTest.baseFetchSizeBudgetConservation. Three cases: pure-normal (baseFetchSize = replicationFetchSizeInBytes, total = N×config), one-priority-with-boost (baseFetchSize = N×config / (1×B + (N-1)×1), total bytes-in-flight bounded by no-priority budget, slack < one floor unit), and zero-replica fallback.

* The chosen baseline is then multiplied by {@code weight}.
*/
ReplicaMetadataRequest createReplicaMetadataRequest(List<RemoteReplicaInfo> replicasToReplicatePerNode,
DataNodeId remoteNode, int weight, long baseFetchSize) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test for this overload. This is where the boost actually takes effect — if weight isn't applied correctly to baseline, priority partitions silently get no boost at all and the feature does nothing. Should test at minimum:

  • weight=1, baseFetchSize=0 → falls back to replicationFetchSizeInBytes
  • weight=1, baseFetchSize=X → fetchSize = X
  • weight=N, baseFetchSize=X → fetchSize = N×X (the boost path)
  • Bootstrap intra-colo path still overrides baseFetchSize regardless of weight

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added RemoteReplicaGroupPollerTest.createReplicaMetadataRequestFetchSizeMath. Four cases: (weight=1, baseFetchSize=0) → fetchConfig, (weight=1, baseFetchSize=X) → X, (weight=B, baseFetchSize=X) → B×X, bootstrap-intra-colo override still multiplies weight through.

Qian1900 and others added 2 commits May 20, 2026 14:45
- Action enum: switched from ordinal-based to explicit-wire-value encoding.
  Reordering constants in code no longer silently breaks the wire format,
  and unknown values throw a clean IOException instead of
  ArrayIndexOutOfBoundsException.

- Added test pinning the per-cycle budget conservation: baseFetchSize is
  totalBudget / totalWeightedReplicas; priority replicas get
  weight x baseFetchSize; total bytes-in-flight stays bounded by the
  no-priority budget.

- Added test pinning the perReplicaFetchSize math in
  createReplicaMetadataRequest: weight applies multiplicatively to the
  baseline (normal / bootstrap-intra-colo), with a fallback to
  replicationFetchSizeInBytes when no priority redistribution is active.
- Added negative-path test for Action.fromWireValue: verifies -1, 3,
  Short.MAX_VALUE, Short.MIN_VALUE all throw IOException. Pins the
  explicit-wire-value safety net against a future refactor reverting to
  Action.values()[v].

- Dropped dead finally block in baseFetchSizeBudgetConservation case 3:
  @before rebuilds the storageManager + replicaThread between runs, so
  the per-test re-add was redundant. Comment notes why no restore is
  needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@crliao crliao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving — all prior review threads addressed, verified at HEAD (68df301).

Resolution map (all comments from my 2026-05-19 review):

# Topic How resolved
L1 boost math vs spec Author clarified: fetchSize is per-replica at the server (AmbryRequests.java:910), not per-chunk — so per-replica ratio is , matching the contract. Renamed local var for clarity.
L2 orphan priority entries Boundary validation rejects unknown partitions with BadRequest. In-memory state wipes on host restart. Reassignment-after-set accepted as rare edge case.
L3 snapshot semantics docs Misleading comment deleted.
L4 instanceof ReplicationEngine Methods moved to ReplicationAPI as default-throw; casts removed.
L5 non-deterministic list order Sort applied + listAllPriorityPartitionsResultsAreOrderedDeterministically test added.
B1 no upper bound on boost MAX_PRIORITY_BOOST = 1024 added with mirrored test pattern.
B2 no cap on partition list size MAX_PARTITIONS_PER_REQUEST = 256 enforced at wire-deserialization + handler.
B3 clear=true + empty list = wipe all Replaced with explicit Action {SET, UNSET, UNSET_ALL} enum. Per follow-up: now uses explicit wire ints (SET=0, UNSET=1, UNSET_ALL=2) with fromWireValue throwing IOException on unknown + negative-path test.
N1/N2 duplicate byte[] allocation Cached byte[][] partitionIdBytes in constructor; reused in size + serialize.
N3 priorityPartitions.size() approximate Cosmetic note acknowledged.
N5 toString() format change Flagged in PR description for downstream log-parsing rule maintainers.
N7 "intro colo" typo Fixed.
Follow-up: budget math test RemoteReplicaGroupPollerTest.baseFetchSizeBudgetConservation covers pure-normal / one-priority-with-boost / zero-replica fallback.
Follow-up: fetchSize overload test RemoteReplicaGroupPollerTest.createReplicaMetadataRequestFetchSizeMath covers all four weight × baseFetchSize combinations including bootstrap intra-colo.

The risk model still holds: read-path only, additive wire (now with explicit ints, so future enum additions stay safe), default-off, concurrent-safe.

Also noting @snalli's suggestion to wait for #3259 evidence — that's an orthogonal sequencing decision and doesn't affect approval readiness on this PR's content.

— review by Claude (Opus 4.7), invoked by @crliao_LinkedIn

@crliao
Copy link
Copy Markdown
Contributor

crliao commented May 22, 2026

Follow-up ask — could we add a config-level gate (e.g., replication.priority.api.enabled in ReplicationConfig, default false) before merge?

Rationale: the feature is behaviorally default-off today (empty priorityPartitions map → legacy code path), but the new branches still live in the hot replication path. A config flag would let us:

  1. Keep the feature fully dormant in production until ops actually needs it.
  2. Enable per-fabric via cfg2 rather than via binary rollout.
  3. Have a kill-switch if the new code path ever misbehaves, without needing a binary revert.

Suggested wiring:

  • AmbryServerRequests.handleSetReplicationPriorityRequest / handleListReplicationPriorityRequest → early-return ServiceUnavailable (or BadRequest) when disabled, with the existing audit log so attempted use is still visible.
  • ReplicaThread.fillDataNodeTrackers → guard the snapshot/prune/singletonize block behind the same config, so the legacy path runs even if priorityPartitions somehow gets populated.

Usage pattern would then be: enable cfg in the affected fabric → invoke the admin API → disable cfg once recovery is done.

Not blocking the approval — happy to land this as a follow-up PR if you prefer. Flagging now since the design suggests this code will sit in the binary for a while before first real use, and a default-off cfg gate is cheap insurance for that gap.

Add server.handle.replication.priority.request.enabled (default false),
mirroring the existing server.handle.{undelete,force.delete}.request.enabled
gates. UpdateReplicationPriority and ListReplicationPriority handlers reject
with BadRequest and a warn-level log when the gate is closed, so the server
binary can ship to all fabrics before the cfg2 flip enables the API per
fabric.

Test setup flips the gate on so existing priority tests still exercise the
handler path.
@Qian1900
Copy link
Copy Markdown
Contributor Author

@crliao FYI — I pushed a follow-up commit (6f73b22) adding a server.handle.replication.priority.request.enabled config gate, defaulting to false. Both UpdateReplicationPriority and ListReplicationPriority handlers reject with BadRequest + a warn log when the gate is closed.

Pattern mirrors the existing server.handle.{undelete,force.delete}.request.enabled gates. Rollout sequence: ship OSS → bump AmbryLI → flip cfg2 per fabric. Keeping the gate post-rollout as a kill switch so we can disable per-fabric without a code revert if the API ever misbehaves.

@crliao
Copy link
Copy Markdown
Contributor

crliao commented May 23, 2026

Thanks @Qian1900 — gate verified at 6f73b224:

  • ServerConfig.serverHandleReplicationPriorityRequestEnabled (default false) ✅
  • Both Update and List handlers reject with BadRequest + warn log when disabled ✅
  • Empty ListReplicationPriorityAdminResponse returned on the list path so older clients deserialize cleanly ✅

ServerConfig is the right placement for these — better than my original ReplicationConfig suggestion since the gate guards the server-side admin RPC handlers.

Also confirmed the fillDataNodeTrackers block doesn't need its own gate: when the handler is closed, priorityPartitions can never be populated, so the legacy code path in fillDataNodeTrackers is naturally what runs. Defense-in-depth would be nice but not required.

Approval stands. 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants