Skip to content

Commit 0af4c98

Browse files
committed
perf(pm): add install scheduler wiring
1 parent 45c746d commit 0af4c98

8 files changed

Lines changed: 769 additions & 240 deletions

File tree

crates/pm/src/helper/lock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::fs;
1717
use crate::helper::workspace::find_workspaces;
1818
use crate::util::cli_enum::{PackageAction, SaveType};
1919
use crate::util::cloner::clone_package;
20-
use crate::util::downloader::{is_git_url, resolve_cache_path};
20+
use crate::util::downloader::{download_and_extract_to_cache, is_git_url};
2121
use crate::util::git_resolver::{resolve_git_spec, resolve_github_spec};
2222
use crate::util::json::{load_package_lock_json_from_path, read_json_file};
2323

@@ -267,9 +267,9 @@ pub async fn prepare_global_package_json(npm_spec: &str, prefix: Option<&str>) -
267267
.ok_or_else(|| anyhow!("Failed to get tarball URL from manifest"))?;
268268

269269
// Download and extract package to cache.
270-
let cache_path = resolve_cache_path(&name, &resolved.version, tarball_url)
270+
let cache_path = download_and_extract_to_cache(&name, &resolved.version, tarball_url)
271271
.await
272-
.ok_or_else(|| anyhow!("Failed to download package {name}"))?;
272+
.with_context(|| format!("Failed to download package {name}"))?;
273273

274274
// If the package has install scripts, create a flag file
275275
// in linux, we can use hardlink when FICLONE is not supported

crates/pm/src/helper/ruborist_context.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use utoo_ruborist::service::{
66
BuildDepsOptions, BuildDepsOutput, Glob, ManifestStore, UnifiedRegistry,
77
};
88

9-
use crate::service::pipeline::{PipelineChannels, PipelineReceiver};
9+
use crate::service::install_scheduler::{InstallEventReceiver, InstallScheduler};
1010
use crate::util::cache::get_cache_dir;
1111
use crate::util::logger::ProgressReceiver;
1212
use crate::util::manifest_store::DiskManifestStore;
@@ -66,17 +66,13 @@ impl Context {
6666
}
6767
}
6868

69-
/// Create BuildDepsOptions with PipelineReceiver for concurrent download/clone.
70-
/// Returns (options, channels) where channels are used to start pipeline workers.
71-
pub async fn pipeline_deps_options(
69+
/// Create BuildDepsOptions that forwards package events to the install scheduler.
70+
pub async fn install_deps_options(
7271
cwd: PathBuf,
73-
) -> (
74-
BuildDepsOptions<GlobImpl, PipelineReceiver<ProgressReceiver>>,
75-
PipelineChannels,
76-
) {
77-
let (receiver, channels) = PipelineReceiver::new(ProgressReceiver);
78-
let options = Self::deps_options(cwd, receiver).await;
79-
(options, channels)
72+
scheduler: InstallScheduler,
73+
) -> BuildDepsOptions<GlobImpl, InstallEventReceiver<ProgressReceiver>> {
74+
let receiver = InstallEventReceiver::new(ProgressReceiver, scheduler, cwd.clone());
75+
Self::deps_options(cwd, receiver).await
8076
}
8177

8278
/// Resolve dependency tree with plain ProgressReceiver. Returns

crates/pm/src/service/install.rs

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::util::cli_enum::ScriptPolicy;
2-
use anyhow::Context;
3-
use anyhow::Result;
2+
use anyhow::{Context as _, Result};
3+
use futures::stream::{FuturesUnordered, StreamExt};
44
use std::collections::{HashMap, HashSet};
55
use std::path::Path;
66
use std::time::Instant;
@@ -10,8 +10,9 @@ use crate::fs;
1010
use crate::helper::global_bin::get_global_bin_dir;
1111
use crate::helper::lock::{
1212
Package, UpdatePackageJsonOptions, extract_package_name, group_by_depth, is_pkg_lock_outdated,
13-
prepare_global_package_json, update_package_json,
13+
prepare_global_package_json, save_package_lock, update_package_json,
1414
};
15+
use crate::helper::ruborist_context::{Context, spawn_save_project_cache};
1516
use crate::helper::workspace::init_project_root;
1617
use crate::model::package::PackageInfo;
1718
use crate::service::rebuild::RebuildService;
@@ -62,28 +63,27 @@ fn should_omit_package(package: &Package, omit: &HashSet<OmitType>) -> bool {
6263
false
6364
}
6465

65-
pub async fn install_packages(
66+
async fn install_packages(
6667
groups: &HashMap<usize, Vec<(String, Package)>>,
6768
cwd: &Path,
6869
omit: &HashSet<OmitType>,
70+
scheduler: &super::install_scheduler::InstallScheduler,
6971
) -> Result<()> {
70-
use crate::util::cloner::clone_package_once;
71-
7272
// Surface the clean step in the spinner — it doesn't move `pos`, so
7373
// without a message the bar looks frozen on large trees.
7474
log_progress("validating node_modules");
7575
clean_deps(groups, cwd).await?;
7676
log_progress("linking packages");
7777

7878
// Always process level-by-level to ensure parent directories exist before
79-
// children. Within each level, tasks run concurrently. The pipeline's
80-
// clone_worker may have already cloned some packages — clone_package_once
81-
// deduplicates via CLONE_CACHE so no double work occurs.
79+
// children. Within each level, tasks run concurrently. The install
80+
// scheduler owns clone/download dedupe, so package tasks only request the
81+
// concrete target they need.
8282
let mut depths: Vec<_> = groups.keys().cloned().collect();
8383
depths.sort_unstable();
8484

8585
for depth in depths.iter() {
86-
let mut clone_tasks: Vec<tokio::task::JoinHandle<Result<()>>> = Vec::new();
86+
let mut clone_tasks = FuturesUnordered::new();
8787

8888
if let Some(packages) = groups.get(depth) {
8989
for (path, package) in packages.iter() {
@@ -140,14 +140,16 @@ pub async fn install_packages(
140140
.ok_or_else(|| anyhow::anyhow!("package {name} missing version"))?;
141141
let cwd_clone = cwd.to_path_buf();
142142
let target_path = cwd_clone.join(&path);
143+
let scheduler = scheduler.clone();
143144

144145
// Check if this is an optional dependency
145146
let is_optional =
146147
package.optional == Some(true) || package.dev_optional == Some(true);
147148

148-
let task = tokio::spawn(async move {
149-
if let Err(e) =
150-
clone_package_once(&name, &version, &resolved, &target_path).await
149+
clone_tasks.push(async move {
150+
if let Err(e) = scheduler
151+
.ensure_clone(name.clone(), version, resolved, target_path.clone())
152+
.await
151153
{
152154
if is_optional {
153155
tracing::warn!(
@@ -162,21 +164,33 @@ pub async fn install_packages(
162164
log_progress(&format!("{name} resolved"));
163165
update_package_binary(&target_path, &name).await
164166
});
165-
clone_tasks.push(task);
166167
} else {
167168
PROGRESS_BAR.inc(1);
168169
}
169170
}
170171
}
171172

172-
for task in clone_tasks {
173-
task.await??;
173+
while let Some(result) = clone_tasks.next().await {
174+
result?;
174175
}
175176
}
176177

177178
Ok(())
178179
}
179180

181+
async fn resolve_package_lock_with_scheduler(
182+
root_path: &Path,
183+
scheduler: super::install_scheduler::InstallScheduler,
184+
) -> Result<utoo_ruborist::lock::PackageLock> {
185+
let options = Context::install_deps_options(root_path.to_path_buf(), scheduler).await;
186+
let output = utoo_ruborist::service::build_deps(options).await?;
187+
188+
save_package_lock(root_path, &output.lock).await?;
189+
spawn_save_project_cache(root_path.to_path_buf(), output.project_cache);
190+
191+
Ok(output.lock)
192+
}
193+
180194
pub struct InstallService;
181195

182196
impl InstallService {
@@ -244,16 +258,31 @@ impl InstallService {
244258
// itself emits a `tracing::warn` with the specific mismatch reason.
245259
let use_fresh_lock = fs::try_exists(&lock_path).await.unwrap_or(false)
246260
&& !is_pkg_lock_outdated(root_path).await.unwrap_or(true);
247-
248-
let (package_lock, pipeline_handles) = if use_fresh_lock {
249-
let lock = load_package_lock_json_from_path(root_path).await?;
250-
(lock, None)
261+
let scheduler_handle = super::install_scheduler::InstallSchedulerHandle::start();
262+
let scheduler = scheduler_handle.scheduler();
263+
264+
let (package_lock, used_scheduler_prefetch) = if use_fresh_lock {
265+
let lock = match load_package_lock_json_from_path(root_path).await {
266+
Ok(lock) => lock,
267+
Err(e) => {
268+
scheduler_handle.shutdown().await;
269+
return Err(e);
270+
}
271+
};
272+
(lock, false)
251273
} else {
252274
start_progress_bar();
253275
let resolve_start = Instant::now();
254-
let result = super::pipeline::resolve_with_pipeline(root_path).await?;
276+
let lock = match resolve_package_lock_with_scheduler(root_path, scheduler.clone()).await
277+
{
278+
Ok(lock) => lock,
279+
Err(e) => {
280+
scheduler_handle.shutdown().await;
281+
return Err(e);
282+
}
283+
};
255284
finish_progress_bar("package-lock.json resolved", Some(resolve_start.elapsed()));
256-
(result.package_lock, Some(result.handles))
285+
(lock, true)
257286
};
258287

259288
let groups = group_by_depth(&package_lock.packages);
@@ -264,15 +293,15 @@ impl InstallService {
264293
}
265294

266295
let link_start = Instant::now();
267-
install_packages(&groups, root_path, omit)
296+
let install_result = install_packages(&groups, root_path, omit, &scheduler)
268297
.await
269-
.context("Failed to install packages")?;
298+
.context("Failed to install packages");
270299

271-
// Wait for pipeline workers to complete (if any)
272-
if let Some(handles) = pipeline_handles {
273-
handles.await_completion().await;
274-
super::pipeline::print_pipeline_summary();
300+
scheduler_handle.shutdown().await;
301+
if used_scheduler_prefetch {
302+
super::install_scheduler::print_summary();
275303
}
304+
install_result?;
276305
finish_progress_bar("node_modules cloned", Some(link_start.elapsed()));
277306

278307
RebuildService::rebuild(&package_lock, root_path, scripts).await?;

0 commit comments

Comments
 (0)