From 2431a8626c41a4e4a82b20b4f23174f0f0150dca Mon Sep 17 00:00:00 2001 From: John Sirois Date: Sun, 6 Oct 2024 09:22:23 -0700 Subject: [PATCH] Add 2-phase deletion support. Stub out 2-phase prune support. --- pex/cache/data.py | 252 +++++++++++++++++++++------------------------- pex/cache/dirs.py | 37 +++++-- pex/layout.py | 2 +- 3 files changed, 144 insertions(+), 147 deletions(-) diff --git a/pex/cache/data.py b/pex/cache/data.py index 8e1fe35ea..b3c1f6631 100644 --- a/pex/cache/data.py +++ b/pex/cache/data.py @@ -23,7 +23,18 @@ from pex.typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union + from typing import ( + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + ) from pex.pex_info import PexInfo @@ -95,18 +106,13 @@ def _inserted_wheels(pex_info): wheels = [] # type: List[Dict[str, Optional[str]]] for wheel_name, install_hash in pex_info.distributions.items(): - wheel_hash = None # type: Optional[str] installed_wheel_dir = InstalledWheelDir.create(wheel_name, install_hash) - if os.path.islink(installed_wheel_dir): - wheel_hash_dir, _ = os.path.split(os.path.realpath(installed_wheel_dir)) - wheel_hash = os.path.basename(wheel_hash_dir) - pnav = ProjectNameAndVersion.from_filename(wheel_name) wheels.append( dict( name=wheel_name, install_hash=install_hash, - wheel_hash=wheel_hash, + wheel_hash=installed_wheel_dir.wheel_hash, project_name=str(pnav.canonicalized_project_name), version=str(pnav.canonicalized_version), ) @@ -196,169 +202,135 @@ def record_venv(coon_or_cursor): record_venv(conn).close() -def _iter_wheel_dependents( - dependent_pex, # type: Union[UnzipDir, VenvDirs] - conn, # type: sqlite3.Connection - wheel_install_hashes, # type: Sequence[str] -): - # type: (...) -> Iterator[AtomicCacheDir] - - # N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit - # to an even lower chunk size to be safe: https://www.sqlite.org/limits.html - chunk_size = 250 - for index in range(0, len(wheel_install_hashes), chunk_size): - chunk = wheel_install_hashes[index : index + chunk_size] - chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk))) - with closing( - conn.execute( - """ - SELECT DISTINCT pex_hash FROM zipapp_deps WHERE wheel_install_hash IN ({chunk}) - """.format( - chunk=chunk_placeholders - ), - chunk, - ) - ) as cursor: - for (pex_hash,) in cursor: - zipapp = UnzipDir.create(pex_hash) - if zipapp != dependent_pex: - yield zipapp - - with closing( - conn.execute( - """ - SELECT DISTINCT venvs.short_hash, venvs.pex_hash, venvs.contents_hash FROM venvs - JOIN venv_deps ON venv_deps.venv_hash = venvs.short_hash - WHERE venv_deps.wheel_install_hash IN ({chunk}) - """.format( - chunk=chunk_placeholders - ), - chunk, - ) - ) as cursor: - for short_hash, pex_hash, contents_hash in cursor: - venv = VenvDirs.create( - short_hash=short_hash, pex_hash=pex_hash, contents_hash=contents_hash - ) - if venv != dependent_pex: - yield venv +if TYPE_CHECKING: + _I = TypeVar("_I") + _K = TypeVar("_K") -def _iter_wheel_deps( - dependent_pex, # type: Union[UnzipDir, VenvDirs] - conn, # type: sqlite3.Connection - wheels, # type: Iterator[Tuple[str, str, str]] +def _iter_key_chunks( + items, # type: Sequence[_I] + extract_key, # type: Callable[[_I], _K] ): - # type: (...) -> Iterator[Union[str, AtomicCacheDir]] + # type: (...) -> Iterator[Tuple[str, Sequence[_K]]] - wheel_install_hashes = [] # type: List[str] - for wheel_name, wheel_install_hash, wheel_hash in wheels: - wheel_install_hashes.append(wheel_install_hash) - installed_wheel_dir = InstalledWheelDir.create( - wheel_name=wheel_name, wheel_hash=wheel_install_hash - ) - if wheel_hash: - yield InstalledWheelDir.create(wheel_name=wheel_name, wheel_hash=wheel_hash) - yield installed_wheel_dir.path - else: - yield installed_wheel_dir - - for dependent_dir in _iter_wheel_dependents( - dependent_pex=dependent_pex, conn=conn, wheel_install_hashes=wheel_install_hashes - ): - yield dependent_dir + # N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit + # to an even lower chunk size to be safe: https://www.sqlite.org/limits.html + chunk_size = 100 + for index in range(0, len(items), chunk_size): + keys = tuple(map(extract_key, items[index : index + chunk_size])) + placeholders = ", ".join(itertools.repeat("?", len(keys))) + yield placeholders, keys -def zipapp_deps(unzip_dir): - # type: (UnzipDir) -> Iterator[Union[str, AtomicCacheDir]] +def _zipapp_deps(unzip_dirs): + # type: (Sequence[UnzipDir]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]] with _db_connection() as conn: - with closing( - conn.execute( - "SELECT bootstrap_hash, code_hash FROM zipapps WHERE pex_hash = ?", - [unzip_dir.pex_hash], - ) - ) as cursor: - bootstrap_hash, code_hash = cursor.fetchone() - yield BootstrapDir.create(bootstrap_hash) - yield UserCodeDir.create(code_hash) + for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash): + with closing( + conn.execute( + """ + SELECT bootstrap_hash, code_hash FROM zipapps WHERE pex_hash IN ({keys}) + """.format( + keys=placeholders + ), + keys, + ) + ) as cursor: + bootstrap_hash, code_hash = cursor.fetchone() + yield BootstrapDir.create(bootstrap_hash) + yield UserCodeDir.create(code_hash) + + with closing( + conn.execute( + """ + SELECT name, install_hash, wheel_hash FROM wheels + JOIN zipapp_deps ON zipapp_deps.wheel_install_hash = wheels.install_hash + JOIN zipapps ON zipapps.pex_hash = zipapp_deps.pex_hash + WHERE zipapps.pex_hash in ({keys}) + """.format( + keys=placeholders + ), + keys, + ) + ) as cursor: + for name, install_hash, wheel_hash in cursor: + yield InstalledWheelDir.create( + wheel_name=name, install_hash=install_hash, wheel_hash=wheel_hash + ) - with closing( - conn.execute( - """ - SELECT name, install_hash, wheel_hash FROM wheels - JOIN zipapp_deps ON zipapp_deps.wheel_install_hash = wheels.install_hash - JOIN zipapps ON zipapps.pex_hash = zipapp_deps.pex_hash - WHERE zipapps.pex_hash = ? - """, - [unzip_dir.pex_hash], - ) - ) as cursor: - for dep in _iter_wheel_deps(dependent_pex=unzip_dir, conn=conn, wheels=cursor): - yield dep +def _venv_deps(venv_dirs): + # type: (Sequence[VenvDirs]) -> Iterator[InstalledWheelDir] -def venv_deps(venv_dirs): - # type: (VenvDirs) -> Iterator[Union[str, AtomicCacheDir]] with _db_connection() as conn: - with closing( - conn.execute( - """ - SELECT name, install_hash, wheel_hash FROM wheels - JOIN venv_deps ON venv_deps.wheel_install_hash = wheels.install_hash - JOIN venvs ON venvs.short_hash = venv_deps.venv_hash - WHERE venvs.short_hash = ? - """, - [venv_dirs.short_hash], - ) - ) as cursor: - for dep in _iter_wheel_deps(dependent_pex=venv_dirs, conn=conn, wheels=cursor): - yield dep - - -def dir_dependencies(pex_dir): - # type: (Union[UnzipDir, VenvDirs]) -> Iterator[Union[str, AtomicCacheDir]] - return zipapp_deps(pex_dir) if isinstance(pex_dir, UnzipDir) else venv_deps(pex_dir) + for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash): + with closing( + conn.execute( + """ + SELECT name, install_hash, wheel_hash FROM wheels + JOIN venv_deps ON venv_deps.wheel_install_hash = wheels.install_hash + JOIN venvs ON venvs.short_hash = venv_deps.venv_hash + WHERE venvs.short_hash IN ({chunk}) + """.format( + chunk=placeholders + ), + keys, + ) + ) as cursor: + for name, install_hash, wheel_hash in cursor: + yield InstalledWheelDir.create( + wheel_name=name, install_hash=install_hash, wheel_hash=wheel_hash + ) + + +def dir_dependencies(pex_dirs): + # type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]] + seen = set() + for dep in _zipapp_deps([pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)]): + if dep not in seen: + seen.add(dep) + yield dep + for dep in _venv_deps([venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)]): + if dep not in seen: + seen.add(dep) + yield dep def _delete_zipapps(unzip_dirs): # type: (Sequence[UnzipDir]) -> None - # N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit - # to an even lower chunk size to be safe: https://www.sqlite.org/limits.html - chunk_size = 100 - for index in range(0, len(unzip_dirs), chunk_size): - chunk = tuple(unzip_dir.pex_hash for unzip_dir in unzip_dirs[index : index + chunk_size]) - chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk))) - with _db_connection() as conn: + with _db_connection() as conn: + for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash): conn.execute( - "DELETE FROM zipapps WHERE pex_hash IN ({chunk})".format(chunk=chunk_placeholders), - chunk, + "DELETE FROM zipapps WHERE pex_hash IN ({keys})".format(keys=placeholders), keys ).close() def _delete_venvs(venv_dirs): # type: (Sequence[VenvDirs]) -> None - # N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit - # to an even lower chunk size to be safe: https://www.sqlite.org/limits.html - chunk_size = 100 - for index in range(0, len(venv_dirs), chunk_size): - chunk = tuple(venv_dir.short_hash for venv_dir in venv_dirs[index : index + chunk_size]) - chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk))) - with _db_connection() as conn: + with _db_connection() as conn: + for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash): conn.execute( - "DELETE FROM venvs WHERE short_hash IN ({chunk})".format(chunk=chunk_placeholders), - chunk, + "DELETE FROM venvs WHERE short_hash IN ({keys})".format(keys=placeholders), keys ).close() +@contextmanager def delete(pex_dirs): - # type: (Iterable[Union[UnzipDir, VenvDirs]]) -> None - - # TODO(John Sirois): XXX: Combine dir_dependencies to both delete and return deps to check for - # pruning. + # type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Iterator[AtomicCacheDir]] + yield dir_dependencies(pex_dirs) _delete_zipapps(unzip_dirs=[pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)]) _delete_venvs( venv_dirs=[venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)] ) + + +@contextmanager +def prune(deps): + # type: (Iterable[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]) -> Iterator[Iterator[AtomicCacheDir]] + + # TODO(John Sirois): XXX: yield an iterator over just the subset of items that are safe to + # prune (have no dependents) and then delete that subset from the db. + pass diff --git a/pex/cache/dirs.py b/pex/cache/dirs.py index 993ca335c..8e28fa1b6 100644 --- a/pex/cache/dirs.py +++ b/pex/cache/dirs.py @@ -5,6 +5,7 @@ import glob import os +from typing import Optional from pex.enum import Enum from pex.typing import TYPE_CHECKING, cast @@ -330,12 +331,17 @@ def __init__( self.short_hash = short_hash self.pex_hash = venv_dir.pex_hash self.contents_hash = venv_dir.contents_hash - self.pex_root = venv_dir.pex_root + self._pex_root = venv_dir.pex_root @property def short_dir(self): # type: () -> str - return CacheDir.VENVS.path("s", self.short_hash, pex_root=self.pex_root) + return CacheDir.VENVS.path("s", self.short_hash, pex_root=self._pex_root) + + @property + def symlink(self): + # type: () -> str + return os.path.join(self.short_dir, self.SHORT_SYMLINK_NAME) class InstalledWheelDir(AtomicCacheDir): @@ -343,23 +349,42 @@ class InstalledWheelDir(AtomicCacheDir): def create( cls, wheel_name, # type: str - wheel_hash, # type: str + install_hash, # type: str + wheel_hash=None, # type: Optional[str] pex_root=ENV, # type: Union[str, Variables] ): # type: (...) -> InstalledWheelDir - wheel_dir = CacheDir.INSTALLED_WHEELS.path(wheel_hash, wheel_name, pex_root=pex_root) - return cls(path=wheel_dir, wheel_name=wheel_name, wheel_hash=wheel_hash) + + wheel_dir = CacheDir.INSTALLED_WHEELS.path(install_hash, wheel_name, pex_root=pex_root) + symlink = None # type: Optional[str] + if os.path.islink(wheel_dir): + symlink = wheel_dir + wheel_dir = os.path.realpath(wheel_dir) + wheel_hash_dir, _ = os.path.split(wheel_dir) + wheel_hash = os.path.basename(wheel_hash_dir) + + return cls( + path=wheel_dir, + wheel_name=wheel_name, + install_hash=install_hash, + wheel_hash=wheel_hash, + symlink=symlink, + ) def __init__( self, path, # type: str wheel_name, # type: str - wheel_hash, # type: str + install_hash, # type: str + wheel_hash=None, # type: Optional[str] + symlink=None, # type: Optional[str] ): # type: (...) -> None super(InstalledWheelDir, self).__init__(path) self.wheel_name = wheel_name + self.install_hash = install_hash self.wheel_hash = wheel_hash + self.symlink = symlink class BootstrapDir(AtomicCacheDir): diff --git a/pex/layout.py b/pex/layout.py index 40de89a28..ee44f7ba1 100644 --- a/pex/layout.py +++ b/pex/layout.py @@ -164,7 +164,7 @@ def _install_distribution( location, sha = distribution_info is_wheel_file = pex_info.deps_are_wheel_files spread_dest = InstalledWheelDir.create( - wheel_name=location, wheel_hash=sha, pex_root=pex_info.pex_root + wheel_name=location, install_hash=sha, pex_root=pex_info.pex_root ) dist_relpath = os.path.join(DEPS_DIR, location) source = None if is_wheel_file else layout.dist_strip_prefix(location)