feat(aqe): early-stop on global LIMIT#1727
Draft
wirybeaver wants to merge 1 commit into
Draft
Conversation
Pure-logic atomic counter that aggregates per-stage row counts and fires a one-shot CancelRemaining decision when the running sum crosses limit * safety_factor. Foundation for the AQE early-stop feature (apache#1359): subsequent commits add the plan-time eligibility analyzer, scheduler-side wiring into update_task_statuses, and an EarlyStopCancel event that finalizes the job as Successful with partial output. Trigger uses AtomicU64 fetch_add (Relaxed) for row accumulation and AtomicBool swap (SeqCst) on the triggered flag so that, across racing observers, exactly one CancelRemaining is returned. Threshold is computed in fixed-point arithmetic to avoid f64 imprecision at large limits. The observe() path carries an asymmetric correctness invariant: we must fire no earlier than sum >= limit (firing early would cause the downstream LimitExec to under-report). Firing late is always safe; it only wastes I/O. Includes 9 unit tests covering threshold rounding, below/at/above trigger transitions, single-fire guarantee, untagged-stage handling, multi-stage aggregation, 32-thread concurrent observer race, and constructor preconditions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds scheduler-side early-stop for
SELECT ... LIMIT Nqueries under AQE. When enough rows have been shuffled to satisfy the LIMIT, the scheduler cancels remaining tasks and finalizes the job as Successful — the downstreamLimitExecslices to exactfetch.JobLimitTracker— atomic row-count tracker that fires exactly once whenSUM(num_rows) >= limit × safety_factor(default 1.5). Lock-free:fetch_add(Relaxed)+swap(SeqCst)guarantees oneCancelRemainingacross racing observers.LimitEarlyStopAnalyzer— walks the post-stage-resolution physical plan top-down, identifies eligibleGlobalLimitExecnodes (bare LIMIT, no OFFSET, no sorted subtree), traces through allowlisted operators to producerExchangeExecstage IDs.TaskManager, observation hook inupdate_task_statuses, newEarlyStopCancelevent,early_stop_jobsynthesizes successful completion for producer stages and reports running tasks for cancellation.ballista.aqe.limit_early_stop.enabled(defaulttrue), gated behind the existing AQE flag.Known limitation (v2.6)
DataFusion 53.x's
LimitPushdownphysical optimizer aggressively rewritesGlobalLimitExecintoLocalLimitExec+ fetch hints onCoalescePartitionsExec/DataSourceExec. In practice, the analyzer rarely finds aGlobalLimitExecin real plans. Extending recognition toLocalLimitExecand fetch-bearing operators is tracked in a companion spec file.Test plan
JobLimitTracker(threshold transitions, exactly-once fire, 32-thread concurrency, preconditions)LimitEarlyStopAnalyzer(eligible/ineligible operator mixes, multi-producer UNION, nested LIMITs)AdaptiveExecutionGraph+TaskManagerwiring (analyzer invocation, disabled flag, stage synthesis,update_task_statuses→EarlyStopCancelemission,early_stop_jobfinalization, tracker cleanup)