Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub const BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS: &str =
"ballista.client.io_retry_wait_time_ms";
/// Enables adaptive query planning
pub const BALLISTA_ADAPTIVE_PLANNER_ENABLED: &str = "ballista.planner.adaptive.enabled";

/// Setting key for [`BallistaConfig::aqe_limit_early_stop_enabled`].
pub const BALLISTA_AQE_LIMIT_EARLY_STOP_ENABLED: &str =
"ballista.aqe.limit_early_stop.enabled";
/// Configuration key for enabling sort-based shuffle.
pub const BALLISTA_SHUFFLE_SORT_BASED_ENABLED: &str =
"ballista.shuffle.sort_based.enabled";
Expand Down Expand Up @@ -138,6 +142,13 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Enables Adaptive Query Planning (EXPERIMENTAL)".to_string(),
DataType::Boolean,
Some(false.to_string())),
ConfigEntry::new(BALLISTA_AQE_LIMIT_EARLY_STOP_ENABLED.to_string(),
"When AQE is enabled, cancels remaining tasks of stages feeding an \
eligible bare GlobalLimitExec once the rows already written exceed \
the limit. Eligibility excludes OFFSET, sorted inputs, and limits \
inside a non-pass-through subtree.".to_string(),
DataType::Boolean,
Some(true.to_string())),
ConfigEntry::new(BALLISTA_SHUFFLE_SORT_BASED_ENABLED.to_string(),
"Enable sort-based shuffle which writes consolidated files with index".to_string(),
DataType::Boolean,
Expand Down Expand Up @@ -352,6 +363,16 @@ impl BallistaConfig {
self.get_bool_setting(BALLISTA_ADAPTIVE_PLANNER_ENABLED)
}

/// Is AQE early-stop on global LIMIT enabled.
///
/// Only takes effect when [`Self::adaptive_query_planner_enabled`] is
/// also true. When on, the scheduler tracks rows produced by stages
/// that feed an eligible `GlobalLimitExec` and cancels remaining
/// tasks once the limit is satisfied.
pub fn aqe_limit_early_stop_enabled(&self) -> bool {
self.get_bool_setting(BALLISTA_AQE_LIMIT_EARLY_STOP_ENABLED)
}

/// Returns whether sort-based shuffle is enabled.
///
/// When enabled, shuffle writes produce a single consolidated file per input
Expand Down
14 changes: 14 additions & 0 deletions ballista/core/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ pub trait SessionConfigExt {
/// Is adaptive query planner enabled
fn ballista_adaptive_query_planner_enabled(&self) -> bool;

/// Is AQE early-stop on global LIMIT enabled. Only takes effect when
/// [`Self::ballista_adaptive_query_planner_enabled`] is also true.
fn ballista_aqe_limit_early_stop_enabled(&self) -> bool;

/// Set user defined metadata keys in Ballista gRPC requests
fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) -> Self;

Expand Down Expand Up @@ -537,6 +541,16 @@ impl SessionConfigExt for SessionConfig {
.unwrap_or_else(|| BallistaConfig::default().adaptive_query_planner_enabled())
}

fn ballista_aqe_limit_early_stop_enabled(&self) -> bool {
self.options()
.extensions
.get::<BallistaConfig>()
.map(|c| c.aqe_limit_early_stop_enabled())
.unwrap_or_else(|| {
BallistaConfig::default().aqe_limit_early_stop_enabled()
})
}

fn with_ballista_grpc_metadata(self, metadata: HashMap<String, String>) -> Self {
let extension = BallistaGrpcMetadataInterceptor::new(metadata);
self.with_extension(Arc::new(extension))
Expand Down
12 changes: 12 additions & 0 deletions ballista/scheduler/src/scheduler_server/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ pub enum QueryStageSchedulerEvent {
ExecutorLost(String, Option<String>),
/// Request to cancel specific running tasks.
CancelTasks(Vec<RunningTaskInfo>),
/// AQE early-stop trigger fired: a tracked job's accumulated row
/// count has crossed its LIMIT threshold. The handler short-stops the
/// tagged producer stages (synthesizing successful completion for any
/// remaining tasks so the consumer stage can run with the partial
/// shuffle output) and cancels the now-irrelevant in-flight tasks.
EarlyStopCancel {
/// Unique job identifier.
job_id: String,
},
}

impl Debug for QueryStageSchedulerEvent {
Expand Down Expand Up @@ -165,6 +174,9 @@ impl Debug for QueryStageSchedulerEvent {
QueryStageSchedulerEvent::CancelTasks(status) => {
write!(f, "CancelTasks : status:[{status:?}].")
}
QueryStageSchedulerEvent::EarlyStopCancel { job_id } => {
write!(f, "EarlyStopCancel : job_id={job_id}.")
}
}
}
}
33 changes: 33 additions & 0 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,39 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
warn!("Fail to cancel running tasks due to {e:?}");
}
}
QueryStageSchedulerEvent::EarlyStopCancel { job_id } => {
info!("AQE early-stop firing for job {job_id}");
match self.state.task_manager.early_stop_job(&job_id).await {
Ok((tasks_to_cancel, follow_up_events)) => {
if !tasks_to_cancel.is_empty() {
event_sender
.post_event(
QueryStageSchedulerEvent::CancelTasks(
tasks_to_cancel,
),
)
.await?;
}
for ev in follow_up_events {
event_sender.post_event(ev).await?;
}
// Producer stages are now Successful so the
// consumer (LIMIT) stage can be scheduled;
// revive offers to pick it up.
if self.state.config.is_push_staged_scheduling() {
event_sender
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}
}
Err(e) => {
error!(
"Failed to short-stop AQE producer stages for \
job {job_id}: {e:?}"
);
}
}
}
QueryStageSchedulerEvent::JobDataClean(job_id) => {
self.state.executor_manager.clean_up_job_data(job_id);
}
Expand Down
Loading