Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 91 additions & 38 deletions crates/pm/src/util/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,40 @@ pub async fn http_tarball_cache_lookup(name: &str, tarball_url: &str) -> Option<
slot_cache_lookup(name, http_cache_slot(tarball_url)).await
}

/// Resolve cache slots that may have been seeded during dependency resolution
/// without falling through to registry download. `Ok(None)` means this is a
/// registry-style HTTP tarball that should be downloaded into `<name>/<version>`.
pub async fn resolve_seeded_cache_path(
name: &str,
version: &str,
tarball_url: &str,
) -> Result<Option<PathBuf>> {
match tarball_url.parse::<Protocol>() {
Ok(Protocol::Git) => git_cache_lookup(name, version, tarball_url)
.await
.map(Some)
.ok_or_else(|| anyhow::anyhow!("git cache not found for {name}@{version}")),
Ok(Protocol::File) => file_cache_lookup(name, tarball_url)
.await
.map(Some)
.ok_or_else(|| anyhow::anyhow!("file tarball cache not found for {name}@{version}")),
_ => Ok(http_tarball_cache_lookup(name, tarball_url).await),
}
}

/// Resolve the local cache path for a package, downloading if necessary.
///
/// The cloner calls this with a fully-qualified URL (never a relative
/// path) — the install loop is responsible for any lockfile-format
/// rewriting before we get here.
pub async fn resolve_cache_path(name: &str, version: &str, tarball_url: &str) -> Option<PathBuf> {
match tarball_url.parse::<Protocol>() {
Ok(Protocol::Git) => git_cache_lookup(name, version, tarball_url).await,
Ok(Protocol::File) => file_cache_lookup(name, tarball_url).await,
// Otherwise try the URL-hashed http slot BFS may have seeded,
// then fall through to the registry `<name>/<version>/` path.
_ => match http_tarball_cache_lookup(name, tarball_url).await {
Some(p) => Some(p),
None => download_to_cache(name, version, tarball_url).await,
},
match resolve_seeded_cache_path(name, version, tarball_url).await {
Ok(Some(path)) => Some(path),
Ok(None) => download_to_cache(name, version, tarball_url).await,
Err(e) => {
tracing::warn!("{e:#}");
None
}
}
}

Expand All @@ -155,21 +174,13 @@ pub async fn download_to_cache(name: &str, version: &str, tarball_url: &str) ->
return Some((*cache_path).clone());
}

let cache_dir = get_cache_dir();
let name = name.to_string();
let version = version.to_string();
let tarball_url = tarball_url.to_string();

DOWNLOAD_CACHE
.get_or_init(key, || async move {
let cache_path = cache_dir.join(&name).join(&version);

// Fast path: already extracted in cache
if crate::fs::try_exists(&cache_path.join("_resolved"))
.await
.unwrap_or(false)
{
REUSE_COUNT.fetch_add(1, Ordering::Relaxed);
if let Ok(Some(cache_path)) = registry_cache_lookup(&name, &version).await {
return Some(cache_path);
}

Expand All @@ -179,41 +190,83 @@ pub async fn download_to_cache(name: &str, version: &str, tarball_url: &str) ->
let semaphore = DOWNLOAD_SEMAPHORE
.get_or_init(|| Semaphore::new(get_manifests_concurrency_limit_sync()));
let _permit = semaphore.acquire().await.ok()?;
let bytes = download_bytes(&tarball_url)
download_and_extract_to_cache(&name, &version, &tarball_url)
.await
.inspect_err(|e| {
tracing::warn!(
"Download {}@{} from {}: {:#}",
"Download/extract {}@{} from {} into {}: {:#}",
name,
version,
tarball_url,
registry_cache_path(&name, &version).display(),
e
)
})
.ok()?;

// Extract
extract_and_write(bytes, &cache_path)
.await
.inspect_err(|e| {
tracing::warn!(
"Extract {}@{} into {}: {:#}",
name,
version,
cache_path.display(),
e
)
})
.ok()?;

DOWNLOAD_COUNT.fetch_add(1, Ordering::Relaxed);
Some(cache_path)
.ok()
})
.await
.as_deref()
.cloned()
}

/// Download and extract a registry tarball without global single-flight or
/// semaphore state. Callers that already own scheduling/deduplication should use
/// this primitive directly.
pub async fn download_and_extract_to_cache(
name: &str,
version: &str,
tarball_url: &str,
) -> Result<PathBuf> {
if let Some(cache_path) = registry_cache_lookup(name, version).await? {
return Ok(cache_path);
}

let bytes = download_bytes(tarball_url)
.await
.with_context(|| format!("Download {name}@{version} from {tarball_url}"))?;

extract_to_cache(name, version, bytes).await
}

/// Return the registry cache path for a package version.
pub fn registry_cache_path(name: &str, version: &str) -> PathBuf {
get_cache_dir().join(name).join(version)
}

/// Look up an already extracted registry package cache.
pub async fn registry_cache_lookup(name: &str, version: &str) -> Result<Option<PathBuf>> {
let cache_path = registry_cache_path(name, version);
if crate::fs::try_exists(&cache_path.join("_resolved"))
.await
.unwrap_or(false)
{
Comment on lines +239 to +242
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of unwrap_or(false) on try_exists masks potential IO errors (e.g., permission issues). Since this function returns a Result, it should propagate the error using ? to allow callers to handle it appropriately.

    if crate::fs::try_exists(&cache_path.join("_resolved")).await? {

REUSE_COUNT.fetch_add(1, Ordering::Relaxed);
Ok(Some(cache_path))
} else {
Ok(None)
}
}

/// Extract already downloaded registry tarball bytes into the package cache.
pub async fn extract_to_cache(name: &str, version: &str, bytes: Bytes) -> Result<PathBuf> {
let cache_path = registry_cache_path(name, version);

if crate::fs::try_exists(&cache_path.join("_resolved"))
.await
.unwrap_or(false)
{
REUSE_COUNT.fetch_add(1, Ordering::Relaxed);
return Ok(cache_path);
}
Comment on lines +252 to +260
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block duplicates the logic already implemented in registry_cache_lookup. Reusing the helper function improves maintainability and reduces redundant filesystem checks, which aligns with the performance goals of this PR.

Suggested change
let cache_path = registry_cache_path(name, version);
if crate::fs::try_exists(&cache_path.join("_resolved"))
.await
.unwrap_or(false)
{
REUSE_COUNT.fetch_add(1, Ordering::Relaxed);
return Ok(cache_path);
}
if let Some(cache_path) = registry_cache_lookup(name, version).await? {
return Ok(cache_path);
}
let cache_path = registry_cache_path(name, version);


extract_and_write(bytes, &cache_path)
.await
.with_context(|| format!("Extract {name}@{version} into {}", cache_path.display()))?;

DOWNLOAD_COUNT.fetch_add(1, Ordering::Relaxed);
Ok(cache_path)
}

/// Download tarball bytes with retries (network phase only).
pub async fn download_bytes(url: &str) -> Result<Bytes> {
let retry_count = AtomicU32::new(0);
Expand Down
Loading