Skip to content

Commit

Permalink
Merge branch 'main' into generic-tests-config-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Sep 24, 2024
2 parents dcd5e61 + 583ec5e commit b5ac991
Show file tree
Hide file tree
Showing 24 changed files with 746 additions and 607 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-001806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table materializations.
time: 2024-09-11T00:18:06.780586-07:00
custom:
Author: versusfacit
Issue: "321"
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240913-215416.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Microbatch incremental strategy
time: 2024-09-13T21:54:16.492885-04:00
custom:
Author: michelleark
Issue: "1182"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240917-181147.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Change behavior flag semantics to log iceberg flag warnings..
time: 2024-09-17T18:11:47.525026-07:00
custom:
Author: versusfacit
Issue: "321"
55 changes: 42 additions & 13 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
LIST_SCHEMAS_MACRO_NAME,
LIST_RELATIONS_MACRO_NAME,
)
from dbt_common.behavior_flags import BehaviorFlag
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.contracts.metadata import (
TableMetadata,
Expand All @@ -20,7 +21,10 @@
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
from dbt_common.utils import filter_null_values

from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
from dbt.adapters.snowflake.relation_configs import (
SnowflakeRelationType,
TableFormat,
)
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
Expand All @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig):
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None

# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Expand All @@ -69,6 +78,21 @@ class SnowflakeAdapter(SQLAdapter):
}
)

@property
def _behavior_flags(self) -> List[BehaviorFlag]:
return [
{
"name": "enable_iceberg_materializations",
"default": False,
"description": (
"Enabling Iceberg materializations introduces latency to metadata queries, "
"specifically within the list_relations_without_caching macro. Since Iceberg "
"benefits only those actively using it, we've made this behavior opt-in to "
"prevent unnecessary latency for other users."
),
}
]

@classmethod
def date_function(cls):
return "CURRENT_TIMESTAMP()"
Expand Down Expand Up @@ -223,8 +247,9 @@ def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {"schema_relation": schema_relation}

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except DbtDatabaseError as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
Expand All @@ -233,20 +258,20 @@ def list_relations_without_caching(
return []
raise

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in results.column_names:
columns.append("is_dynamic")
# this can be collapsed once Snowflake adds is_iceberg to show objects
columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"]
if self.behavior.enable_iceberg_materializations.no_warn:
columns.append("is_iceberg")

return [self._parse_list_relations_result(result) for result in results.select(columns)]
return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
# this can be collapsed once Snowflake adds is_iceberg to show objects
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -256,12 +281,16 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT

quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
table_format=table_format,
quote_policy=quote_policy,
)

Expand Down Expand Up @@ -385,7 +414,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str):
return response

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]

def debug_query(self):
"""Override for DebugTask method"""
Expand Down
132 changes: 131 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
TableFormat,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
Expand All @@ -25,6 +31,7 @@
@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
table_format: str = TableFormat.DEFAULT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
Expand Down Expand Up @@ -53,6 +60,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_iceberg_format(self) -> bool:
return self.table_format == TableFormat.ICEBERG

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down Expand Up @@ -120,3 +131,122 @@ def as_case_sensitive(self) -> "SnowflakeRelation":
path_part_map[path] = part.upper()

return self.replace_path(**path_part_map)

def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as
macro. It decides based on mutually exclusive table configuration options:
- TEMPORARY: Indicates a table that exists only for the duration of the session.
- ICEBERG: A specific storage format that requires a distinct DDL layout.
- TRANSIENT: A table similar to a permanent table but without fail-safe.
Additional Caveats for Iceberg models:
- transient=true throws a warning because Iceberg does not support transient tables
- A temporary relation is never an Iceberg relation because Iceberg does not
support temporary relations.
"""

transient_explicitly_set_true: bool = config.get("transient", False)

# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
return "temporary"
elif self.is_iceberg_format:
# Log a warning that transient=true on an Iceberg relation is ignored.
if transient_explicitly_set_true:
warn_or_error(
AdapterEventWarning(
base_msg=(
"Iceberg format relations cannot be transient. Please "
"remove either the transient or iceberg config options "
f"from {self.path.database}.{self.path.schema}."
f"{self.path.identifier}. If left unmodified, dbt will "
"ignore 'transient'."
)
)
)

return "iceberg"

# Always supply transient on table create DDL unless user specifically sets
# transient to false or unset. Might as well update the object attribute too!
elif transient_explicitly_set_true or config.get("transient", True):
return "transient"
else:
return ""

def get_ddl_prefix_for_alter(self) -> str:
"""All ALTER statements on Iceberg tables require an ICEBERG prefix"""
if self.is_iceberg_format:
return "iceberg"
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
f"Dropping relation {old_relation} because it is a view and target relation {self} "
f"is of type {self.type}."
)

drop_table_for_iceberg_message: str = (
f"Dropping relation {old_relation} because it is a default format table "
f"and target relation {self} is an Iceberg format table."
)

drop_iceberg_for_table_message: str = (
f"Dropping relation {old_relation} because it is an Iceberg format table "
f"and target relation {self} is a default format table."
)

# An existing view must be dropped for model to build into a table".
yield (not old_relation.is_table, drop_view_message)
# An existing table must be dropped for model to build into an Iceberg table.
yield (
old_relation.is_table
and not old_relation.is_iceberg_format
and self.is_iceberg_format,
drop_table_for_iceberg_message,
)
# existing Iceberg table must be dropped for model to build into a table.
yield (
old_relation.is_table
and old_relation.is_iceberg_format
and not self.is_iceberg_format,
drop_iceberg_for_table_message,
)

def needs_to_drop(self, old_relation: Optional["SnowflakeRelation"]) -> bool:
"""
To convert between Iceberg and non-Iceberg relations, a preemptive drop is
required.
drops cause latency, but it should be a relatively infrequent occurrence.
Some Boolean expression below are logically redundant, but this is done for easier
readability.
"""

if old_relation is None:
return False

for condition, message in self.__drop_conditions(old_relation):
if condition:
fire_event(AdapterEventDebug(base_msg=message))
return True

return False
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
14 changes: 14 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11


class TableFormat(StrEnum):
"""
Snowflake docs refers to this an 'Object Format.'
Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here.
"""

DEFAULT = "default"
ICEBERG = "iceberg"

def __str__(self):
return self.value
27 changes: 18 additions & 9 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,24 @@
{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}

{%- set max_total_results = max_results_per_iter * max_iter -%}
{% if schema_relation is string %}
{%- set sql -%}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }}
{%- endset -%}
{% else %}
{%- set sql -%}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }}
{%- endset -%}
{% endif -%}
{%- set sql -%}
{% if schema_relation is string %}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }};
{% else %}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }};
{% endif -%}

{# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the
-- latency penalty. #}
{% if adapter.behavior.enable_iceberg_materializations.no_warn %}
select all_objects.*, is_iceberg as "is_iceberg"
from table(result_scan(last_query_id(-1))) all_objects
left join INFORMATION_SCHEMA.tables as all_tables
on all_tables.table_name = all_objects."name"
and all_tables.table_schema = all_objects."schema_name"
and all_tables.table_catalog = all_objects."database_name"
{% endif -%}
{%- endset -%}

{%- set result = run_query(sql) -%}

Expand Down
Loading

0 comments on commit b5ac991

Please sign in to comment.