Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Extension {
.map(|s| s.config().clone())
.unwrap_or_else(default_config_producer);

let scheduler_url = format!("http://localhost:{}", addr.port());
let scheduler_url = format!("http://127.0.0.1:{}", addr.port());

let scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Expand Down
133 changes: 110 additions & 23 deletions ballista/client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::error::Error;
use std::path::PathBuf;

use ballista::prelude::{SessionConfigExt, SessionContextExt};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::{
BallistaCodec, protobuf::scheduler_grpc_client::SchedulerGrpcClient,
};
Expand Down Expand Up @@ -99,25 +100,36 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn
}
}

/// starts a ballista cluster for integration tests
/// starts a ballista cluster for integration tests (pull-staged scheduling).
#[allow(dead_code)]
pub async fn setup_test_cluster() -> (String, u16) {
setup_test_cluster_with_scheduling(TaskSchedulingPolicy::PullStaged).await
}

/// starts a ballista cluster using the given [`TaskSchedulingPolicy`].
#[allow(dead_code)]
pub async fn setup_test_cluster_with_scheduling(
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
let default_codec = BallistaCodec::default();

let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.expect("scheduler to be created");
let addr = ballista_scheduler::standalone::new_standalone_scheduler_with_scheduling(
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "localhost".to_string();
let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor(
ballista_executor::new_standalone_executor_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
default_codec,
scheduling_policy,
)
.await
.expect("executor to be created");
Expand All @@ -127,26 +139,48 @@ pub async fn setup_test_cluster() -> (String, u16) {
(host, addr.port())
}

/// starts a ballista cluster for integration tests
/// starts a ballista cluster using push-staged scheduling (default executor policy).
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
let config = SessionConfig::new_with_ballista();
pub async fn setup_test_cluster_push_scheduling() -> (String, u16) {
setup_test_cluster_with_scheduling(TaskSchedulingPolicy::PushStaged).await
}

let addr = ballista_scheduler::standalone::new_standalone_scheduler_from_state(
&session_state,
/// starts a cluster with [`SessionState`] (pull scheduling).
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (String, u16) {
setup_test_cluster_with_state_and_scheduling(
session_state,
TaskSchedulingPolicy::PullStaged,
)
.await
.expect("scheduler to be created");
}

let host = "localhost".to_string();
/// starts a ballista cluster with selectable [`TaskSchedulingPolicy`].
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state_and_scheduling(
session_state: SessionState,
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = SessionConfig::new_with_ballista();

let addr =
ballista_scheduler::standalone::new_standalone_scheduler_from_state_with_scheduling_policy(
&session_state,
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_state(
ballista_executor::new_standalone_executor_from_state_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
&session_state,
scheduling_policy,
)
.await
.expect("executor to be created");
Expand All @@ -156,11 +190,39 @@ pub async fn setup_test_cluster_with_state(session_state: SessionState) -> (Stri
(host, addr.port())
}

/// starts a cluster with push-staged scheduling and a custom session state.
#[allow(dead_code)]
pub async fn setup_test_cluster_with_state_push_scheduling(
session_state: SessionState,
) -> (String, u16) {
setup_test_cluster_with_state_and_scheduling(
session_state,
TaskSchedulingPolicy::PushStaged,
)
.await
}

#[allow(dead_code)]
pub async fn setup_test_cluster_with_builders(
config_producer: ConfigProducer,
runtime_producer: RuntimeProducer,
session_builder: SessionBuilder,
) -> (String, u16) {
setup_test_cluster_with_builders_and_scheduling(
config_producer,
runtime_producer,
session_builder,
TaskSchedulingPolicy::PullStaged,
)
.await
}

#[allow(dead_code)]
pub async fn setup_test_cluster_with_builders_and_scheduling(
config_producer: ConfigProducer,
runtime_producer: RuntimeProducer,
session_builder: SessionBuilder,
scheduling_policy: TaskSchedulingPolicy,
) -> (String, u16) {
let config = config_producer();

Expand All @@ -171,26 +233,29 @@ pub async fn setup_test_cluster_with_builders(
datafusion_proto::protobuf::PhysicalPlanNode,
> = BallistaCodec::new(logical, physical);

let addr = ballista_scheduler::standalone::new_standalone_scheduler_with_builder(
session_builder,
config_producer.clone(),
codec.clone(),
)
.await
.expect("scheduler to be created");
let addr =
ballista_scheduler::standalone::new_standalone_scheduler_with_builder_and_policy(
session_builder,
config_producer.clone(),
codec.clone(),
scheduling_policy,
)
.await
.expect("scheduler to be created");

let host = "localhost".to_string();
let host = "127.0.0.1".to_string();

let scheduler =
connect_to_scheduler(format!("http://{}:{}", host, addr.port())).await;

ballista_executor::new_standalone_executor_from_builder(
ballista_executor::new_standalone_executor_from_builder_with_scheduling_policy(
scheduler,
config.ballista_standalone_parallelism(),
config_producer,
runtime_producer,
codec,
Default::default(),
scheduling_policy,
)
.await
.expect("executor to be created");
Expand Down Expand Up @@ -234,6 +299,15 @@ pub async fn remote_context() -> SessionContext {
.unwrap()
}

/// Remote [`SessionContext`] against a throwaway cluster using push-staged scheduling.
#[allow(dead_code)]
pub async fn remote_context_push_scheduling() -> SessionContext {
let (host, port) = setup_test_cluster_push_scheduling().await;
SessionContext::remote(&format!("df://{host}:{port}"))
.await
.unwrap()
}

#[allow(dead_code)]
pub async fn standalone_context_with_state() -> SessionContext {
let config = SessionConfig::new_with_ballista();
Expand All @@ -257,6 +331,19 @@ pub async fn remote_context_with_state() -> SessionContext {
.unwrap()
}

#[allow(dead_code)]
pub async fn remote_context_with_state_push_scheduling() -> SessionContext {
let config = SessionConfig::new_with_ballista();
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let (host, port) = setup_test_cluster_with_state_push_scheduling(state.clone()).await;
SessionContext::remote_with_state(&format!("df://{host}:{port}"), state)
.await
.unwrap()
}

#[ctor::ctor(unsafe)]
fn init() {
// Enable RUST_LOG logging configuration for test
Expand Down
Loading
Loading