From 1efa749a3a86c79840d078e81143a47932bb3a97 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Fri, 22 May 2026 02:56:44 +0800 Subject: [PATCH] perf(pm): add resolver demand mainloop --- crates/ruborist/src/resolver/builder.rs | 1215 ++++++++++++++++++----- crates/ruborist/src/service/api.rs | 79 +- 2 files changed, 981 insertions(+), 313 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index b98f9b313..e7a1be14b 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -7,19 +7,17 @@ //! - Hoisting (placing packages as high as possible in the tree) //! - Override rules //! - Different dependency types (prod, dev, peer, optional) -//! - Parallel manifest preloading for performance +//! - Demand-driven parallel manifest jobs for performance //! -//! # Two-Phase Resolution +//! # Demand BFS Resolution //! -//! The builder uses a two-phase approach for optimal performance: -//! 1. **Preload Phase**: Parallel fetch of all manifests to warm up caches -//! 2. **Build Phase**: Sequential BFS traversal reading from cache -//! -//! This separation allows for maximum parallelism during network I/O -//! while keeping the graph building logic simple and deterministic. +//! The builder owns breadth-first traversal, per-run manifest cache, waiters, +//! and inflight de-duplication. Provider tasks only execute concrete manifest +//! jobs such as fetch, parse, extract, and persistence. +use futures::stream::{FuturesUnordered, StreamExt}; use petgraph::graph::NodeIndex; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::path::PathBuf; use std::sync::Arc; @@ -28,13 +26,19 @@ use anyhow::Context as _; use crate::model::graph::{DependencyGraph, FindResult, PackageNode}; use crate::model::manifest::NodeManifest; +use crate::model::manifest::{CoreVersionManifest, FullManifest, VersionsRef}; use crate::model::node::EdgeType; use crate::model::package_json::PackageJson; -use crate::resolver::preload::{PreloadConfig, preload_manifests}; use crate::resolver::registry::{ResolveError, resolve_registry_dep}; -use crate::service::{ManifestProvider, ProjectCacheData}; -use crate::spec::{Catalogs, PackageSpec, Protocol}; +use crate::resolver::semver::normalize_spec; +use crate::resolver::version::resolve_target_version; +use crate::service::{ + ManifestFullData, ManifestJob, ManifestJobDone, ManifestProvider, MetadataFormat, + ProjectCacheData, +}; +use crate::spec::{Catalogs, PackageSpec, Protocol, SpecStr}; use crate::traits::progress::{BuildEvent, EventReceiver, NoopReceiver}; +use crate::traits::registry::RegistryError; use crate::traits::registry::{RegistryClient, ResolvedPackage}; /// Dispatch a git/github spec to the real `gix`-backed resolver when the @@ -100,15 +104,15 @@ pub use super::edges::{ }; pub use crate::model::node::{DevDeps, PeerDeps}; +const DEFAULT_CONCURRENCY: usize = 128; + /// Configuration for dependency resolution. #[derive(Debug, Clone)] pub struct BuildDepsConfig { /// How to handle peer dependencies. pub peer_deps: PeerDeps, - /// Maximum number of concurrent manifest fetches during preload + /// Maximum number of concurrent manifest jobs. pub concurrency: usize, - /// Whether to skip preload phase (useful when cache is already warm) - pub skip_preload: bool, /// Cache directory for git clones (defaults to `~/.cache/nm`) pub cache_dir: Option, /// Shared dedup cache for concurrent git clone operations @@ -118,7 +122,7 @@ pub struct BuildDepsConfig { /// Catalog definitions for the `catalog:` dependency protocol. /// Key `""` = default catalog, other keys = named catalogs. pub catalogs: Catalogs, - /// Host-provided project cache used to seed resolver manifest state. + /// Host-provided project cache used to seed the resolver-owned manifest cache. pub warm_project_cache: Option, } @@ -126,8 +130,7 @@ impl Default for BuildDepsConfig { fn default() -> Self { Self { peer_deps: PeerDeps::Skip, - concurrency: crate::resolver::preload::DEFAULT_CONCURRENCY, - skip_preload: false, + concurrency: DEFAULT_CONCURRENCY, cache_dir: dirs::home_dir().map(|d| d.join(".cache/nm")), git_clone_cache: Arc::new(GitCloneCache::new()), http_fetch_cache: Arc::new(HttpFetchCache::new()), @@ -150,12 +153,6 @@ impl BuildDepsConfig { self } - /// Create config that skips preload phase - pub fn with_skip_preload(mut self, skip: bool) -> Self { - self.skip_preload = skip; - self - } - /// Set the cache directory for git clones pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self { self.cache_dir = Some(cache_dir); @@ -168,7 +165,6 @@ impl BuildDepsConfig { self } - /// Set the host-provided warm project cache. pub fn with_warm_project_cache(mut self, warm_project_cache: Option) -> Self { self.warm_project_cache = warm_project_cache; self @@ -185,44 +181,6 @@ struct NodeFlags { is_peer: bool, } -/// Gather all unresolved deps from root and workspace nodes for preloading. -/// -/// Only registry specs (e.g. `^4.17.0`) are collected. `catalog:` specs are -/// resolved at edge creation time, so by the time this runs they are already -/// concrete registry specs. -fn gather_preload_deps(graph: &DependencyGraph, peer_deps: PeerDeps) -> Vec<(String, String)> { - use crate::spec::SpecStr; - use std::collections::HashSet; - - let mut deps = HashSet::new(); - - let collect = |node_index: NodeIndex, deps: &mut HashSet<(String, String)>| { - for (_, edge) in graph.get_dependency_edges(node_index) { - if edge.valid { - continue; - } - if peer_deps == PeerDeps::Skip && edge.edge_type == EdgeType::Peer { - continue; - } - if edge.spec.is_registry_spec() { - deps.insert((edge.name.clone(), edge.spec.clone())); - } - } - }; - - collect(graph.root_index, &mut deps); - - for node_index in graph.graph.node_indices() { - if let Some(node) = graph.get_node(node_index) - && node.is_workspace() - { - collect(node_index, &mut deps); - } - } - - deps.into_iter().collect() -} - /// Create a new package node for a resolved dependency. /// /// # Arguments @@ -661,97 +619,6 @@ pub async fn process_dependency( } } -/// Place an already-resolved registry package into the graph. -/// -/// The demand resolver uses this after a manifest job completes; the legacy -/// resolver path also goes through it so placement semantics stay shared. -pub fn process_dependency_with_resolved( - graph: &mut DependencyGraph, - node_index: NodeIndex, - edge_info: &DependencyEdgeInfo, - resolved: &ResolvedPackage, - config: &BuildDepsConfig, -) -> ProcessResult { - match graph.find_compatible_node(node_index, &edge_info.name, &edge_info.spec) { - FindResult::Reuse(existing_index) => { - graph.mark_dependency_resolved(edge_info.edge_id, existing_index); - update_node_type_from_edge(graph, node_index, existing_index, &edge_info.edge_type); - ProcessResult::Reused(existing_index) - } - FindResult::Conflict(conflict_parent) | FindResult::New(conflict_parent) => { - let new_node = create_package_node(&edge_info.name, resolved, conflict_parent, graph); - let new_index = graph.add_node(new_node); - graph.add_physical_edge(conflict_parent, new_index); - graph.mark_dependency_resolved(edge_info.edge_id, new_index); - update_node_type_from_edge(graph, node_index, new_index, &edge_info.edge_type); - add_edges_from( - graph, - new_index, - &*resolved.manifest, - &EdgeContext::new(config.peer_deps, DevDeps::Exclude), - ); - ProcessResult::Created(new_index) - } - } -} - -fn chain_err( - graph: &DependencyGraph, - parent: NodeIndex, - edge: &DependencyEdgeInfo, - inner: ResolveError, -) -> ResolveError { - let mut chain = graph.logical_ancestry(parent); - chain.push((edge.name.clone(), edge.spec.clone())); - ResolveError::WithChain { - chain, - source: Box::new(inner), - } -} - -fn handle_processed( - graph: &DependencyGraph, - receiver: &E, - parent: NodeIndex, - edge: &DependencyEdgeInfo, - processed: &ProcessResult, - next_level: &mut Vec, -) { - match processed { - ProcessResult::Created(idx) => { - if let Some(node) = graph.get_node(*idx) { - receiver.on_event(BuildEvent::Resolved { - name: &edge.name, - version: &node.version, - }); - if let NodeManifest::Registry(ref manifest) = node.manifest { - let parent_path = graph.get_node(parent).map(|p| p.path.as_path()); - receiver.on_event(BuildEvent::PackagePlaced { - package: manifest.as_ref().into(), - path: &node.path, - parent_path, - }); - } - } - next_level.push(*idx); - } - ProcessResult::Reused(idx) => { - if let Some(node) = graph.get_node(*idx) { - receiver.on_event(BuildEvent::Reused { - name: &edge.name, - version: &node.version, - }); - } - } - ProcessResult::Skipped => { - receiver.on_event(BuildEvent::Skipped { - name: &edge.name, - spec: &edge.spec, - }); - } - } -} - /// Build the complete dependency tree using BFS traversal. /// /// This is the main entry point for dependency resolution. It starts from @@ -811,21 +678,20 @@ where /// Build the complete dependency tree with full configuration. /// -/// This is the most flexible entry point for dependency resolution. It performs: -/// 1. **Preload Phase** (unless skipped): Parallel fetch of all manifests to warm up caches -/// 2. **Build Phase**: Sequential BFS traversal reading from cache +/// This is the most flexible entry point for dependency resolution. It runs +/// demand BFS and schedules manifest jobs only when the current frontier needs +/// them. /// /// # Arguments /// * `graph` - The dependency graph (should have root node and initial edges) /// * `registry` - Registry client for fetching packages -/// * `config` - Build configuration (concurrency, peer_deps, skip_preload) +/// * `config` - Build configuration (concurrency, peer_deps, cache_dir, etc.) /// * `receiver` - Event receiver for handling build events /// /// # Example /// ```ignore /// let config = BuildDepsConfig::default() -/// .with_concurrency(50) -/// .with_skip_preload(true); // Skip preload if cache is warm +/// .with_concurrency(50); /// /// build_deps_with_config(&mut graph, ®istry, config, &receiver).await?; /// ``` @@ -835,135 +701,1008 @@ pub async fn build_deps_with_config( config: BuildDepsConfig, receiver: &E, ) -> Result<(), ResolveError> +where + R: ManifestProvider, + R::Error: Send, + E: EventReceiver, +{ + build_deps_with_config_output(graph, registry, config, receiver) + .await + .map(|_| ()) +} + +pub(crate) async fn build_deps_with_config_output( + graph: &mut DependencyGraph, + registry: &R, + config: BuildDepsConfig, + receiver: &E, +) -> Result> where R: ManifestProvider, R::Error: Send, E: EventReceiver, { tracing::debug!( - "Starting dependency tree build, peer_deps: {:?}, concurrency: {}, skip_preload: {}", + "Starting dependency tree build, peer_deps: {:?}, concurrency: {}", config.peer_deps, - config.concurrency, - config.skip_preload + config.concurrency ); - // Phase 1: Preload manifests in parallel (unless skipped) - run_preload_phase(graph, registry, &config, receiver).await; - - // Phase 2: BFS traversal to build the dependency tree - run_bfs_phase(graph, registry, &config, receiver).await?; + let manifest_cache = run_main_loop_bfs(graph, registry, &config, receiver).await?; receiver.on_event(BuildEvent::Complete { total_nodes: graph.graph.node_count(), }); - Ok(()) + Ok(manifest_cache) } -/// Run the preload phase to warm up the cache with manifests. -async fn run_preload_phase( - graph: &DependencyGraph, +type WaitingEdge = (NodeIndex, DependencyEdgeInfo); + +type VersionKey = (String, String); + +#[derive(Default)] +pub(crate) struct ResolverManifestCache { + entries: Vec<(String, String, Arc)>, +} + +impl ResolverManifestCache { + pub(crate) fn into_project_cache(self) -> ProjectCacheData { + let mut project_cache = ProjectCacheData::default(); + for (name, spec, manifest) in self.entries { + let version = manifest.version.clone(); + let pkg_cache = project_cache.cache.entry(name).or_default(); + pkg_cache.specs.insert(spec, version.clone()); + pkg_cache + .manifests + .entry(version) + .or_insert_with(|| (*manifest).clone()); + } + project_cache + } +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +enum FetchKey { + Full(String), + Version(String, String), +} + +impl ManifestJob { + fn key(&self) -> FetchKey { + match self { + Self::Full { name, .. } => FetchKey::Full(name.clone()), + Self::Version { name, spec, .. } | Self::ExtractVersion { name, spec, .. } => { + FetchKey::Version(name.clone(), spec.clone()) + } + } + } +} + +enum FetchDone { + Full { + name: String, + result: Result, + }, + Version { + name: String, + spec: String, + result: Result, String>, + }, +} + +impl FetchDone { + fn key(&self) -> FetchKey { + match self { + Self::Full { name, .. } => FetchKey::Full(name.clone()), + Self::Version { name, spec, .. } => FetchKey::Version(name.clone(), spec.clone()), + } + } +} + +type FetchFuture = tokio::task::JoinHandle; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum FetchPriority { + Demand, + Prefetch, +} + +#[derive(Default)] +struct FetchQueues { + demand: VecDeque, + prefetch: VecDeque, + queued: HashMap, + active: HashMap, +} + +impl FetchQueues { + fn enqueue(&mut self, request: ManifestJob, priority: FetchPriority) { + let key = request.key(); + if self.active.contains_key(&key) { + return; + } + + match (self.queued.get(&key).copied(), priority) { + (Some(FetchPriority::Demand), _) + | (Some(FetchPriority::Prefetch), FetchPriority::Prefetch) => {} + (Some(FetchPriority::Prefetch), FetchPriority::Demand) => { + self.queued.insert(key, FetchPriority::Demand); + self.demand.push_back(request); + } + (None, FetchPriority::Demand) => { + self.queued.insert(key, FetchPriority::Demand); + self.demand.push_back(request); + } + (None, FetchPriority::Prefetch) => { + self.queued.insert(key, FetchPriority::Prefetch); + self.prefetch.push_back(request); + } + } + } + + fn complete(&mut self, key: &FetchKey) { + self.queued.remove(key); + self.active.remove(key); + } + + fn pop_next(&mut self, prefetch_concurrency: usize) -> Option { + if let Some(request) = self.pop_priority(FetchPriority::Demand) { + return Some(request); + } + + let prefetch_concurrency = if self + .queued + .values() + .any(|priority| *priority == FetchPriority::Demand) + { + prefetch_concurrency + } else { + usize::MAX + }; + + if self.active_prefetches() >= prefetch_concurrency { + return None; + } + + self.pop_priority(FetchPriority::Prefetch) + } + + fn pop_priority(&mut self, priority: FetchPriority) -> Option { + loop { + let request = match priority { + FetchPriority::Demand => self.demand.pop_front(), + FetchPriority::Prefetch => self.prefetch.pop_front(), + }?; + let key = request.key(); + if self.queued.get(&key).copied() != Some(priority) { + continue; + } + self.queued.remove(&key); + self.active.insert(key, priority); + return Some(request); + } + } + + fn active_prefetches(&self) -> usize { + self.active + .values() + .filter(|priority| **priority == FetchPriority::Prefetch) + .count() + } +} + +fn prefetch_concurrency_limit(concurrency: usize) -> usize { + (concurrency / 4).max(1) +} + +#[derive(Default)] +struct ManifestState { + full_cache: HashMap>, + versions_cache: HashMap>, + version_cache: HashMap>, + full_waiters: HashMap>, + version_waiters: HashMap>, + full_failures: HashMap, + version_failures: HashMap, + fetch_queues: FetchQueues, +} + +impl ManifestState { + fn with_warm_project_cache(warm: Option<&ProjectCacheData>) -> Self { + let mut state = Self::default(); + let Some(warm) = warm else { + return state; + }; + for (name, pkg_cache) in &warm.cache { + for (spec, version) in &pkg_cache.specs { + let Some(manifest) = pkg_cache.manifests.get(version) else { + continue; + }; + let manifest = Arc::new(manifest.clone()); + state + .version_cache + .insert((name.clone(), spec.clone()), Arc::clone(&manifest)); + state + .version_cache + .entry((name.clone(), version.clone())) + .or_insert(manifest); + } + } + state + } + + fn into_resolver_cache(self) -> ResolverManifestCache { + ResolverManifestCache { + entries: self + .version_cache + .into_iter() + .map(|((name, spec), manifest)| (name, spec, manifest)) + .collect(), + } + } + + fn schedule_registry_fetch( + &mut self, + name: String, + spec: String, + supports_semver: bool, + priority: FetchPriority, + ) { + let (real_name, real_spec) = normalize_spec(&name, &spec); + if supports_semver { + let key = (real_name, real_spec); + if self.version_cache.contains_key(&key) || self.version_failures.contains_key(&key) { + return; + } + self.fetch_queues.enqueue( + ManifestJob::Version { + name: key.0.clone(), + spec: key.1.clone(), + fetch_spec: key.1, + format: version_metadata_format(supports_semver), + }, + priority, + ); + } else { + if self.full_cache.contains_key(&real_name) + || self.versions_cache.contains_key(&real_name) + || self.full_failures.contains_key(&real_name) + { + return; + } + self.fetch_queues.enqueue( + ManifestJob::Full { + name: real_name, + spec: Some(real_spec), + }, + priority, + ); + } + } + + fn enqueue_version_extract(&mut self, name: String, version: String, full: Arc) { + self.fetch_queues.enqueue( + ManifestJob::ExtractVersion { + name, + spec: version.clone(), + version, + full, + }, + FetchPriority::Demand, + ); + } + + fn enqueue_version_fetch(&mut self, name: String, fetch_spec: String, supports_semver: bool) { + self.fetch_queues.enqueue( + ManifestJob::Version { + name, + spec: fetch_spec.clone(), + fetch_spec, + format: version_metadata_format(supports_semver), + }, + FetchPriority::Demand, + ); + } + + fn schedule_transitive_prefetches( + &mut self, + manifest: &CoreVersionManifest, + peer_deps: PeerDeps, + supports_semver: bool, + ) { + for (name, spec) in collect_registry_prefetches(manifest, peer_deps) { + self.schedule_registry_fetch(name, spec, supports_semver, FetchPriority::Prefetch); + } + } + + fn apply_fetch_result( + &mut self, + done: FetchDone, + supports_semver: bool, + peer_deps: PeerDeps, + level_pending: &mut VecDeque, + ) { + let done_key = done.key(); + self.fetch_queues.complete(&done_key); + + match done { + FetchDone::Full { name, result } => { + match result { + Ok(ManifestFullData::Full { + manifest: full, + speculative, + }) => { + if let Some((resolved_spec, manifest)) = speculative { + self.version_cache + .insert((name.clone(), resolved_spec), Arc::clone(&manifest)); + self.version_cache + .entry((name.clone(), manifest.version.clone())) + .or_insert_with(|| Arc::clone(&manifest)); + self.schedule_transitive_prefetches( + &manifest, + peer_deps, + supports_semver, + ); + } + self.full_cache.insert(name.clone(), full); + } + Ok(ManifestFullData::Versions(versions)) => { + self.versions_cache.insert(name.clone(), versions); + } + Err(e) => { + self.full_failures.insert(name.clone(), e); + } + } + if let Some(waiters) = self.full_waiters.remove(&name) { + level_pending.extend(waiters); + } + } + FetchDone::Version { name, spec, result } => { + let key = (name, spec); + match result { + Ok(manifest) => { + self.version_cache + .insert(key.clone(), Arc::clone(&manifest)); + self.version_cache + .entry((key.0.clone(), manifest.version.clone())) + .or_insert_with(|| Arc::clone(&manifest)); + self.schedule_transitive_prefetches(&manifest, peer_deps, supports_semver); + } + Err(e) => { + self.version_failures.insert(key.clone(), e); + } + } + if let Some(waiters) = self.version_waiters.remove(&key) { + level_pending.extend(waiters); + } + } + } + } +} + +fn version_metadata_format(supports_semver: bool) -> MetadataFormat { + if supports_semver { + MetadataFormat::Abbreviated + } else { + MetadataFormat::Complete + } +} + +fn registry_error(message: impl Into) -> ResolveError +where + E: From, +{ + ResolveError::Registry(RegistryError(anyhow::anyhow!(message.into())).into()) +} + +async fn fetch_registry_manifest_inner(registry: R, request: ManifestJob) -> FetchDone +where + R: ManifestProvider, +{ + let key = request.key(); + match registry.execute_manifest_job(request).await { + Ok(done) => match done { + ManifestJobDone::Full { name, data } => FetchDone::Full { + name, + result: Ok(data), + }, + ManifestJobDone::Version { + name, + spec, + manifest, + } => FetchDone::Version { + name, + spec, + result: Ok(manifest), + }, + }, + Err(error) => match key { + FetchKey::Full(name) => FetchDone::Full { + name, + result: Err(format!("{error:#}")), + }, + FetchKey::Version(name, spec) => FetchDone::Version { + name, + spec, + result: Err(format!("{error:#}")), + }, + }, + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn fetch_registry_manifest(registry: R, request: ManifestJob) -> FetchFuture +where + R: ManifestProvider, + R::Error: Send, +{ + tokio::spawn(fetch_registry_manifest_inner(registry, request)) +} + +#[cfg(target_arch = "wasm32")] +fn fetch_registry_manifest(registry: R, request: ManifestJob) -> FetchFuture +where + R: ManifestProvider, +{ + tokio::task::spawn_local(fetch_registry_manifest_inner(registry, request)) +} + +fn pump_fetches( + fetches: &mut FuturesUnordered, + fetch_queues: &mut FetchQueues, registry: &R, + concurrency: usize, +) where + R: ManifestProvider, + R::Error: Send, +{ + let prefetch_concurrency = prefetch_concurrency_limit(concurrency); + while fetches.len() < concurrency { + let Some(request) = fetch_queues.pop_next(prefetch_concurrency) else { + break; + }; + fetches.push(fetch_registry_manifest(registry.clone(), request)); + } +} + +fn try_reuse_dependency( + graph: &mut DependencyGraph, + parent: NodeIndex, + edge: &DependencyEdgeInfo, +) -> Option { + match graph.find_compatible_node(parent, &edge.name, &edge.spec) { + FindResult::Reuse(existing_index) => { + graph.mark_dependency_resolved(edge.edge_id, existing_index); + update_node_type_from_edge(graph, parent, existing_index, &edge.edge_type); + Some(ProcessResult::Reused(existing_index)) + } + FindResult::Conflict(_) | FindResult::New(_) => None, + } +} + +pub fn process_dependency_with_resolved( + graph: &mut DependencyGraph, + node_index: NodeIndex, + edge_info: &DependencyEdgeInfo, + resolved: &ResolvedPackage, config: &BuildDepsConfig, +) -> ProcessResult { + match graph.find_compatible_node(node_index, &edge_info.name, &edge_info.spec) { + FindResult::Reuse(existing_index) => { + graph.mark_dependency_resolved(edge_info.edge_id, existing_index); + update_node_type_from_edge(graph, node_index, existing_index, &edge_info.edge_type); + ProcessResult::Reused(existing_index) + } + FindResult::Conflict(conflict_parent) | FindResult::New(conflict_parent) => { + let new_node = create_package_node(&edge_info.name, resolved, conflict_parent, graph); + let new_index = graph.add_node(new_node); + graph.add_physical_edge(conflict_parent, new_index); + graph.mark_dependency_resolved(edge_info.edge_id, new_index); + update_node_type_from_edge(graph, node_index, new_index, &edge_info.edge_type); + add_edges_from( + graph, + new_index, + &*resolved.manifest, + &EdgeContext::new(config.peer_deps, DevDeps::Exclude), + ); + ProcessResult::Created(new_index) + } + } +} + +fn chain_err( + graph: &DependencyGraph, + parent: NodeIndex, + edge: &DependencyEdgeInfo, + inner: ResolveError, +) -> ResolveError { + let mut chain = graph.logical_ancestry(parent); + chain.push((edge.name.clone(), edge.spec.clone())); + ResolveError::WithChain { + chain, + source: Box::new(inner), + } +} + +fn handle_processed( + graph: &DependencyGraph, receiver: &E, + parent: NodeIndex, + edge: &DependencyEdgeInfo, + processed: &ProcessResult, + next_level: &mut Vec, ) { - if config.skip_preload { - return; + match processed { + ProcessResult::Created(idx) => { + if let Some(node) = graph.get_node(*idx) { + receiver.on_event(BuildEvent::Resolved { + name: &edge.name, + version: &node.version, + }); + if let NodeManifest::Registry(ref manifest) = node.manifest { + let parent_path = graph.get_node(parent).map(|p| p.path.as_path()); + receiver.on_event(BuildEvent::PackagePlaced { + package: manifest.as_ref().into(), + path: &node.path, + parent_path, + }); + } + } + next_level.push(*idx); + } + ProcessResult::Reused(idx) => { + if let Some(node) = graph.get_node(*idx) { + receiver.on_event(BuildEvent::Reused { + name: &edge.name, + version: &node.version, + }); + } + } + ProcessResult::Skipped => { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + } } +} - let start = tokio::time::Instant::now(); +async fn handle_resolved_registry_manifest( + graph: &mut DependencyGraph, + registry: &R, + receiver: &E, + parent: NodeIndex, + edge: &DependencyEdgeInfo, + manifest: Arc, + config: &BuildDepsConfig, +) -> Result> +where + R: RegistryClient, + E: EventReceiver, +{ + let resolved = ResolvedPackage { + name: edge.name.clone(), + version: manifest.version.clone(), + manifest, + }; - let initial_deps = gather_preload_deps(graph, config.peer_deps); - if initial_deps.is_empty() { - return; - } + let processed = if graph + .check_override(parent, &edge.name, Some(&resolved.version)) + .is_some() + { + process_dependency(graph, registry, parent, edge, config) + .await + .map_err(|inner| chain_err(graph, parent, edge, inner))? + } else { + receiver.on_event(BuildEvent::PackageResolved((&*resolved.manifest).into())); + process_dependency_with_resolved(graph, parent, edge, &resolved, config) + }; - tracing::debug!("Preload phase: {} initial dependencies", initial_deps.len()); - receiver.on_event(BuildEvent::PreloadStart { - count: initial_deps.len(), - }); + Ok(processed) +} + +fn resolve_version_from_versions( + edge: &DependencyEdgeInfo, + package_name: &str, + versions: VersionsRef<'_>, + real_spec: &str, +) -> Result, ResolveError> { + if versions.versions.is_empty() { + if edge.edge_type == EdgeType::Optional { + return Ok(None); + } + return Err(ResolveError::NoVersions(package_name.to_string())); + } - let preload_config = PreloadConfig { - peer_deps: config.peer_deps, - concurrency: config.concurrency, + let version = match resolve_target_version(versions, real_spec) { + Ok(version) => version, + Err(_) if edge.edge_type == EdgeType::Optional => return Ok(None), + Err(e) => { + return Err(ResolveError::Version(format!( + "{}@{}: {}", + edge.name, real_spec, e + ))); + } }; + Ok(Some(version)) +} - let stats = preload_manifests( - initial_deps, - registry, - preload_config, - receiver, - |_name, _manifest| { - // Registry client's resolve_package should cache the manifest - }, - ) - .await; +fn resolve_version_from_full_manifest( + edge: &DependencyEdgeInfo, + full: &FullManifest, + real_spec: &str, +) -> Result, ResolveError> { + resolve_version_from_versions(edge, &full.name, full.into(), real_spec) +} - tracing::debug!( - "Preload phase completed: {} success, {} failed", - stats.success_count, - stats.failed_count - ); - receiver.on_event(BuildEvent::PreloadComplete { - success: stats.success_count, - failed: stats.failed_count, +fn collect_registry_prefetches( + manifest: &CoreVersionManifest, + peer_deps: PeerDeps, +) -> Vec<(String, String)> { + let mut deps = Vec::new(); + manifest.for_each_dep(peer_deps, DevDeps::Exclude, |_, name, spec| { + if spec.is_registry_spec() { + deps.push((name.to_string(), spec.to_string())); + } }); - - tracing::debug!("Preload phase: {:?}", start.elapsed()); + deps } -/// Run the BFS traversal phase to build the dependency tree. -async fn run_bfs_phase( +async fn run_main_loop_bfs( graph: &mut DependencyGraph, registry: &R, config: &BuildDepsConfig, receiver: &E, -) -> Result<(), ResolveError> { - let start = tokio::time::Instant::now(); +) -> Result> +where + R: ManifestProvider, + R::Error: Send, + E: EventReceiver, +{ + let supports_semver = registry.supports_semver_resolution(); + let concurrency = config.concurrency.max(1); + + let mut state = ManifestState::with_warm_project_cache(config.warm_project_cache.as_ref()); + let mut fetches: FuturesUnordered = FuturesUnordered::new(); - let mut current_level = vec![graph.root_index]; + let root_idx = graph.root_index; + let mut current_level = vec![root_idx]; while !current_level.is_empty() { receiver.on_event(BuildEvent::LevelStart { node_count: current_level.len(), }); + let mut next_level = Vec::new(); + let mut level_pending = VecDeque::new(); - for node_index in current_level { - // Add workspace nodes to next level - for (_, dep) in graph.get_dependency_edges(node_index) { + for node_index in ¤t_level { + for (_, dep) in graph.get_dependency_edges(*node_index) { if dep.valid && let Some(to) = dep.to && let Some(n) = graph.get_node(to) && n.is_workspace() - && node_index == graph.root_index + && *node_index == root_idx { next_level.push(to); } } - // Process unresolved dependencies - let unresolved = collect_unresolved_edges(graph, node_index); + let unresolved = collect_unresolved_edges(graph, *node_index); receiver.on_event(BuildEvent::DependencyCount { count: unresolved.len(), }); + for edge in unresolved { + level_pending.push_back((*node_index, edge)); + } + } - for edge_info in unresolved { - receiver.on_event(BuildEvent::Resolving { - name: &edge_info.name, - }); - let processed = process_dependency(graph, registry, node_index, &edge_info, config) - .await - .map_err(|inner| chain_err(graph, node_index, &edge_info, inner))?; - handle_processed( - graph, - receiver, - node_index, - &edge_info, - &processed, - &mut next_level, + loop { + pump_fetches(&mut fetches, &mut state.fetch_queues, registry, concurrency); + + while let Some((parent, edge)) = level_pending.pop_front() { + receiver.on_event(BuildEvent::Resolving { name: &edge.name }); + + if !edge.spec.is_registry_spec() { + let processed = process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))?; + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + continue; + } + + if let Some(processed) = try_reuse_dependency(graph, parent, &edge) { + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + continue; + } + + let (real_name, real_spec) = normalize_spec(&edge.name, &edge.spec); + if supports_semver { + let key = (real_name.clone(), real_spec.clone()); + if let Some(error) = state.version_failures.get(&key) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}@{}: {error}", real_name, real_spec)), + )); + } + + if let Some(manifest) = state.version_cache.get(&key).cloned() { + let processed = handle_resolved_registry_manifest( + graph, registry, receiver, parent, &edge, manifest, config, + ) + .await?; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + state + .version_waiters + .entry(key.clone()) + .or_default() + .push((parent, edge)); + state.schedule_registry_fetch( + key.0, + key.1, + supports_semver, + FetchPriority::Demand, + ); + } else { + if let Some(error) = state.full_failures.get(&real_name) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}: {error}", real_name)), + )); + } + + let version_key = (real_name.clone(), real_spec.clone()); + if let Some(error) = state.version_failures.get(&version_key) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}@{}: {error}", real_name, real_spec)), + )); + } + + if let Some(manifest) = state.version_cache.get(&version_key).cloned() { + let processed = handle_resolved_registry_manifest( + graph, registry, receiver, parent, &edge, manifest, config, + ) + .await?; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + if let Some(full) = state.full_cache.get(&real_name).cloned() { + let Some(resolved_version) = + resolve_version_from_full_manifest::( + &edge, &full, &real_spec, + ) + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + else { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + }; + + let exact_key = (real_name.clone(), resolved_version.clone()); + if let Some(error) = state.version_failures.get(&exact_key) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}@{}: {error}", real_name, real_spec)), + )); + } + + if let Some(manifest) = state.version_cache.get(&exact_key).cloned() { + state + .version_cache + .insert(version_key, Arc::clone(&manifest)); + let processed = handle_resolved_registry_manifest( + graph, registry, receiver, parent, &edge, manifest, config, + ) + .await?; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + state + .version_waiters + .entry(exact_key) + .or_default() + .push((parent, edge)); + state.enqueue_version_extract(real_name, resolved_version, full); + continue; + } + + if let Some(versions) = state.versions_cache.get(&real_name).cloned() { + let Some(resolved_version) = resolve_version_from_versions::( + &edge, + &real_name, + (&*versions).into(), + &real_spec, + ) + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + else { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + }; + + let exact_key = (real_name.clone(), resolved_version.clone()); + if let Some(error) = state.version_failures.get(&exact_key) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}@{}: {error}", real_name, real_spec)), + )); + } + + if let Some(manifest) = state.version_cache.get(&exact_key).cloned() { + state + .version_cache + .insert(version_key, Arc::clone(&manifest)); + let processed = handle_resolved_registry_manifest( + graph, registry, receiver, parent, &edge, manifest, config, + ) + .await?; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + state + .version_waiters + .entry(exact_key) + .or_default() + .push((parent, edge)); + state.enqueue_version_fetch(real_name, resolved_version, supports_semver); + continue; + } + + state + .full_waiters + .entry(real_name.clone()) + .or_default() + .push((parent, edge)); + state.schedule_registry_fetch( + real_name, + real_spec, + supports_semver, + FetchPriority::Demand, + ); + } + + pump_fetches(&mut fetches, &mut state.fetch_queues, registry, concurrency); + } + + loop { + let ready = std::future::poll_fn(|cx| match fetches.poll_next_unpin(cx) { + std::task::Poll::Ready(done) => std::task::Poll::Ready(done), + std::task::Poll::Pending => std::task::Poll::Ready(None), + }) + .await; + let Some(done) = ready else { + break; + }; + let done = done.map_err(|e| { + registry_error::(format!("manifest fetch task failed: {e}")) + })?; + + state.apply_fetch_result( + done, + supports_semver, + config.peer_deps, + &mut level_pending, ); } + + if !level_pending.is_empty() { + continue; + } + + if !state.full_waiters.is_empty() || !state.version_waiters.is_empty() { + pump_fetches(&mut fetches, &mut state.fetch_queues, registry, concurrency); + } + + if state.full_waiters.is_empty() && state.version_waiters.is_empty() { + break; + } + + let Some(done) = fetches.next().await else { + let mut fallback = Vec::new(); + for (_, waiters) in state.full_waiters.drain() { + fallback.extend(waiters); + } + for (_, waiters) in state.version_waiters.drain() { + fallback.extend(waiters); + } + for (parent, edge) in fallback { + let processed = process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))?; + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + } + break; + }; + let done = done.map_err(|e| { + registry_error::(format!("manifest fetch task failed: {e}")) + })?; + + state.apply_fetch_result(done, supports_semver, config.peer_deps, &mut level_pending); } receiver.on_event(BuildEvent::LevelComplete { @@ -972,8 +1711,7 @@ async fn run_bfs_phase( current_level = next_level; } - tracing::debug!("Build phase: {:?}", start.elapsed()); - Ok(()) + Ok(state.into_resolver_cache()) } // ============================================================================ @@ -1347,13 +2085,6 @@ mod tests { assert_eq!(edges.get("lodash"), Some(&"^4.17.0".to_string())); assert_eq!(edges.get("react"), Some(&"^18.0.0".to_string())); assert_eq!(edges.get("tslib"), Some(&"^2.0.0".to_string())); - - // Since edges are now resolved, gather_preload_deps should find them - let deps = gather_preload_deps(&graph, PeerDeps::Skip); - let deps_map: HashMap = deps.into_iter().collect(); - assert_eq!(deps_map.get("lodash"), Some(&"^4.17.0".to_string())); - assert_eq!(deps_map.get("react"), Some(&"^18.0.0".to_string())); - assert_eq!(deps_map.get("tslib"), Some(&"^2.0.0".to_string())); } #[test] @@ -1381,9 +2112,5 @@ mod tests { .map(|(_, e)| (e.name.clone(), e.spec.clone())) .collect(); assert_eq!(edges.get("missing-pkg"), Some(&"catalog:".to_string())); - - // gather_preload_deps should NOT include it (not a registry spec) - let deps = gather_preload_deps(&graph, PeerDeps::Skip); - assert!(deps.is_empty()); } } diff --git a/crates/ruborist/src/service/api.rs b/crates/ruborist/src/service/api.rs index 7df51a303..ed4520684 100644 --- a/crates/ruborist/src/service/api.rs +++ b/crates/ruborist/src/service/api.rs @@ -19,13 +19,12 @@ //! let json = serde_json::to_string_pretty(&output.lock)?; //! ``` +use anyhow::Result; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use anyhow::Result; - -use super::cache::{PackageCache, ProjectCacheData}; +use super::cache::ProjectCacheData; use super::fs::Glob; use super::registry::UnifiedRegistry; use super::store::{ManifestStore, NoopStore}; @@ -33,9 +32,8 @@ use crate::model::graph::{DependencyGraph, PackageNode}; use crate::model::node::EdgeType; use crate::model::package_json::PackageJson; use crate::model::package_lock::PackageLock; -use crate::model::util::parse_package_spec; use crate::resolver::builder::{ - BuildDepsConfig, DevDeps, EdgeContext, PeerDeps, add_edges_from, build_deps_with_config, + BuildDepsConfig, DevDeps, EdgeContext, PeerDeps, add_edges_from, build_deps_with_config_output, }; use crate::resolver::runtime::install_runtime_from_map; use crate::resolver::workspace::WorkspaceDiscovery; @@ -55,7 +53,8 @@ pub struct BuildDepsOptions { /// (everything is in-memory). pub manifest_store: Arc, /// Project-level warm cache pre-loaded by the host. Pre-populates the - /// in-memory manifest cache to skip the preload phase on a warm install. + /// in-memory manifest cache so demand BFS can satisfy warm specs without + /// network requests. pub warm_project_cache: Option, /// Maximum concurrent network requests pub concurrency: usize, @@ -204,24 +203,11 @@ where add_edges_from(&mut graph, workspace_index, &ws_pkg, &edge_ctx); } - // 7. Create in-memory package cache and pre-populate from the warm - // project cache (host-supplied; `None` for cold runs). - let package_cache = Arc::new(PackageCache::default()); - let (cache_count, missing_count) = - prepopulate_warm_cache(&package_cache, warm_project_cache.as_ref()); - if missing_count > 0 { - tracing::warn!( - "Project cache has {missing_count} specs with missing manifests, will re-fetch from registry" - ); - } - if cache_count > 0 { - tracing::debug!("Loaded {cache_count} manifests from project cache"); - } - - // 8. Create registry client with shared cache and persistence backend. + // 7. Create registry client with persistence backend. The resolver loop + // owns the in-memory manifest cache; the provider handles only I/O, + // parsing, and optional persistent store lookups/writes. let mut builder = UnifiedRegistry::builder() .registry(®istry_url) - .cache(package_cache) .store(Arc::clone(&manifest_store)); if let Some(semver) = supports_semver { builder = builder.supports_semver(semver); @@ -234,44 +220,25 @@ where registry.supports_semver(), ); - let skip_preload = cache_count > 0; let mut config = BuildDepsConfig::default() .with_peer_deps(peer_deps) .with_concurrency(concurrency) - .with_skip_preload(skip_preload) .with_catalogs(catalogs) .with_warm_project_cache(warm_project_cache); if let Some(dir) = cache_dir { config = config.with_cache_dir(dir); } - if skip_preload { - tracing::debug!( - "Skipping preload phase (project cache has {} entries)", - cache_count - ); - } - // Preserve the typed error via `Error::new` + `.context(...)` so CLI // renderers (e.g. pm's format_print) can downcast and pretty-print the // dependency chain carried by `ResolveError::WithChain`. - build_deps_with_config(&mut graph, ®istry, config, &receiver) + let manifest_cache = build_deps_with_config_output(&mut graph, ®istry, config, &receiver) .await .map_err(|e| anyhow::Error::new(e).context("Dependency resolution failed"))?; let (packages, _total) = graph.serialize_to_packages(&root_path); - // Export project cache from memory cache for the host to persist. - let mut project_cache = ProjectCacheData::default(); - for (key, manifest) in registry.cache().export_version_manifests() { - // `parse_package_spec` rather than `split_once('@')` so scoped names - // (`@babel/core@^7.0.0`) parse to (`@babel/core`, `^7.0.0`). - let (name, spec) = parse_package_spec(&key); - let version = manifest.version.clone(); - let pkg_cache = project_cache.cache.entry(name.to_string()).or_default(); - pkg_cache.specs.insert(spec.to_string(), version.clone()); - pkg_cache.manifests.insert(version, (*manifest).clone()); - } + let project_cache = manifest_cache.into_project_cache(); Ok(BuildDepsOutput { lock: PackageLock::new(&pkg.name, &pkg.version, packages), @@ -279,32 +246,6 @@ where }) } -/// Pre-populate `cache` from a warm project cache. Returns -/// `(loaded, missing)` — `loaded` is the count of usable spec→manifest -/// entries; `missing` counts specs whose resolved version had no manifest -/// (corrupted cache, will be re-fetched). -fn prepopulate_warm_cache(cache: &PackageCache, warm: Option<&ProjectCacheData>) -> (usize, usize) { - let Some(warm) = warm else { - return (0, 0); - }; - let mut loaded = 0; - let mut missing = 0; - for (name, pkg_cache) in &warm.cache { - for (spec, version) in &pkg_cache.specs { - let Some(manifest) = pkg_cache.manifests.get(version) else { - tracing::debug!( - "Project cache missing manifest: {name}@{spec} (version {version})" - ); - missing += 1; - continue; - }; - cache.set_version_manifest(name.clone(), spec.clone(), Arc::new(manifest.clone())); - loaded += 1; - } - } - (loaded, missing) -} - #[cfg(test)] mod tests { use super::*;