Skip to content

Commit

Permalink
Pruning likely mostly working.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirois committed Oct 7, 2024
1 parent 26cf72b commit 7838aaa
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 68 deletions.
29 changes: 13 additions & 16 deletions pex/cache/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
UserCodeDir,
VenvDirs,
)
from pex.common import CopyMode
from pex.dist_metadata import ProjectNameAndVersion
from pex.typing import TYPE_CHECKING, overload

Expand Down Expand Up @@ -162,26 +161,24 @@ def record_zipapp_install(pex_info):


def record_venv_install(
copy_mode, # type: CopyMode.Value
pex_info, # type: PexInfo
venv_dirs, # type: VenvDirs
):
# type: (...) -> None

if copy_mode is CopyMode.SYMLINK:
with _db_connection() as conn:
conn.executemany(
"""
INSERT OR IGNORE INTO venv_deps (
venv_hash,
wheel_install_hash
) VALUES (?, ?)
""",
tuple(
(venv_dirs.short_hash, wheel_install_hash)
for wheel_install_hash in pex_info.distributions.values()
),
).close()
with _inserted_wheels(pex_info) as cursor:
cursor.executemany(
"""
INSERT OR IGNORE INTO venv_deps (
venv_hash,
wheel_install_hash
) VALUES (?, ?)
""",
tuple(
(venv_dirs.short_hash, wheel_install_hash)
for wheel_install_hash in pex_info.distributions.values()
),
).close()


if TYPE_CHECKING:
Expand Down
182 changes: 140 additions & 42 deletions pex/cli/commands/cache/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@

from pex.cache import access as cache_access
from pex.cache import data as cache_data
from pex.cache.dirs import (
BootstrapDir,
CacheDir,
InstalledWheelDir,
UnzipDir,
UserCodeDir,
VenvDirs,
)
from pex.cache.dirs import AtomicCacheDir, CacheDir
from pex.cli.command import BuildTimeCommand
from pex.cli.commands.cache.bytes import ByteAmount, ByteUnits
from pex.cli.commands.cache.du import DiskUsage
Expand All @@ -33,6 +26,10 @@
if TYPE_CHECKING:
from typing import IO, Dict, Iterable, List, Optional, Tuple, Union

import attr # vendor:skip
else:
from pex.third_party import attr


class HandleAmountAction(Action):
def __init__(self, *args, **kwargs):
Expand All @@ -54,17 +51,25 @@ def __call__(self, parser, namespace, value, option_str=None):
setattr(namespace, self.dest, amount_func)


def parse_older_than(spec):
# type: (str) -> datetime
match = re.match(
r"(?P<amount>\d+)\s+(?P<unit>second|minute|hour|day|week)s?(\s+ago)?",
spec.strip(),
re.IGNORECASE,
)
if match:
return datetime.now() - timedelta(**{match.group("unit") + "s": int(match.group("amount"))})
else:
return datetime.strptime(spec.strip(), "%d/%m/%Y")
@attr.s(frozen=True)
class Cutoff(object):
@classmethod
def parse(cls, spec):
# type: (str) -> Cutoff
match = re.match(
r"(?P<amount>\d+)\s+(?P<unit>second|minute|hour|day|week)s?(\s+ago)?",
spec.strip(),
re.IGNORECASE,
)
if match:
args = {match.group("unit") + "s": int(match.group("amount"))}
cutoff = datetime.now() - timedelta(**args)
else:
cutoff = datetime.strptime(spec.strip(), "%d/%m/%Y")
return cls(spec=spec, cutoff=cutoff)

spec = attr.ib() # type: str
cutoff = attr.ib() # type: datetime


class Cache(OutputMixin, BuildTimeCommand):
Expand Down Expand Up @@ -155,7 +160,7 @@ def _add_prune_arguments(cls, parser):
parser.add_argument(
"--older-than",
dest="cutoff",
type=parse_older_than,
type=Cutoff.parse,
default=datetime.now() - timedelta(weeks=2),
help=(
"Prune zipapp and venv caches last accessed before the specified time. If the "
Expand Down Expand Up @@ -417,7 +422,7 @@ def _purge(self):

disk_usages = [] # type: List[DiskUsage]
for cache_dir, du in iter_map_parallel(
cache_dirs, self._purge_cache_dir, noun="entries", verb="purge", verb_past="purged"
cache_dirs, self._purge_cache_dir, noun="entry", verb="purge", verb_past="purged"
):
print(
"{purged} cache {name} from {rel_path}".format(
Expand All @@ -436,31 +441,124 @@ def _purge(self):

return Ok()

def _prune_atomic_cache_dir(self, atomic_cache_dir):
# type: (AtomicCacheDir) -> DiskUsage
du = DiskUsage.collect(atomic_cache_dir.path)
if not self.options.dry_run:
safe_rmtree(atomic_cache_dir.path)
return du

def _prune(self):
# type: () -> Result

def pex_dirs_and_deps():
# type: () -> Tuple[List[Union[UnzipDir, VenvDirs]], List[Union[BootstrapDir, UserCodeDir, InstalledWheelDir]]]
aged_pex_dirs = list(cache_access.last_access_before(self.options.cutoff))
with cache_data.delete(aged_pex_dirs, self.options.dry_run) as deps_iter:
return aged_pex_dirs, list(deps_iter)

with self.output(self.options) as fp:
if self.options.dry_run:
pex_dirs, deps = pex_dirs_and_deps()
for pex_dir in pex_dirs:
print("Would delete: {path}".format(path=pex_dir.path), file=fp)
for dep in deps:
print("Might delete: {path}".format(path=dep.path), file=fp)
else:
with cache_access.await_delete_lock() as lock_file:
self._log_delete_start(lock_file, out=fp)
if not self.options.dry_run:
try:
with cache_access.await_delete_lock() as lock_file:
self._log_delete_start(lock_file, out=fp)
print(
"Attempting to acquire cache write lock (press CTRL-C to abort) ...",
file=fp,
)
except KeyboardInterrupt:
return Error("No cache entries purged.")
finally:
print(file=fp)

pex_dirs, deps = pex_dirs_and_deps()
for pex_dir in pex_dirs:
print("Should delete: {path}".format(path=pex_dir.path), file=fp)
with cache_data.prune(deps) as prunable_deps:
for dep in prunable_deps:
print("Should delete: {path}".format(path=dep.path), file=fp)
cutoff = self.options.cutoff
pex_dirs = list(cache_access.last_access_before(cutoff.cutoff))
if not pex_dirs:
print(
"There are no cached PEX zipapps or venvs last accessed prior to {cutoff}".format(
cutoff=(
cutoff.spec
if cutoff.spec.endswith("ago") or cutoff.spec[-1].isdigit()
else "{cutoff} ago".format(cutoff=cutoff.spec)
),
),
file=fp,
)
return Ok()

with cache_data.delete(pex_dirs, self.options.dry_run) as deps_iter:
deps = list(deps_iter)

print(
"{pruned} {count} {cached_pex}".format(
pruned="Would have pruned" if self.options.dry_run else "Pruned",
count=len(pex_dirs),
cached_pex=pluralize(pex_dirs, "cached PEX"),
),
file=fp,
)
print(
self._render_usage(
list(
iter_map_parallel(
pex_dirs,
self._prune_atomic_cache_dir,
noun="cached PEX",
verb="prune",
verb_past="pruned",
)
)
),
file=fp,
)
print(file=fp)

if self.options.dry_run:
print(
"Might have pruned up to {count} {cached_pex_dependency}".format(
count=len(deps), cached_pex_dependency=pluralize(deps, "cached PEX dependency")
),
file=fp,
)
print(
self._render_usage(
list(
iter_map_parallel(
deps,
self._prune_atomic_cache_dir,
noun="cached PEX dependency",
verb="prune",
verb_past="pruned",
)
)
)
)
print(file=fp)
else:
with cache_data.prune(deps) as prunable_deps_iter:
disk_usages = list(
iter_map_parallel(
prunable_deps_iter,
self._prune_atomic_cache_dir,
noun="cached PEX dependency",
verb="prune",
verb_past="pruned",
)
)
if not disk_usages:
print(
"No cached PEX dependencies were able to be pruned; all have un-pruned "
"cached PEX dependents.",
file=fp,
)
elif len(deps) == 1:
print("Pruned the 1 cached PEX dependency.", file=fp)
elif len(deps) == len(disk_usages):
print(
"Pruned all {count} cached PEX dependencies.".format(count=len(deps)),
file=fp,
)
else:
print(
"Pruned {count} of {total} cached PEX dependencies.".format(
count=len(disk_usages), total=len(deps)
),
file=fp,
)
print(self._render_usage(disk_usages))
print(file=fp)
return Ok()
7 changes: 5 additions & 2 deletions pex/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,8 +754,11 @@ def iter_map_parallel(

slots = defaultdict(list) # type: DefaultDict[int, List[float]]
with TRACER.timed(
"Using {pool_size} parallel jobs to {verb} {count} items".format(
pool_size=pool_size, verb=verb, count=len(input_items)
"Using {pool_size} parallel jobs to {verb} {count} {inputs}".format(
pool_size=pool_size,
verb=verb,
count=len(input_items),
inputs=pluralize(input_items, noun),
)
):
with _mp_pool(size=pool_size) as pool:
Expand Down
15 changes: 7 additions & 8 deletions pex/pex_bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,13 @@ def ensure_venv(
hermetic_scripts=pex_info.venv_hermetic_scripts,
)

with TRACER.timed(
"Recording venv install of {pex} {hash}".format(
pex=pex.path(), hash=pex_info.pex_hash
)
):
record_venv_install(
copy_mode=copy_mode, pex_info=pex_info, venv_dirs=venv_dirs
)
if copy_mode is CopyMode.SYMLINK:
with TRACER.timed(
"Recording venv install of {pex} {hash}".format(
pex=pex.path(), hash=pex_info.pex_hash
)
):
record_venv_install(pex_info=pex_info, venv_dirs=venv_dirs)

# There are popular Linux distributions with shebang length limits
# (BINPRM_BUF_SIZE in /usr/include/linux/binfmts.h) set at 128 characters, so
Expand Down

0 comments on commit 7838aaa

Please sign in to comment.