diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index 86bec04f9a..f2653e5908 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -1112,7 +1112,7 @@ mod supported { "| | |", "| | =========UnResolvedStage[stage_id=2.0, children=1]========= |", "| | Inputs{1: StageOutput { partition_locations: {}, complete: false }} |", - "| | ShuffleWriterExec: partitioning: None |", + "| | SortShuffleWriterExec: partitioning=None |", "| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |", "| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |", "| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |", @@ -1195,7 +1195,7 @@ mod supported { "| | PlaceholderRowExec, metrics=[...] |", "| | |", "| | =========SuccessfulStage[stage_id=2, partitions=16]========= |", - "| | ShuffleWriterExec: partitioning: None, metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |", + "| | SortShuffleWriterExec: partitioning=None, metrics=[output_rows=..., input_rows=..., spill_bytes=..., spill_count=..., repart_time=..., spill_time=..., write_time=...] |", "| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=..., expr_0_eval_time=..., expr_1_eval_time=...] |", "| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=..., spill_count=..., spilled_bytes=..., spilled_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=...] |", "| | ShuffleReaderExec: partitioning: Hash([id@0], 16), metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., output_batches=...] |", diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index ee75917c54..b7bea312cd 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -19,7 +19,7 @@ use crate::client::BallistaClient; use crate::client_pool::BallistaClientPool; use crate::error::BallistaError; use crate::execution_plans::sort_shuffle::{ - get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition, + ShuffleIndex, get_index_path, is_sort_shuffle_output, stream_sort_shuffle_partition, }; use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt}; use crate::serde::scheduler::{PartitionLocation, PartitionStats}; @@ -718,19 +718,32 @@ fn fetch_partition_local( partition_id.partition_id, data_path ); let index_path = get_index_path(data_path); - return stream_sort_shuffle_partition( - data_path, - &index_path, - partition_id.partition_id, - ) - .map_err(|e| { + // None-partitioning sort-shuffle files have a single logical bucket; + // map any request on a single-bucket file to bucket 0 (the + // scheduler-level partition_id is the input partition number, not the + // file-level partition). + let index = ShuffleIndex::read_from_file(&index_path).map_err(|e| { BallistaError::FetchFailed( metadata.id.clone(), partition_id.stage_id, partition_id.partition_id, e.to_string(), ) - }); + })?; + let file_partition_id = if index.partition_count() == 1 { + 0 + } else { + partition_id.partition_id + }; + return stream_sort_shuffle_partition(data_path, &index_path, file_partition_id) + .map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + }); } debug!("fetch local partition file: {data_path:?} "); // Standard hash-based shuffle - read the file directly diff --git a/ballista/core/src/execution_plans/sort_shuffle/writer.rs b/ballista/core/src/execution_plans/sort_shuffle/writer.rs index c490efd983..a68d31192a 100644 --- a/ballista/core/src/execution_plans/sort_shuffle/writer.rs +++ b/ballista/core/src/execution_plans/sort_shuffle/writer.rs @@ -82,8 +82,9 @@ pub struct SortShuffleWriterExec { plan: Arc, /// Path to write output streams to work_dir: String, - /// Shuffle output partitioning (must be Hash partitioning) - shuffle_output_partitioning: Partitioning, + /// Shuffle output partitioning. `None` means the input is passed through to + /// a single output partition without re-partitioning. + shuffle_output_partitioning: Option, /// Sort shuffle configuration config: SortShuffleConfig, /// Execution metrics @@ -135,22 +136,25 @@ impl SortShuffleWriterExec { stage_id: usize, plan: Arc, work_dir: String, - shuffle_output_partitioning: Partitioning, + shuffle_output_partitioning: Option, config: SortShuffleConfig, ) -> Result { - // Sort shuffle only supports hash partitioning - match &shuffle_output_partitioning { - Partitioning::Hash(_, _) => {} - other => { - return Err(DataFusionError::Plan(format!( - "SortShuffleWriterExec only supports Hash partitioning, got: {other:?}" - ))); - } + if let Some(p) = &shuffle_output_partitioning + && !matches!(p, Partitioning::Hash(_, _)) + { + return Err(DataFusionError::Plan(format!( + "SortShuffleWriterExec only supports Hash or None partitioning, got: {p:?}" + ))); } + // When no shuffle partitioning is requested, mirror ShuffleWriterExec and + // pass the input plan's partitioning through unchanged. + let plan_partitioning = shuffle_output_partitioning + .clone() + .unwrap_or_else(|| plan.properties().output_partitioning().clone()); let properties = Arc::new(PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(plan.schema()), - shuffle_output_partitioning.clone(), + plan_partitioning, datafusion::physical_plan::execution_plan::EmissionType::Incremental, datafusion::physical_plan::execution_plan::Boundedness::Bounded, )); @@ -177,9 +181,10 @@ impl SortShuffleWriterExec { self.stage_id } - /// Get the shuffle output partitioning - pub fn shuffle_output_partitioning(&self) -> &Partitioning { - &self.shuffle_output_partitioning + /// Get the shuffle output partitioning, or `None` if the writer passes + /// the input through to a single output partition. + pub fn shuffle_output_partitioning(&self) -> Option<&Partitioning> { + self.shuffle_output_partitioning.as_ref() } /// Get the sort shuffle configuration @@ -214,10 +219,15 @@ impl SortShuffleWriterExec { let mut stream = plan.execute(input_partition, context.clone())?; let schema = stream.schema(); - let Partitioning::Hash(exprs, num_output_partitions) = partitioning else { - return Err(DataFusionError::Internal( - "Expected hash partitioning".to_string(), - )); + // None => single output bucket (pass-through); Hash(_, n) => n buckets. + let num_output_partitions = match &partitioning { + Some(Partitioning::Hash(_, n)) => *n, + None => 1, + Some(other) => { + return Err(DataFusionError::Internal(format!( + "Unexpected partitioning in SortShuffleWriterExec: {other:?}" + ))); + } }; let mut spill_manager = SpillManager::new( @@ -230,6 +240,19 @@ impl SortShuffleWriterExec { ) .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + // For Hash partitioning, capture the expressions so we can compute + // per-row partition assignments below. `None` means pass-through: + // every row goes to bucket 0. + let hash_exprs: Option>> = match &partitioning { + Some(Partitioning::Hash(exprs, _)) => Some(exprs.clone()), + None => None, + Some(other) => { + return Err(DataFusionError::Internal(format!( + "Unexpected partitioning in SortShuffleWriterExec: {other:?}" + ))); + } + }; + let mut buffered = BufferedBatches::new(num_output_partitions, schema.clone()); @@ -250,14 +273,20 @@ impl SortShuffleWriterExec { let input_batch = result?; metrics.input_rows.add(input_batch.num_rows()); - // Compute partition assignment for every row. + // Compute partition assignment for every row. With no shuffle + // partitioning, all rows route to bucket 0. let timer = metrics.repart_time.timer(); - let per_partition_rows = compute_partition_indices( - &input_batch, - &exprs, - num_output_partitions, - &mut hash_buffer, - )?; + let per_partition_rows = match &hash_exprs { + Some(exprs) => compute_partition_indices( + &input_batch, + exprs, + num_output_partitions, + &mut hash_buffer, + )?, + None => { + vec![(0..input_batch.num_rows() as u32).collect()] + } + }; timer.done(); // Estimate memory growth: input batch + index Vec growth. @@ -354,11 +383,23 @@ impl SortShuffleWriterExec { total_bytes_spilled ); + // For None partitioning we mirror legacy `ShuffleWriterExec` + // semantics: each input partition becomes its own output partition + // tagged with `partition_id = input_partition`. Otherwise the + // scheduler keys every task's result under partition 0 and the + // downstream `SortPreservingMergeExec` collapses 16 hash buckets + // into a single concatenated stream, which is not globally sorted. + let none_partitioning = partitioning.is_none(); let mut results = Vec::new(); for (part_id, num_batches, num_rows, num_bytes) in partition_stats { if num_rows > 0 { + let scheduler_partition_id = if none_partitioning { + input_partition as u64 + } else { + part_id as u64 + }; results.push(ShuffleWritePartition { - partition_id: part_id as u64, + partition_id: scheduler_partition_id, num_batches, num_rows, num_bytes, @@ -518,16 +559,17 @@ impl DisplayAs for SortShuffleWriterExec { t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { + let partitioning = self + .shuffle_output_partitioning + .as_ref() + .map(|p| p.to_string()) + .unwrap_or_else(|| "None".to_string()); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "SortShuffleWriterExec: partitioning={}", - self.shuffle_output_partitioning - ) + write!(f, "SortShuffleWriterExec: partitioning={partitioning}") } DisplayFormatType::TreeRender => { - write!(f, "partitioning={}", self.shuffle_output_partitioning) + write!(f, "partitioning={partitioning}") } } } @@ -677,7 +719,7 @@ impl ShuffleWriter for SortShuffleWriterExec { } fn shuffle_output_partitioning(&self) -> Option<&Partitioning> { - Some(&self.shuffle_output_partitioning) + self.shuffle_output_partitioning.as_ref() } fn input_partition_count(&self) -> usize { @@ -843,7 +885,7 @@ mod tests { 1, input_plan, work_dir.path().to_str().unwrap().to_string(), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2), + Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)), config, )?; @@ -940,7 +982,10 @@ mod tests { 1, input, work_dir.path().to_str().unwrap().to_string(), - Partitioning::Hash(vec![Arc::new(Column::new("k", 0))], num_partitions), + Some(Partitioning::Hash( + vec![Arc::new(Column::new("k", 0))], + num_partitions, + )), SortShuffleConfig::default() .with_memory_limit_per_task_bytes(sort_shuffle_memory_limit_bytes), )?; @@ -1087,7 +1132,10 @@ mod tests { 1, input, work_dir.path().to_str().unwrap().to_string(), - Partitioning::Hash(vec![Arc::new(Column::new("k", 0))], num_partitions), + Some(Partitioning::Hash( + vec![Arc::new(Column::new("k", 0))], + num_partitions, + )), SortShuffleConfig::default(), )?; @@ -1196,4 +1244,67 @@ mod tests { ); } } + #[tokio::test] + async fn test_sort_shuffle_with_no_partitioning() -> Result<()> { + use super::super::reader::stream_sort_shuffle_partition; + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let work_dir = TempDir::new()?; + let work_dir_str = work_dir.path().to_str().unwrap().to_string(); + + let input = create_test_input()?; + // create_test_input(): 2 input partitions × 2 batches × 2 rows = 8 rows total + let num_input_partitions = + input.properties().output_partitioning().partition_count(); + + let writer = SortShuffleWriterExec::try_new( + "job-none".to_string(), + 1, + input, + work_dir_str.clone(), + None, + SortShuffleConfig::default(), + )?; + + let mut total_rows = 0u64; + for input_partition in 0..num_input_partitions { + let results = writer + .clone() + .execute_shuffle_write(input_partition, task_ctx.clone()) + .await?; + + // Exactly one output partition per input partition. With None + // partitioning the writer mirrors legacy `ShuffleWriterExec` + // semantics: each input partition becomes its own scheduler-level + // output partition tagged with `partition_id = input_partition`. + assert_eq!(results.len(), 1, "expected 1 output partition"); + assert_eq!(results[0].partition_id, input_partition as u64); + assert!(results[0].is_sort_shuffle); + total_rows += results[0].num_rows; + + // Verify the data file is readable via the sort-shuffle reader + let data_path = work_dir + .path() + .join("job-none") + .join("1") + .join(format!("{input_partition}")) + .join("data.arrow"); + let index_path = data_path.with_extension("arrow.index"); + assert!(data_path.exists(), "data file missing"); + assert!(index_path.exists(), "index file missing"); + + let mut s = stream_sort_shuffle_partition(&data_path, &index_path, 0) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + let mut read_rows = 0u64; + while let Some(batch_result) = s.next().await { + let batch = batch_result?; + read_rows += batch.num_rows() as u64; + } + assert_eq!(read_rows, results[0].num_rows); + } + assert_eq!(total_rows, 8); // 2 partitions × 2 batches × 2 rows + Ok(()) + } } diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index d243982010..ef5cb62f66 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -370,12 +370,6 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { &converter, )?; - let partitioning = shuffle_output_partitioning.ok_or_else(|| { - DataFusionError::Internal( - "SortShuffleWriterExec requires hash partitioning".to_string(), - ) - })?; - let batch_size = if sort_shuffle_writer.batch_size > 0 { sort_shuffle_writer.batch_size as usize } else { @@ -392,7 +386,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { sort_shuffle_writer.stage_id as usize, input, "".to_string(), // executor will fill this in - partitioning, + shuffle_output_partitioning, config, )?)) } @@ -524,7 +518,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { Ok(()) } else if let Some(exec) = node.as_any().downcast_ref::() { let output_partitioning = match exec.shuffle_output_partitioning() { - Partitioning::Hash(exprs, partition_count) => { + Some(Partitioning::Hash(exprs, partition_count)) => { Some(datafusion_proto::protobuf::PhysicalHashRepartition { hash_expr: exprs .iter() @@ -538,9 +532,10 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec { partition_count: *partition_count as u64, }) } - other => { + None => None, + Some(other) => { return Err(DataFusionError::Internal(format!( - "SortShuffleWriterExec requires Hash partitioning, got: {other:?}" + "SortShuffleWriterExec requires Hash or None partitioning, got: {other:?}" ))); } }; diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index c57d104227..c13482adfb 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -154,7 +154,7 @@ impl ExecutionEngine for DefaultExecutionEngine { stage_id, plan.children()[0].clone(), work_dir.to_string(), - sort_shuffle_writer.shuffle_output_partitioning().clone(), + sort_shuffle_writer.shuffle_output_partitioning().cloned(), sort_shuffle_writer.config().clone(), )?; Ok(Arc::new(DefaultQueryStageExec::new( diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs index 32a0db4047..7721c53022 100644 --- a/ballista/executor/src/flight_service.rs +++ b/ballista/executor/src/flight_service.rs @@ -120,9 +120,23 @@ impl FlightService for BallistaFlightService { if is_sort_shuffle_output(&path) { debug!("Detected sort-based shuffle format for {path:?}"); let index_path = get_index_path(path.as_path()); - let stream = - stream_sort_shuffle_partition(&path, &index_path, *partition_id) - .map_err(|e| from_ballista_err(&e))?; + // None-partitioning sort-shuffle files have a single logical + // bucket; the scheduler-level `partition_id` is the input + // partition number, so map any request on a single-bucket + // file to bucket 0. + let index = ShuffleIndex::read_from_file(&index_path) + .map_err(|e| from_ballista_err(&e))?; + let file_partition_id = if index.partition_count() == 1 { + 0 + } else { + *partition_id + }; + let stream = stream_sort_shuffle_partition( + &path, + &index_path, + file_partition_id, + ) + .map_err(|e| from_ballista_err(&e))?; let schema = stream.schema(); // Map DataFusionError to FlightError @@ -363,6 +377,14 @@ async fn stream_sort_shuffle_block( let index = ShuffleIndex::read_from_file(&index_path).map_err(|e| from_ballista_err(&e))?; + // None-partitioning sort-shuffle files have a single logical bucket; map + // any scheduler-level partition_id on a single-bucket file to bucket 0. + let partition_id = if index.partition_count() == 1 { + 0 + } else { + partition_id + }; + if partition_id >= index.partition_count() { return Err(Status::out_of_range(format!( "partition_id {partition_id} not found in index (max: {})", diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index dc6c8b6907..076f11c333 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -555,27 +555,26 @@ pub(crate) fn create_shuffle_writer_with_config( .cloned() .unwrap_or_default(); - if ballista_config.shuffle_sort_based_enabled() { - // Sort shuffle requires hash partitioning - if let Some(Partitioning::Hash(exprs, partition_count)) = partitioning { - let sort_config = SortShuffleConfig::new( - true, - datafusion::arrow::ipc::CompressionType::LZ4_FRAME, - ballista_config.shuffle_sort_based_batch_size(), - ) - .with_memory_limit_per_task_bytes( - ballista_config.shuffle_sort_based_memory_limit_per_task_bytes(), - ); + if ballista_config.shuffle_sort_based_enabled() + && matches!(partitioning, None | Some(Partitioning::Hash(_, _))) + { + let sort_config = SortShuffleConfig::new( + true, + datafusion::arrow::ipc::CompressionType::LZ4_FRAME, + ballista_config.shuffle_sort_based_batch_size(), + ) + .with_memory_limit_per_task_bytes( + ballista_config.shuffle_sort_based_memory_limit_per_task_bytes(), + ); - return Ok(Arc::new(SortShuffleWriterExec::try_new( - job_id.to_owned(), - stage_id, - plan, - "".to_owned(), - Partitioning::Hash(exprs, partition_count), - sort_config, - )?)); - } + return Ok(Arc::new(SortShuffleWriterExec::try_new( + job_id.to_owned(), + stage_id, + plan, + "".to_owned(), + partitioning, + sort_config, + )?)); } // Fall back to standard shuffle writer @@ -1183,7 +1182,9 @@ order by // stage0 let stage0 = stages[0].clone(); let shuffle_write = downcast_exec!(stage0, SortShuffleWriterExec); - let partitioning = shuffle_write.shuffle_output_partitioning(); + let partitioning = shuffle_write + .shuffle_output_partitioning() + .expect("expected hash partitioning"); assert_eq!(2, partitioning.partition_count()); let partition_col = match partitioning { Partitioning::Hash(exprs, 2) => match exprs.as_slice() { @@ -1280,6 +1281,56 @@ order by Ok(()) } + #[tokio::test] + async fn sort_shuffle_used_for_none_partitioning() -> Result<(), BallistaError> { + use ballista_core::config::BallistaConfig; + use datafusion::config::{ConfigOptions, ExtensionOptions}; + + // With sort-based shuffle enabled, every stage in the distributed plan + // should use SortShuffleWriterExec - including the final stage(s) + // whose shuffle partitioning is None. + let ctx = datafusion_test_context("testdata").await?; + let session_state = ctx.state(); + + let df = ctx + .sql( + "select l_returnflag, sum(l_extendedprice) as total + from lineitem + group by l_returnflag + order by l_returnflag", + ) + .await?; + let plan = df.into_optimized_plan()?; + let plan = session_state.optimize(&plan)?; + let plan = session_state.create_physical_plan(&plan).await?; + + let mut ballista_config = BallistaConfig::default(); + ballista_config + .set("shuffle.sort_based.enabled", "true") + .expect("set sort-based-enabled"); + let mut config_options = ConfigOptions::new(); + config_options.extensions.insert(ballista_config); + + let mut planner = DefaultDistributedPlanner::new(); + let stages = planner.plan_query_stages("job-none", plan, &config_options)?; + + // The query produces multiple stages: a Hash-partitioned partial + // aggregate, a None-partitioned final-aggregate, and a None-partitioned + // sort-preserving merge. All should be SortShuffleWriterExec. + assert!(stages.len() >= 2, "expected at least 2 stages"); + for (i, stage) in stages.iter().enumerate() { + assert!( + stage + .as_any() + .downcast_ref::() + .is_some(), + "stage {i} expected SortShuffleWriterExec, got {}", + stage.name() + ); + } + Ok(()) + } + fn make_broadcast_test_ctx( threshold_bytes: usize, small_partitions: usize, diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index ce23a459a5..3402dd3745 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1717,7 +1717,8 @@ impl TaskDescription { { return shuffle_writer .shuffle_output_partitioning() - .partition_count(); + .map(|partitioning| partitioning.partition_count()) + .unwrap_or(1); } // Default fallback 1 diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index e1a24c4da5..434eee23e1 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -459,7 +459,7 @@ filter_expr="] } subgraph cluster4 { label = "Stage 5 [Unresolved]"; - stage_5_0 [shape=box, label="ShuffleWriter [48 partitions]"] + stage_5_0 [shape=box, label="SortShuffleWriter [48 partitions]"] stage_5_0_0 [shape=box, label="HashJoin join_expr=b@3 = b@1 filter_expr="] @@ -528,7 +528,7 @@ filter_expr="] } subgraph cluster3 { label = "Stage 4 [Unresolved]"; - stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"] + stage_4_0 [shape=box, label="SortShuffleWriter [48 partitions]"] stage_4_0_0 [shape=box, label="HashJoin join_expr=a@1 = a@0 filter_expr="] @@ -560,7 +560,7 @@ filter_expr="] .map_err(|e| BallistaError::Internal(format!("{e:?}")))?; let expected = r#"digraph G { - stage_4_0 [shape=box, label="ShuffleWriter [48 partitions]"] + stage_4_0 [shape=box, label="SortShuffleWriter [48 partitions]"] stage_4_0_0 [shape=box, label="HashJoin join_expr=a@1 = a@0 filter_expr="] diff --git a/benchmarks/benches/sort_shuffle.rs b/benchmarks/benches/sort_shuffle.rs index ddfcf29a0e..af06cf0f96 100644 --- a/benchmarks/benches/sort_shuffle.rs +++ b/benchmarks/benches/sort_shuffle.rs @@ -243,7 +243,10 @@ fn run_sort_shuffle( 1, input, work_dir.to_string(), - Partitioning::Hash(vec![Arc::new(Column::new("c0", 0))], NUM_OUTPUT_PARTITIONS), + Some(Partitioning::Hash( + vec![Arc::new(Column::new("c0", 0))], + NUM_OUTPUT_PARTITIONS, + )), config, ) .unwrap(); diff --git a/benchmarks/src/bin/shuffle_bench.rs b/benchmarks/src/bin/shuffle_bench.rs index d4f248b461..fc910a16ea 100644 --- a/benchmarks/src/bin/shuffle_bench.rs +++ b/benchmarks/src/bin/shuffle_bench.rs @@ -293,7 +293,7 @@ async fn execute_shuffle_write( 1, input, work_dir_str, - partitioning, + Some(partitioning), cfg, )?; let task_ctx = ctx.task_ctx();