Skip to content

Commit

Permalink
S3 support based on Boto3 instead of minio-py (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
mak authored Jul 22, 2022
1 parent 6df87ea commit 3b927e3
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 101 deletions.
84 changes: 46 additions & 38 deletions karton/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import time
import warnings
from collections import defaultdict, namedtuple
from io import BytesIO
from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import (
Any,
BinaryIO,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
Union,
)

from minio import Minio
from minio.deleteobjects import DeleteError, DeleteObject
import boto3
from redis import AuthenticationError, StrictRedis
from redis.client import Pipeline
from urllib3.response import HTTPResponse
Expand Down Expand Up @@ -45,11 +54,11 @@ def __init__(self, config, identity: Optional[str] = None) -> None:
self.config = config
self.identity = identity
self.redis = self.make_redis(config, identity=identity)
self.minio = Minio(
endpoint=config["minio"]["address"],
access_key=config["minio"]["access_key"],
secret_key=config["minio"]["secret_key"],
secure=config.getboolean("minio", "secure", fallback=True),
self.s3 = boto3.client(
"s3",
endpoint_url=config["s3"]["address"],
aws_access_key_id=config["s3"]["access_key"],
aws_secret_access_key=config["s3"]["secret_key"],
)

def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis:
Expand Down Expand Up @@ -85,9 +94,9 @@ def make_redis(self, config, identity: Optional[str] = None) -> StrictRedis:

@property
def default_bucket_name(self) -> str:
bucket_name = self.config.get("minio", "bucket")
bucket_name = self.config.get("s3", "bucket")
if not bucket_name:
raise RuntimeError("MinIO default bucket is not defined in configuration")
raise RuntimeError("S3 default bucket is not defined in configuration")
return bucket_name

@staticmethod
Expand Down Expand Up @@ -335,7 +344,7 @@ def delete_task(self, task: Task) -> None:
"""
self.redis.delete(f"{KARTON_TASK_NAMESPACE}:{task.uid}")

def delete_tasks(self, tasks: List[Task], chunk_size: int = 1000) -> None:
def delete_tasks(self, tasks: Iterable[Task], chunk_size: int = 1000) -> None:
"""
Remove multiple tasks from Redis
Expand Down Expand Up @@ -563,20 +572,15 @@ def upload_object(
bucket: str,
object_uid: str,
content: Union[bytes, BinaryIO],
length: int = None,
) -> None:
"""
Upload resource object to underlying object storage (Minio)
Upload resource object to underlying object storage (S3)
:param bucket: Bucket name
:param object_uid: Object identifier
:param content: Object content as bytes or file-like stream
:param length: Object content length (if file-like object provided)
"""
if isinstance(content, bytes):
length = len(content)
content = BytesIO(content)
self.minio.put_object(bucket, object_uid, content, length)
self.s3.put_object(Bucket=bucket, Key=object_uid, Body=content)

def upload_object_from_file(self, bucket: str, object_uid: str, path: str) -> None:
"""
Expand All @@ -586,7 +590,8 @@ def upload_object_from_file(self, bucket: str, object_uid: str, path: str) -> No
:param object_uid: Object identifier
:param path: Path to the object content
"""
self.minio.fput_object(bucket, object_uid, path)
with open(path, "rb") as f:
self.s3.put_object(Bucket=bucket, Key=object_uid, Body=f)

def get_object(self, bucket: str, object_uid: str) -> HTTPResponse:
"""
Expand All @@ -600,7 +605,7 @@ def get_object(self, bucket: str, object_uid: str) -> HTTPResponse:
:param object_uid: Object identifier
:return: Response object with content
"""
return self.minio.get_object(bucket, object_uid)
return self.s3.get_object(Bucket=bucket, Key=object_uid)["Body"]

def download_object(self, bucket: str, object_uid: str) -> bytes:
"""
Expand All @@ -610,12 +615,9 @@ def download_object(self, bucket: str, object_uid: str) -> bytes:
:param object_uid: Object identifier
:return: Content bytes
"""
reader = self.minio.get_object(bucket, object_uid)
try:
return reader.read()
finally:
reader.release_conn()
reader.close()
with self.s3.get_object(Bucket=bucket, Key=object_uid)["Body"] as f:
ret = f.read()
return ret

def download_object_to_file(self, bucket: str, object_uid: str, path: str) -> None:
"""
Expand All @@ -625,7 +627,7 @@ def download_object_to_file(self, bucket: str, object_uid: str, path: str) -> No
:param object_uid: Object identifier
:param path: Target file path
"""
self.minio.fget_object(bucket, object_uid, path)
self.s3.download_file(Bucket=bucket, Key=object_uid, Filename=path)

def list_objects(self, bucket: str) -> List[str]:
"""
Expand All @@ -634,7 +636,10 @@ def list_objects(self, bucket: str) -> List[str]:
:param bucket: Bucket name
:return: List of object identifiers
"""
return [object.object_name for object in self.minio.list_objects(bucket)]
return [
object["Key"]
for object in self.s3.list_objects(Bucket=bucket).get("Contents", [])
]

def remove_object(self, bucket: str, object_uid: str) -> None:
"""
Expand All @@ -643,19 +648,17 @@ def remove_object(self, bucket: str, object_uid: str) -> None:
:param bucket: Bucket name
:param object_uid: Object identifier
"""
self.minio.remove_object(bucket, object_uid)
self.s3.delete_object(Bucket=bucket, Key=object_uid)

def remove_objects(
self, bucket: str, object_uids: List[str]
) -> Iterator[DeleteError]:
def remove_objects(self, bucket: str, object_uids: Iterable[str]) -> None:
"""
Bulk remove resource objects from object storage
:param bucket: Bucket name
:param object_uids: Object identifiers
"""
delete_objects = [DeleteObject(uid) for uid in object_uids]
yield from self.minio.remove_objects(bucket, delete_objects)
delete_objects = [{"Key": uid} for uid in object_uids]
self.s3.delete_objects(Bucket=bucket, Delete={"Objects": delete_objects})

def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:
"""
Expand All @@ -665,10 +668,15 @@ def check_bucket_exists(self, bucket: str, create: bool = False) -> bool:
:param create: Create bucket if doesn't exist
:return: True if bucket exists yet
"""
if self.minio.bucket_exists(bucket):
try:
self.s3.head_bucket(Bucket=bucket)
return True
if create:
self.minio.make_bucket(bucket)
except self.s3.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "404":
if create:
self.s3.create_bucket(Bucket=bucket)
else:
raise e
return False

def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None:
Expand Down
30 changes: 27 additions & 3 deletions karton/core/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import configparser
import os
import re
import warnings
from typing import Any, Dict, List, Optional, cast, overload


Expand All @@ -22,7 +23,7 @@ class Config(object):
Environment variables have higher precedence than those loaded from files.
:param path: Path to additional configuration file
:param check_sections: Check if sections ``redis`` and ``minio`` are defined
:param check_sections: Check if sections ``redis`` and ``s3`` are defined
in the configuration
"""

Expand All @@ -46,11 +47,34 @@ def __init__(
self._load_from_env()

if check_sections:
if not self.has_section("minio"):
raise RuntimeError("Missing MinIO configuration")
if self.has_section("minio") and not self.has_section("s3"):
self._map_minio_to_s3()
if not self.has_section("s3"):
raise RuntimeError("Missing S3 configuration")
if not self.has_section("redis"):
raise RuntimeError("Missing Redis configuration")

def _map_minio_to_s3(self):
"""
Configuration backwards compatibility. Before 5.x.x [minio] section was used.
"""
warnings.warn(
"[minio] section in configuration is deprecated, replace it with [s3]"
)
self._config["s3"] = dict(self._config["minio"])
if not (
self._config["s3"]["address"].startswith("http://")
or self._config["s3"]["address"].startswith("https://")
):
if self.getboolean("minio", "secure", True):
self._config["s3"]["address"] = (
"https://" + self._config["s3"]["address"]
)
else:
self._config["s3"]["address"] = (
"http://" + self._config["s3"]["address"]
)

def set(self, section_name: str, option_name: str, value: Any) -> None:
"""
Sets value in configuration
Expand Down
70 changes: 34 additions & 36 deletions karton/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from configparser import ConfigParser
from typing import Any, Dict, List

from minio import Minio
import boto3
from redis import StrictRedis

from .__version__ import __version__
Expand Down Expand Up @@ -35,41 +35,40 @@ def get_user_option(prompt: str, default: str) -> str:
def configuration_wizard(config_filename: str) -> None:
config = ConfigParser()

log.info("Configuring MinIO")
minio_access_key = "minioadmin"
minio_secret_key = "minioadmin"
minio_address = "localhost:9000"
minio_bucket = "karton"
minio_secure = "0"
log.info("Configuring s3")
s3_access_key = "minioadmin"
s3_secret_key = "minioadmin"
s3_address = "http://localhost:9000"
s3_bucket = "karton"
while True:
minio_access_key = get_user_option(
"Enter the MinIO access key", default=minio_access_key
s3_access_key = get_user_option(
"Enter the S3 access key", default=s3_access_key
)
minio_secret_key = get_user_option(
"Enter the MinIO secret key", default=minio_secret_key
s3_secret_key = get_user_option(
"Enter the S3 secret key", default=s3_secret_key
)
minio_address = get_user_option(
"Enter the MinIO address", default=minio_address
)
minio_bucket = get_user_option(
"Enter the MinIO bucket to use", default=minio_bucket
)
minio_secure = get_user_option('Use SSL ("0", "1")?', default=minio_secure)

log.info("Testing MinIO connection...")
minio = Minio(
endpoint=minio_address,
access_key=minio_access_key,
secret_key=minio_secret_key,
secure=bool(int(minio_secure)),
s3_address = get_user_option("Enter the S3 address", default=s3_address)
s3_bucket = get_user_option("Enter the S3 bucket to use", default=s3_bucket)

log.info("Testing S3 connection...")
s3_client = boto3.client(
"s3",
endpoint_url=s3_address,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
)
bucket_exists = False
try:
bucket_exists = minio.bucket_exists(minio_bucket)
bucket_exists = bool(s3_client.head_bucket(Bucket=s3_bucket))

except s3_client.exceptions.ClientError as e:
if e.response["Error"]["Code"] != "404":
raise e

except Exception as e:
log.info("Error while connecting to MinIO: %s", e, exc_info=True)
log.info("Error while connecting to S3: %s", e, exc_info=True)
retry = get_user_option(
'Do you want to try with different MinIO settings ("yes", "no")?',
'Do you want to try with different S3 settings ("yes", "no")?',
default="yes",
)
if retry != "yes":
Expand All @@ -78,23 +77,22 @@ def configuration_wizard(config_filename: str) -> None:
else:
continue

log.info("Connected to MinIO successfully")
log.info("Connected to S3 successfully")
if not bucket_exists:
log.info(
(
"The required bucket %s does not exist. To create it automatically,"
" start karton-system with --setup-bucket flag"
),
minio_bucket,
s3_bucket,
)
break

config["minio"] = {
"access_key": minio_access_key,
"secret_key": minio_secret_key,
"address": minio_address,
"bucket": minio_bucket,
"secure": minio_secure,
config["s3"] = {
"access_key": s3_access_key,
"secret_key": s3_secret_key,
"address": s3_address,
"bucket": s3_bucket,
}

log.info("Configuring Redis")
Expand Down
Loading

0 comments on commit 3b927e3

Please sign in to comment.