Skip to content

Commit

Permalink
Add 2-phase deletion support.
Browse files Browse the repository at this point in the history
Stub out 2-phase prune support.
  • Loading branch information
jsirois committed Oct 6, 2024
1 parent bc39cee commit 2431a86
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 147 deletions.
252 changes: 112 additions & 140 deletions pex/cache/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@
from pex.typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union
from typing import (
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)

from pex.pex_info import PexInfo

Expand Down Expand Up @@ -95,18 +106,13 @@ def _inserted_wheels(pex_info):

wheels = [] # type: List[Dict[str, Optional[str]]]
for wheel_name, install_hash in pex_info.distributions.items():
wheel_hash = None # type: Optional[str]
installed_wheel_dir = InstalledWheelDir.create(wheel_name, install_hash)
if os.path.islink(installed_wheel_dir):
wheel_hash_dir, _ = os.path.split(os.path.realpath(installed_wheel_dir))
wheel_hash = os.path.basename(wheel_hash_dir)

pnav = ProjectNameAndVersion.from_filename(wheel_name)
wheels.append(
dict(
name=wheel_name,
install_hash=install_hash,
wheel_hash=wheel_hash,
wheel_hash=installed_wheel_dir.wheel_hash,
project_name=str(pnav.canonicalized_project_name),
version=str(pnav.canonicalized_version),
)
Expand Down Expand Up @@ -196,169 +202,135 @@ def record_venv(coon_or_cursor):
record_venv(conn).close()


def _iter_wheel_dependents(
dependent_pex, # type: Union[UnzipDir, VenvDirs]
conn, # type: sqlite3.Connection
wheel_install_hashes, # type: Sequence[str]
):
# type: (...) -> Iterator[AtomicCacheDir]

# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 250
for index in range(0, len(wheel_install_hashes), chunk_size):
chunk = wheel_install_hashes[index : index + chunk_size]
chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk)))
with closing(
conn.execute(
"""
SELECT DISTINCT pex_hash FROM zipapp_deps WHERE wheel_install_hash IN ({chunk})
""".format(
chunk=chunk_placeholders
),
chunk,
)
) as cursor:
for (pex_hash,) in cursor:
zipapp = UnzipDir.create(pex_hash)
if zipapp != dependent_pex:
yield zipapp

with closing(
conn.execute(
"""
SELECT DISTINCT venvs.short_hash, venvs.pex_hash, venvs.contents_hash FROM venvs
JOIN venv_deps ON venv_deps.venv_hash = venvs.short_hash
WHERE venv_deps.wheel_install_hash IN ({chunk})
""".format(
chunk=chunk_placeholders
),
chunk,
)
) as cursor:
for short_hash, pex_hash, contents_hash in cursor:
venv = VenvDirs.create(
short_hash=short_hash, pex_hash=pex_hash, contents_hash=contents_hash
)
if venv != dependent_pex:
yield venv
if TYPE_CHECKING:
_I = TypeVar("_I")
_K = TypeVar("_K")


def _iter_wheel_deps(
dependent_pex, # type: Union[UnzipDir, VenvDirs]
conn, # type: sqlite3.Connection
wheels, # type: Iterator[Tuple[str, str, str]]
def _iter_key_chunks(
items, # type: Sequence[_I]
extract_key, # type: Callable[[_I], _K]
):
# type: (...) -> Iterator[Union[str, AtomicCacheDir]]
# type: (...) -> Iterator[Tuple[str, Sequence[_K]]]

wheel_install_hashes = [] # type: List[str]
for wheel_name, wheel_install_hash, wheel_hash in wheels:
wheel_install_hashes.append(wheel_install_hash)
installed_wheel_dir = InstalledWheelDir.create(
wheel_name=wheel_name, wheel_hash=wheel_install_hash
)
if wheel_hash:
yield InstalledWheelDir.create(wheel_name=wheel_name, wheel_hash=wheel_hash)
yield installed_wheel_dir.path
else:
yield installed_wheel_dir

for dependent_dir in _iter_wheel_dependents(
dependent_pex=dependent_pex, conn=conn, wheel_install_hashes=wheel_install_hashes
):
yield dependent_dir
# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 100
for index in range(0, len(items), chunk_size):
keys = tuple(map(extract_key, items[index : index + chunk_size]))
placeholders = ", ".join(itertools.repeat("?", len(keys)))
yield placeholders, keys


def zipapp_deps(unzip_dir):
# type: (UnzipDir) -> Iterator[Union[str, AtomicCacheDir]]
def _zipapp_deps(unzip_dirs):
# type: (Sequence[UnzipDir]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]
with _db_connection() as conn:
with closing(
conn.execute(
"SELECT bootstrap_hash, code_hash FROM zipapps WHERE pex_hash = ?",
[unzip_dir.pex_hash],
)
) as cursor:
bootstrap_hash, code_hash = cursor.fetchone()
yield BootstrapDir.create(bootstrap_hash)
yield UserCodeDir.create(code_hash)
for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash):
with closing(
conn.execute(
"""
SELECT bootstrap_hash, code_hash FROM zipapps WHERE pex_hash IN ({keys})
""".format(
keys=placeholders
),
keys,
)
) as cursor:
bootstrap_hash, code_hash = cursor.fetchone()
yield BootstrapDir.create(bootstrap_hash)
yield UserCodeDir.create(code_hash)

with closing(
conn.execute(
"""
SELECT name, install_hash, wheel_hash FROM wheels
JOIN zipapp_deps ON zipapp_deps.wheel_install_hash = wheels.install_hash
JOIN zipapps ON zipapps.pex_hash = zipapp_deps.pex_hash
WHERE zipapps.pex_hash in ({keys})
""".format(
keys=placeholders
),
keys,
)
) as cursor:
for name, install_hash, wheel_hash in cursor:
yield InstalledWheelDir.create(
wheel_name=name, install_hash=install_hash, wheel_hash=wheel_hash
)

with closing(
conn.execute(
"""
SELECT name, install_hash, wheel_hash FROM wheels
JOIN zipapp_deps ON zipapp_deps.wheel_install_hash = wheels.install_hash
JOIN zipapps ON zipapps.pex_hash = zipapp_deps.pex_hash
WHERE zipapps.pex_hash = ?
""",
[unzip_dir.pex_hash],
)
) as cursor:
for dep in _iter_wheel_deps(dependent_pex=unzip_dir, conn=conn, wheels=cursor):
yield dep

def _venv_deps(venv_dirs):
# type: (Sequence[VenvDirs]) -> Iterator[InstalledWheelDir]

def venv_deps(venv_dirs):
# type: (VenvDirs) -> Iterator[Union[str, AtomicCacheDir]]
with _db_connection() as conn:
with closing(
conn.execute(
"""
SELECT name, install_hash, wheel_hash FROM wheels
JOIN venv_deps ON venv_deps.wheel_install_hash = wheels.install_hash
JOIN venvs ON venvs.short_hash = venv_deps.venv_hash
WHERE venvs.short_hash = ?
""",
[venv_dirs.short_hash],
)
) as cursor:
for dep in _iter_wheel_deps(dependent_pex=venv_dirs, conn=conn, wheels=cursor):
yield dep


def dir_dependencies(pex_dir):
# type: (Union[UnzipDir, VenvDirs]) -> Iterator[Union[str, AtomicCacheDir]]
return zipapp_deps(pex_dir) if isinstance(pex_dir, UnzipDir) else venv_deps(pex_dir)
for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash):
with closing(
conn.execute(
"""
SELECT name, install_hash, wheel_hash FROM wheels
JOIN venv_deps ON venv_deps.wheel_install_hash = wheels.install_hash
JOIN venvs ON venvs.short_hash = venv_deps.venv_hash
WHERE venvs.short_hash IN ({chunk})
""".format(
chunk=placeholders
),
keys,
)
) as cursor:
for name, install_hash, wheel_hash in cursor:
yield InstalledWheelDir.create(
wheel_name=name, install_hash=install_hash, wheel_hash=wheel_hash
)


def dir_dependencies(pex_dirs):
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]
seen = set()
for dep in _zipapp_deps([pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)]):
if dep not in seen:
seen.add(dep)
yield dep
for dep in _venv_deps([venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)]):
if dep not in seen:
seen.add(dep)
yield dep


def _delete_zipapps(unzip_dirs):
# type: (Sequence[UnzipDir]) -> None

# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 100
for index in range(0, len(unzip_dirs), chunk_size):
chunk = tuple(unzip_dir.pex_hash for unzip_dir in unzip_dirs[index : index + chunk_size])
chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk)))
with _db_connection() as conn:
with _db_connection() as conn:
for placeholders, keys in _iter_key_chunks(unzip_dirs, extract_key=lambda u: u.pex_hash):
conn.execute(
"DELETE FROM zipapps WHERE pex_hash IN ({chunk})".format(chunk=chunk_placeholders),
chunk,
"DELETE FROM zipapps WHERE pex_hash IN ({keys})".format(keys=placeholders), keys
).close()


def _delete_venvs(venv_dirs):
# type: (Sequence[VenvDirs]) -> None

# N.B.: Maximum parameter count is 999 in pre-2020 versions of SQLite 3; so we limit
# to an even lower chunk size to be safe: https://www.sqlite.org/limits.html
chunk_size = 100
for index in range(0, len(venv_dirs), chunk_size):
chunk = tuple(venv_dir.short_hash for venv_dir in venv_dirs[index : index + chunk_size])
chunk_placeholders = ", ".join(itertools.repeat("?", len(chunk)))
with _db_connection() as conn:
with _db_connection() as conn:
for placeholders, keys in _iter_key_chunks(venv_dirs, extract_key=lambda v: v.short_hash):
conn.execute(
"DELETE FROM venvs WHERE short_hash IN ({chunk})".format(chunk=chunk_placeholders),
chunk,
"DELETE FROM venvs WHERE short_hash IN ({keys})".format(keys=placeholders), keys
).close()


@contextmanager
def delete(pex_dirs):
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> None

# TODO(John Sirois): XXX: Combine dir_dependencies to both delete and return deps to check for
# pruning.
# type: (Iterable[Union[UnzipDir, VenvDirs]]) -> Iterator[Iterator[AtomicCacheDir]]

yield dir_dependencies(pex_dirs)
_delete_zipapps(unzip_dirs=[pex_dir for pex_dir in pex_dirs if isinstance(pex_dir, UnzipDir)])
_delete_venvs(
venv_dirs=[venv_dirs for venv_dirs in pex_dirs if isinstance(venv_dirs, VenvDirs)]
)


@contextmanager
def prune(deps):
# type: (Iterable[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]) -> Iterator[Iterator[AtomicCacheDir]]

# TODO(John Sirois): XXX: yield an iterator over just the subset of items that are safe to
# prune (have no dependents) and then delete that subset from the db.
pass
37 changes: 31 additions & 6 deletions pex/cache/dirs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import glob
import os
from typing import Optional

from pex.enum import Enum
from pex.typing import TYPE_CHECKING, cast
Expand Down Expand Up @@ -330,36 +331,60 @@ def __init__(
self.short_hash = short_hash
self.pex_hash = venv_dir.pex_hash
self.contents_hash = venv_dir.contents_hash
self.pex_root = venv_dir.pex_root
self._pex_root = venv_dir.pex_root

@property
def short_dir(self):
# type: () -> str
return CacheDir.VENVS.path("s", self.short_hash, pex_root=self.pex_root)
return CacheDir.VENVS.path("s", self.short_hash, pex_root=self._pex_root)

@property
def symlink(self):
# type: () -> str
return os.path.join(self.short_dir, self.SHORT_SYMLINK_NAME)


class InstalledWheelDir(AtomicCacheDir):
@classmethod
def create(
cls,
wheel_name, # type: str
wheel_hash, # type: str
install_hash, # type: str
wheel_hash=None, # type: Optional[str]
pex_root=ENV, # type: Union[str, Variables]
):
# type: (...) -> InstalledWheelDir
wheel_dir = CacheDir.INSTALLED_WHEELS.path(wheel_hash, wheel_name, pex_root=pex_root)
return cls(path=wheel_dir, wheel_name=wheel_name, wheel_hash=wheel_hash)

wheel_dir = CacheDir.INSTALLED_WHEELS.path(install_hash, wheel_name, pex_root=pex_root)
symlink = None # type: Optional[str]
if os.path.islink(wheel_dir):
symlink = wheel_dir
wheel_dir = os.path.realpath(wheel_dir)
wheel_hash_dir, _ = os.path.split(wheel_dir)
wheel_hash = os.path.basename(wheel_hash_dir)

return cls(
path=wheel_dir,
wheel_name=wheel_name,
install_hash=install_hash,
wheel_hash=wheel_hash,
symlink=symlink,
)

def __init__(
self,
path, # type: str
wheel_name, # type: str
wheel_hash, # type: str
install_hash, # type: str
wheel_hash=None, # type: Optional[str]
symlink=None, # type: Optional[str]
):
# type: (...) -> None
super(InstalledWheelDir, self).__init__(path)
self.wheel_name = wheel_name
self.install_hash = install_hash
self.wheel_hash = wheel_hash
self.symlink = symlink


class BootstrapDir(AtomicCacheDir):
Expand Down
2 changes: 1 addition & 1 deletion pex/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _install_distribution(
location, sha = distribution_info
is_wheel_file = pex_info.deps_are_wheel_files
spread_dest = InstalledWheelDir.create(
wheel_name=location, wheel_hash=sha, pex_root=pex_info.pex_root
wheel_name=location, install_hash=sha, pex_root=pex_info.pex_root
)
dist_relpath = os.path.join(DEPS_DIR, location)
source = None if is_wheel_file else layout.dist_strip_prefix(location)
Expand Down

0 comments on commit 2431a86

Please sign in to comment.