From 56101d88ac167391cf2e8d20a06bb270a2d995dd Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Fri, 22 May 2026 02:25:22 +0800 Subject: [PATCH] perf(pm): add resolver pm wiring --- crates/pm/src/helper/ruborist_context.rs | 5 +- crates/pm/src/main.rs | 2 +- crates/pm/src/service/pipeline/receiver.rs | 2 +- crates/pm/src/util/json.rs | 60 --------------- crates/pm/src/util/logger.rs | 15 ---- crates/pm/src/util/manifest_store.rs | 48 +++++------- crates/pm/src/util/project_cache.rs | 4 +- crates/pm/src/util/retry.rs | 2 +- crates/pm/src/util/user_config.rs | 90 +++++++++++++++++++++- 9 files changed, 115 insertions(+), 113 deletions(-) diff --git a/crates/pm/src/helper/ruborist_context.rs b/crates/pm/src/helper/ruborist_context.rs index b47def019..49bebc738 100644 --- a/crates/pm/src/helper/ruborist_context.rs +++ b/crates/pm/src/helper/ruborist_context.rs @@ -12,7 +12,8 @@ use crate::util::logger::ProgressReceiver; use crate::util::manifest_store::DiskManifestStore; use crate::util::project_cache; use crate::util::user_config::{ - get_catalogs, get_manifests_concurrency_limit, get_peer_deps, get_registry, get_supports_semver, + get_catalogs, get_peer_deps, get_registry, get_resolver_manifests_concurrency_limit, + get_supports_semver, }; /// Tokio-based glob implementation. @@ -57,7 +58,7 @@ impl Context { cache_dir: Some(get_cache_dir()), manifest_store: Self::manifest_store(), warm_project_cache, - concurrency: get_manifests_concurrency_limit().await, + concurrency: get_resolver_manifests_concurrency_limit().await, peer_deps: get_peer_deps().await, glob: TokioGlob, receiver, diff --git a/crates/pm/src/main.rs b/crates/pm/src/main.rs index 3ec93fc1c..f9235cf33 100644 --- a/crates/pm/src/main.rs +++ b/crates/pm/src/main.rs @@ -92,7 +92,7 @@ struct Cli { #[arg(long, global = true, action = clap::ArgAction::SetTrue)] legacy_peer_deps: Option, - /// Maximum concurrent manifest fetches (default: 64) + /// Maximum concurrent manifest fetches (default: 64; npmjs/non-semver resolver: 256) #[arg(long, global = true)] manifests_concurrency_limit: Option, diff --git a/crates/pm/src/service/pipeline/receiver.rs b/crates/pm/src/service/pipeline/receiver.rs index 376a5b50c..283e6b626 100644 --- a/crates/pm/src/service/pipeline/receiver.rs +++ b/crates/pm/src/service/pipeline/receiver.rs @@ -123,7 +123,7 @@ mod tests { })); // Should not forward other events - receiver.on_event(BuildEvent::PreloadStart { count: 10 }); + receiver.on_event(BuildEvent::LevelStart { node_count: 10 }); // Only one message should be in the download channel assert!(channels.download_rx.try_recv().is_ok()); diff --git a/crates/pm/src/util/json.rs b/crates/pm/src/util/json.rs index 1c75f1d6c..9243152e2 100644 --- a/crates/pm/src/util/json.rs +++ b/crates/pm/src/util/json.rs @@ -1,9 +1,6 @@ -use std::fs::File; -use std::io::{self, BufWriter, Write}; use std::path::Path; use anyhow::{Context, Result}; -use serde::Serialize; use serde::de::DeserializeOwned; /// Read and parse a JSON file into the specified type. @@ -15,34 +12,6 @@ pub async fn read_json_file(path: &Path) -> Result { serde_json::from_slice(&bytes).with_context(|| format!("Failed to parse JSON from {path:?}")) } -/// Serialize `value` as compact JSON and stream it to `path` through a -/// [`BufWriter`], skipping the intermediate `Vec` that -/// `serde_json::to_vec` + `std::fs::write` would allocate. -/// -/// Synchronous on purpose: the caller in [`crate::util::manifest_store`] runs -/// it on a dedicated OS thread so manifest persistence never touches the -/// async runtime's worker or blocking pool. Async callers should wrap this -/// in `tokio::task::spawn_blocking` (or write the async-aware counterpart -/// when one is needed). -/// -/// The parent directory of `path` must already exist; this helper does *not* -/// `mkdir -p`. The cost-benefit of "try the write first, recover on -/// `NotFound`" is policy-level (warm-cache rewrites want to skip the extra -/// syscall every time), so the recovery loop lives at the call site, not -/// here. A missing parent surfaces as [`io::ErrorKind::NotFound`] for the -/// caller to match on. -/// -/// Serialization failures — rare for `derive(Serialize)` types, possible for -/// hand-written impls or maps with non-string keys — are folded into -/// [`io::Error`] via [`io::Error::other`] so the whole API speaks one error -/// type and callers can keep matching on [`io::ErrorKind`]. -pub fn write_compact_sync(path: &Path, value: &T) -> io::Result<()> { - let file = File::create(path)?; - let mut writer = BufWriter::new(file); - serde_json::to_writer(&mut writer, value).map_err(io::Error::other)?; - writer.flush() -} - /// Load package.json from a directory path and deserialize into the caller's /// chosen view type `T`. Use a full `PackageJson` for root projects, or a /// minimal view (e.g. `ScriptsView`) for node_modules to avoid parsing @@ -166,35 +135,6 @@ mod tests { assert_eq!(view.scripts.get("test").unwrap(), "node build/test.js"); } - #[tokio::test] - async fn write_compact_sync_round_trips_through_read_json_file() { - let dir = tempdir().unwrap(); - let path = dir.path().join("out.json"); - let value = json!({ - "name": "test", - "version": "1.0.0", - "deps": ["a", "b", "c"], - }); - - super::write_compact_sync(&path, &value).unwrap(); - - let read_back: Value = read_json_file(&path).await.unwrap(); - assert_eq!(read_back, value); - - // Compact form: no inter-token whitespace. - let raw = std::fs::read_to_string(&path).unwrap(); - assert!(!raw.contains(": ")); - assert!(!raw.contains(", ")); - } - - #[test] - fn write_compact_sync_requires_existing_parent_directory() { - let dir = tempdir().unwrap(); - let path = dir.path().join("missing").join("out.json"); - let err = super::write_compact_sync(&path, &json!({})).unwrap_err(); - assert_eq!(err.kind(), std::io::ErrorKind::NotFound); - } - #[tokio::test] async fn test_error_handling() { let non_existent_path = Path::new("non_existent.json"); diff --git a/crates/pm/src/util/logger.rs b/crates/pm/src/util/logger.rs index 6ba124bb2..864d60193 100644 --- a/crates/pm/src/util/logger.rs +++ b/crates/pm/src/util/logger.rs @@ -219,21 +219,6 @@ impl utoo_ruborist::progress::EventReceiver for ProgressReceiver { } use utoo_ruborist::progress::BuildEvent; match event { - BuildEvent::PreloadStart { count } | BuildEvent::PreloadQueued { count } => { - PROGRESS_BAR.inc_length(count as u64); - } - BuildEvent::PreloadFetching { name } => { - log_progress(&format!("fetching {}", name)); - } - BuildEvent::PreloadProgress { name, .. } => { - PROGRESS_BAR.inc(1); - log_progress(&format!("resolved {}", name)); - } - BuildEvent::PreloadComplete { success, failed } => { - PROGRESS_BAR.set_position(0); - PROGRESS_BAR.set_length(0); - log_progress(&format!("preload: {} ok, {} failed", success, failed)); - } BuildEvent::DependencyCount { count } => { PROGRESS_BAR.inc_length(count as u64); } diff --git a/crates/pm/src/util/manifest_store.rs b/crates/pm/src/util/manifest_store.rs index a2c60c65e..3fb846af9 100644 --- a/crates/pm/src/util/manifest_store.rs +++ b/crates/pm/src/util/manifest_store.rs @@ -11,11 +11,9 @@ //! Serialization and file writes run on a dedicated writer thread so manifest //! persistence does not occupy async runtime workers or Tokio's blocking pool. -use std::fs; -use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::sync::mpsc::{self, SyncSender, TrySendError}; +use std::sync::mpsc::{self, Sender}; use std::thread::JoinHandle; use async_trait::async_trait; @@ -23,11 +21,7 @@ use serde::Serialize; use utoo_ruborist::model::manifest::CoreVersionManifest; use utoo_ruborist::service::{ManifestStore, VersionsInfo}; -use crate::util::json::{read_json_file, write_compact_sync}; - -/// Opportunistic writer backlog. If disk stalls beyond this, new cache writes -/// are dropped instead of letting resolver memory grow without bound. -const MANIFEST_WRITE_QUEUE_CAPACITY: usize = 1024; +use crate::util::json::read_json_file; pub struct DiskManifestStore { cache_dir: PathBuf, @@ -112,13 +106,13 @@ enum ManifestWriteJob { } struct ManifestWriter { - tx: SyncSender, + tx: Sender, handle: JoinHandle<()>, } impl ManifestWriter { fn spawn() -> Self { - let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY); + let (tx, rx) = mpsc::channel(); let handle = std::thread::Builder::new() .name("utoo-manifest-store".to_string()) .spawn(move || { @@ -138,14 +132,8 @@ impl ManifestWriter { } fn enqueue(&self, job: ManifestWriteJob) { - match self.tx.try_send(job) { - Ok(()) => {} - Err(TrySendError::Full(_)) => { - tracing::debug!("Manifest store writer queue full; dropping cache write"); - } - Err(TrySendError::Disconnected(_)) => { - tracing::debug!("Manifest store writer stopped before accepting write"); - } + if self.tx.send(job).is_err() { + tracing::debug!("Manifest store writer stopped before accepting write"); } } @@ -157,23 +145,27 @@ impl ManifestWriter { } } -/// Apply the manifest-cache write policy on top of -/// [`crate::util::json::write_compact_sync`]: on `NotFound`, create the -/// parent directory once and retry — this is how the resolver hot path -/// avoids the up-front `mkdir` syscall on every warm-cache rewrite. All -/// errors are swallowed at the `debug` log level because the disk cache is -/// opportunistic; a dropped write only costs a future cache miss. +/// Serialize `value` and write to `path`. On `NotFound`, create the parent +/// directory and retry once — saves the mkdir syscall on every warm-cache +/// rewrite. Errors are logged at debug; disk cache is opportunistic. fn write_json_sync(path: &Path, value: &T) { - match write_compact_sync(path, value) { + let bytes = match serde_json::to_vec(value) { + Ok(b) => b, + Err(e) => { + tracing::debug!("Failed to serialize {path:?}: {e}"); + return; + } + }; + match std::fs::write(path, &bytes) { Ok(()) => {} - Err(e) if e.kind() == ErrorKind::NotFound => { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { if let Some(parent) = path.parent() - && let Err(e) = fs::create_dir_all(parent) + && let Err(e) = std::fs::create_dir_all(parent) { tracing::debug!("Failed to create {parent:?}: {e}"); return; } - if let Err(e) = write_compact_sync(path, value) { + if let Err(e) = std::fs::write(path, &bytes) { tracing::debug!("Failed to write {path:?}: {e}"); } } diff --git a/crates/pm/src/util/project_cache.rs b/crates/pm/src/util/project_cache.rs index f4b0269ff..ccfb27de9 100644 --- a/crates/pm/src/util/project_cache.rs +++ b/crates/pm/src/util/project_cache.rs @@ -1,7 +1,7 @@ //! Disk persistence for ruborist's project-level manifest cache. //! -//! Stored at `/node_modules/.utoo-manifest.json`. Used to skip the -//! preload phase on warm installs. +//! Stored at `/node_modules/.utoo-manifest.json`. Used to warm the +//! demand resolver's in-memory manifest cache across installs. use std::path::{Path, PathBuf}; diff --git a/crates/pm/src/util/retry.rs b/crates/pm/src/util/retry.rs index 896141422..a210d9f6f 100644 --- a/crates/pm/src/util/retry.rs +++ b/crates/pm/src/util/retry.rs @@ -30,7 +30,7 @@ pub fn build_dns_cached_client() -> reqwest::Client { .read_timeout(std::time::Duration::from_secs(30)) // Timeout for individual read operations // No total timeout - large files (e.g. node binary ~100MB) need longer download time // No pool_max_idle_per_host - let reqwest manage connections freely - // Concurrency is controlled by semaphore in preload service + // Concurrency is controlled by each caller's semaphore. .build() .expect("Failed to build reqwest client") } diff --git a/crates/pm/src/util/user_config.rs b/crates/pm/src/util/user_config.rs index 34ee45a34..994033d7d 100644 --- a/crates/pm/src/util/user_config.rs +++ b/crates/pm/src/util/user_config.rs @@ -132,11 +132,27 @@ pub fn get_install_scope() -> InstallScope { INSTALL_SCOPE.get().copied().unwrap_or_default() } -// Manifest fetch concurrency configuration -static MANIFESTS_CONCURRENCY_LIMIT: LazyLock> = - LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 64)); +// Manifest fetch concurrency configuration. +// +// Keep the user-visible/default tarball download limit at 64. Registry +// resolution can opt into a higher default for non-semver registries via +// `get_resolver_manifests_concurrency_limit`; tarball download/extract still +// uses `get_manifests_concurrency_limit_sync` so install IO is not inflated. +const DEFAULT_MANIFESTS_CONCURRENCY_LIMIT: usize = 64; +const NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT: usize = 256; + +static MANIFESTS_CONCURRENCY_LIMIT: LazyLock> = LazyLock::new(|| { + ConfigValue::new( + "manifests-concurrency-limit", + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, + ) +}); +static MANIFESTS_CONCURRENCY_CLI_SET: OnceLock<()> = OnceLock::new(); pub fn set_manifests_concurrency_limit(value: Option) { + if value.is_some() { + let _ = MANIFESTS_CONCURRENCY_CLI_SET.set(()); + } MANIFESTS_CONCURRENCY_LIMIT.set(value); } @@ -144,6 +160,30 @@ pub async fn get_manifests_concurrency_limit() -> usize { MANIFESTS_CONCURRENCY_LIMIT.get().await } +fn resolver_manifest_concurrency_limit( + configured_limit: usize, + cli_set: bool, + supports_semver: Option, +) -> usize { + if !cli_set + && configured_limit == DEFAULT_MANIFESTS_CONCURRENCY_LIMIT + && supports_semver == Some(false) + { + NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT + } else { + configured_limit + } +} + +pub async fn get_resolver_manifests_concurrency_limit() -> usize { + let limit = get_manifests_concurrency_limit().await; + resolver_manifest_concurrency_limit( + limit, + MANIFESTS_CONCURRENCY_CLI_SET.get().is_some(), + get_supports_semver(), + ) +} + pub fn get_manifests_concurrency_limit_sync() -> usize { MANIFESTS_CONCURRENCY_LIMIT.get_sync() } @@ -358,6 +398,50 @@ mod tests { assert!(!config.parse_config_value("anything")); } + #[test] + fn test_resolver_manifest_concurrency_raises_npmjs_default() { + assert_eq!( + resolver_manifest_concurrency_limit( + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, + false, + Some(false), + ), + NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT + ); + } + + #[test] + fn test_resolver_manifest_concurrency_preserves_explicit_limit() { + assert_eq!( + resolver_manifest_concurrency_limit(32, true, Some(false)), + 32 + ); + assert_eq!( + resolver_manifest_concurrency_limit( + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, + true, + Some(false), + ), + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT + ); + } + + #[test] + fn test_resolver_manifest_concurrency_preserves_semver_default() { + assert_eq!( + resolver_manifest_concurrency_limit( + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, + false, + Some(true), + ), + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT + ); + assert_eq!( + resolver_manifest_concurrency_limit(DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, false, None), + DEFAULT_MANIFESTS_CONCURRENCY_LIMIT + ); + } + #[tokio::test] async fn test_cache_dir_from_config_file() -> Result<()> { let temp_dir = TempDir::new()?;