From 3b927e3789a23d30622025d4bc93e46f7d743577 Mon Sep 17 00:00:00 2001 From: Maciej Kotowicz Date: Fri, 22 Jul 2022 14:46:35 +0200 Subject: [PATCH] S3 support based on Boto3 instead of minio-py (#190) --- karton/core/backend.py | 84 ++++++++++++++++++++++------------------- karton/core/config.py | 30 +++++++++++++-- karton/core/main.py | 70 +++++++++++++++++----------------- karton/core/resource.py | 20 +++++----- karton/core/test.py | 4 +- karton/system/system.py | 7 +--- requirements.txt | 2 +- setup.cfg | 4 +- tests/test_core.py | 7 ++-- 9 files changed, 127 insertions(+), 101 deletions(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index 4a5ea11a..b511f664 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -3,11 +3,20 @@ import time import warnings from collections import defaultdict, namedtuple -from io import BytesIO -from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union +from typing import ( + Any, + BinaryIO, + Dict, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) -from minio import Minio -from minio.deleteobjects import DeleteError, DeleteObject +import boto3 from redis import AuthenticationError, StrictRedis from redis.client import Pipeline from urllib3.response import HTTPResponse @@ -45,11 +54,11 @@ def __init__(self, config, identity: Optional[str] = None) -> None: self.config = config self.identity = identity self.redis = self.make_redis(config, identity=identity) - self.minio = Minio( - endpoint=config["minio"]["address"], - access_key=config["minio"]["access_key"], - secret_key=config["minio"]["secret_key"], - secure=config.getboolean("minio", "secure", fallback=True), + self.s3 = boto3.client( + "s3", + endpoint_url=config["s3"]["address"], + aws_access_key_id=config["s3"]["access_key"], + aws_secret_access_key=config["s3"]["secret_key"], ) def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis: @@ -85,9 +94,9 @@ def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis: @property def default_bucket_name(self) -> str: - bucket_name = self.config.get("minio", "bucket") + bucket_name = self.config.get("s3", "bucket") if not bucket_name: - raise RuntimeError("MinIO default bucket is not defined in configuration") + raise RuntimeError("S3 default bucket is not defined in configuration") return bucket_name @staticmethod @@ -335,7 +344,7 @@ def delete_task(self, task: Task) -> None: """ self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}") - def delete_tasks(self, tasks: List[Task], chunk_size: int = 1000) -> None: + def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None: """ Remove multiple tasks from Redis @@ -563,20 +572,15 @@ def upload_object( bucket: str, object_uid: str, content: Union[bytes, BinaryIO], - length: int = None, ) -> None: """ - Upload resource object to underlying object storage (Minio) + Upload resource object to underlying object storage (S3) :param bucket: Bucket name :param object_uid: Object identifier :param content: Object content as bytes or file-like stream - :param length: Object content length (if file-like object provided) """ - if isinstance(content, bytes): - length = len(content) - content = BytesIO(content) - self.minio.put_object(bucket, object_uid, content, length) + self.s3.put_object(Bucket=bucket, Key=object_uid, Body=content) def upload_object_from_file(self, bucket: str, object_uid: str, path: str) -> None: """ @@ -586,7 +590,8 @@ def upload_object_from_file(self, bucket: str, object_uid: str, path: str) -> No :param object_uid: Object identifier :param path: Path to the object content """ - self.minio.fput_object(bucket, object_uid, path) + with open(path, "rb") as f: + self.s3.put_object(Bucket=bucket, Key=object_uid, Body=f) def get_object(self, bucket: str, object_uid: str) -> HTTPResponse: """ @@ -600,7 +605,7 @@ def get_object(self, bucket: str, object_uid: str) -> HTTPResponse: :param object_uid: Object identifier :return: Response object with content """ - return self.minio.get_object(bucket, object_uid) + return self.s3.get_object(Bucket=bucket, Key=object_uid)["Body"] def download_object(self, bucket: str, object_uid: str) -> bytes: """ @@ -610,12 +615,9 @@ def download_object(self, bucket: str, object_uid: str) -> bytes: :param object_uid: Object identifier :return: Content bytes """ - reader = self.minio.get_object(bucket, object_uid) - try: - return reader.read() - finally: - reader.release_conn() - reader.close() + with self.s3.get_object(Bucket=bucket, Key=object_uid)["Body"] as f: + ret = f.read() + return ret def download_object_to_file(self, bucket: str, object_uid: str, path: str) -> None: """ @@ -625,7 +627,7 @@ def download_object_to_file(self, bucket: str, object_uid: str, path: str) -> No :param object_uid: Object identifier :param path: Target file path """ - self.minio.fget_object(bucket, object_uid, path) + self.s3.download_file(Bucket=bucket, Key=object_uid, Filename=path) def list_objects(self, bucket: str) -> List[str]: """ @@ -634,7 +636,10 @@ def list_objects(self, bucket: str) -> List[str]: :param bucket: Bucket name :return: List of object identifiers """ - return [object.object_name for object in self.minio.list_objects(bucket)] + return [ + object["Key"] + for object in self.s3.list_objects(Bucket=bucket).get("Contents", []) + ] def remove_object(self, bucket: str, object_uid: str) -> None: """ @@ -643,19 +648,17 @@ def remove_object(self, bucket: str, object_uid: str) -> None: :param bucket: Bucket name :param object_uid: Object identifier """ - self.minio.remove_object(bucket, object_uid) + self.s3.delete_object(Bucket=bucket, Key=object_uid) - def remove_objects( - self, bucket: str, object_uids: List[str] - ) -> Iterator[DeleteError]: + def remove_objects(self, bucket: str, object_uids: Iterable[str]) -> None: """ Bulk remove resource objects from object storage :param bucket: Bucket name :param object_uids: Object identifiers """ - delete_objects = [DeleteObject(uid) for uid in object_uids] - yield from self.minio.remove_objects(bucket, delete_objects) + delete_objects = [{"Key": uid} for uid in object_uids] + self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects}) def check_bucket_exists(self, bucket: str, create: bool = False) -> bool: """ @@ -665,10 +668,15 @@ def check_bucket_exists(self, bucket: str, create: bool = False) -> bool: :param create: Create bucket if doesn't exist :return: True if bucket exists yet """ - if self.minio.bucket_exists(bucket): + try: + self.s3.head_bucket(Bucket=bucket) return True - if create: - self.minio.make_bucket(bucket) + except self.s3.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + if create: + self.s3.create_bucket(Bucket=bucket) + else: + raise e return False def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None: diff --git a/karton/core/config.py b/karton/core/config.py index 237151d3..84cd31f0 100644 --- a/karton/core/config.py +++ b/karton/core/config.py @@ -1,6 +1,7 @@ import configparser import os import re +import warnings from typing import Any, Dict, List, Optional, cast, overload @@ -22,7 +23,7 @@ class Config(object): Environment variables have higher precedence than those loaded from files. :param path: Path to additional configuration file - :param check_sections: Check if sections ``redis`` and ``minio`` are defined + :param check_sections: Check if sections ``redis`` and ``s3`` are defined in the configuration """ @@ -46,11 +47,34 @@ def __init__( self._load_from_env() if check_sections: - if not self.has_section("minio"): - raise RuntimeError("Missing MinIO configuration") + if self.has_section("minio") and not self.has_section("s3"): + self._map_minio_to_s3() + if not self.has_section("s3"): + raise RuntimeError("Missing S3 configuration") if not self.has_section("redis"): raise RuntimeError("Missing Redis configuration") + def _map_minio_to_s3(self): + """ + Configuration backwards compatibility. Before 5.x.x [minio] section was used. + """ + warnings.warn( + "[minio] section in configuration is deprecated, replace it with [s3]" + ) + self._config["s3"] = dict(self._config["minio"]) + if not ( + self._config["s3"]["address"].startswith("http://") + or self._config["s3"]["address"].startswith("https://") + ): + if self.getboolean("minio", "secure", True): + self._config["s3"]["address"] = ( + "https://" + self._config["s3"]["address"] + ) + else: + self._config["s3"]["address"] = ( + "http://" + self._config["s3"]["address"] + ) + def set(self, section_name: str, option_name: str, value: Any) -> None: """ Sets value in configuration diff --git a/karton/core/main.py b/karton/core/main.py index 3ac6e17b..bfa69519 100644 --- a/karton/core/main.py +++ b/karton/core/main.py @@ -4,7 +4,7 @@ from configparser import ConfigParser from typing import Any, Dict, List -from minio import Minio +import boto3 from redis import StrictRedis from .__version__ import __version__ @@ -35,41 +35,40 @@ def get_user_option(prompt: str, default: str) -> str: def configuration_wizard(config_filename: str) -> None: config = ConfigParser() - log.info("Configuring MinIO") - minio_access_key = "minioadmin" - minio_secret_key = "minioadmin" - minio_address = "localhost:9000" - minio_bucket = "karton" - minio_secure = "0" + log.info("Configuring s3") + s3_access_key = "minioadmin" + s3_secret_key = "minioadmin" + s3_address = "http://localhost:9000" + s3_bucket = "karton" while True: - minio_access_key = get_user_option( - "Enter the MinIO access key", default=minio_access_key + s3_access_key = get_user_option( + "Enter the S3 access key", default=s3_access_key ) - minio_secret_key = get_user_option( - "Enter the MinIO secret key", default=minio_secret_key + s3_secret_key = get_user_option( + "Enter the S3 secret key", default=s3_secret_key ) - minio_address = get_user_option( - "Enter the MinIO address", default=minio_address - ) - minio_bucket = get_user_option( - "Enter the MinIO bucket to use", default=minio_bucket - ) - minio_secure = get_user_option('Use SSL ("0", "1")?', default=minio_secure) - - log.info("Testing MinIO connection...") - minio = Minio( - endpoint=minio_address, - access_key=minio_access_key, - secret_key=minio_secret_key, - secure=bool(int(minio_secure)), + s3_address = get_user_option("Enter the S3 address", default=s3_address) + s3_bucket = get_user_option("Enter the S3 bucket to use", default=s3_bucket) + + log.info("Testing S3 connection...") + s3_client = boto3.client( + "s3", + endpoint_url=s3_address, + aws_access_key_id=s3_access_key, + aws_secret_access_key=s3_secret_key, ) bucket_exists = False try: - bucket_exists = minio.bucket_exists(minio_bucket) + bucket_exists = bool(s3_client.head_bucket(Bucket=s3_bucket)) + + except s3_client.exceptions.ClientError as e: + if e.response["Error"]["Code"] != "404": + raise e + except Exception as e: - log.info("Error while connecting to MinIO: %s", e, exc_info=True) + log.info("Error while connecting to S3: %s", e, exc_info=True) retry = get_user_option( - 'Do you want to try with different MinIO settings ("yes", "no")?', + 'Do you want to try with different S3 settings ("yes", "no")?', default="yes", ) if retry != "yes": @@ -78,23 +77,22 @@ def configuration_wizard(config_filename: str) -> None: else: continue - log.info("Connected to MinIO successfully") + log.info("Connected to S3 successfully") if not bucket_exists: log.info( ( "The required bucket %s does not exist. To create it automatically," " start karton-system with --setup-bucket flag" ), - minio_bucket, + s3_bucket, ) break - config["minio"] = { - "access_key": minio_access_key, - "secret_key": minio_secret_key, - "address": minio_address, - "bucket": minio_bucket, - "secure": minio_secure, + config["s3"] = { + "access_key": s3_access_key, + "secret_key": s3_secret_key, + "address": s3_address, + "bucket": s3_bucket, } log.info("Configuring Redis") diff --git a/karton/core/resource.py b/karton/core/resource.py index e2a6b529..171319b6 100644 --- a/karton/core/resource.py +++ b/karton/core/resource.py @@ -32,10 +32,10 @@ class ResourceBase(object): :param name: Name of the resource (e.g. name of file) :param content: Resource content :param path: Path of file with resource content - :param bucket: Alternative MinIO bucket for resource + :param bucket: Alternative S3 bucket for resource :param metadata: Resource metadata :param sha256: Resource sha256 hash - :param _uid: Alternative MinIO resource id + :param _uid: Alternative S3 resource id :param _fd: File descriptor :param _flag: Resource flags """ @@ -154,7 +154,7 @@ class LocalResource(ResourceBase): """ Represents local resource with arbitrary binary data e.g. file contents. - Local resources will be uploaded to object hub (MinIO) during + Local resources will be uploaded to object hub (S3) during task dispatching. .. code-block:: python @@ -169,9 +169,9 @@ class LocalResource(ResourceBase): :param name: Name of the resource (e.g. name of file) :param content: Resource content :param path: Path of file with resource content - :param bucket: Alternative MinIO bucket for resource + :param bucket: Alternative S3 bucket for resource :param metadata: Resource metadata - :param uid: Alternative MinIO resource id + :param uid: Alternative S3 resource id :param sha256: Resource sha256 hash :param _fd: File descriptor :param _flag: Resource flags @@ -244,9 +244,9 @@ def from_directory( :param compression: Compression level (default is zipfile.ZIP_DEFLATED) :param in_memory: Don't create temporary file and make in-memory zip file \ (default: False) - :param bucket: Alternative MinIO bucket for resource + :param bucket: Alternative S3 bucket for resource :param metadata: Resource metadata - :param uid: Alternative MinIO resource id + :param uid: Alternative S3 resource id :return: :class:`LocalResource` instance with zipped contents """ out_stream = BytesIO() if in_memory else tempfile.NamedTemporaryFile() @@ -329,15 +329,15 @@ def upload(self, backend: "KartonBackend") -> None: class RemoteResource(ResourceBase): """ Keeps reference to remote resource object shared between subsystems - via object storage (MinIO) + via object storage (S3) Should never be instantiated directly by subsystem, but can be directly passed to outgoing payload. :param name: Name of the resource (e.g. name of file) - :param bucket: Alternative MinIO bucket for resource + :param bucket: Alternative S3 bucket for resource :param metadata: Resource metadata - :param uid: Alternative MinIO resource id + :param uid: Alternative S3 resource id :param size: Resource size :param backend: :py:meth:`KartonBackend` to bind to this resource :param sha256: Resource sha256 hash diff --git a/karton/core/test.py b/karton/core/test.py index e0801cee..fd435fde 100644 --- a/karton/core/test.py +++ b/karton/core/test.py @@ -21,13 +21,13 @@ class ConfigMock(Config): def __init__(self): - self._config = {"redis": {}, "minio": {}} + self._config = {"redis": {}, "s3": {}} class BackendMock: def __init__(self) -> None: self.produced_tasks: List[Task] = [] - # A custom MinIO system mock + # A custom S3 system mock self.buckets: Dict[str, Dict[str, bytes]] = defaultdict(dict) @property diff --git a/karton/system/system.py b/karton/system/system.py index 9cbe206e..23a68c66 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -64,10 +64,7 @@ def gc_collect_resources(self) -> None: resources_to_remove.remove(resource.uid) # Remove unreferenced resources if resources_to_remove: - for err in self.backend.remove_objects( - karton_bucket, list(resources_to_remove) - ): - self.log.error(err) + self.backend.remove_objects(karton_bucket, resources_to_remove) def gc_collect_abandoned_queues(self): online_consumers = self.backend.get_online_consumers() @@ -266,7 +263,7 @@ def loop(self) -> None: def args_parser(cls) -> argparse.ArgumentParser: parser = super().args_parser() parser.add_argument( - "--setup-bucket", action="store_true", help="Create missing bucket in MinIO" + "--setup-bucket", action="store_true", help="Create missing bucket in S3" ) # store_false defaults to True, we intentionally want None there parser.add_argument( diff --git a/requirements.txt b/requirements.txt index 1a9357d4..d9cda45c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ redis -minio>=7.0.0 +boto3 diff --git a/setup.cfg b/setup.cfg index c9091f8b..5b23ec51 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,10 +3,10 @@ max-line-length = 88 extend-ignore = E203, W503 [mypy] explicit_package_bases = True -[mypy-minio.*] -ignore_missing_imports = True [mypy-urllib3.*] ignore_missing_imports = True [mypy-redis.*] ignore_missing_imports = True +[mypy-boto3.*] +ignore_missing_imports = True diff --git a/tests/test_core.py b/tests/test_core.py index 58b4c7c6..c560e290 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,7 +4,7 @@ from karton.core import Config, Task MOCK_CONFIG = """ -[minio] +[s3] access_key=xd [redis] @@ -15,8 +15,7 @@ @patch('karton.core.Config', autospec=True) class TestConfig(unittest.TestCase): MOCK_ENV = { - "KARTON_MINIO_ACCESS_KEY": "xd", - "KARTON_MINIO_SECURE": "1", + "KARTON_S3_ACCESS_KEY": "xd", "KARTON_REDIS_HOST": "testhost", "KARTON_REDIS_PORT": "2137", @@ -30,7 +29,7 @@ def test_env_override(self, mock_parser): cfg = Config() assert cfg["redis"]["host"] == self.MOCK_ENV["KARTON_REDIS_HOST"] assert cfg["redis"]["port"] == self.MOCK_ENV["KARTON_REDIS_PORT"] - assert cfg["minio"]["access_key"] == self.MOCK_ENV["KARTON_MINIO_ACCESS_KEY"] + assert cfg["s3"]["access_key"] == self.MOCK_ENV["KARTON_S3_ACCESS_KEY"] assert cfg["foo"]["bar"] == self.MOCK_ENV["KARTON_FOO_BAR"] assert cfg["foo"]["baz"] == self.MOCK_ENV["KARTON_FOO_BAZ"]