Skip to content

Commit

Permalink
Feature: CLI command to repair local storage
Browse files Browse the repository at this point in the history
Problem: the local storage of a CCN may get incoherent or corrupted
because of issues like downtime or maintenance gone wrong.

Solution: a new CLI command, 'repair'. This command checks the DB
to determine the files that should be stored on the node and
fetches them from the network.
  • Loading branch information
odesenfans committed Oct 4, 2022
1 parent dd702df commit 7cf0893
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 20 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ install_requires =
aiohttp==3.8.1
aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7
aleph-client==0.4.6
aleph-message==0.2.1
aleph-message==0.2.2
aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c
coincurve==15.0.1
configmanager==1.35.1
Expand Down
121 changes: 104 additions & 17 deletions src/aleph/ccn_cli/commands/repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,151 @@
"""
import asyncio
import itertools
from typing import Dict, FrozenSet
from typing import Dict, FrozenSet, Set, Tuple
from typing import cast

import typer
from aleph_message.models import ItemHash
from configmanager import Config

import aleph.model
import aleph.services.p2p.singleton as singleton
from aleph.ccn_cli.cli_config import CliConfig
from aleph.config import get_defaults
from aleph.exceptions import ContentCurrentlyUnavailable
from aleph.model import init_db_globals
from aleph.storage import get_hash_content
from .toolkit.local_storage import list_expected_local_files
from ...services.p2p import http

repair_ns = typer.Typer()



def print_files_to_preserve(files_to_preserve: Dict[str, FrozenSet[str]]) -> None:
typer.echo("The following files will be preserved:")
for file_type, files in files_to_preserve.items():
typer.echo(f"* {len(files)} {file_type}")
async def init_api_servers():
peers = [peer async for peer in aleph.model.db["peers"].find({"type": "HTTP"})]
singleton.api_servers = [peer["address"] for peer in peers]


async def list_missing_files() -> FrozenSet[str]:
# Get a set of all the files currently in GridFS
gridfs_files_dict = {
file["filename"]: file
async for file in aleph.model.db["fs.files"].find(
projection={"_id": 0, "filename": 1, "length": 1, "uploadDate": 1},
batch_size=1000,
)
}
if aleph.model.db is None: # for mypy
raise ValueError("DB not initialized as expected.")

gridfs_files = frozenset(gridfs_files_dict.keys())
typer.echo(f"Found {len(gridfs_files_dict)} files in local storage.")
# Get a set of all the files currently in GridFS
gridfs_files = frozenset(
[
file["filename"]
async for file in aleph.model.db["fs.files"].find(
projection={"_id": 0, "filename": 1},
batch_size=1000,
)
]
)

typer.echo(f"Found {len(gridfs_files)} files in local storage.")

expected_local_files_dict = await list_expected_local_files()
expected_local_files = frozenset(itertools.chain.from_iterable(expected_local_files_dict.values()))
expected_local_files = frozenset(
itertools.chain.from_iterable(expected_local_files_dict.values())
)

missing_files = expected_local_files - gridfs_files
return missing_files


async def fetch_and_store_file(filename: str):
item_hash = ItemHash(filename)
_ = await get_hash_content(
content_hash=filename,
engine=item_hash.item_type,
use_network=True,
use_ipfs=True,
store_value=True,
timeout=15,
)


def process_results(
finished_tasks: Set[asyncio.Task], task_dict: Dict[asyncio.Task, str]
) -> Tuple[Set[str], Set[str]]:
fetched_files = set()
failed_files = set()

for task in finished_tasks:
filename = task_dict.pop(task)
exception = task.exception()

if exception is None:
fetched_files.add(filename)

else:
failed_files.add(filename)
if isinstance(exception, ContentCurrentlyUnavailable):
typer.echo(
f"WARNING: Could not fetch {filename}: currently unavailable."
)
else:
typer.echo(
f"ERROR: Could not fetch {filename}: unexpected error: {exception}"
)

return fetched_files, failed_files


async def fetch_files(missing_files: FrozenSet[str], batch_size: int):
tasks = set()
task_dict = {}

fetched_files = set()
failed_files = set()

for i, filename in enumerate(missing_files, start=1):
typer.echo(f"Fetching {filename} ({i}/{len(missing_files)})...")
fetch_task = asyncio.create_task(fetch_and_store_file(filename))
tasks.add(fetch_task)
task_dict[fetch_task] = filename

if len(tasks) == batch_size:
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
fetched, failed = process_results(done, task_dict)
fetched_files |= fetched
failed_files |= failed

# Finish
if tasks:
done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
fetched, failed = process_results(done, task_dict)
fetched_files |= fetched
failed_files |= failed

typer.echo(f"Successfully fetched {len(fetched_files)} files.")
if failed_files:
typer.echo(f"WARNING: Failed to fetch {len(failed_files)} files.")


async def fetch_missing_files():
missing_files = await list_missing_files()
typer.echo(f"Found {len(missing_files)} missing files.")

await fetch_files(missing_files, 2000)


async def run(ctx: typer.Context):
config = Config(schema=get_defaults())
cli_config = cast(CliConfig, ctx.obj)
config.yaml.load(str(cli_config.config_file_path))

init_db_globals(config=config)
# To be able to fetch data from the network
await init_api_servers()
if aleph.model.db is None: # for mypy
raise ValueError("DB not initialized as expected.")

await fetch_missing_files()

# Clean up aiohttp client sessions to avoid a warning
for client_session in http.SESSIONS.values():
await client_session.close()

typer.echo("Done.")


Expand Down
6 changes: 4 additions & 2 deletions src/aleph/ccn_cli/commands/toolkit/local_storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any, Dict, FrozenSet, List, Optional
from aleph.model.messages import Message
from aleph.model.filepin import PermanentPin

from aleph_message.models import MessageType

from aleph.model.filepin import PermanentPin
from aleph.model.messages import Message


async def get_hashes(
item_type_field: str, item_hash_field: str, msg_type: Optional[MessageType] = None
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/ccn_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .commands.garbage_collector import gc_ns
from .commands.keys import keys_ns
from .commands.migrations import migrations_ns
from .commands.repair import repair_ns

app = typer.Typer()

Expand Down Expand Up @@ -68,6 +69,11 @@ def main(
app.add_typer(gc_ns, name="gc", help="Invoke the garbage collector.")
app.add_typer(keys_ns, name="keys", help="Operations on private keys.")
app.add_typer(migrations_ns, name="migrations", help="Run DB migrations.")
app.add_typer(
repair_ns,
name="repair",
help="Performs checks on the local install and fixes issues like missing files.",
)


if __name__ == "__main__":
Expand Down

0 comments on commit 7cf0893

Please sign in to comment.