Skip to content

Commit

Permalink
fix: unclobber issue when packages are named differently (#776)
Browse files Browse the repository at this point in the history
cc @baszalmstra - i think that was the test case I wrote for a issues
similar to teh one wtih pyarrow now.

---------

Co-authored-by: Bas Zalmstra <zalmstra.bas@gmail.com>
Co-authored-by: Bas Zalmstra <bas@prefix.dev>
Co-authored-by: Hofer-Julian <30049909+Hofer-Julian@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 15, 2024
1 parent d619dcd commit 014055b
Show file tree
Hide file tree
Showing 11 changed files with 478 additions and 197 deletions.
384 changes: 271 additions & 113 deletions crates/rattler/src/install/clobber_registry.rs

Large diffs are not rendered by default.

138 changes: 85 additions & 53 deletions crates/rattler/src/install/driver.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
use super::clobber_registry::ClobberRegistry;
use super::link_script::{PrePostLinkError, PrePostLinkResult};
use super::unlink::{recursively_remove_empty_directories, UnlinkError};
use super::Transaction;
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::{Arc, Mutex, MutexGuard},
};

use indexmap::IndexSet;
use itertools::Itertools;
use rattler_conda_types::prefix_record::PathType;
use rattler_conda_types::{PackageRecord, PrefixRecord};
use simple_spawn_blocking::tokio::run_blocking_task;
use simple_spawn_blocking::Cancelled;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::path::Path;
use std::sync::MutexGuard;
use std::sync::{Arc, Mutex};
use rattler_conda_types::{prefix_record::PathType, PackageRecord, PrefixRecord};
use simple_spawn_blocking::{tokio::run_blocking_task, Cancelled};
use thiserror::Error;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};

/// Packages can mostly be installed in isolation and therefor in parallel. However, when installing
/// a large number of packages at the same time the different installation tasks start competing for
/// resources. The [`InstallDriver`] helps to assist in making sure that tasks don't starve
/// each other from resource as well as making sure that due to the large number of requests the
/// process doesn't try to acquire more resources than the system has available.
use super::{
clobber_registry::{ClobberError, ClobberRegistry, ClobberedPath},
link_script::{PrePostLinkError, PrePostLinkResult},
unlink::{recursively_remove_empty_directories, UnlinkError},
Transaction,
};
use crate::install::link_script::LinkScriptError;

/// Packages can mostly be installed in isolation and therefore in parallel.
/// However, when installing a large number of packages at the same time the
/// different installation tasks start competing for resources. The
/// [`InstallDriver`] helps to assist in making sure that tasks don't starve
/// each other from resource as well as making sure that due to the large number
/// of requests the process doesn't try to acquire more resources than the
/// system has available.
pub struct InstallDriver {
io_concurrency_semaphore: Option<Arc<Semaphore>>,
clobber_registry: Arc<Mutex<ClobberRegistry>>,
Expand All @@ -43,6 +50,28 @@ pub struct InstallDriverBuilder {
execute_link_scripts: bool,
}

/// The result of the post-processing step.
#[derive(Debug)]
pub struct PostProcessResult {
/// The result of running the post link scripts. This is only present if
/// running the scripts is allowed.
pub post_link_result: Option<Result<PrePostLinkResult, LinkScriptError>>,

/// The paths that were clobbered during the installation process.
pub clobbered_paths: HashMap<PathBuf, ClobberedPath>,
}

/// An error that might have occurred during post-processing
#[derive(Debug, Error)]
pub enum PostProcessingError {
#[error("failed to unclobber clobbered files")]
ClobberError(#[from] ClobberError),

/// Failed to determine the currently installed packages.
#[error("failed to determine the installed packages")]
FailedToDetectInstalledPackages(#[source] std::io::Error),
}

impl InstallDriverBuilder {
/// Sets an optional IO concurrency limit. This is used to make sure
/// that the system doesn't acquire more IO resources than the system has
Expand Down Expand Up @@ -71,7 +100,7 @@ impl InstallDriverBuilder {
prefix_records: impl IntoIterator<Item = &'i PrefixRecord>,
) -> Self {
Self {
clobber_registry: Some(ClobberRegistry::from_prefix_records(prefix_records)),
clobber_registry: Some(ClobberRegistry::new(prefix_records)),
..self
}
}
Expand Down Expand Up @@ -103,22 +132,24 @@ impl InstallDriver {
InstallDriverBuilder::default()
}

/// Returns a permit that will allow the caller to perform IO operations. This is used to make
/// sure that the system doesn't try to acquire more IO resources than the system has available.
/// Returns a permit that will allow the caller to perform IO operations.
/// This is used to make sure that the system doesn't try to acquire
/// more IO resources than the system has available.
pub async fn acquire_io_permit(&self) -> Result<Option<OwnedSemaphorePermit>, AcquireError> {
match self.io_concurrency_semaphore.clone() {
None => Ok(None),
Some(semaphore) => Ok(Some(semaphore.acquire_owned().await?)),
}
}

/// Return a locked reference to the paths registry. This is used to make sure that the same
/// path is not installed twice.
/// Return a locked reference to the paths registry. This is used to make
/// sure that the same path is not installed twice.
pub fn clobber_registry(&self) -> MutexGuard<'_, ClobberRegistry> {
self.clobber_registry.lock().unwrap()
}

/// Call this before any packages are installed to perform any pre processing that is required.
/// Call this before any packages are installed to perform any pre
/// processing that is required.
pub fn pre_process<Old: Borrow<PrefixRecord>, New>(
&self,
transaction: &Transaction<Old, New>,
Expand All @@ -138,9 +169,10 @@ impl InstallDriver {
Ok(None)
}

/// Runs a blocking task that will execute on a seperate thread. The task is not started until
/// an IO permit is acquired. This is used to make sure that the system doesn't try to acquire
/// more IO resources than the system has available.
/// Runs a blocking task that will execute on a seperate thread. The task is
/// not started until an IO permit is acquired. This is used to make
/// sure that the system doesn't try to acquire more IO resources than
/// the system has available.
pub async fn run_blocking_io_task<
T: Send + 'static,
E: Send + From<Cancelled> + 'static,
Expand All @@ -158,18 +190,19 @@ impl InstallDriver {
.await
}

/// Call this after all packages have been installed to perform any post processing that is
/// required.
/// Call this after all packages have been installed to perform any post
/// processing that is required.
///
/// This function will select a winner among multiple packages that might write to a single package
/// and will also execute any `post-link.sh/bat` scripts
/// This function will select a winner among multiple packages that might
/// write to a single package and will also execute any
/// `post-link.sh/bat` scripts
pub fn post_process<Old: Borrow<PrefixRecord> + AsRef<New>, New: AsRef<PackageRecord>>(
&self,
transaction: &Transaction<Old, New>,
target_prefix: &Path,
) -> Result<Option<PrePostLinkResult>, PrePostLinkError> {
) -> Result<PostProcessResult, PostProcessingError> {
let prefix_records = PrefixRecord::collect_from_prefix(target_prefix)
.map_err(PrePostLinkError::FailedToDetectInstalledPackages)?;
.map_err(PostProcessingError::FailedToDetectInstalledPackages)?;

let required_packages =
PackageRecord::sort_topologically(prefix_records.iter().collect::<Vec<_>>());
Expand All @@ -179,27 +212,24 @@ impl InstallDriver {
tracing::warn!("Failed to remove empty directories: {} (ignored)", e);
});

self.clobber_registry()
.unclobber(&required_packages, target_prefix)
.unwrap_or_else(|e| {
tracing::error!("Error unclobbering packages: {:?}", e);
});
let clobbered_paths = self
.clobber_registry()
.unclobber(&required_packages, target_prefix)?;

if self.execute_link_scripts {
match self.run_post_link_scripts(transaction, &required_packages, target_prefix) {
Ok(res) => {
return Ok(Some(res));
}
Err(e) => {
tracing::error!("Error running post-link scripts: {:?}", e);
}
}
}
let post_link_result = if self.execute_link_scripts {
Some(self.run_post_link_scripts(transaction, &required_packages, target_prefix))
} else {
None
};

Ok(None)
Ok(PostProcessResult {
post_link_result,
clobbered_paths,
})
}

/// Remove all empty directories that are not part of the new prefix records.
/// Remove all empty directories that are not part of the new prefix
/// records.
pub fn remove_empty_directories<Old: Borrow<PrefixRecord>, New>(
&self,
transaction: &Transaction<Old, New>,
Expand Down Expand Up @@ -232,7 +262,8 @@ impl InstallDriver {

let is_python_noarch = record.repodata_record.package_record.noarch.is_python();

// Sort the directories by length, so that we delete the deepest directories first.
// Sort the directories by length, so that we delete the deepest directories
// first.
let mut directories: IndexSet<_> = removed_directories.into_iter().sorted().collect();

while let Some(directory) = directories.pop() {
Expand All @@ -244,8 +275,9 @@ impl InstallDriver {
&keep_directories,
)?;

// The directory is not empty which means our parent directory is also not empty,
// recursively remove the parent directory from the set as well.
// The directory is not empty which means our parent directory is also not
// empty, recursively remove the parent directory from the set
// as well.
while let Some(parent) = removed_until.parent() {
if !directories.shift_remove(parent) {
break;
Expand Down
27 changes: 23 additions & 4 deletions crates/rattler/src/install/installer/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::install::link_script::PrePostLinkError;
use crate::install::unlink::UnlinkError;
use crate::install::{InstallError, TransactionError};
use crate::package_cache::PackageCacheError;
use simple_spawn_blocking::Cancelled;

use crate::{
install::{
clobber_registry::ClobberError, driver::PostProcessingError, link_script::PrePostLinkError,
unlink::UnlinkError, InstallError, TransactionError,
},
package_cache::PackageCacheError,
};

/// An error returned by the installer
#[derive(Debug, thiserror::Error)]
pub enum InstallerError {
Expand Down Expand Up @@ -39,6 +43,10 @@ pub enum InstallerError {
#[error("post-processing failed")]
PostProcessingFailed(#[source] PrePostLinkError),

/// A clobbering error occured
#[error("failed to unclobber clobbered files")]
ClobberError(#[from] ClobberError),

/// The operation was cancelled
#[error("the operation was cancelled")]
Cancelled,
Expand All @@ -49,3 +57,14 @@ impl From<Cancelled> for InstallerError {
InstallerError::Cancelled
}
}

impl From<PostProcessingError> for InstallerError {
fn from(value: PostProcessingError) -> Self {
match value {
PostProcessingError::ClobberError(err) => InstallerError::ClobberError(err),
PostProcessingError::FailedToDetectInstalledPackages(err) => {
InstallerError::FailedToDetectInstalledPackages(err)
}
}
}
}
30 changes: 18 additions & 12 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod error;
mod indicatif;
mod reporter;
use std::{
collections::HashMap,
future::ready,
path::{Path, PathBuf},
sync::Arc,
Expand All @@ -26,9 +27,10 @@ use simple_spawn_blocking::tokio::run_blocking_task;
use tokio::{sync::Semaphore, task::JoinError};

use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction};
use crate::install::link_script::LinkScriptError;
use crate::{
default_cache_dir,
install::link_script::PrePostLinkResult,
install::{clobber_registry::ClobberedPath, link_script::PrePostLinkResult},
package_cache::{CacheReporter, PackageCache},
};

Expand All @@ -55,15 +57,18 @@ pub struct InstallationResult {
/// The transaction that was applied
pub transaction: Transaction<PrefixRecord, RepoDataRecord>,

/// The result of running pre-processing steps. `None` if no
/// The result of running pre link scripts. `None` if no
/// pre-processing was performed, possibly because link scripts were
/// disabled.
pub pre_process_result: Option<PrePostLinkResult>,
pub pre_link_script_result: Option<PrePostLinkResult>,

/// The result of running post-processing steps. `None` if no
/// The result of running post link scripts. `None` if no
/// post-processing was performed, possibly because link scripts were
/// disabled.
pub post_process_result: Option<PrePostLinkResult>,
pub post_link_script_result: Option<Result<PrePostLinkResult, LinkScriptError>>,

/// The paths that were clobbered during the installation process.
pub clobbered_paths: HashMap<PathBuf, ClobberedPath>,
}

impl Installer {
Expand Down Expand Up @@ -312,8 +317,9 @@ impl Installer {
if transaction.operations.is_empty() {
return Ok(InstallationResult {
transaction,
pre_process_result: None,
post_process_result: None,
pre_link_script_result: None,
post_link_script_result: None,
clobbered_paths: HashMap::default(),
});
}

Expand Down Expand Up @@ -389,6 +395,7 @@ impl Installer {
let reporter = reporter
.as_deref()
.map(move |r| (r, r.on_unlink_start(idx, record)));
driver.clobber_registry().unregister_paths(record);
unlink_package(prefix.as_ref(), record).await.map_err(|e| {
InstallerError::UnlinkError(record.repodata_record.file_name.clone(), e)
})?;
Expand Down Expand Up @@ -432,18 +439,17 @@ impl Installer {
drop(pending_futures);

// Post process the transaction
let post_process_result = driver
.post_process(&transaction, prefix.as_ref())
.map_err(InstallerError::PostProcessingFailed)?;
let post_process_result = driver.post_process(&transaction, prefix.as_ref())?;

if let Some(reporter) = &self.reporter {
reporter.on_transaction_complete();
}

Ok(InstallationResult {
transaction,
pre_process_result,
post_process_result,
pre_link_script_result: pre_process_result,
post_link_script_result: post_process_result.post_link_result,
clobbered_paths: post_process_result.clobbered_paths,
})
}
}
Expand Down
Loading

0 comments on commit 014055b

Please sign in to comment.