Skip to content

feat(aqe): SplitPartitionsRule — file-list sharding for skewed shuffle partitions (v1)#1718

Draft
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:splitpartition
Draft

feat(aqe): SplitPartitionsRule — file-list sharding for skewed shuffle partitions (v1)#1718
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:splitpartition

Conversation

@wirybeaver
Copy link
Copy Markdown

@wirybeaver wirybeaver commented May 18, 2026

Adds SplitPartitionsRule — the inverse of #1684's CoalescePartitionsRule. When upstream stats show one shuffle partition is far larger than the median, the rule fans that partition out across multiple reader tasks via round-robin assignment over its file list, instead of folding small partitions together. Same per-stage invocation, same alignment-group leaf walk, same carrier-slot-on-ExchangeExec pattern as #1684 — strict architectural mirror.

Stacks on #1684. Until that lands, this PR's diff shows the 5 coalesce commits + 1 split commit. Once #1684 merges I'll rebase and the diff will reduce to the single split commit.

Part of the AQE epic #1359. Motivating bug: #1643 (TPC-H Q2 SF1000, one partition 8670× larger than the median). v1 does NOT close #1643 — see "v1 scope" below.

Mechanism — file-list sharding (v1)

The shuffle reader side already lists multiple PartitionLocations per output partition (one per upstream map task). Splitting just means handing those locations to several reader tasks via round-robin assignment by file index. No row-range reads, no protobuf changes, no executor data-path changes — pure scheduler/adapter work.

Tradeoff: a partition backed by only one file can't be split this way. The rule bails on that idx (factor stays at 1). Row-range reads to lift that restriction live in a v2 task doc (linked at the bottom).

v1 scope — and what it does NOT cover

File-list sharding produces UnknownPartitioning(K') output. Rows that used to land in the same hash bucket on the M-side end up scattered across multiple K'-side partitions (the round-robin assignment is file-keyed, not row-keyed). That breaks any downstream operator with a hash or single-partition input requirement:

  • HashJoinExec(Partitioned) and SortMergeJoinExec — both legs must agree on hash buckets.
  • AggregateExec(FinalPartitioned) — assumes each downstream partition holds a closed set of group keys.
  • Any future operator whose required_input_distribution() returns HashPartitioned(_) or SinglePartition.

Rather than enumerate operator types, the rule walks the stage subtree and inspects required_input_distribution() directly. If any node above the leaves demands hash or single-partition input, the rule bails the whole stage. Strictly correct, future-proof against new DataFusion operators.

TPC-H Q2's skew sits behind a FinalPartitioned aggregate, so v1 cannot help it. v1 helps the narrower set of stages where the consumer is distribution-agnostic (FilterExec, ProjectionExec, LocalLimitExec, single-input scans into UnknownPartitioning sinks). The infrastructure is the win; v2 (row-range reads + aggregate-aware plan rewriting) is where Q2 lands.

Surface

Opt-in via a single boolean. ballista.planner.split.enabled=false is the default — the rule short-circuits, the plan flows through untouched. Users who want the skew-handling turn it on.

key default meaning
ballista.planner.split.enabled false master gate
ballista.planner.split.skew_factor 5.0 Spark OptimizeSkewedJoin default — partition is "skewed" if bytes > skew_factor × median
ballista.planner.split.min_split_bytes 64 MiB absolute floor — don't fan-out trivially small partitions
ballista.planner.split.max_split_factor 8 per-partition fan-out cap to limit executor pressure

Algorithm

For each per-stage call (mirrors the coalesce rule line-for-line):

  1. Bail if disabled.
  2. Single subtree walk that does two things: bail if any operator demands HashPartitioned/SinglePartition input, AND collect every leaf ExchangeExec (Jump after each hit, same convention as feat(aqe): CoalescePartitionsRule — shuffle-partition coalescing on resolved stats #1684).
  3. Conflict guard: bail if any leaf already has coalesce() or split() set.
  4. Alignment-group invariant: every leaf shares M (Q22 guard — same one feat(aqe): CoalescePartitionsRule — shuffle-partition coalescing on resolved stats #1684 added in fix(coalesce): bail on heterogeneous M).
  5. Sum byte sizes element-wise across the alignment group; capture leaf-0's file-count vector.
  6. decide_split_factors(summed_bytes, file_counts, skew, min_bytes, max_factor) — three guards (single-file → factor 1; below min-bytes → factor 1; below skew ratio → factor 1) then ceil(bytes / median) capped at max_factor, floored at 2.
  7. If every factor is 1, bail.
  8. Build one SplitPlan with shards = factors_to_shards(&factors) and attach uniformly to every leaf via set_split — sharing the plan (not just K') keeps per-idx fan-out identical across leaves, which matters for non-join multi-leaf shapes (UNION).

Sharing the plan across the alignment group is the same invariant #1684 enforces for coalesce — preserves uniform K' downstream.

Components

Area Change
Rule SplitPartitionsRule in state/aqe/optimizer_rule/split_partitions.rs. Unit struct. Invoked per-stage in AdaptivePlanner::actionable_stages() right after CoalescePartitionsRule.
Carrier ExchangeExec gains an Arc<Mutex<Option<Arc<SplitPlan>>>> slot next to the existing coalesce slot, with set_split / split accessors. split=K' of M annotation appears in DisplayAs output only when attached. Mutually exclusive with coalesce — rule's conflict guard enforces it.
Adapter BallistaAdapter::transform_children checks exchange.split(); on Some(sp) it pre-shards the M-shape upstream Vec<Vec<PartitionLocation>> into K'-shape via SplitShard::owns_file(file_idx) and builds the reader via ShuffleReaderExec::try_new_split with UnknownPartitioning(K').
Reader ShuffleReaderExec::try_new_split constructor + split: Option<SplitPlan> field. Threaded through with_work_dir, with_client_pool, with_new_children, partition_statistics, DisplayAs. execute() needs no changes — self.partition[idx] already returns the per-output-partition Vec<PartitionLocation>, which the adapter has pre-sharded. The reader is oblivious to whether sharding happened.
Algorithm Pure-CPU helpers in state/aqe/split/algorithm.rsdecide_split_factors (median-based skew detection mirroring Spark's OptimizeSkewedJoin) + factors_to_shards (per-idx factor → flat shard list).
Config Four new keys (table above).

SplitPlan is NOT round-tripped through proto in v1 — the rule attaches it on the scheduler side, the adapter consumes it inline, and the resulting ShuffleReaderExec ships already-sharded Vec<Vec<PartitionLocation>>. A protobuf round-trip would only be needed if the rule outcome had to survive serialization to the executor, which it doesn't.

Test plan

  • cargo test --workspace --no-fail-fast — workspace tests pass, 0 failures (includes the 7 coalesce regressions from feat(aqe): CoalescePartitionsRule — shuffle-partition coalescing on resolved stats #1684 plus the 8 new split tests).
  • cargo clippy --workspace --all-targets --tests — 0 warnings.
  • cargo fmt --all — clean.
  • 9 unit tests in state/aqe/split/algorithm.rs covering the decision function
  • 8 functional tests in state::aqe::test::split_rule:
  • TPC-H SF=100 sanity sweep, 22 queries × 2 join variants, split.enabled=true: deferred to PR validation env. Expected outcome is mostly non-regression — most queries hit a hash or FinalPartitioned consumer and the rule bails. If any query does fire the rule (a stage ending in Filter / Projection / LocalLimit over a hash exchange), I'll record the K → K' increase in a comment.

v2 follow-up

The honest scope limitation in v1 (file-list sharding requires ≥2 files per partition; bails on hash/single consumers) is lifted by v2 — row-range reads (PartitionLocation::row_range: Option<(u64, u64)>, batch-count IPC reader on the executor) AND aggregate-aware splitting (rewrite AggregateExec(FinalPartitioned) into per-shard partial + reshuffle + final). That's the path #1643 will actually land on. Task doc written and cross-linked.

@milenkovicm
Copy link
Copy Markdown
Contributor

Please rebase as downstream PR has been merged

@wirybeaver wirybeaver marked this pull request as draft May 21, 2026 07:10
Inverse of apache#1684's CoalescePartitionsRule: when one upstream partition is
much larger than the median, fan it out across multiple downstream reader
tasks instead of folding small partitions together. v1 ships file-list
sharding only — round-robin assignment of an upstream's PartitionLocation
list to K' shards. No protobuf or executor changes.

Architecture mirrors coalesce: pure-CPU algorithm in state/aqe/split/,
SplitPartitionsRule in state/aqe/optimizer_rule/, SplitPlan/SplitShard
carrier on ExchangeExec (mutually exclusive with the coalesce slot),
adapter consumes the slot when building ShuffleReaderExec::try_new_split.
Rule chains after CoalescePartitionsRule in actionable_stages() so it
sees post-coalesce stats.

Honest scope: file-list sharding produces UnknownPartitioning(K'),
scattering same-hash-bucket rows. The rule walks the stage subtree and
bails when any node's required_input_distribution() is HashPartitioned
or SinglePartition — catches HashJoinExec(Partitioned), SortMergeJoinExec,
AggregateExec(FinalPartitioned), etc. TPC-H Q2 won't benefit from v1
because its skew sits behind a hash consumer; row-range reads and
aggregate-aware rewrite belong in v2 (see ~/mydocs/datafusion/aqe-tasks/
10-adaptive-skew-handling-v2.md).

Default off via ballista.planner.split.enabled=false. Defaults:
skew_factor=5.0, min_split_bytes=64 MiB, max_split_factor=8.

Tests: 9 algorithm unit tests + 8 integration tests (SQL bail cases for
join / FinalPartitioned aggregate + synthetic happy-path tests for the
attach / idempotence / single-file-guard paths). Coalesce regression
tests unchanged and passing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TPC-H Q2 parallelism bottleneck: hash partitioning on low-cardinality keys causes severe data skew

2 participants