diff --git a/setup.cfg b/setup.cfg index 31dd91245..9f7d8bbac 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.1 aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7 aleph-client==0.4.6 - aleph-message==0.2.1 + aleph-message==0.2.2 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c coincurve==15.0.1 configmanager==1.35.1 diff --git a/src/aleph/ccn_cli/commands/repair.py b/src/aleph/ccn_cli/commands/repair.py index fa244a488..bfd515f10 100644 --- a/src/aleph/ccn_cli/commands/repair.py +++ b/src/aleph/ccn_cli/commands/repair.py @@ -3,52 +3,133 @@ """ import asyncio import itertools -from typing import Dict, FrozenSet +from typing import Dict, FrozenSet, Set, Tuple from typing import cast import typer +from aleph_message.models import ItemHash from configmanager import Config import aleph.model +import aleph.services.p2p.singleton as singleton from aleph.ccn_cli.cli_config import CliConfig from aleph.config import get_defaults +from aleph.exceptions import ContentCurrentlyUnavailable from aleph.model import init_db_globals +from aleph.storage import get_hash_content from .toolkit.local_storage import list_expected_local_files +from ...services.p2p import http repair_ns = typer.Typer() - -def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None: - typer.echo("The following files will be preserved:") - for file_type, files in files_to_preserve.items(): - typer.echo(f"* {len(files)} {file_type}") +async def init_api_servers(): + peers = [peer async for peer in aleph.model.db["peers"].find({"type": "HTTP"})] + singleton.api_servers = [peer["address"] for peer in peers] async def list_missing_files() -> FrozenSet[str]: - # Get a set of all the files currently in GridFS - gridfs_files_dict = { - file["filename"]: file - async for file in aleph.model.db["fs.files"].find( - projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1}, - batch_size=1000, - ) - } + if aleph.model.db is None: # for mypy + raise ValueError("DB not initialized as expected.") - gridfs_files = frozenset(gridfs_files_dict.keys()) - typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.") + # Get a set of all the files currently in GridFS + gridfs_files = frozenset( + [ + file["filename"] + async for file in aleph.model.db["fs.files"].find( + projection={"_id": 0, "filename": 1}, + batch_size=1000, + ) + ] + ) + + typer.echo(f"Found {len(gridfs_files)} files in local storage.") expected_local_files_dict = await list_expected_local_files() - expected_local_files = frozenset(itertools.chain.from_iterable(expected_local_files_dict.values())) + expected_local_files = frozenset( + itertools.chain.from_iterable(expected_local_files_dict.values()) + ) missing_files = expected_local_files - gridfs_files return missing_files +async def fetch_and_store_file(filename: str): + item_hash = ItemHash(filename) + _ = await get_hash_content( + content_hash=filename, + engine=item_hash.item_type, + use_network=True, + use_ipfs=True, + store_value=True, + timeout=15, + ) + + +def process_results( + finished_tasks: Set[asyncio.Task], task_dict: Dict[asyncio.Task, str] +) -> Tuple[Set[str], Set[str]]: + fetched_files = set() + failed_files = set() + + for task in finished_tasks: + filename = task_dict.pop(task) + exception = task.exception() + + if exception is None: + fetched_files.add(filename) + + else: + failed_files.add(filename) + if isinstance(exception, ContentCurrentlyUnavailable): + typer.echo( + f"WARNING: Could not fetch {filename}: currently unavailable." + ) + else: + typer.echo( + f"ERROR: Could not fetch {filename}: unexpected error: {exception}" + ) + + return fetched_files, failed_files + + +async def fetch_files(missing_files: FrozenSet[str], batch_size: int): + tasks = set() + task_dict = {} + + fetched_files = set() + failed_files = set() + + for i, filename in enumerate(missing_files, start=1): + typer.echo(f"Fetching {filename} ({i}/{len(missing_files)})...") + fetch_task = asyncio.create_task(fetch_and_store_file(filename)) + tasks.add(fetch_task) + task_dict[fetch_task] = filename + + if len(tasks) == batch_size: + done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + fetched, failed = process_results(done, task_dict) + fetched_files |= fetched + failed_files |= failed + + # Finish + if tasks: + done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) + fetched, failed = process_results(done, task_dict) + fetched_files |= fetched + failed_files |= failed + + typer.echo(f"Successfully fetched {len(fetched_files)} files.") + if failed_files: + typer.echo(f"WARNING: Failed to fetch {len(failed_files)} files.") + + async def fetch_missing_files(): missing_files = await list_missing_files() typer.echo(f"Found {len(missing_files)} missing files.") + await fetch_files(missing_files, 2000) + async def run(ctx: typer.Context): config = Config(schema=get_defaults()) @@ -56,11 +137,17 @@ async def run(ctx: typer.Context): config.yaml.load(str(cli_config.config_file_path)) init_db_globals(config=config) + # To be able to fetch data from the network + await init_api_servers() if aleph.model.db is None: # for mypy raise ValueError("DB not initialized as expected.") await fetch_missing_files() + # Clean up aiohttp client sessions to avoid a warning + for client_session in http.SESSIONS.values(): + await client_session.close() + typer.echo("Done.") diff --git a/src/aleph/ccn_cli/commands/toolkit/local_storage.py b/src/aleph/ccn_cli/commands/toolkit/local_storage.py index 6de4c33da..d0f5cab1b 100644 --- a/src/aleph/ccn_cli/commands/toolkit/local_storage.py +++ b/src/aleph/ccn_cli/commands/toolkit/local_storage.py @@ -1,8 +1,10 @@ from typing import Any, Dict, FrozenSet, List, Optional -from aleph.model.messages import Message -from aleph.model.filepin import PermanentPin + from aleph_message.models import MessageType +from aleph.model.filepin import PermanentPin +from aleph.model.messages import Message + async def get_hashes( item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None diff --git a/src/aleph/ccn_cli/main.py b/src/aleph/ccn_cli/main.py index fb12c0f2c..308c25773 100644 --- a/src/aleph/ccn_cli/main.py +++ b/src/aleph/ccn_cli/main.py @@ -7,6 +7,7 @@ from .commands.garbage_collector import gc_ns from .commands.keys import keys_ns from .commands.migrations import migrations_ns +from .commands.repair import repair_ns app = typer.Typer() @@ -68,6 +69,11 @@ def main( app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.") app.add_typer(keys_ns, name="keys", help="Operations on private keys.") app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.") +app.add_typer( + repair_ns, + name="repair", + help="Performs checks on the local install and fixes issues like missing files.", +) if __name__ == "__main__":