perf(pm): source install scheduler architecture#3029
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new InstallScheduler to manage concurrent package downloads and extractions, replacing the previous pipeline-based implementation. It also increases the manifest fetch concurrency limit for non-semver registries to 256. Feedback focuses on improving memory efficiency and robustness: specifically, tightening backpressure in the download queue to prevent excessive buffering, removing inappropriate panic recovery logic in clone workers, and restoring bounded channels and streaming serialization in the manifest store to prevent memory growth and unnecessary allocations.
|
|
||
| fn pump_downloads(&mut self) { | ||
| while self.download_active.len() < self.download_limit | ||
| && self.extract_backlog() < self.download_limit |
There was a problem hiding this comment.
The backpressure check self.extract_backlog() < self.download_limit may allow an excessively large number of downloaded tarballs to be buffered in memory if download_limit is high (e.g., 256 for non-semver registries). Since extraction is limited by extract_limit (max 8), the backlog should be constrained by a smaller value relative to the extraction capacity to prevent high memory usage.
| && self.extract_backlog() < self.download_limit | |
| && self.extract_backlog() < self.extract_limit * 2 |
| let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { | ||
| clone_package_from_cache_sync( | ||
| &job.spec.package.name, | ||
| &job.spec.package.version, | ||
| &job.spec.package.tarball_url, | ||
| &job.cache_path, | ||
| &job.spec.target, | ||
| ) | ||
| .map_err(|e| format!("{e:#}")) | ||
| })) | ||
| .unwrap_or_else(|_| Err("install clone worker panicked".to_string())); |
There was a problem hiding this comment.
The use of std::panic::catch_unwind here violates the general rule regarding panic handling. Panics should be treated as unrecoverable bugs rather than being caught and converted into errors. If a clone worker panics, it should be allowed to propagate or cause the process to terminate.
let result = clone_package_from_cache_sync(
&job.spec.package.name,
&job.spec.package.version,
&job.spec.package.tarball_url,
&job.cache_path,
&job.spec.target,
)
.map_err(|e| format!("{e:#}"));References
- Do not implement recovery logic for panics. Panics should be treated as unrecoverable bugs that need to be fixed, not as transient, recoverable errors.
| use std::path::{Path, PathBuf}; | ||
| use std::sync::Arc; | ||
| use std::sync::mpsc::{self, SyncSender, TrySendError}; | ||
| use std::sync::mpsc::{self, Sender}; |
|
|
||
| struct ManifestWriter { | ||
| tx: SyncSender<ManifestWriteJob>, | ||
| tx: Sender<ManifestWriteJob>, |
| impl ManifestWriter { | ||
| fn spawn() -> Self { | ||
| let (tx, rx) = mpsc::sync_channel(MANIFEST_WRITE_QUEUE_CAPACITY); | ||
| let (tx, rx) = mpsc::channel(); |
There was a problem hiding this comment.
Changing the manifest writer channel to an unbounded channel removes protection against memory growth if disk I/O stalls. Restoring a bounded sync_channel allows the store to maintain its opportunistic write policy and drop writes when the backlog is too large.
| let (tx, rx) = mpsc::channel(); | |
| let (tx, rx) = mpsc::sync_channel(1024); |
| if self.tx.send(job).is_err() { | ||
| tracing::debug!("Manifest store writer stopped before accepting write"); | ||
| } |
There was a problem hiding this comment.
The manifest writer should use try_send to maintain its opportunistic nature, dropping writes if the queue is full rather than blocking the resolver or allowing unbounded memory growth.
match self.tx.try_send(job) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
tracing::debug!("Manifest store writer queue full; dropping cache write");
}
Err(TrySendError::Disconnected(_)) => {
tracing::debug!("Manifest store writer stopped before accepting write");
}
}| let bytes = match serde_json::to_vec(value) { | ||
| Ok(b) => b, | ||
| Err(e) => { | ||
| tracing::debug!("Failed to serialize {path:?}: {e}"); | ||
| return; | ||
| } | ||
| }; | ||
| match std::fs::write(path, &bytes) { | ||
| Ok(()) => {} | ||
| Err(e) if e.kind() == ErrorKind::NotFound => { | ||
| Err(e) if e.kind() == std::io::ErrorKind::NotFound => { | ||
| if let Some(parent) = path.parent() | ||
| && let Err(e) = fs::create_dir_all(parent) | ||
| && let Err(e) = std::fs::create_dir_all(parent) | ||
| { | ||
| tracing::debug!("Failed to create {parent:?}: {e}"); | ||
| return; | ||
| } | ||
| if let Err(e) = write_compact_sync(path, value) { | ||
| if let Err(e) = std::fs::write(path, &bytes) { | ||
| tracing::debug!("Failed to write {path:?}: {e}"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Using serde_json::to_vec followed by std::fs::write allocates a full Vec<u8> for the serialized manifest, which can be large. Restoring the use of BufWriter and serde_json::to_writer is more memory-efficient as it streams the data directly to the file.
let write_to_file = |p: &std::path::Path| -> std::io::Result<()> {
let file = std::fs::File::create(p)?;
let mut writer = std::io::BufWriter::new(file);
serde_json::to_writer(&mut writer, value).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
writer.flush()
};
match write_to_file(path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if let Some(parent) = path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::debug!("Failed to create {parent:?}: {e}");
return;
}
if let Err(e) = write_to_file(path) {
tracing::debug!("Failed to write {path:?}: {e}");
}
}
Err(e) => tracing::debug!("Failed to write {path:?}: {e}"),
}8d7b025 to
1b9becd
Compare
1b9becd to
22beccd
Compare
22beccd to
d826277
Compare
d826277 to
0964dc0
Compare
0964dc0 to
c244b67
Compare
📊 pm-bench-phases ·
|
| PM | wall | ±σ | user | sys | RSS | pgMinor |
|---|---|---|---|---|---|---|
| bun | 9.29s | 0.56s | 10.42s | 10.34s | 770M | 336.7K |
| utoo-next | 8.03s | 0.11s | 10.46s | 12.24s | 970M | 125.5K |
| utoo-npm | 9.69s | 1.19s | 10.90s | 13.00s | 979M | 119.6K |
| utoo | 8.66s | 1.81s | 10.57s | 12.41s | 998M | 116.8K |
| PM | vCtx | iCtx | netRX | netTX | cache | node_mod | lock |
|---|---|---|---|---|---|---|---|
| bun | 13.7K | 17.9K | 1.19G | 6M | 1.86G | 1.75G | 1M |
| utoo-next | 108.3K | 80.4K | 1.16G | 5M | 1.71G | 1.70G | 2M |
| utoo-npm | 148.6K | 104.1K | 1.16G | 5M | 1.71G | 1.70G | 2M |
| utoo | 122.7K | 77.3K | 1.16G | 5M | 1.71G | 1.70G | 2M |
p1_resolve
| PM | wall | ±σ | user | sys | RSS | pgMinor |
|---|---|---|---|---|---|---|
| bun | 2.04s | 0.14s | 4.07s | 1.08s | 513M | 171.0K |
| utoo-next | 2.92s | 0.01s | 5.30s | 1.67s | 611M | 80.3K |
| utoo-npm | 3.23s | 0.36s | 5.51s | 2.05s | 614M | 79.1K |
| utoo | 2.92s | 0.10s | 5.28s | 1.75s | 609M | 87.6K |
| PM | vCtx | iCtx | netRX | netTX | cache | node_mod | lock |
|---|---|---|---|---|---|---|---|
| bun | 8.1K | 4.6K | 204M | 3M | 108M | - | 1M |
| utoo-next | 45.2K | 67.7K | 200M | 2M | 7M | 3M | 2M |
| utoo-npm | 70.4K | 87.6K | 200M | 2M | 7M | 3M | 2M |
| utoo | 46.3K | 67.8K | 200M | 2M | 7M | 3M | 2M |
p3_cold_install
| PM | wall | ±σ | user | sys | RSS | pgMinor |
|---|---|---|---|---|---|---|
| bun | 7.12s | 0.10s | 6.31s | 10.11s | 625M | 199.9K |
| utoo-next | 7.37s | 1.72s | 5.04s | 11.32s | 502M | 64.3K |
| utoo-npm | 9.63s | 3.21s | 5.25s | 11.67s | 549M | 71.0K |
| utoo | 5.74s | 0.20s | 4.93s | 10.87s | 456M | 59.9K |
| PM | vCtx | iCtx | netRX | netTX | cache | node_mod | lock |
|---|---|---|---|---|---|---|---|
| bun | 4.5K | 6.8K | 1019M | 3M | 1.76G | 1.76G | 1M |
| utoo-next | 113.9K | 50.6K | 989M | 3M | 1.70G | 1.70G | 2M |
| utoo-npm | 136.6K | 49.3K | 990M | 4M | 1.70G | 1.70G | 2M |
| utoo | 85.6K | 55.0K | 989M | 3M | 1.70G | 1.70G | 2M |
p4_warm_link
| PM | wall | ±σ | user | sys | RSS | pgMinor |
|---|---|---|---|---|---|---|
| bun | 3.53s | 0.18s | 0.20s | 2.38s | 135M | 32.6K |
| utoo-next | 2.21s | 0.10s | 0.49s | 3.79s | 78M | 17.8K |
| utoo-npm | 2.28s | 0.01s | 0.47s | 3.79s | 79M | 18.4K |
| utoo | 2.11s | 0.15s | 0.41s | 3.48s | 52M | 11.6K |
| PM | vCtx | iCtx | netRX | netTX | cache | node_mod | lock |
|---|---|---|---|---|---|---|---|
| bun | 277 | 21 | 5M | 18K | 1.91G | 1.75G | 1M |
| utoo-next | 42.2K | 19.7K | 5K | 5K | 1.70G | 1.70G | 2M |
| utoo-npm | 41.0K | 18.6K | 5K | 5K | 1.70G | 1.70G | 2M |
| utoo | 20.8K | 13.4K | 6K | 11K | 1.71G | 1.70G | 2M |
npmmirror.com: no output captured.
Summary
Draft source PR for the installer half of #2948. This is not the final review unit; it is the source branch used to verify the full installer stack in one place.
This source branch now matches the current installer split-stack top (
perf/pm-split-installer-scheduler-core), so the split PRs should compose back to this diff exactly.Covers From #2948
Actual Split Stack
perf(pm): split downloader cache primitives(+91 / -38)perf(pm): add sync cloner primitive(+225 / -79)perf(pm): schedule fresh lock installs(+490 / -5)test(pm): cover install scheduler core(+94)perf(pm): route stale installs through scheduler(+220 / -270)perf(pm): remove obsolete install pipeline(-258)Notes
The scheduler core is split so each PR has a clear review focus: primitives, fresh-lock scheduling, tests, stale-lock event wiring, and final deletion of the old pipeline module. The production PRs avoid dead-code staging and do not use clippy
allowescapes.Validation
Validated at installer split-stack top:
cargo fmtcargo check -p utoo-pmcargo test -p utoo-pm service::install_scheduler -- --nocapture(4 passed)cargo clippy --all-targets -- -D warnings --no-depspack-napiwarns locally becausenext.jsis a symlink in this worktree; clippy exits successfully.