Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/pm/src/helper/ruborist_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::util::logger::ProgressReceiver;
use crate::util::manifest_store::DiskManifestStore;
use crate::util::project_cache;
use crate::util::user_config::{
get_catalogs, get_manifests_concurrency_limit, get_peer_deps, get_registry, get_supports_semver,
get_catalogs, get_peer_deps, get_registry, get_resolver_manifests_concurrency_limit,
get_supports_semver,
};

/// Tokio-based glob implementation.
Expand Down Expand Up @@ -57,7 +58,7 @@ impl Context {
cache_dir: Some(get_cache_dir()),
manifest_store: Self::manifest_store(),
warm_project_cache,
concurrency: get_manifests_concurrency_limit().await,
concurrency: get_resolver_manifests_concurrency_limit().await,
peer_deps: get_peer_deps().await,
glob: TokioGlob,
receiver,
Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct Cli {
#[arg(long, global = true, action = clap::ArgAction::SetTrue)]
legacy_peer_deps: Option<bool>,

/// Maximum concurrent manifest fetches (default: 64)
/// Maximum concurrent manifest fetches (default: 64; npmjs/non-semver resolver: 256)
#[arg(long, global = true)]
manifests_concurrency_limit: Option<usize>,

Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/service/pipeline/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
}));

// Should not forward other events
receiver.on_event(BuildEvent::PreloadStart { count: 10 });
receiver.on_event(BuildEvent::LevelStart { node_count: 10 });

// Only one message should be in the download channel
assert!(channels.download_rx.try_recv().is_ok());
Expand Down
60 changes: 0 additions & 60 deletions crates/pm/src/util/json.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::fs::File;
use std::io::{self, BufWriter, Write};
use std::path::Path;

use anyhow::{Context, Result};
use serde::Serialize;
use serde::de::DeserializeOwned;

/// Read and parse a JSON file into the specified type.
Expand All @@ -15,34 +12,6 @@ pub async fn read_json_file<T: DeserializeOwned>(path: &Path) -> Result<T> {
serde_json::from_slice(&bytes).with_context(|| format!("Failed to parse JSON from {path:?}"))
}

/// Serialize `value` as compact JSON and stream it to `path` through a
/// [`BufWriter`], skipping the intermediate `Vec<u8>` that
/// `serde_json::to_vec` + `std::fs::write` would allocate.
///
/// Synchronous on purpose: the caller in [`crate::util::manifest_store`] runs
/// it on a dedicated OS thread so manifest persistence never touches the
/// async runtime's worker or blocking pool. Async callers should wrap this
/// in `tokio::task::spawn_blocking` (or write the async-aware counterpart
/// when one is needed).
///
/// The parent directory of `path` must already exist; this helper does *not*
/// `mkdir -p`. The cost-benefit of "try the write first, recover on
/// `NotFound`" is policy-level (warm-cache rewrites want to skip the extra
/// syscall every time), so the recovery loop lives at the call site, not
/// here. A missing parent surfaces as [`io::ErrorKind::NotFound`] for the
/// caller to match on.
///
/// Serialization failures — rare for `derive(Serialize)` types, possible for
/// hand-written impls or maps with non-string keys — are folded into
/// [`io::Error`] via [`io::Error::other`] so the whole API speaks one error
/// type and callers can keep matching on [`io::ErrorKind`].
pub fn write_compact_sync<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, value).map_err(io::Error::other)?;
writer.flush()
}

/// Load package.json from a directory path and deserialize into the caller's
/// chosen view type `T`. Use a full `PackageJson` for root projects, or a
/// minimal view (e.g. `ScriptsView`) for node_modules to avoid parsing
Expand Down Expand Up @@ -166,35 +135,6 @@ mod tests {
assert_eq!(view.scripts.get("test").unwrap(), "node build/test.js");
}

#[tokio::test]
async fn write_compact_sync_round_trips_through_read_json_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("out.json");
let value = json!({
"name": "test",
"version": "1.0.0",
"deps": ["a", "b", "c"],
});

super::write_compact_sync(&path, &value).unwrap();

let read_back: Value = read_json_file(&path).await.unwrap();
assert_eq!(read_back, value);

// Compact form: no inter-token whitespace.
let raw = std::fs::read_to_string(&path).unwrap();
assert!(!raw.contains(": "));
assert!(!raw.contains(", "));
}

#[test]
fn write_compact_sync_requires_existing_parent_directory() {
let dir = tempdir().unwrap();
let path = dir.path().join("missing").join("out.json");
let err = super::write_compact_sync(&path, &json!({})).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
}

#[tokio::test]
async fn test_error_handling() {
let non_existent_path = Path::new("non_existent.json");
Expand Down
15 changes: 0 additions & 15 deletions crates/pm/src/util/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,6 @@ impl utoo_ruborist::progress::EventReceiver for ProgressReceiver {
}
use utoo_ruborist::progress::BuildEvent;
match event {
BuildEvent::PreloadStart { count } | BuildEvent::PreloadQueued { count } => {
PROGRESS_BAR.inc_length(count as u64);
}
BuildEvent::PreloadFetching { name } => {
log_progress(&format!("fetching {}", name));
}
BuildEvent::PreloadProgress { name, .. } => {
PROGRESS_BAR.inc(1);
log_progress(&format!("resolved {}", name));
}
BuildEvent::PreloadComplete { success, failed } => {
PROGRESS_BAR.set_position(0);
PROGRESS_BAR.set_length(0);
log_progress(&format!("preload: {} ok, {} failed", success, failed));
}
BuildEvent::DependencyCount { count } => {
PROGRESS_BAR.inc_length(count as u64);
}
Expand Down
48 changes: 20 additions & 28 deletions crates/pm/src/util/manifest_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,17 @@
//! Serialization and file writes run on a dedicated writer thread so manifest
//! persistence does not occupy async runtime workers or Tokio's blocking pool.

use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::mpsc::{self, SyncSender, TrySendError};
use std::sync::mpsc::{self, Sender};
use std::thread::JoinHandle;

use async_trait::async_trait;
use serde::Serialize;
use utoo_ruborist::model::manifest::CoreVersionManifest;
use utoo_ruborist::service::{ManifestStore, VersionsInfo};

use crate::util::json::{read_json_file, write_compact_sync};

/// Opportunistic writer backlog. If disk stalls beyond this, new cache writes
/// are dropped instead of letting resolver memory grow without bound.
const MANIFEST_WRITE_QUEUE_CAPACITY: usize = 1024;
use crate::util::json::read_json_file;

pub struct DiskManifestStore {
cache_dir: PathBuf,
Expand Down Expand Up @@ -112,13 +106,13 @@ enum ManifestWriteJob {
}

struct ManifestWriter {
tx: SyncSender<ManifestWriteJob>,
tx: Sender<ManifestWriteJob>,
handle: JoinHandle<()>,
}

impl ManifestWriter {
fn spawn() -> Self {
let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY);
let (tx, rx) = mpsc::channel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The switch from a bounded sync_channel to an unbounded channel removes the backpressure mechanism. The previous implementation used a capacity of 1024 and dropped writes when full, which prevented unbounded memory growth if disk I/O stalled. Since manifest persistence is described as 'opportunistic' (line 150), dropping writes is a safer strategy than risking an OOM during large-scale dependency resolution.

let handle = std::thread::Builder::new()
.name("utoo-manifest-store".to_string())
.spawn(move || {
Expand All @@ -138,14 +132,8 @@ impl ManifestWriter {
}

fn enqueue(&self, job: ManifestWriteJob) {
match self.tx.try_send(job) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
tracing::debug!("Manifest store writer queue full; dropping cache write");
}
Err(TrySendError::Disconnected(_)) => {
tracing::debug!("Manifest store writer stopped before accepting write");
}
if self.tx.send(job).is_err() {
tracing::debug!("Manifest store writer stopped before accepting write");
}
}

Expand All @@ -157,23 +145,27 @@ impl ManifestWriter {
}
}

/// Apply the manifest-cache write policy on top of
/// [`crate::util::json::write_compact_sync`]: on `NotFound`, create the
/// parent directory once and retry — this is how the resolver hot path
/// avoids the up-front `mkdir` syscall on every warm-cache rewrite. All
/// errors are swallowed at the `debug` log level because the disk cache is
/// opportunistic; a dropped write only costs a future cache miss.
/// Serialize `value` and write to `path`. On `NotFound`, create the parent
/// directory and retry once — saves the mkdir syscall on every warm-cache
/// rewrite. Errors are logged at debug; disk cache is opportunistic.
fn write_json_sync<T: Serialize>(path: &Path, value: &T) {
match write_compact_sync(path, value) {
let bytes = match serde_json::to_vec(value) {
Ok(b) => b,
Err(e) => {
tracing::debug!("Failed to serialize {path:?}: {e}");
return;
}
};
Comment on lines +152 to +158
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using serde_json::to_vec allocates a full Vec<u8> for every manifest before writing. The previous implementation used serde_json::to_writer with a BufWriter, which streamed the JSON directly to the file. For large manifests, this new approach increases memory pressure and allocation overhead, which seems contrary to the 'perf' goal of this PR.

match std::fs::write(path, &bytes) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::NotFound => {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if let Some(parent) = path.parent()
&& let Err(e) = fs::create_dir_all(parent)
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::debug!("Failed to create {parent:?}: {e}");
return;
}
if let Err(e) = write_compact_sync(path, value) {
if let Err(e) = std::fs::write(path, &bytes) {
tracing::debug!("Failed to write {path:?}: {e}");
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/pm/src/util/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Disk persistence for ruborist's project-level manifest cache.
//!
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to skip the
//! preload phase on warm installs.
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to warm the
//! demand resolver's in-memory manifest cache across installs.

use std::path::{Path, PathBuf};

Expand Down
2 changes: 1 addition & 1 deletion crates/pm/src/util/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn build_dns_cached_client() -> reqwest::Client {
.read_timeout(std::time::Duration::from_secs(30)) // Timeout for individual read operations
// No total timeout - large files (e.g. node binary ~100MB) need longer download time
// No pool_max_idle_per_host - let reqwest manage connections freely
// Concurrency is controlled by semaphore in preload service
// Concurrency is controlled by each caller's semaphore.
.build()
.expect("Failed to build reqwest client")
}
Expand Down
90 changes: 87 additions & 3 deletions crates/pm/src/util/user_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,58 @@ pub fn get_install_scope() -> InstallScope {
INSTALL_SCOPE.get().copied().unwrap_or_default()
}

// Manifest fetch concurrency configuration
static MANIFESTS_CONCURRENCY_LIMIT: LazyLock<ConfigValue<usize>> =
LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 64));
// Manifest fetch concurrency configuration.
//
// Keep the user-visible/default tarball download limit at 64. Registry
// resolution can opt into a higher default for non-semver registries via
// `get_resolver_manifests_concurrency_limit`; tarball download/extract still
// uses `get_manifests_concurrency_limit_sync` so install IO is not inflated.
const DEFAULT_MANIFESTS_CONCURRENCY_LIMIT: usize = 64;
const NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT: usize = 256;

static MANIFESTS_CONCURRENCY_LIMIT: LazyLock<ConfigValue<usize>> = LazyLock::new(|| {
ConfigValue::new(
"manifests-concurrency-limit",
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
)
});
static MANIFESTS_CONCURRENCY_CLI_SET: OnceLock<()> = OnceLock::new();

pub fn set_manifests_concurrency_limit(value: Option<usize>) {
if value.is_some() {
let _ = MANIFESTS_CONCURRENCY_CLI_SET.set(());
}
MANIFESTS_CONCURRENCY_LIMIT.set(value);
}

pub async fn get_manifests_concurrency_limit() -> usize {
MANIFESTS_CONCURRENCY_LIMIT.get().await
}

fn resolver_manifest_concurrency_limit(
configured_limit: usize,
cli_set: bool,
supports_semver: Option<bool>,
) -> usize {
if !cli_set
&& configured_limit == DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
&& supports_semver == Some(false)
{
NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT
} else {
configured_limit
}
}

pub async fn get_resolver_manifests_concurrency_limit() -> usize {
let limit = get_manifests_concurrency_limit().await;
resolver_manifest_concurrency_limit(
limit,
MANIFESTS_CONCURRENCY_CLI_SET.get().is_some(),
get_supports_semver(),
)
}

pub fn get_manifests_concurrency_limit_sync() -> usize {
MANIFESTS_CONCURRENCY_LIMIT.get_sync()
}
Expand Down Expand Up @@ -358,6 +398,50 @@ mod tests {
assert!(!config.parse_config_value("anything"));
}

#[test]
fn test_resolver_manifest_concurrency_raises_npmjs_default() {
assert_eq!(
resolver_manifest_concurrency_limit(
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
false,
Some(false),
),
NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT
);
}

#[test]
fn test_resolver_manifest_concurrency_preserves_explicit_limit() {
assert_eq!(
resolver_manifest_concurrency_limit(32, true, Some(false)),
32
);
assert_eq!(
resolver_manifest_concurrency_limit(
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
true,
Some(false),
),
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
);
}

#[test]
fn test_resolver_manifest_concurrency_preserves_semver_default() {
assert_eq!(
resolver_manifest_concurrency_limit(
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
false,
Some(true),
),
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
);
assert_eq!(
resolver_manifest_concurrency_limit(DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, false, None),
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
);
}

#[tokio::test]
async fn test_cache_dir_from_config_file() -> Result<()> {
let temp_dir = TempDir::new()?;
Expand Down
Loading