Skip to content

Commit

Permalink
Use 'strtobool' instead of comparing with a string. (#988)
Browse files Browse the repository at this point in the history
* Use 'strtobool' instead of comparing with a string.

* Move the PropertyUtil methods to the properties module as functions

* fixup! Use 'strtobool' instead of comparing with a string.

* fixup! Use 'strtobool' instead of comparing with a string.
  • Loading branch information
ndrluis committed Aug 2, 2024
1 parent dafcf22 commit 6c0d307
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 98 deletions.
13 changes: 6 additions & 7 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.utils.properties import get_first_property_value

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -95,19 +96,17 @@ class DynamoDbCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

from pyiceberg.table import PropertyUtil

session = boto3.Session(
profile_name=PropertyUtil.get_first_property_value(properties, DYNAMODB_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=PropertyUtil.get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION, DEPRECATED_REGION),
profile_name=get_first_property_value(properties, DYNAMODB_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=PropertyUtil.get_first_property_value(
aws_access_key_id=get_first_property_value(
properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=PropertyUtil.get_first_property_value(
aws_secret_access_key=get_first_property_value(
properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=PropertyUtil.get_first_property_value(
aws_session_token=get_first_property_value(
properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
)
Expand Down
16 changes: 7 additions & 9 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
PropertyUtil,
Table,
)
from pyiceberg.table.metadata import TableMetadata
Expand All @@ -98,6 +97,7 @@
TimeType,
UUIDType,
)
from pyiceberg.utils.properties import get_first_property_value, property_as_bool

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -298,19 +298,17 @@ class GlueCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)

from pyiceberg.table import PropertyUtil

session = boto3.Session(
profile_name=PropertyUtil.get_first_property_value(properties, GLUE_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=PropertyUtil.get_first_property_value(properties, GLUE_REGION, AWS_REGION, DEPRECATED_REGION),
profile_name=get_first_property_value(properties, GLUE_PROFILE_NAME, DEPRECATED_PROFILE_NAME),
region_name=get_first_property_value(properties, GLUE_REGION, AWS_REGION, DEPRECATED_REGION),
botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
aws_access_key_id=PropertyUtil.get_first_property_value(
aws_access_key_id=get_first_property_value(
properties, GLUE_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, DEPRECATED_ACCESS_KEY_ID
),
aws_secret_access_key=PropertyUtil.get_first_property_value(
aws_secret_access_key=get_first_property_value(
properties, GLUE_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, DEPRECATED_SECRET_ACCESS_KEY
),
aws_session_token=PropertyUtil.get_first_property_value(
aws_session_token=get_first_property_value(
properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
)
Expand Down Expand Up @@ -368,7 +366,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
self.glue.update_table(
DatabaseName=database_name,
TableInput=table_input,
SkipArchive=PropertyUtil.property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
SkipArchive=property_as_bool(self.properties, GLUE_SKIP_ARCHIVE, GLUE_SKIP_ARCHIVE_DEFAULT),
VersionId=version_id,
)
except self.glue.exceptions.EntityNotFoundException as e:
Expand Down
14 changes: 5 additions & 9 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
PropertyUtil,
StagedTable,
Table,
TableProperties,
Expand Down Expand Up @@ -109,6 +108,7 @@
TimeType,
UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -259,13 +259,9 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))

self._lock_check_min_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
)
self._lock_check_max_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
)
self._lock_check_retries = PropertyUtil.property_as_float(
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
self._lock_check_retries = property_as_float(
properties,
LOCK_CHECK_RETRIES,
DEFAULT_LOCK_CHECK_RETRIES,
Expand Down Expand Up @@ -314,7 +310,7 @@ def _convert_iceberg_into_hive(self, table: Table) -> HiveTable:
sd=_construct_hive_storage_descriptor(
table.schema(),
table.location(),
PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT),
),
tableType=EXTERNAL_TABLE,
parameters=_construct_parameters(table.metadata_location),
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -257,7 +258,7 @@ def _create_session(self) -> Session:
self._config_headers(session)

# Configure SigV4 Request Signing
if str(self.properties.get(SIGV4, False)).lower() == "true":
if property_as_bool(self.properties, SIGV4, False):
self._init_sigv4(session)

return session
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
TimestamptzType,
TimeType,
UUIDType,
strtobool,
)
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros
from pyiceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal
Expand Down Expand Up @@ -99,7 +100,7 @@ def partition_to_py(primitive_type: PrimitiveType, value_str: str) -> Union[int,
@partition_to_py.register(BooleanType)
@handle_none
def _(primitive_type: BooleanType, value_str: str) -> Union[int, float, str, uuid.UUID]:
return value_str.lower() == "true"
return strtobool(value_str)


@partition_to_py.register(IntegerType)
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/expressions/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
StringLiteral,
)
from pyiceberg.typedef import L
from pyiceberg.types import strtobool

ParserElement.enablePackrat()

Expand Down Expand Up @@ -96,7 +97,7 @@ def _(result: ParseResults) -> Reference:

@boolean.set_parse_action
def _(result: ParseResults) -> BooleanExpression:
if "true" == result.boolean.lower():
if strtobool(result.boolean):
return AlwaysTrue()
else:
return AlwaysFalse()
Expand Down
15 changes: 7 additions & 8 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
OutputStream,
)
from pyiceberg.typedef import Properties
from pyiceberg.utils.properties import get_first_property_value, property_as_bool

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,14 +119,12 @@ def _file(_: Properties) -> LocalFileSystem:
def _s3(properties: Properties) -> AbstractFileSystem:
from s3fs import S3FileSystem

from pyiceberg.table import PropertyUtil

client_kwargs = {
"endpoint_url": properties.get(S3_ENDPOINT),
"aws_access_key_id": PropertyUtil.get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"aws_secret_access_key": PropertyUtil.get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"aws_session_token": PropertyUtil.get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region_name": PropertyUtil.get_first_property_value(properties, S3_REGION, AWS_REGION),
"aws_access_key_id": get_first_property_value(properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"aws_secret_access_key": get_first_property_value(properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"aws_session_token": get_first_property_value(properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),
}
config_kwargs = {}
register_events: Dict[str, Callable[[Properties], None]] = {}
Expand Down Expand Up @@ -165,11 +164,11 @@ def _gs(properties: Properties) -> AbstractFileSystem:
token=properties.get(GCS_TOKEN),
consistency=properties.get(GCS_CONSISTENCY, "none"),
cache_timeout=properties.get(GCS_CACHE_TIMEOUT),
requester_pays=properties.get(GCS_REQUESTER_PAYS, False),
requester_pays=property_as_bool(properties, GCS_REQUESTER_PAYS, False),
session_kwargs=json.loads(properties.get(GCS_SESSION_KWARGS, "{}")),
endpoint_url=properties.get(GCS_ENDPOINT),
default_location=properties.get(GCS_DEFAULT_LOCATION),
version_aware=properties.get(GCS_VERSION_AWARE, "false").lower() == "true",
version_aware=property_as_bool(properties, GCS_VERSION_AWARE, False),
)


Expand Down
29 changes: 14 additions & 15 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.properties import get_first_property_value, property_as_int
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

Expand Down Expand Up @@ -345,14 +346,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
if scheme in {"s3", "s3a", "s3n"}:
from pyarrow.fs import S3FileSystem

from pyiceberg.table import PropertyUtil

client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
"access_key": PropertyUtil.get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": PropertyUtil.get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": PropertyUtil.get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": PropertyUtil.get_first_property_value(self.properties, S3_REGION, AWS_REGION),
"access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
"secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
"session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}

if proxy_uri := self.properties.get(S3_PROXY_URI):
Expand Down Expand Up @@ -2132,10 +2131,10 @@ def data_file_statistics_from_parquet_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = PropertyUtil.property_as_int(
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
Expand Down Expand Up @@ -2278,7 +2277,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
from pyiceberg.table import PropertyUtil, TableProperties
from pyiceberg.table import TableProperties

for key_pattern in [
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
Expand All @@ -2290,7 +2289,7 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented")

compression_codec = table_properties.get(TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT)
compression_level = PropertyUtil.property_as_int(
compression_level = property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
Expand All @@ -2301,17 +2300,17 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
return {
"compression": compression_codec,
"compression_level": compression_level,
"data_page_size": PropertyUtil.property_as_int(
"data_page_size": property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
),
"dictionary_pagesize_limit": PropertyUtil.property_as_int(
"dictionary_pagesize_limit": property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
),
"write_batch_size": PropertyUtil.property_as_int(
"write_batch_size": property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_PAGE_ROW_LIMIT,
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
Expand All @@ -2331,11 +2330,11 @@ def _dataframe_to_data_files(
Returns:
An iterable that supplies datafiles that represent the table.
"""
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask

counter = counter or itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
target_file_size: int = PropertyUtil.property_as_int( # type: ignore # The property is set with non-None value.
target_file_size: int = property_as_int( # type: ignore # The property is set with non-None value.
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
Expand Down
Loading

0 comments on commit 6c0d307

Please sign in to comment.