Skip to content

Commit 94579a5

Browse files
committed
perf(pm): add resolver pm wiring
1 parent e68b1b8 commit 94579a5

9 files changed

Lines changed: 115 additions & 113 deletions

File tree

crates/pm/src/helper/ruborist_context.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use crate::util::logger::ProgressReceiver;
1212
use crate::util::manifest_store::DiskManifestStore;
1313
use crate::util::project_cache;
1414
use crate::util::user_config::{
15-
get_catalogs, get_manifests_concurrency_limit, get_peer_deps, get_registry, get_supports_semver,
15+
get_catalogs, get_peer_deps, get_registry, get_resolver_manifests_concurrency_limit,
16+
get_supports_semver,
1617
};
1718

1819
/// Tokio-based glob implementation.
@@ -57,7 +58,7 @@ impl Context {
5758
cache_dir: Some(get_cache_dir()),
5859
manifest_store: Self::manifest_store(),
5960
warm_project_cache,
60-
concurrency: get_manifests_concurrency_limit().await,
61+
concurrency: get_resolver_manifests_concurrency_limit().await,
6162
peer_deps: get_peer_deps().await,
6263
glob: TokioGlob,
6364
receiver,

crates/pm/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ struct Cli {
9292
#[arg(long, global = true, action = clap::ArgAction::SetTrue)]
9393
legacy_peer_deps: Option<bool>,
9494

95-
/// Maximum concurrent manifest fetches (default: 64)
95+
/// Maximum concurrent manifest fetches (default: 64; npmjs/non-semver resolver: 256)
9696
#[arg(long, global = true)]
9797
manifests_concurrency_limit: Option<usize>,
9898

crates/pm/src/service/pipeline/receiver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ mod tests {
123123
}));
124124

125125
// Should not forward other events
126-
receiver.on_event(BuildEvent::PreloadStart { count: 10 });
126+
receiver.on_event(BuildEvent::LevelStart { node_count: 10 });
127127

128128
// Only one message should be in the download channel
129129
assert!(channels.download_rx.try_recv().is_ok());

crates/pm/src/util/json.rs

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use std::fs::File;
2-
use std::io::{self, BufWriter, Write};
31
use std::path::Path;
42

53
use anyhow::{Context, Result};
6-
use serde::Serialize;
74
use serde::de::DeserializeOwned;
85

96
/// Read and parse a JSON file into the specified type.
@@ -15,34 +12,6 @@ pub async fn read_json_file<T: DeserializeOwned>(path: &Path) -> Result<T> {
1512
serde_json::from_slice(&bytes).with_context(|| format!("Failed to parse JSON from {path:?}"))
1613
}
1714

18-
/// Serialize `value` as compact JSON and stream it to `path` through a
19-
/// [`BufWriter`], skipping the intermediate `Vec<u8>` that
20-
/// `serde_json::to_vec` + `std::fs::write` would allocate.
21-
///
22-
/// Synchronous on purpose: the caller in [`crate::util::manifest_store`] runs
23-
/// it on a dedicated OS thread so manifest persistence never touches the
24-
/// async runtime's worker or blocking pool. Async callers should wrap this
25-
/// in `tokio::task::spawn_blocking` (or write the async-aware counterpart
26-
/// when one is needed).
27-
///
28-
/// The parent directory of `path` must already exist; this helper does *not*
29-
/// `mkdir -p`. The cost-benefit of "try the write first, recover on
30-
/// `NotFound`" is policy-level (warm-cache rewrites want to skip the extra
31-
/// syscall every time), so the recovery loop lives at the call site, not
32-
/// here. A missing parent surfaces as [`io::ErrorKind::NotFound`] for the
33-
/// caller to match on.
34-
///
35-
/// Serialization failures — rare for `derive(Serialize)` types, possible for
36-
/// hand-written impls or maps with non-string keys — are folded into
37-
/// [`io::Error`] via [`io::Error::other`] so the whole API speaks one error
38-
/// type and callers can keep matching on [`io::ErrorKind`].
39-
pub fn write_compact_sync<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
40-
let file = File::create(path)?;
41-
let mut writer = BufWriter::new(file);
42-
serde_json::to_writer(&mut writer, value).map_err(io::Error::other)?;
43-
writer.flush()
44-
}
45-
4615
/// Load package.json from a directory path and deserialize into the caller's
4716
/// chosen view type `T`. Use a full `PackageJson` for root projects, or a
4817
/// minimal view (e.g. `ScriptsView`) for node_modules to avoid parsing
@@ -166,35 +135,6 @@ mod tests {
166135
assert_eq!(view.scripts.get("test").unwrap(), "node build/test.js");
167136
}
168137

169-
#[tokio::test]
170-
async fn write_compact_sync_round_trips_through_read_json_file() {
171-
let dir = tempdir().unwrap();
172-
let path = dir.path().join("out.json");
173-
let value = json!({
174-
"name": "test",
175-
"version": "1.0.0",
176-
"deps": ["a", "b", "c"],
177-
});
178-
179-
super::write_compact_sync(&path, &value).unwrap();
180-
181-
let read_back: Value = read_json_file(&path).await.unwrap();
182-
assert_eq!(read_back, value);
183-
184-
// Compact form: no inter-token whitespace.
185-
let raw = std::fs::read_to_string(&path).unwrap();
186-
assert!(!raw.contains(": "));
187-
assert!(!raw.contains(", "));
188-
}
189-
190-
#[test]
191-
fn write_compact_sync_requires_existing_parent_directory() {
192-
let dir = tempdir().unwrap();
193-
let path = dir.path().join("missing").join("out.json");
194-
let err = super::write_compact_sync(&path, &json!({})).unwrap_err();
195-
assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
196-
}
197-
198138
#[tokio::test]
199139
async fn test_error_handling() {
200140
let non_existent_path = Path::new("non_existent.json");

crates/pm/src/util/logger.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -219,21 +219,6 @@ impl utoo_ruborist::progress::EventReceiver for ProgressReceiver {
219219
}
220220
use utoo_ruborist::progress::BuildEvent;
221221
match event {
222-
BuildEvent::PreloadStart { count } | BuildEvent::PreloadQueued { count } => {
223-
PROGRESS_BAR.inc_length(count as u64);
224-
}
225-
BuildEvent::PreloadFetching { name } => {
226-
log_progress(&format!("fetching {}", name));
227-
}
228-
BuildEvent::PreloadProgress { name, .. } => {
229-
PROGRESS_BAR.inc(1);
230-
log_progress(&format!("resolved {}", name));
231-
}
232-
BuildEvent::PreloadComplete { success, failed } => {
233-
PROGRESS_BAR.set_position(0);
234-
PROGRESS_BAR.set_length(0);
235-
log_progress(&format!("preload: {} ok, {} failed", success, failed));
236-
}
237222
BuildEvent::DependencyCount { count } => {
238223
PROGRESS_BAR.inc_length(count as u64);
239224
}

crates/pm/src/util/manifest_store.rs

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,17 @@
1111
//! Serialization and file writes run on a dedicated writer thread so manifest
1212
//! persistence does not occupy async runtime workers or Tokio's blocking pool.
1313
14-
use std::fs;
15-
use std::io::ErrorKind;
1614
use std::path::{Path, PathBuf};
1715
use std::sync::Arc;
18-
use std::sync::mpsc::{self, SyncSender, TrySendError};
16+
use std::sync::mpsc::{self, Sender};
1917
use std::thread::JoinHandle;
2018

2119
use async_trait::async_trait;
2220
use serde::Serialize;
2321
use utoo_ruborist::model::manifest::CoreVersionManifest;
2422
use utoo_ruborist::service::{ManifestStore, VersionsInfo};
2523

26-
use crate::util::json::{read_json_file, write_compact_sync};
27-
28-
/// Opportunistic writer backlog. If disk stalls beyond this, new cache writes
29-
/// are dropped instead of letting resolver memory grow without bound.
30-
const MANIFEST_WRITE_QUEUE_CAPACITY: usize = 1024;
24+
use crate::util::json::read_json_file;
3125

3226
pub struct DiskManifestStore {
3327
cache_dir: PathBuf,
@@ -112,13 +106,13 @@ enum ManifestWriteJob {
112106
}
113107

114108
struct ManifestWriter {
115-
tx: SyncSender<ManifestWriteJob>,
109+
tx: Sender<ManifestWriteJob>,
116110
handle: JoinHandle<()>,
117111
}
118112

119113
impl ManifestWriter {
120114
fn spawn() -> Self {
121-
let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY);
115+
let (tx, rx) = mpsc::channel();
122116
let handle = std::thread::Builder::new()
123117
.name("utoo-manifest-store".to_string())
124118
.spawn(move || {
@@ -138,14 +132,8 @@ impl ManifestWriter {
138132
}
139133

140134
fn enqueue(&self, job: ManifestWriteJob) {
141-
match self.tx.try_send(job) {
142-
Ok(()) => {}
143-
Err(TrySendError::Full(_)) => {
144-
tracing::debug!("Manifest store writer queue full; dropping cache write");
145-
}
146-
Err(TrySendError::Disconnected(_)) => {
147-
tracing::debug!("Manifest store writer stopped before accepting write");
148-
}
135+
if self.tx.send(job).is_err() {
136+
tracing::debug!("Manifest store writer stopped before accepting write");
149137
}
150138
}
151139

@@ -157,23 +145,27 @@ impl ManifestWriter {
157145
}
158146
}
159147

160-
/// Apply the manifest-cache write policy on top of
161-
/// [`crate::util::json::write_compact_sync`]: on `NotFound`, create the
162-
/// parent directory once and retry — this is how the resolver hot path
163-
/// avoids the up-front `mkdir` syscall on every warm-cache rewrite. All
164-
/// errors are swallowed at the `debug` log level because the disk cache is
165-
/// opportunistic; a dropped write only costs a future cache miss.
148+
/// Serialize `value` and write to `path`. On `NotFound`, create the parent
149+
/// directory and retry once — saves the mkdir syscall on every warm-cache
150+
/// rewrite. Errors are logged at debug; disk cache is opportunistic.
166151
fn write_json_sync<T: Serialize>(path: &Path, value: &T) {
167-
match write_compact_sync(path, value) {
152+
let bytes = match serde_json::to_vec(value) {
153+
Ok(b) => b,
154+
Err(e) => {
155+
tracing::debug!("Failed to serialize {path:?}: {e}");
156+
return;
157+
}
158+
};
159+
match std::fs::write(path, &bytes) {
168160
Ok(()) => {}
169-
Err(e) if e.kind() == ErrorKind::NotFound => {
161+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
170162
if let Some(parent) = path.parent()
171-
&& let Err(e) = fs::create_dir_all(parent)
163+
&& let Err(e) = std::fs::create_dir_all(parent)
172164
{
173165
tracing::debug!("Failed to create {parent:?}: {e}");
174166
return;
175167
}
176-
if let Err(e) = write_compact_sync(path, value) {
168+
if let Err(e) = std::fs::write(path, &bytes) {
177169
tracing::debug!("Failed to write {path:?}: {e}");
178170
}
179171
}

crates/pm/src/util/project_cache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Disk persistence for ruborist's project-level manifest cache.
22
//!
3-
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to skip the
4-
//! preload phase on warm installs.
3+
//! Stored at `<root>/node_modules/.utoo-manifest.json`. Used to warm the
4+
//! demand resolver's in-memory manifest cache across installs.
55
66
use std::path::{Path, PathBuf};
77

crates/pm/src/util/retry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub fn build_dns_cached_client() -> reqwest::Client {
3030
.read_timeout(std::time::Duration::from_secs(30)) // Timeout for individual read operations
3131
// No total timeout - large files (e.g. node binary ~100MB) need longer download time
3232
// No pool_max_idle_per_host - let reqwest manage connections freely
33-
// Concurrency is controlled by semaphore in preload service
33+
// Concurrency is controlled by each caller's semaphore.
3434
.build()
3535
.expect("Failed to build reqwest client")
3636
}

crates/pm/src/util/user_config.rs

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,18 +132,58 @@ pub fn get_install_scope() -> InstallScope {
132132
INSTALL_SCOPE.get().copied().unwrap_or_default()
133133
}
134134

135-
// Manifest fetch concurrency configuration
136-
static MANIFESTS_CONCURRENCY_LIMIT: LazyLock<ConfigValue<usize>> =
137-
LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 64));
135+
// Manifest fetch concurrency configuration.
136+
//
137+
// Keep the user-visible/default tarball download limit at 64. Registry
138+
// resolution can opt into a higher default for non-semver registries via
139+
// `get_resolver_manifests_concurrency_limit`; tarball download/extract still
140+
// uses `get_manifests_concurrency_limit_sync` so install IO is not inflated.
141+
const DEFAULT_MANIFESTS_CONCURRENCY_LIMIT: usize = 64;
142+
const NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT: usize = 256;
143+
144+
static MANIFESTS_CONCURRENCY_LIMIT: LazyLock<ConfigValue<usize>> = LazyLock::new(|| {
145+
ConfigValue::new(
146+
"manifests-concurrency-limit",
147+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
148+
)
149+
});
150+
static MANIFESTS_CONCURRENCY_CLI_SET: OnceLock<()> = OnceLock::new();
138151

139152
pub fn set_manifests_concurrency_limit(value: Option<usize>) {
153+
if value.is_some() {
154+
let _ = MANIFESTS_CONCURRENCY_CLI_SET.set(());
155+
}
140156
MANIFESTS_CONCURRENCY_LIMIT.set(value);
141157
}
142158

143159
pub async fn get_manifests_concurrency_limit() -> usize {
144160
MANIFESTS_CONCURRENCY_LIMIT.get().await
145161
}
146162

163+
fn resolver_manifest_concurrency_limit(
164+
configured_limit: usize,
165+
cli_set: bool,
166+
supports_semver: Option<bool>,
167+
) -> usize {
168+
if !cli_set
169+
&& configured_limit == DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
170+
&& supports_semver == Some(false)
171+
{
172+
NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT
173+
} else {
174+
configured_limit
175+
}
176+
}
177+
178+
pub async fn get_resolver_manifests_concurrency_limit() -> usize {
179+
let limit = get_manifests_concurrency_limit().await;
180+
resolver_manifest_concurrency_limit(
181+
limit,
182+
MANIFESTS_CONCURRENCY_CLI_SET.get().is_some(),
183+
get_supports_semver(),
184+
)
185+
}
186+
147187
pub fn get_manifests_concurrency_limit_sync() -> usize {
148188
MANIFESTS_CONCURRENCY_LIMIT.get_sync()
149189
}
@@ -358,6 +398,50 @@ mod tests {
358398
assert!(!config.parse_config_value("anything"));
359399
}
360400

401+
#[test]
402+
fn test_resolver_manifest_concurrency_raises_npmjs_default() {
403+
assert_eq!(
404+
resolver_manifest_concurrency_limit(
405+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
406+
false,
407+
Some(false),
408+
),
409+
NON_SEMVER_RESOLVER_CONCURRENCY_LIMIT
410+
);
411+
}
412+
413+
#[test]
414+
fn test_resolver_manifest_concurrency_preserves_explicit_limit() {
415+
assert_eq!(
416+
resolver_manifest_concurrency_limit(32, true, Some(false)),
417+
32
418+
);
419+
assert_eq!(
420+
resolver_manifest_concurrency_limit(
421+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
422+
true,
423+
Some(false),
424+
),
425+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
426+
);
427+
}
428+
429+
#[test]
430+
fn test_resolver_manifest_concurrency_preserves_semver_default() {
431+
assert_eq!(
432+
resolver_manifest_concurrency_limit(
433+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT,
434+
false,
435+
Some(true),
436+
),
437+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
438+
);
439+
assert_eq!(
440+
resolver_manifest_concurrency_limit(DEFAULT_MANIFESTS_CONCURRENCY_LIMIT, false, None),
441+
DEFAULT_MANIFESTS_CONCURRENCY_LIMIT
442+
);
443+
}
444+
361445
#[tokio::test]
362446
async fn test_cache_dir_from_config_file() -> Result<()> {
363447
let temp_dir = TempDir::new()?;

0 commit comments

Comments
 (0)