Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 225 additions & 79 deletions crates/pm/src/util/cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};

use anyhow::{Context, Result};
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use tokio_retry::Retry;
use utoo_ruborist::manifest::IdentityView;
use utoo_ruborist::util::OnceMap;
Expand Down Expand Up @@ -33,6 +34,24 @@ pub fn clone_count() -> usize {
CLONE_COUNT.load(Ordering::Relaxed)
}

/// Clone a package from an already-resolved cache path without touching the
/// global clone/download single-flight maps. Schedulers that own deduplication
/// use this sync primitive from their worker pool.
pub fn clone_package_from_cache_sync(
name: &str,
version: &str,
tarball_url: &str,
cache_path: &Path,
target_path: &Path,
) -> Result<()> {
let is_git = is_git_url(tarball_url);
let fresh = clone_package_sync(cache_path, target_path, name, version, !is_git)?;
if fresh {
CLONE_COUNT.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}

/// Normalize a target path into the canonical key used by `CLONE_CACHE`.
#[cfg(windows)]
fn cache_key(target_path: &Path) -> PathBuf {
Expand Down Expand Up @@ -78,15 +97,17 @@ pub async fn clone_package_once(
let tarball_url = tarball_url.to_string();
let target_path = target_path.to_path_buf();

// Git packages are extracted flat (no `package/` wrapper directory),
// so skip `find_real_src` which would incorrectly pick a subdirectory.
let is_git = is_git_url(&tarball_url);

CLONE_CACHE
.get_or_init(key, || async move {
let cache_path = resolve_cache_path(&name, &version, &tarball_url).await?;
let fresh = clone_package(&cache_path, &target_path, &name, &version, !is_git)
.await
tokio::task::spawn_blocking(move || {
clone_package_from_cache_sync(
&name,
&version,
&tarball_url,
&cache_path,
&target_path,
)
.inspect_err(|e| {
tracing::warn!(
"Clone failed: {}@{} to {}: {:#}",
Expand All @@ -96,12 +117,10 @@ pub async fn clone_package_once(
e
)
})
.ok()?;

if fresh {
CLONE_COUNT.fetch_add(1, Ordering::Relaxed);
}
Some(())
.ok()
})
.await
.ok()?
})
.await
.map(|_| ())
Expand Down Expand Up @@ -172,83 +191,165 @@ mod hardlink_clone {
Ok(())
}

/// Clone directory using spawn_blocking for sync I/O.
/// Uses hardlink when possible, falls back to copy.
pub async fn clone_dir(src: &Path, dst: &Path) -> Result<()> {
/// Clone directory using sync I/O. Uses hardlink when possible, falls back
/// to copy.
pub fn clone_dir_sync(src: &Path, dst: &Path) -> Result<()> {
let err_msg = format!("Failed to clone {} to {}", src.display(), dst.display());

if !fs::metadata(src)?.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotADirectory,
"Source is not a directory",
))
.with_context(|| err_msg);
}

let mut force_copy = has_install_script_sync(src);

let mut files = Vec::new();
let mut dirs = Vec::new();
collect_entries(src, dst, &mut files, &mut dirs)?;

let mut created_dirs = HashSet::new();
for dir in &dirs {
if created_dirs.insert(dir.clone())
&& let Err(e) = fs::create_dir_all(dir)
&& e.kind() != io::ErrorKind::AlreadyExists
{
return Err(e).with_context(|| err_msg.clone());
}
}

let mut warned_per_file = false;
for entry in &files {
if force_copy {
copy_file_sync(&entry.src, &entry.dst)?;
} else if let Err(e) = fs::hard_link(&entry.src, &entry.dst) {
if e.kind() == io::ErrorKind::CrossesDevices {
tracing::warn!(
"cross-device hardlink {} -> {}: {}; falling back to copy for remaining files",
src.display(),
dst.display(),
e
);
force_copy = true;
} else if !warned_per_file {
tracing::warn!(
"hardlink failed for {} -> {}: {}; falling back to copy (further per-file failures suppressed)",
entry.src.display(),
entry.dst.display(),
e
);
warned_per_file = true;
}
copy_file_sync(&entry.src, &entry.dst)?;
}
}
Ok(())
}

/// Clone directory using spawn_blocking for callers that are still async.
pub async fn clone_dir(src: &Path, dst: &Path) -> Result<()> {
let src = src.to_path_buf();
let dst = dst.to_path_buf();
tokio::task::spawn_blocking(move || clone_dir_sync(&src, &dst)).await?
}
}

tokio::task::spawn_blocking(move || {
if !fs::metadata(&src)?.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotADirectory,
"Source is not a directory",
));
fn load_package_json_sync<T: DeserializeOwned>(path: &Path) -> Result<T> {
let pkg_path = path.join("package.json");
let content = std::fs::read_to_string(&pkg_path)
.with_context(|| format!("Failed to read file {pkg_path:?}"))?;

match serde_json::from_str(&content) {
Ok(v) => Ok(v),
Err(original_err) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(value) => serde_json::from_value(value)
.with_context(|| format!("Failed to deserialize {pkg_path:?}")),
Err(_) => {
Err(original_err).with_context(|| format!("Failed to parse JSON from {pkg_path:?}"))
}
},
}
Comment on lines +264 to +273
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

load_package_json_sync is missing the warning log when falling back to the lenient JSON parser (used to handle duplicate keys in package.json). This makes the synchronous path's logging inconsistent with the asynchronous load_package_json function in json.rs. Adding the warning ensures that non-standard package structures are surfaced to the user regardless of the execution path.

Suggested change
match serde_json::from_str(&content) {
Ok(v) => Ok(v),
Err(original_err) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(value) => serde_json::from_value(value)
.with_context(|| format!("Failed to deserialize {pkg_path:?}")),
Err(_) => {
Err(original_err).with_context(|| format!("Failed to parse JSON from {pkg_path:?}"))
}
},
}
match serde_json::from_str(&content) {
Ok(v) => Ok(v),
Err(original_err) => match serde_json::from_str::<serde_json::Value>(&content) {
Ok(value) => {
tracing::warn!(
"package.json has non-standard structure (e.g. duplicate keys), \
retrying with lenient parser: {pkg_path:?}"
);
serde_json::from_value(value)
.with_context(|| format!("Failed to deserialize {pkg_path:?}"))
}
Err(_) => {
Err(original_err).with_context(|| format!("Failed to parse JSON from {pkg_path:?}"))
}
},
}

}

let mut force_copy = has_install_script_sync(&src);

// Phase 1: Collect all files and directories
let mut files = Vec::new();
let mut dirs = Vec::new();
collect_entries(&src, &dst, &mut files, &mut dirs)?;

// Phase 2: Create all directories
let mut created_dirs = HashSet::new();
for dir in &dirs {
if created_dirs.insert(dir.clone())
&& let Err(e) = fs::create_dir_all(dir)
&& e.kind() != io::ErrorKind::AlreadyExists
{
return Err(e);
}
fn validate_name_version_sync(dst: &Path, name: &str, version: &str) -> bool {
let Ok(pkg) = load_package_json_sync::<IdentityView>(dst) else {
return false;
};
pkg.name == name && pkg.version == version
}

fn find_real_src_sync(src: &Path) -> Option<PathBuf> {
for entry in std::fs::read_dir(src).ok()? {
let entry = entry.ok()?;
if entry.file_type().ok()?.is_dir() {
let path = entry.path();
if path.file_name().is_some_and(|name| name != ".utoo_built") {
return Some(path);
}
}
}
None
}

// Phase 3: Clone files (hardlink, fall back to copy on error).
//
// EXDEV (src cache and dst on different filesystems, e.g. a
// global install where ~/.cache/nm lives on a different volume
// than /usr/local) is a property of the src/dst pair — every
// remaining file would fail the same way, so latch `force_copy`
// and skip hardlink for the rest of this clone.
//
// Any other hardlink error (EMLINK on a single inode whose link
// count is exhausted, EPERM on a specific file, etc.) is
// per-file: copy this one and keep trying hardlink on the next.
// We warn only on the first such failure per package to avoid
// spamming hundreds of identical warnings when an entire package
// can't be hardlinked.
let mut warned_per_file = false;
for entry in &files {
if force_copy {
copy_file_sync(&entry.src, &entry.dst)?;
} else if let Err(e) = fs::hard_link(&entry.src, &entry.dst) {
if e.kind() == io::ErrorKind::CrossesDevices {
tracing::warn!(
"cross-device hardlink {} -> {}: {}; falling back to copy for remaining files",
src.display(),
dst.display(),
e
);
force_copy = true;
} else if !warned_per_file {
tracing::warn!(
"hardlink failed for {} -> {}: {}; falling back to copy (further per-file failures suppressed)",
entry.src.display(),
entry.dst.display(),
e
);
warned_per_file = true;
}
copy_file_sync(&entry.src, &entry.dst)?;
}
#[cfg(target_os = "macos")]
fn clone_dir_native_sync(real_src: &Path, dst: &Path) -> Result<()> {
let src_c = CString::new(real_src.as_os_str().as_bytes())?;
let dst_c = CString::new(dst.as_os_str().as_bytes())?;
let mut last_error = None;

for delay in std::iter::once(std::time::Duration::ZERO).chain(create_retry_strategy()) {
if !delay.is_zero() {
std::thread::sleep(delay);
}

match unsafe { clonefile(src_c.as_ptr(), dst_c.as_ptr(), 0) } {
0 => return Ok(()),
_ => {
let err = std::io::Error::last_os_error();
let _ = std::fs::remove_dir_all(dst);
last_error = Some(err);
}
Ok(())
})
.await?
.with_context(|| err_msg)
}
}

Err(anyhow::anyhow!(
"clonefile {} -> {}: {}",
real_src.display(),
dst.display(),
last_error
.map(|e| e.to_string())
.unwrap_or_else(|| "unknown error".to_string())
))
}

#[cfg(not(target_os = "macos"))]
fn clone_dir_native_sync(real_src: &Path, dst: &Path) -> Result<()> {
hardlink_clone::clone_dir_sync(real_src, dst)
.with_context(|| format!("clone_dir {} -> {}", real_src.display(), dst.display()))
}
Comment on lines +328 to +331
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The synchronous implementation of clone_dir_native_sync for non-macOS platforms is missing the retry logic that exists in its asynchronous counterpart. This makes the synchronous path (used by the new scheduler) less resilient to transient filesystem errors, such as temporary file locks or concurrent access issues. To maintain parity with the async implementation, a retry loop with a backoff strategy should be added.

fn clone_dir_native_sync(real_src: &Path, dst: &Path) -> Result<()> {
    let mut last_error = None;

    for delay in std::iter::once(std::time::Duration::ZERO).chain(create_retry_strategy()) {
        if !delay.is_zero() {
            std::thread::sleep(delay);
        }

        match hardlink_clone::clone_dir_sync(real_src, dst) {
            Ok(_) => return Ok(()),
            Err(e) => {
                let _ = std::fs::remove_dir_all(dst);
                last_error = Some(e);
            }
        }
    }

    let err = last_error.unwrap_or_else(|| anyhow::anyhow!("unknown error"));
    Err(err).with_context(|| format!("clone_dir {} -> {}", real_src.display(), dst.display()))
}


fn clone_sync(src: &Path, dst: &Path, find_real: bool) -> Result<()> {
let real_src = if find_real {
find_real_src_sync(src)
.ok_or_else(|| anyhow::anyhow!("Cannot find valid source directory in {src:?}"))?
} else {
src.to_path_buf()
};

if dst.try_exists()?
&& let Err(e) = std::fs::remove_dir_all(dst)
{
tracing::warn!("Failed to clean target directory {}: {}", dst.display(), e);
}

if let Some(parent) = dst.parent() {
std::fs::create_dir_all(parent)?;
}

clone_dir_native_sync(&real_src, dst)?;
Ok(())
}

async fn validate_directory(src: &Path, dst: &Path) -> Result<bool> {
Expand Down Expand Up @@ -445,6 +546,26 @@ pub async fn clone_package(
Ok(true)
}

/// Sync clone path with the same name/version validation as [`clone_package`].
fn clone_package_sync(
src: &Path,
dst: &Path,
name: &str,
version: &str,
find_real: bool,
) -> Result<bool> {
if dst.try_exists()? {
if validate_name_version_sync(dst, name, version) {
return Ok(false);
}
if let Err(e) = std::fs::remove_dir_all(dst) {
tracing::warn!("Failed to clean target directory {}: {}", dst.display(), e);
}
Comment on lines +561 to +563
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to std::fs::remove_dir_all(dst) here is redundant because clone_sync (called immediately after on line 565) also performs the same existence check and removal logic. This results in an unnecessary extra syscall and potentially duplicate warning logs if the directory removal fails. It is cleaner to let the lower-level clone_sync primitive handle the destination cleanup.

}
clone_sync(src, dst, find_real)?;
Ok(true)
}

#[cfg(test)]
mod tests {
use tempfile::TempDir;
Expand Down Expand Up @@ -804,6 +925,31 @@ mod tests {
Ok(())
}

#[test]
fn test_clone_package_from_cache_sync_fresh_install() -> Result<()> {
let temp = TempDir::new()?;
let cache_dir = temp.path().join("cache/lodash/4.17.21");
let src_dir = cache_dir.join("package");
let dst_dir = temp.path().join("node_modules/lodash");

std::fs::create_dir_all(&src_dir)?;
let pkg_json = create_package_json("lodash", "4.17.21");
std::fs::write(src_dir.join("package.json"), &pkg_json)?;

clone_package_from_cache_sync(
"lodash",
"4.17.21",
"https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
&cache_dir,
&dst_dir,
)?;

assert!(dst_dir.join("package.json").exists());
let content = std::fs::read_to_string(dst_dir.join("package.json"))?;
assert!(content.contains("lodash"));
Ok(())
}

#[tokio::test]
async fn test_clone_package_git_flat_layout() -> Result<()> {
let temp = TempDir::new()?;
Expand Down
Loading