From 07b8a66a94b38deafeff7827a4bdf6c0886f1c99 Mon Sep 17 00:00:00 2001 From: Artem Burashnikov Date: Sat, 11 Nov 2023 23:14:34 +0300 Subject: [PATCH] refactor: whole project and cli --- depinspect/__main__.py | 284 ---------------------------------------- depinspect/database.py | 88 +++++++++++++ depinspect/extract.py | 27 ---- depinspect/fetch.py | 39 ------ depinspect/main.py | 187 ++++++++++++++++++++++++++ depinspect/printer.py | 86 ++++-------- depinspect/processor.py | 12 +- depinspect/sqlite_db.py | 140 -------------------- pyproject.toml | 2 +- tests/test_extract.py | 4 - 10 files changed, 311 insertions(+), 558 deletions(-) delete mode 100644 depinspect/__main__.py create mode 100644 depinspect/database.py create mode 100644 depinspect/main.py delete mode 100644 depinspect/sqlite_db.py diff --git a/depinspect/__main__.py b/depinspect/__main__.py deleted file mode 100644 index 6431136..0000000 --- a/depinspect/__main__.py +++ /dev/null @@ -1,284 +0,0 @@ -import logging -from pathlib import Path -from shutil import rmtree -from typing import Tuple - -import click - -from depinspect import sqlite_db -from depinspect.constants import DB_NAME, DISTRIBUTIONS, ROOT_DIR, SOURCES_FILE_PATH -from depinspect.extract import process_archives -from depinspect.fetch import fetch_and_save_metadata -from depinspect.helper import ( - create_temp_dir, - is_valid_architecture_name, - is_valid_distribution, - is_valid_package_name, -) -from depinspect.printer import print_result -from depinspect.processor import run_metadata_processing - -# Set up logging configuration -logging.basicConfig( - level=logging.INFO, - format="- %(levelname)s - %(asctime)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) - - -@click.command(context_settings={"ignore_unknown_options": True}) -@click.option( - "--cmp", - nargs=4, - type=(str, str, str, str), - help="Provide a distribution, architecture and another distribution and architecture. \ - Show all pacakges that have divergent dependencies betweet given distribtuion and architectures. ", -) -@click.option( - "-p1", - "--package1", - nargs=3, - type=(str, str, str), - help="Provide the first distribution, \ - architecture and package name separated by whitespaces. \ - Order of arguments matters. Example: --package1 i386 ubuntu apt", -) -@click.option( - "-p2", - "--package2", - nargs=3, - type=(str, str, str), - help="Provide the second distribution, \ - architecture and package name separated by whitespaces. \ - Order of arguments matters. Example: --package2 ubuntu amd64 grub-common", -) -@click.option( - "-l", - "--list", - default=False, - is_flag=True, - is_eager=True, - help="List all available distributions, architectures and package names.", -) -@click.option( - "-u", - "--update", - default=False, - is_flag=True, - is_eager=True, - help="Forcefully re-initialize database. \ - This removes old database, fetches all defined metadata \ - and stores it in a new database.", -) -@click.pass_context -def main( - ctx: click.Context, - cmp: Tuple[str, str, str, str], - package1: Tuple[str, str, str], - package2: Tuple[str, str, str], - update: bool, - list: bool, -) -> None: - """ - Main function for the depinspect command-line tool. - - Parameters: - - ctx (click.Context): Click context object. - - package1 (Tuple[str, str, str]): Tuple of distribution, architecture, \ - and package name for the first package. - - package2 (Tuple[str, str, str]): Tuple of distribution, architecture, \ - and package name for the second package. - - update (bool): Flag indicating whether to forcefully re-initialize the database. - - list (bool): Flag indicating whether to list all available \ - distributions, architectures, and package names. - - Returns: - - None - """ - - def initialize_data(config_path: Path, db_name: str, output_path: Path) -> None: - """ - Initialize data by fetching archives, extracting them, and \ - processing metadata into the database. - - Parameters: - - config_path (Path): Path to the sources configuration file. - - db_name (str): Name of the SQLite database. - - output_path (Path): Output path for temporary and database files. - - Returns: - - None - """ - tmp_dir = create_temp_dir(dir_prefix=".tmp", output_path=output_path) - db_path = sqlite_db.db_new(db_name=db_name, output_path=output_path) - - try: - logging.info("Fetching archives from pre-defined URL sources.") - fetch_and_save_metadata(config_path, tmp_dir) - - logging.info("Extracting archives.") - process_archives(tmp_dir) - - logging.info("Processing metadata into database.") - for distribution in DISTRIBUTIONS: - run_metadata_processing(tmp_dir, db_path, distribution) - - except Exception: - logging.exception( - "There was an exception trying to pull data into database." - ) - logging.error("Removing database, if exists, as it might be corrupted.") - if db_path.is_file() and db_path.suffix == ".db": - sqlite_db.db_remove(db_path) - - finally: - logging.info("Cleaning up.") - rmtree(tmp_dir) - logging.info("Done.") - - def validate_cl_arguments( - cl_argument1: Tuple[str, str, str], cl_argument2: Tuple[str, str, str] - ) -> Tuple[Tuple[str, str, str], Tuple[str, str, str]]: - """ - Validate command-line arguments for packages. - - Parameters: - - cl_argument1 (Tuple[str, str, str]): Tuple of distribution, architecture, \ - and package name for the first package. - - cl_argument2 (Tuple[str, str, str]): Tuple of distribution, architecture, \ - and package name for the second package. - - Returns: - - Tuple[Tuple[str, str, str], Tuple[str, str, str]]: \ - Validated tuples for both packages. - """ - ditribution1, architecture1, package_name1 = cl_argument1 - - if not is_valid_distribution(ditribution1.lower()): - raise click.BadOptionUsage( - ditribution1, - f"List of currently supported distributions: {DISTRIBUTIONS}. \ - Your input was: {ditribution1}", - ) - - if not is_valid_architecture_name(architecture1.lower()): - raise click.BadOptionUsage( - architecture1, - f"Archicetrure1 should be one of the strings provided by a \ - '$ dpkg-architecture -L' command. Your input: {architecture1}", - ) - - if not is_valid_package_name(package_name1.lower()): - raise click.BadOptionUsage( - package_name1, - f"Name of the package1 should match correct syntax. \ - Your input: {package_name1}", - ) - - distribution2, architecture2, package_name2 = cl_argument2 - - if not is_valid_distribution(distribution2.lower()): - raise click.BadOptionUsage( - distribution2, - f"List of currently supported distributions: {DISTRIBUTIONS}. \ - Your input was: {distribution2}", - ) - - if not is_valid_architecture_name(architecture2.lower()): - raise click.BadOptionUsage( - architecture2, - f"Archicetrure2 should be one of the strings provided by a \ - '$ dpkg-architecture -L' command. Your input: {architecture2}", - ) - - if not is_valid_package_name(package_name2.lower()): - raise click.BadOptionUsage( - package_name2, - f"Name of the package2 should match correct syntax. \ - Your input: {package_name2}", - ) - - return ( - (ditribution1.lower(), architecture1.lower(), package_name1.lower()), - (distribution2.lower(), architecture2.lower(), package_name2.lower()), - ) - - def get_cl_arguments() -> Tuple[Tuple[str, str, str], Tuple[str, str, str]]: - """ - Get validated command-line arguments for packages. - - Returns: - - Tuple[Tuple[str, str, str], Tuple[str, str, str]]: \ - Validated tuples for both packages. - """ - return validate_cl_arguments(package1, package2) - - def ensure_db_exists(db_path: Path) -> None: - """ - Ensure that the database file exists. If not, initialize data. - - Parameters: - - db_path (Path): Path to the database file. - - Returns: - - None - """ - if db_path.is_file() and db_path.suffix == ".db": - return - else: - initialize_data( - config_path=SOURCES_FILE_PATH, db_name=DB_NAME, output_path=ROOT_DIR - ) - - if update and list: - logging.error("--update and --list can't be passed simultaneously.") - ctx.exit(1) - - if update: - initialize_data( - config_path=SOURCES_FILE_PATH, db_name=DB_NAME, output_path=ROOT_DIR - ) - logging.info("Update complete.") - ctx.exit(0) - - db_path = ROOT_DIR / DB_NAME - - if list: - ensure_db_exists(db_path) - sqlite_db.db_list_all(db_path) - ctx.exit(0) - - if package1 and package2: - validated_input1, validated_input2 = get_cl_arguments() - - ensure_db_exists(db_path) - - result1 = sqlite_db.db_list_dependencies( - db_path=db_path, - distribution=validated_input1[0], - package_architecture=validated_input1[1], - package_name=validated_input1[2], - ) - - result2 = sqlite_db.db_list_dependencies( - db_path=db_path, - distribution=validated_input2[0], - package_architecture=validated_input2[1], - package_name=validated_input2[2], - ) - - print_result(validated_input1, result1, validated_input2, result2) - - else: - logging.error( - "Incorrect number of arguments. \ - Make sure to specifiy --package1 and --package2." - ) - click.echo(ctx.get_help()) - ctx.exit(1) - - ctx.exit(0) - - -if __name__ == "__main__": - main() diff --git a/depinspect/database.py b/depinspect/database.py new file mode 100644 index 0000000..e542d2e --- /dev/null +++ b/depinspect/database.py @@ -0,0 +1,88 @@ +import logging +import sqlite3 +from pathlib import Path +from sys import exit +from typing import Dict, List, Tuple + +from depinspect import files + + +def remove(db_path: Path) -> None: + file_extension = db_path.suffix + if file_extension == ".db": + logging.info("Removing database.") + files.remove_file(db_path) + else: + logging.error(f"File specified at {db_path} is not an sqlite3 database.") + exit(1) + + +def new(db_name: str, output_path: Path) -> Path: + db_path = output_path / Path(db_name) + + if db_path.is_file() and db_path.suffix == ".db": + logging.warning(f"sqlite3 database already exists at: {db_path}.") + try: + remove(db_path) + except Exception: + logging.exception( + "There was an exception trying to remove existing database." + ) + + logging.info("Creating and initializing new database.") + connection = sqlite3.connect(db_path) + + connection.execute( + "CREATE TABLE IF NOT EXISTS Packages " + "(id INTEGER PRIMARY KEY AUTOINCREMENT, " + "distribution TEXT, architecture TEXT, package_name TEXT, version TEXT, " + "UNIQUE(distribution, architecture, package_name, version))" + ) + + connection.execute( + "CREATE TABLE IF NOT EXISTS Dependencies " + "(package_id INTEGER, dependency_name TEXT, FOREIGN KEY (package_id) " + "REFERENCES Packages(id))" + ) + connection.close() + logging.info("Successfully initialized new database.") + return db_path + + +def find_dependencies( + db_path: Path, distribution: str, package_architecture: str, package_name: str +) -> List[Tuple[str]]: + db = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + result = db.execute( + "SELECT dependency_name " + "FROM Dependencies " + "JOIN Packages ON Dependencies.package_id = Packages.id " + "WHERE Packages.distribution = ? " + "AND Packages.package_name = ? " + "AND Packages.architecture = ?", + (distribution, package_name, package_architecture), + ).fetchall() + db.close() + return result + + +def find_all_distinct(db_path: Path) -> Dict[str, List[str]]: + db = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + + result: Dict[str, List[str]] = { + "distributions": [], + "architectures": [], + "package_names": [], + } + + with db: + for distribution in db.execute("SELECT DISTINCT distribution FROM Packages"): + result["distributions"].append(distribution[0]) + for architecture in db.execute("SELECT DISTINCT architecture FROM Packages"): + result["architectures"].append(architecture[0]) + for package_name in db.execute("SELECT DISTINCT package_name FROM Packages"): + result["package_names"].append(package_name[0]) + + db.close() + + return result diff --git a/depinspect/extract.py b/depinspect/extract.py index e3ad251..bc5c657 100644 --- a/depinspect/extract.py +++ b/depinspect/extract.py @@ -6,20 +6,6 @@ def extract_xz_archive(archive_path: Path, output_path: Path) -> None: - """ - Extract data from an XZ compressed archive. - - Parameters: - - archive_path (Path): Path to the XZ compressed archive file. - - output_path (Path): Path to the output file where the extracted data will be saved. - - Returns: - - None - - Note: - This function reads the content of an XZ compressed archive file and extracts its data. - It then writes the extracted data to the specified output file. - """ with open(archive_path, "rb") as archive_file: with lzma.open(archive_file, "rb") as xz_file: extracted_data = xz_file.read() @@ -29,19 +15,6 @@ def extract_xz_archive(archive_path: Path, output_path: Path) -> None: def process_archives(archives_dir: Path) -> None: - """ - Process XZ compressed archive files in a directory. - - Parameters: - - archives_dir (Path): Path to the directory containing XZ compressed archive files. - - Returns: - - None - - Note: - This function iterates through XZ compressed archive files in the specified directory, - extracts their data using `extract_xz_archive`, and saves the extracted data as text files. - """ archives_files = list_files_in_directory(archives_dir) try: for archive_path in archives_files: diff --git a/depinspect/fetch.py b/depinspect/fetch.py index b115ba4..20b39a2 100644 --- a/depinspect/fetch.py +++ b/depinspect/fetch.py @@ -4,57 +4,18 @@ def read_config(config_path: Path) -> configparser.ConfigParser: - """ - Read and parse configuration settings from a file. - - Parameters: - - config_path (Path): Path to the configuration file. - - Returns: - - configparser.ConfigParser: ConfigParser object containing the parsed configuration settings. - - Note: - This function reads and parses configuration settings from the specified file using ConfigParser. - """ config = configparser.ConfigParser() config.read(config_path) return config def pull_target_from_URL(target_url: str, local_target_path: Path) -> None: - """ - Download a file from a URL and save it locally. - - Parameters: - - target_url (str): URL of the file to be downloaded. - - local_target_path (Path): Local path where the downloaded file will be saved. - - Returns: - - None - - Note: - This function downloads a file from the specified URL and saves it to the local target path. - """ with request.urlopen(request.Request(target_url), timeout=15.0) as response: if response.status == 200: request.urlretrieve(target_url, local_target_path) def fetch_and_save_metadata(config_path: Path, output_directory: Path) -> None: - """ - Fetch metadata from URLs specified in a configuration file and save locally. - - Parameters: - - config_path (Path): Path to the configuration file containing metadata URLs. - - output_directory (Path): Local directory where the downloaded metadata files will be saved. - - Returns: - - None - - Note: - This function reads metadata sources from the specified configuration file, - fetches the metadata from URLs, and saves the downloaded files to the output directory. - """ metadata_sources = read_config(config_path) for section in metadata_sources.sections(): diff --git a/depinspect/main.py b/depinspect/main.py new file mode 100644 index 0000000..9341998 --- /dev/null +++ b/depinspect/main.py @@ -0,0 +1,187 @@ +import logging +from pathlib import Path +from shutil import rmtree +from typing import Any, Tuple + +import click + +from depinspect import database, printer +from depinspect.constants import DB_NAME, DISTRIBUTIONS, ROOT_DIR, SOURCES_FILE_PATH +from depinspect.extract import process_archives +from depinspect.fetch import fetch_and_save_metadata +from depinspect.helper import ( + create_temp_dir, + is_valid_architecture_name, + is_valid_distribution, + is_valid_package_name, +) +from depinspect.processor import run_metadata_processing + +logging.basicConfig( + level=logging.INFO, + format="- %(levelname)s - %(asctime)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + + +def run_initialization(config_path: Path, db_name: str, output_path: Path) -> None: + tmp_dir = create_temp_dir(dir_prefix=".tmp", output_path=output_path) + db_path = database.new(db_name=db_name, output_path=output_path) + + try: + logging.info("Fetching archives from pre-defined URL sources.") + fetch_and_save_metadata(config_path, tmp_dir) + + logging.info("Extracting archives.") + process_archives(tmp_dir) + + logging.info("Processing metadata into database.") + for distribution in DISTRIBUTIONS: + run_metadata_processing(tmp_dir, db_path, distribution) + + except Exception: + logging.exception("There was an exception trying to pull data into database.") + if db_path.is_file(): + logging.error("Removing database, if exists, as it might be corrupted.") + database.remove(db_path) + + finally: + logging.info("Cleaning up.") + rmtree(tmp_dir) + logging.info("Done.") + + +def validate_packages_info( + ctx: click.Context, + param: click.Parameter, + value: Tuple[Tuple[str, str, str], Tuple[str, str, str]], +) -> Tuple[Tuple[str, str, str], Tuple[str, str, str]]: + if len(value) != 2: + raise click.BadArgumentUsage( + "diff requires two packages to be provided\n" + "Incorrect number of diff arguments", + ctx=ctx, + ) + + for package_info in value: + if len(package_info) != 3: + raise click.BadArgumentUsage( + "Distribution, architecture and name are required\n", ctx=ctx + ) + else: + distribution, architecture, package_name = package_info + + if not is_valid_distribution(distribution.lower()): + raise click.BadOptionUsage( + distribution, + f"List of currently supported distributions: {DISTRIBUTIONS}. " + f"Your input was: {distribution}", + ) + + if not is_valid_architecture_name(architecture.lower()): + raise click.BadOptionUsage( + architecture, + f"Archicetrure should be one of the strings provided by a " + f"'$ dpkg-architecture -L' command. Your input: {architecture}", + ) + + if not is_valid_package_name(package_name.lower()): + raise click.BadOptionUsage( + package_name, + f"Name of the package should match correct syntax. " + f"Your input: {package_name}", + ) + + return value + + +def ensure_db_exists(db_path: Path) -> None: + if db_path.is_file() and db_path.suffix == ".db": + return + else: + run_initialization( + config_path=SOURCES_FILE_PATH, db_name=DB_NAME, output_path=ROOT_DIR + ) + + +@click.group() +def depinspect() -> None: + pass + + +@depinspect.command( + help=( + "List all available distributions, architectures and packages." + "This implicitly initializez a new database." + ) +) +@click.pass_context +def list(ctx: click.Context) -> None: + db_path = ROOT_DIR / DB_NAME + + ensure_db_exists(db_path) + + result = database.find_all_distinct(db_path) + printer.print_all(result) + + ctx.exit(0) + + +@depinspect.command( + help=( + "Forcefully re-initialize database." + "This removes old database, fetches all defined metadata" + "and stores it in a new database." + ) +) +@click.pass_context +def update(ctx: click.Context) -> None: + run_initialization( + config_path=SOURCES_FILE_PATH, db_name=DB_NAME, output_path=ROOT_DIR + ) + logging.info("Update complete.") + ctx.exit(0) + + +@depinspect.command( + help=( + "Find a difference and similarities in dependencies of two packages" + "from different distributions and architectures." + ), +) +@click.option( + "-p", + "--package", + multiple=True, + type=(str, str, str), + callback=validate_packages_info, + help=( + "Provide distribution, architecture and package name" + " separated by whitespaces." + " Order of arguments matters.\n\n" + "Example: --package ubuntu i386 apt" + ), +) +@click.pass_context +def diff(ctx: click.Context, package: Tuple[Any, ...]) -> None: + db_path = ROOT_DIR / DB_NAME + + ensure_db_exists(db_path) + + result1 = database.find_dependencies( + db_path=db_path, + distribution=package[0][0], + package_architecture=package[0][1], + package_name=package[0][2], + ) + + result2 = database.find_dependencies( + db_path=db_path, + distribution=package[1][0], + package_architecture=package[1][1], + package_name=package[1][2], + ) + + printer.print_result(package[0], result1, package[1], result2) + + ctx.exit(0) diff --git a/depinspect/printer.py b/depinspect/printer.py index 01896d9..ffa5d46 100644 --- a/depinspect/printer.py +++ b/depinspect/printer.py @@ -1,23 +1,11 @@ -from typing import List, Tuple +from typing import Dict, List, Tuple from click import echo -def print_one(package: Tuple[str, str, str], result: List[Tuple[str]]) -> None: - """ - Print information for a single package, including its dependencies. - - Parameters: - - package (Tuple[str, str, str]): A tuple containing distribution, architecture, and package name. - - result (List[Tuple[str]]): A list containing tuples with dependency information. - - Returns: - - None - - Note: - This function takes a package tuple and its dependencies and prints the information in a formatted manner. - It calculates the maximum character length for formatting and prints the header, divider, and dependencies. - """ +def print_result_for_one( + package: Tuple[str, str, str], result: List[Tuple[str]] +) -> None: distribution, architecture, package_name = package dependencies = sorted(result[0][0].split(",")) header = f"{distribution} - {architecture} - {package_name}" @@ -28,34 +16,20 @@ def print_one(package: Tuple[str, str, str], result: List[Tuple[str]]) -> None: divider = "=" * max_char_length + echo("\n", nl=False) echo(header) echo(divider) for dependency in dependencies: echo(f"{dependency}") + echo("\n", nl=False) -def print_both( +def print_result_for_both( package1: Tuple[str, str, str], result1: List[Tuple[str]], package2: Tuple[str, str, str], result2: List[Tuple[str]], ) -> None: - """ - Print information for two packages, including their dependencies and exclusions. - - Parameters: - - package1 (Tuple[str, str, str]): A tuple containing distribution, architecture, and package name for the first package. - - result1 (List[Tuple[str]]): A list containing tuples with dependency information for the first package. - - package2 (Tuple[str, str, str]): A tuple containing distribution, architecture, and package name for the second package. - - result2 (List[Tuple[str]]): A list containing tuples with dependency information for the second package. - - Returns: - - None - - Note: - This function takes information for two packages and their dependencies, and prints a comparison of their dependencies. - It shows dependencies present in both, as well as exclusions for each package. - """ distribution1, architecture1, package_name1 = package1 dependencies1 = sorted(result1[0][0].split(",")) header1 = f"{distribution1} - {architecture1} - {package_name1}" @@ -85,34 +59,32 @@ def print_both( match_max_length, diff_max_length1, diff_max_length2, - len("THESE DEPENDENCIES ARE PRESENT IN BOTH"), + len("These dependencies are present in both"), ) divider = "=" * max_length echo("\n", nl=False) - echo("THESE DEPENDENCIES ARE PRESENT IN BOTH") + echo("These dependencies are present in both:") echo(header1) echo(header2) echo(divider) for match in matches: echo(match) - # echo(divider) echo("\n", nl=False) - echo("THESE ARE ONLY EXCLUSIVE TO") + echo("These dependencies are exclusive to:") echo(header1) echo(divider) for difference in differences1: echo(difference) - # echo(divider) echo("\n", nl=False) - echo("THESE ARE ONLY EXCLUSIVE TO") + echo("These dependencies are exclusive to:") echo(header2) echo(divider) for difference in differences2: - echo(f"|_ {difference}") + echo(f"{difference}") echo("\n", nl=False) @@ -122,32 +94,26 @@ def print_result( input2: Tuple[str, str, str], result_from_input2: List[Tuple[str]], ) -> None: - """ - Print the result of comparing dependencies for two input packages. - - Parameters: - - input1 (Tuple[str, str, str]): A tuple containing distribution, architecture, and package name for the first input package. - - result_from_input1 (List[Tuple[str]]): A list containing tuples with dependency information for the first input package. - - input2 (Tuple[str, str, str]): A tuple containing distribution, architecture, and package name for the second input package. - - result_from_input2 (List[Tuple[str]]): A list containing tuples with dependency information for the second input package. - - Returns: - - None - - Note: - This function prints the result of comparing dependencies for two input packages. - It checks if there are records found for each input and calls the appropriate printing function. - """ if not result_from_input1 and not result_from_input2: echo( - f"No records were found. Printing input...\n{input1[0]} - {input1[1]} - {input1[2]}\n{input2[0]} - {input2[1]} - {input2[2]}" + f"No records were found. Printing input...\n" + f"{input1[0]} - {input1[1]} - {input1[2]}\n" + f"{input2[0]} - {input2[1]} - {input2[2]}" ) if not result_from_input1 and result_from_input2: - print_one(input2, result_from_input2) + print_result_for_one(input2, result_from_input2) if result_from_input1 and not result_from_input2: - print_one(input1, result_from_input1) + print_result_for_one(input1, result_from_input1) if result_from_input1 and result_from_input2: - print_both(input1, result_from_input1, input2, result_from_input2) + print_result_for_both(input1, result_from_input1, input2, result_from_input2) + + +def print_all(data: Dict[str, List[str]]) -> None: + for section in data.keys(): + echo(f"{section.upper()}:") + for value in sorted(data[section]): + print(value) + echo("\n", nl=False) diff --git a/depinspect/processor.py b/depinspect/processor.py index d94a399..9d2fdf8 100644 --- a/depinspect/processor.py +++ b/depinspect/processor.py @@ -28,17 +28,23 @@ def process_metadata_into_db(file_path: Path, db_path: Path, distribution: str) for package in packages: maybe_result = db_connection.execute( - "SELECT distribution, architecture, package_name FROM Packages WHERE distribution = ? AND architecture = ? AND package_name = ?", + "SELECT distribution, architecture, package_name " + "FROM Packages " + "WHERE distribution = ? " + "AND architecture = ? " + "AND package_name = ?", (package.distribution, package.architecture, package.package), ).fetchall() if not maybe_result: result = db_connection.execute( - "INSERT OR ABORT INTO Packages (distribution, architecture, package_name) VALUES (?, ?, ?)", + "INSERT OR ABORT INTO Packages " + "(distribution, architecture, package_name) VALUES (?, ?, ?)", (package.distribution, package.architecture, package.package), ) db_connection.execute( - "INSERT INTO Dependencies (package_id, dependency_name) VALUES (?, ?)", + "INSERT INTO Dependencies " + "(package_id, dependency_name) VALUES (?, ?)", (result.lastrowid, ",".join(package.depends)), ) else: diff --git a/depinspect/sqlite_db.py b/depinspect/sqlite_db.py deleted file mode 100644 index 3d694e0..0000000 --- a/depinspect/sqlite_db.py +++ /dev/null @@ -1,140 +0,0 @@ -import logging -import sqlite3 -from pathlib import Path -from sys import exit -from typing import List, Tuple - -from click import echo - -from depinspect import files - - -def db_remove(db_path: Path) -> None: - """ - Remove an SQLite3 database file. - - Parameters: - - db_path (Path): The path to the SQLite3 database file. - - Returns: - - None - - Raises: - - SystemExit: If the file specified is not an SQLite3 database (with a '.db' extension). - - Note: - This function logs information and errors using the 'logging' module. - """ - file_extension = db_path.suffix - if file_extension == ".db": - logging.info("Removing database.") - files.remove_file(db_path) - else: - logging.error(f"File specified at {db_path} is not an sqlite3 database.") - exit(1) - - -def db_new(db_name: str, output_path: Path) -> Path: - """ - Create and initialize a new SQLite3 database. - - Parameters: - - db_name (str): The name of the new database. - - output_path (Path): The directory where the new database will be created. - - Returns: - - Path: The path to the newly created database. - - Raises: - - Exception: If there is an issue removing an existing database. - """ - db_path = output_path / Path(db_name) - - if db_path.is_file() and db_path.suffix == ".db": - logging.warning(f"sqlite3 database already exists at: {db_path}.") - try: - db_remove(db_path) - except Exception: - logging.exception( - "There was an exception trying to remove existing database." - ) - - logging.info("Creating and initializing new database.") - connection = sqlite3.connect(db_path) - - connection.execute( - "CREATE TABLE IF NOT EXISTS Packages (id INTEGER PRIMARY KEY AUTOINCREMENT, distribution TEXT, architecture TEXT, package_name TEXT, version TEXT, UNIQUE(distribution, architecture, package_name, version))" - ) - - connection.execute( - "CREATE TABLE IF NOT EXISTS Dependencies (package_id INTEGER, dependency_name TEXT, FOREIGN KEY (package_id) REFERENCES Packages(id))" - ) - connection.close() - logging.info("Successfully initialized new database.") - return db_path - - -def db_list_dependencies( - db_path: Path, distribution: str, package_architecture: str, package_name: str -) -> List[Tuple[str]]: - """ - List dependencies for a specific package in an SQLite3 database. - - Parameters: - - db_path (Path): The path to the SQLite3 database. - - distribution (str): The distribution of the package. - - package_architecture (str): The architecture of the package. - - package_name (str): The name of the package. - - Returns: - - List[Tuple[str]]: A list of tuples containing dependency names. - - Note: - This function opens a read-only connection to the database, retrieves dependencies - for the specified package, and returns the result as a list of tuples. - """ - db = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) - result = db.execute( - "SELECT dependency_name \ - FROM Dependencies \ - JOIN Packages ON Dependencies.package_id = Packages.id \ - WHERE Packages.distribution = ? AND Packages.package_name = ? AND Packages.architecture = ?", - (distribution, package_name, package_architecture), - ).fetchall() - db.close() - return result - - -def db_list_all(db_path: Path) -> None: - """ - List all unique distributions, architectures, and package names in an SQLite3 database. - - Parameters: - - db_path (Path): The path to the SQLite3 database. - - Returns: - - None - - Note: - This function opens a read-only connection to the database, retrieves distinct distributions, - architectures, and package names, and prints the results. - """ - db = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) - - with db: - echo("Distributions:") - echo("========================================") - for distribution in db.execute("SELECT DISTINCT distribution FROM Packages"): - echo(distribution[0]) - echo("\n", nl=False) - echo("Architectures:") - echo("========================================") - for architecture in db.execute("SELECT DISTINCT architecture FROM Packages"): - echo(architecture[0]) - echo("\n", nl=False) - echo("Packages:") - echo("========================================") - for package_name in db.execute("SELECT DISTINCT package_name FROM Packages"): - echo(package_name[0]) - - db.close() diff --git a/pyproject.toml b/pyproject.toml index 1bf01d1..a7882e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ ruff = "^0.1.1" target-version = "py310" [tool.poetry.scripts] -depinspect = "depinspect.__main__:main" +depinspect = "depinspect.main:depinspect" [build-system] requires = ["poetry-core"] diff --git a/tests/test_extract.py b/tests/test_extract.py index 7c9d715..cca51b0 100644 --- a/tests/test_extract.py +++ b/tests/test_extract.py @@ -5,10 +5,6 @@ from depinspect.extract import extract_xz_archive -""" -pytest automatically recognizes tmp_path as a fixture and provides the necessary functionality -""" - def test_extract_xz_archive(tmp_path: Path) -> None: # Create a test .xz archive file with XZ-compressed content