Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ mod supported {
"| | |",
"| | =========UnResolvedStage[stage_id=2.0, children=1]========= |",
"| | Inputs{1: StageOutput { partition_locations: {}, complete: false }} |",
"| | ShuffleWriterExec: partitioning: None |",
"| | SortShuffleWriterExec: partitioning=None |",
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
"| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |",
Expand Down Expand Up @@ -1195,7 +1195,7 @@ mod supported {
"| | PlaceholderRowExec, metrics=[...] |",
"| | |",
"| | =========SuccessfulStage[stage_id=2, partitions=16]========= |",
"| | ShuffleWriterExec: partitioning: None, metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |",
"| | SortShuffleWriterExec: partitioning=None, metrics=[output_rows=..., input_rows=..., spill_bytes=..., spill_count=..., repart_time=..., spill_time=..., write_time=...] |",
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=..., expr_0_eval_time=..., expr_1_eval_time=...] |",
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=..., spill_count=..., spilled_bytes=..., spilled_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=...] |",
"| | ShuffleReaderExec: partitioning: Hash([id@0], 16), metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=...] |",
Expand Down
29 changes: 21 additions & 8 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::client::BallistaClient;
use crate::client_pool::BallistaClientPool;
use crate::error::BallistaError;
use crate::execution_plans::sort_shuffle::{
get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition,
ShuffleIndex, get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition,
};
use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
Expand Down Expand Up @@ -718,19 +718,32 @@ fn fetch_partition_local(
partition_id.partition_id, data_path
);
let index_path = get_index_path(data_path);
return stream_sort_shuffle_partition(
data_path,
&index_path,
partition_id.partition_id,
)
.map_err(|e| {
// None-partitioning sort-shuffle files have a single logical bucket;
// map any request on a single-bucket file to bucket 0 (the
// scheduler-level partition_id is the input partition number, not the
// file-level partition).
let index = ShuffleIndex::read_from_file(&index_path).map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
e.to_string(),
)
});
})?;
let file_partition_id = if index.partition_count() == 1 {
0
} else {
partition_id.partition_id
};
return stream_sort_shuffle_partition(data_path, &index_path, file_partition_id)
.map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
e.to_string(),
)
});
}
debug!("fetch local partition file: {data_path:?} ");
// Standard hash-based shuffle - read the file directly
Expand Down
185 changes: 148 additions & 37 deletions ballista/core/src/execution_plans/sort_shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ pub struct SortShuffleWriterExec {
plan: Arc<dyn ExecutionPlan>,
/// Path to write output streams to
work_dir: String,
/// Shuffle output partitioning (must be Hash partitioning)
shuffle_output_partitioning: Partitioning,
/// Shuffle output partitioning. `None` means the input is passed through to
/// a single output partition without re-partitioning.
shuffle_output_partitioning: Option<Partitioning>,
/// Sort shuffle configuration
config: SortShuffleConfig,
/// Execution metrics
Expand Down Expand Up @@ -135,22 +136,25 @@ impl SortShuffleWriterExec {
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: String,
shuffle_output_partitioning: Partitioning,
shuffle_output_partitioning: Option<Partitioning>,
config: SortShuffleConfig,
) -> Result<Self> {
// Sort shuffle only supports hash partitioning
match &shuffle_output_partitioning {
Partitioning::Hash(_, _) => {}
other => {
return Err(DataFusionError::Plan(format!(
"SortShuffleWriterExec only supports Hash partitioning, got: {other:?}"
)));
}
if let Some(p) = &shuffle_output_partitioning
&& !matches!(p, Partitioning::Hash(_, _))
{
return Err(DataFusionError::Plan(format!(
"SortShuffleWriterExec only supports Hash or None partitioning, got: {p:?}"
)));
}

// When no shuffle partitioning is requested, mirror ShuffleWriterExec and
// pass the input plan's partitioning through unchanged.
let plan_partitioning = shuffle_output_partitioning
.clone()
.unwrap_or_else(|| plan.properties().output_partitioning().clone());
let properties = Arc::new(PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(plan.schema()),
shuffle_output_partitioning.clone(),
plan_partitioning,
datafusion::physical_plan::execution_plan::EmissionType::Incremental,
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
));
Expand All @@ -177,9 +181,10 @@ impl SortShuffleWriterExec {
self.stage_id
}

/// Get the shuffle output partitioning
pub fn shuffle_output_partitioning(&self) -> &Partitioning {
&self.shuffle_output_partitioning
/// Get the shuffle output partitioning, or `None` if the writer passes
/// the input through to a single output partition.
pub fn shuffle_output_partitioning(&self) -> Option<&Partitioning> {
self.shuffle_output_partitioning.as_ref()
}

/// Get the sort shuffle configuration
Expand Down Expand Up @@ -214,10 +219,15 @@ impl SortShuffleWriterExec {
let mut stream = plan.execute(input_partition, context.clone())?;
let schema = stream.schema();

let Partitioning::Hash(exprs, num_output_partitions) = partitioning else {
return Err(DataFusionError::Internal(
"Expected hash partitioning".to_string(),
));
// None => single output bucket (pass-through); Hash(_, n) => n buckets.
let num_output_partitions = match &partitioning {
Some(Partitioning::Hash(_, n)) => *n,
None => 1,
Some(other) => {
return Err(DataFusionError::Internal(format!(
"Unexpected partitioning in SortShuffleWriterExec: {other:?}"
)));
}
};

let mut spill_manager = SpillManager::new(
Expand All @@ -230,6 +240,19 @@ impl SortShuffleWriterExec {
)
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

// For Hash partitioning, capture the expressions so we can compute
// per-row partition assignments below. `None` means pass-through:
// every row goes to bucket 0.
let hash_exprs: Option<Vec<Arc<dyn PhysicalExpr>>> = match &partitioning {
Some(Partitioning::Hash(exprs, _)) => Some(exprs.clone()),
None => None,
Some(other) => {
return Err(DataFusionError::Internal(format!(
"Unexpected partitioning in SortShuffleWriterExec: {other:?}"
)));
}
};

let mut buffered =
BufferedBatches::new(num_output_partitions, schema.clone());

Expand All @@ -250,14 +273,20 @@ impl SortShuffleWriterExec {
let input_batch = result?;
metrics.input_rows.add(input_batch.num_rows());

// Compute partition assignment for every row.
// Compute partition assignment for every row. With no shuffle
// partitioning, all rows route to bucket 0.
let timer = metrics.repart_time.timer();
let per_partition_rows = compute_partition_indices(
&input_batch,
&exprs,
num_output_partitions,
&mut hash_buffer,
)?;
let per_partition_rows = match &hash_exprs {
Some(exprs) => compute_partition_indices(
&input_batch,
exprs,
num_output_partitions,
&mut hash_buffer,
)?,
None => {
vec![(0..input_batch.num_rows() as u32).collect()]
}
};
timer.done();

// Estimate memory growth: input batch + index Vec growth.
Expand Down Expand Up @@ -354,11 +383,23 @@ impl SortShuffleWriterExec {
total_bytes_spilled
);

// For None partitioning we mirror legacy `ShuffleWriterExec`
// semantics: each input partition becomes its own output partition
// tagged with `partition_id = input_partition`. Otherwise the
// scheduler keys every task's result under partition 0 and the
// downstream `SortPreservingMergeExec` collapses 16 hash buckets
// into a single concatenated stream, which is not globally sorted.
let none_partitioning = partitioning.is_none();
let mut results = Vec::new();
for (part_id, num_batches, num_rows, num_bytes) in partition_stats {
if num_rows > 0 {
let scheduler_partition_id = if none_partitioning {
input_partition as u64
} else {
part_id as u64
};
results.push(ShuffleWritePartition {
partition_id: part_id as u64,
partition_id: scheduler_partition_id,
num_batches,
num_rows,
num_bytes,
Expand Down Expand Up @@ -518,16 +559,17 @@ impl DisplayAs for SortShuffleWriterExec {
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let partitioning = self
.shuffle_output_partitioning
.as_ref()
.map(|p| p.to_string())
.unwrap_or_else(|| "None".to_string());
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"SortShuffleWriterExec: partitioning={}",
self.shuffle_output_partitioning
)
write!(f, "SortShuffleWriterExec: partitioning={partitioning}")
}
DisplayFormatType::TreeRender => {
write!(f, "partitioning={}", self.shuffle_output_partitioning)
write!(f, "partitioning={partitioning}")
}
}
}
Expand Down Expand Up @@ -677,7 +719,7 @@ impl ShuffleWriter for SortShuffleWriterExec {
}

fn shuffle_output_partitioning(&self) -> Option<&Partitioning> {
Some(&self.shuffle_output_partitioning)
self.shuffle_output_partitioning.as_ref()
}

fn input_partition_count(&self) -> usize {
Expand Down Expand Up @@ -843,7 +885,7 @@ mod tests {
1,
input_plan,
work_dir.path().to_str().unwrap().to_string(),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
config,
)?;

Expand Down Expand Up @@ -940,7 +982,10 @@ mod tests {
1,
input,
work_dir.path().to_str().unwrap().to_string(),
Partitioning::Hash(vec![Arc::new(Column::new("k", 0))], num_partitions),
Some(Partitioning::Hash(
vec![Arc::new(Column::new("k", 0))],
num_partitions,
)),
SortShuffleConfig::default()
.with_memory_limit_per_task_bytes(sort_shuffle_memory_limit_bytes),
)?;
Expand Down Expand Up @@ -1087,7 +1132,10 @@ mod tests {
1,
input,
work_dir.path().to_str().unwrap().to_string(),
Partitioning::Hash(vec![Arc::new(Column::new("k", 0))], num_partitions),
Some(Partitioning::Hash(
vec![Arc::new(Column::new("k", 0))],
num_partitions,
)),
SortShuffleConfig::default(),
)?;

Expand Down Expand Up @@ -1196,4 +1244,67 @@ mod tests {
);
}
}
#[tokio::test]
async fn test_sort_shuffle_with_no_partitioning() -> Result<()> {
use super::super::reader::stream_sort_shuffle_partition;

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let work_dir = TempDir::new()?;
let work_dir_str = work_dir.path().to_str().unwrap().to_string();

let input = create_test_input()?;
// create_test_input(): 2 input partitions × 2 batches × 2 rows = 8 rows total
let num_input_partitions =
input.properties().output_partitioning().partition_count();

let writer = SortShuffleWriterExec::try_new(
"job-none".to_string(),
1,
input,
work_dir_str.clone(),
None,
SortShuffleConfig::default(),
)?;

let mut total_rows = 0u64;
for input_partition in 0..num_input_partitions {
let results = writer
.clone()
.execute_shuffle_write(input_partition, task_ctx.clone())
.await?;

// Exactly one output partition per input partition. With None
// partitioning the writer mirrors legacy `ShuffleWriterExec`
// semantics: each input partition becomes its own scheduler-level
// output partition tagged with `partition_id = input_partition`.
assert_eq!(results.len(), 1, "expected 1 output partition");
assert_eq!(results[0].partition_id, input_partition as u64);
assert!(results[0].is_sort_shuffle);
total_rows += results[0].num_rows;

// Verify the data file is readable via the sort-shuffle reader
let data_path = work_dir
.path()
.join("job-none")
.join("1")
.join(format!("{input_partition}"))
.join("data.arrow");
let index_path = data_path.with_extension("arrow.index");
assert!(data_path.exists(), "data file missing");
assert!(index_path.exists(), "index file missing");

let mut s = stream_sort_shuffle_partition(&data_path, &index_path, 0)
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
let mut read_rows = 0u64;
while let Some(batch_result) = s.next().await {
let batch = batch_result?;
read_rows += batch.num_rows() as u64;
}
assert_eq!(read_rows, results[0].num_rows);
}
assert_eq!(total_rows, 8); // 2 partitions × 2 batches × 2 rows
Ok(())
}
}
Loading
Loading