From 49623d7309bb64600d2ddb3f545ed6a9d8a0ddaf Mon Sep 17 00:00:00 2001 From: Mila Page <67295367+VersusFacit@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:26:43 -0700 Subject: [PATCH 1/6] Add support for Iceberg Table Materialization (#1170) * Add materializations of table and dynamic table. * Add the method to tell something is iceberg format and pipe that through to relation object * Finish create macro and fix alters. * Finish todo items and begin cleaning code. * revert dynamic table changes. * Fix the drop by fixing snowflake__show_iceberg_relations * Transient needs sophisticated handling based on what user specifies for transient manually. * Try to figure out what the right None semantics are. * Revert to original statement. * Fix the transient behavior by passing table_type again. * Rename object_format config param to table_format * Migrate Jinja macros to Python. * All classes are frozen * Clean up the metadata queries that power is_iceberg column generation * Fix Python models generation argument * Add changelog. * Try to fix duplication of join record issues. * Use the RelationConfig protocol for type checking. * Fix transient semantics. * Add functional tests. * Fix test. * Fix test. * Fix test and remove strip calls * Add view test case. * Code review comments. * I'm using too new a version of mypy for Self. * Add a behavior flag for iceberg table materialization. * Flip order of flag. * Adjust test. --------- Co-authored-by: Mila Page --- .../unreleased/Features-20240911-001806.yaml | 6 + dbt/adapters/snowflake/impl.py | 41 +++++- dbt/adapters/snowflake/relation.py | 132 +++++++++++++++++- .../snowflake/relation_configs/__init__.py | 1 + .../snowflake/relation_configs/formats.py | 14 ++ dbt/include/snowflake/macros/adapters.sql | 27 ++-- .../macros/materializations/table.sql | 15 +- .../macros/relations/table/create.sql | 33 +++-- tests/functional/iceberg/test_table_basic.py | 106 ++++++++++++++ 9 files changed, 339 insertions(+), 36 deletions(-) create mode 100644 .changes/unreleased/Features-20240911-001806.yaml create mode 100644 dbt/adapters/snowflake/relation_configs/formats.py create mode 100644 tests/functional/iceberg/test_table_basic.py diff --git a/.changes/unreleased/Features-20240911-001806.yaml b/.changes/unreleased/Features-20240911-001806.yaml new file mode 100644 index 000000000..024480b96 --- /dev/null +++ b/.changes/unreleased/Features-20240911-001806.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add support for Iceberg table materializations. +time: 2024-09-11T00:18:06.780586-07:00 +custom: + Author: versusfacit + Issue: "321" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 6854b199d..7e8ec9cf2 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -9,6 +9,7 @@ LIST_SCHEMAS_MACRO_NAME, LIST_RELATIONS_MACRO_NAME, ) +from dbt_common.behavior_flags import BehaviorFlag from dbt_common.contracts.constraints import ConstraintType from dbt_common.contracts.metadata import ( TableMetadata, @@ -20,7 +21,10 @@ from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError from dbt_common.utils import filter_null_values -from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType +from dbt.adapters.snowflake.relation_configs import ( + SnowflakeRelationType, + TableFormat, +) from dbt.adapters.snowflake import SnowflakeColumn from dbt.adapters.snowflake import SnowflakeConnectionManager from dbt.adapters.snowflake import SnowflakeRelation @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig): merge_update_columns: Optional[str] = None target_lag: Optional[str] = None + # extended formats + table_format: Optional[str] = None + external_volume: Optional[str] = None + base_location_subpath: Optional[str] = None + class SnowflakeAdapter(SQLAdapter): Relation = SnowflakeRelation @@ -69,6 +78,10 @@ class SnowflakeAdapter(SQLAdapter): } ) + @property + def _behavior_flags(self) -> List[BehaviorFlag]: + return [{"name": "enable_iceberg_materializations", "default": False}] + @classmethod def date_function(cls): return "CURRENT_TIMESTAMP()" @@ -223,8 +236,9 @@ def list_relations_without_caching( self, schema_relation: SnowflakeRelation ) -> List[SnowflakeRelation]: kwargs = {"schema_relation": schema_relation} + try: - results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) except DbtDatabaseError as exc: # if the schema doesn't exist, we just want to return. # Alternatively, we could query the list of schemas before we start @@ -235,18 +249,26 @@ def list_relations_without_caching( # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory columns = ["database_name", "schema_name", "name", "kind"] - if "is_dynamic" in results.column_names: + if "is_dynamic" in schema_objects.column_names: columns.append("is_dynamic") + if "is_iceberg" in schema_objects.column_names: + columns.append("is_iceberg") - return [self._parse_list_relations_result(result) for result in results.select(columns)] + return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory + # this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects try: - database, schema, identifier, relation_type, is_dynamic = result + if self.behavior.enable_iceberg_materializations.no_warn: + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result + else: + database, schema, identifier, relation_type, is_dynamic = result except ValueError: database, schema, identifier, relation_type = result is_dynamic = "N" + if self.behavior.enable_iceberg_materializations.no_warn: + is_iceberg = "N" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) @@ -256,12 +278,21 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable + # This line is the main gate on supporting Iceberg materializations. Pass forward a default + # table format, and no downstream table macros can build iceberg relations. + table_format: str = ( + TableFormat.ICEBERG + if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES") + else TableFormat.DEFAULT + ) quote_policy = {"database": True, "schema": True, "identifier": True} + return self.Relation.create( database=database, schema=schema, identifier=identifier, type=relation_type, + table_format=table_format, quote_policy=quote_policy, ) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index ace85695b..224b2b75e 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -1,8 +1,12 @@ +import textwrap + from dataclasses import dataclass, field -from typing import FrozenSet, Optional, Type +from typing import FrozenSet, Optional, Type, Iterator, Tuple + from dbt.adapters.base.relation import BaseRelation from dbt.adapters.contracts.relation import ComponentName, RelationConfig +from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug from dbt.adapters.relation_configs import ( RelationConfigBase, RelationConfigChangeAction, @@ -10,6 +14,7 @@ ) from dbt.adapters.utils import classproperty from dbt_common.exceptions import DbtRuntimeError +from dbt_common.events.functions import fire_event, warn_or_error from dbt.adapters.snowflake.relation_configs import ( SnowflakeDynamicTableConfig, @@ -17,6 +22,7 @@ SnowflakeDynamicTableRefreshModeConfigChange, SnowflakeDynamicTableTargetLagConfigChange, SnowflakeDynamicTableWarehouseConfigChange, + TableFormat, SnowflakeQuotePolicy, SnowflakeRelationType, ) @@ -25,6 +31,7 @@ @dataclass(frozen=True, eq=False, repr=False) class SnowflakeRelation(BaseRelation): type: Optional[SnowflakeRelationType] = None + table_format: str = TableFormat.DEFAULT quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy()) require_alias: bool = False relation_configs = { @@ -53,6 +60,10 @@ class SnowflakeRelation(BaseRelation): def is_dynamic_table(self) -> bool: return self.type == SnowflakeRelationType.DynamicTable + @property + def is_iceberg_format(self) -> bool: + return self.table_format == TableFormat.ICEBERG + @classproperty def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) @@ -120,3 +131,122 @@ def as_case_sensitive(self) -> "SnowflakeRelation": path_part_map[path] = part.upper() return self.replace_path(**path_part_map) + + def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str: + """ + This macro renders the appropriate DDL prefix during the create_table_as + macro. It decides based on mutually exclusive table configuration options: + + - TEMPORARY: Indicates a table that exists only for the duration of the session. + - ICEBERG: A specific storage format that requires a distinct DDL layout. + - TRANSIENT: A table similar to a permanent table but without fail-safe. + + Additional Caveats for Iceberg models: + - transient=true throws a warning because Iceberg does not support transient tables + - A temporary relation is never an Iceberg relation because Iceberg does not + support temporary relations. + """ + + transient_explicitly_set_true: bool = config.get("transient", False) + + # Temporary tables are a Snowflake feature that do not exist in the + # Iceberg framework. We ignore the Iceberg status of the model. + if temporary: + return "temporary" + elif self.is_iceberg_format: + # Log a warning that transient=true on an Iceberg relation is ignored. + if transient_explicitly_set_true: + warn_or_error( + AdapterEventWarning( + base_msg=( + "Iceberg format relations cannot be transient. Please " + "remove either the transient or iceberg config options " + f"from {self.path.database}.{self.path.schema}." + f"{self.path.identifier}. If left unmodified, dbt will " + "ignore 'transient'." + ) + ) + ) + + return "iceberg" + + # Always supply transient on table create DDL unless user specifically sets + # transient to false or unset. Might as well update the object attribute too! + elif transient_explicitly_set_true or config.get("transient", True): + return "transient" + else: + return "" + + def get_ddl_prefix_for_alter(self) -> str: + """All ALTER statements on Iceberg tables require an ICEBERG prefix""" + if self.is_iceberg_format: + return "iceberg" + else: + return "" + + def get_iceberg_ddl_options(self, config: RelationConfig) -> str: + base_location: str = f"_dbt/{self.schema}/{self.name}" + + if subpath := config.get("base_location_subpath"): + base_location += f"/{subpath}" + + iceberg_ddl_predicates: str = f""" + external_volume = '{config.get('external_volume')}' + catalog = 'snowflake' + base_location = '{base_location}' + """ + return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10) + + def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]: + drop_view_message: str = ( + f"Dropping relation {old_relation} because it is a view and target relation {self} " + f"is of type {self.type}." + ) + + drop_table_for_iceberg_message: str = ( + f"Dropping relation {old_relation} because it is a default format table " + f"and target relation {self} is an Iceberg format table." + ) + + drop_iceberg_for_table_message: str = ( + f"Dropping relation {old_relation} because it is an Iceberg format table " + f"and target relation {self} is a default format table." + ) + + # An existing view must be dropped for model to build into a table". + yield (not old_relation.is_table, drop_view_message) + # An existing table must be dropped for model to build into an Iceberg table. + yield ( + old_relation.is_table + and not old_relation.is_iceberg_format + and self.is_iceberg_format, + drop_table_for_iceberg_message, + ) + # existing Iceberg table must be dropped for model to build into a table. + yield ( + old_relation.is_table + and old_relation.is_iceberg_format + and not self.is_iceberg_format, + drop_iceberg_for_table_message, + ) + + def needs_to_drop(self, old_relation: Optional["SnowflakeRelation"]) -> bool: + """ + To convert between Iceberg and non-Iceberg relations, a preemptive drop is + required. + + drops cause latency, but it should be a relatively infrequent occurrence. + + Some Boolean expression below are logically redundant, but this is done for easier + readability. + """ + + if old_relation is None: + return False + + for condition, message in self.__drop_conditions(old_relation): + if condition: + fire_event(AdapterEventDebug(base_msg=message)) + return True + + return False diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index 62f95faff..61941ab50 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -10,3 +10,4 @@ SnowflakeQuotePolicy, SnowflakeRelationType, ) +from dbt.adapters.snowflake.relation_configs.formats import TableFormat diff --git a/dbt/adapters/snowflake/relation_configs/formats.py b/dbt/adapters/snowflake/relation_configs/formats.py new file mode 100644 index 000000000..460241d9d --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/formats.py @@ -0,0 +1,14 @@ +from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11 + + +class TableFormat(StrEnum): + """ + Snowflake docs refers to this an 'Object Format.' + Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here. + """ + + DEFAULT = "default" + ICEBERG = "iceberg" + + def __str__(self): + return self.value diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 4cb4bcffa..aa8895819 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -137,15 +137,24 @@ {% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %} {%- set max_total_results = max_results_per_iter * max_iter -%} - {% if schema_relation is string %} - {%- set sql -%} - show objects in {{ schema_relation }} limit {{ max_results_per_iter }} - {%- endset -%} - {% else %} - {%- set sql -%} - show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }} - {%- endset -%} - {% endif -%} + {%- set sql -%} + {% if schema_relation is string %} + show objects in {{ schema_relation }} limit {{ max_results_per_iter }}; + {% else %} + show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }}; + {% endif -%} + + {# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the + -- latency penalty. #} + {% if adapter.behavior.enable_iceberg_materializations.no_warn %} + select all_objects.*, is_iceberg as "is_iceberg" + from table(result_scan(last_query_id(-1))) all_objects + left join INFORMATION_SCHEMA.tables as all_tables + on all_tables.table_name = all_objects."name" + and all_tables.table_schema = all_objects."schema_name" + and all_tables.table_catalog = all_objects."database_name" + {% endif -%} + {%- endset -%} {%- set result = run_query(sql) -%} diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index ef201c705..cbc6d9ce6 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -8,16 +8,17 @@ {% set grant_config = config.get('grants') %} {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(identifier=identifier, - schema=schema, - database=database, type='table') -%} + {%- set target_relation = api.Relation.create( + identifier=identifier, + schema=schema, + database=database, + type='table', + table_format=config.get('table_format', 'default') + ) -%} {{ run_hooks(pre_hooks) }} - {#-- Drop the relation if it was a view to "convert" it in a table. This may lead to - -- downtime, but it should be a relatively infrequent occurrence #} - {% if old_relation is not none and not old_relation.is_table %} - {{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }} + {% if target_relation.needs_to_drop(old_relation) %} {{ drop_relation_if_exists(old_relation) }} {% endif %} diff --git a/dbt/include/snowflake/macros/relations/table/create.sql b/dbt/include/snowflake/macros/relations/table/create.sql index c6bc8f775..355150e28 100644 --- a/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt/include/snowflake/macros/relations/table/create.sql @@ -1,14 +1,8 @@ {% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%} - {%- set transient = config.get('transient', default=true) -%} - - {% if temporary -%} - {%- set table_type = "temporary" -%} - {%- elif transient -%} - {%- set table_type = "transient" -%} - {%- else -%} - {%- set table_type = "" -%} - {%- endif %} + {%- set materialization_prefix = relation.get_ddl_prefix_for_create(config.model.config, temporary) -%} + {%- set alter_prefix = relation.get_ddl_prefix_for_alter() -%} + {# Generate DDL/DML #} {%- if language == 'sql' -%} {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} @@ -26,7 +20,15 @@ {{ sql_header if sql_header is not none }} - create or replace {{ table_type }} table {{ relation }} + create or replace {{ materialization_prefix }} table {{ relation }} + {%- if relation.is_iceberg_format %} + {# + Valid DDL in CTAS statements. Plain create statements have a different order. + https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table + #} + {{ relation.get_iceberg_ddl_options(config.model.config) }} + {%- endif -%} + {%- set contract_config = config.get('contract') -%} {%- if contract_config.enforced -%} {{ get_assert_columns_equivalent(sql) }} @@ -44,14 +46,17 @@ {%- endif %} ); {% if cluster_by_string is not none and not temporary -%} - alter table {{relation}} cluster by ({{cluster_by_string}}); + alter {{ alter_prefix }} table {{relation}} cluster by ({{cluster_by_string}}); {%- endif -%} - {% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%} - alter table {{relation}} resume recluster; + {% if enable_automatic_clustering and cluster_by_string is not none and not temporary %} + alter {{ alter_prefix }} table {{relation}} resume recluster; {%- endif -%} {%- elif language == 'python' -%} - {{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=table_type) }} + {%- if relation.is_iceberg_format %} + {% do exceptions.raise_compiler_error('Iceberg is incompatible with Python models. Please use a SQL model for the iceberg format.') %} + {%- endif %} + {{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=relation.get_ddl_prefix_for_create(config.model.config, temporary)) }} {%- else -%} {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %} {%- endif -%} diff --git a/tests/functional/iceberg/test_table_basic.py b/tests/functional/iceberg/test_table_basic.py new file mode 100644 index 000000000..0bfdf59f1 --- /dev/null +++ b/tests/functional/iceberg/test_table_basic.py @@ -0,0 +1,106 @@ +import pytest + +from pathlib import Path + +from dbt.tests.util import run_dbt, rm_file, write_file + +_MODEL_BASIC_TABLE_MODEL = """ +{{ + config( + materialized = "table", + cluster_by=['id'], + ) +}} +select 1 as id +""" + +_MODEL_BASIC_ICEBERG_MODEL = """ +{{ + config( + transient = "true", + materialized = "table", + cluster_by=['id'], + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", + ) +}} + +select * from {{ ref('first_table') }} +""" + +_MODEL_BUILT_ON_ICEBERG_TABLE = """ +{{ + config( + materialized = "table", + ) +}} +select * from {{ ref('iceberg_table') }} +""" + +_MODEL_TABLE_BEFORE_SWAP = """ +{{ + config( + materialized = "table", + ) +}} +select 1 as id +""" + +_MODEL_VIEW_BEFORE_SWAP = """ +select 1 as id +""" + +_MODEL_TABLE_FOR_SWAP_ICEBERG = """ +{{ + config( + materialized = "table", + table_format="iceberg", + external_volume="s3_iceberg_snow", + base_location_subpath="subpath", + ) +}} +select 1 as id +""" + + +class TestIcebergTableBuilds: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.fixture(scope="class") + def models(self): + return { + "first_table.sql": _MODEL_BASIC_TABLE_MODEL, + "iceberg_table.sql": _MODEL_BASIC_ICEBERG_MODEL, + "table_built_on_iceberg_table.sql": _MODEL_BUILT_ON_ICEBERG_TABLE, + } + + def test_iceberg_tables_build_and_can_be_referred(self, project): + run_results = run_dbt() + assert len(run_results) == 3 + + +class TestIcebergTableTypeBuildsOnExistingTable: + @pytest.fixture(scope="class") + def project_config_update(self): + return {"flags": {"enable_iceberg_materializations": True}} + + @pytest.mark.parametrize("start_model", [_MODEL_TABLE_BEFORE_SWAP, _MODEL_VIEW_BEFORE_SWAP]) + def test_changing_model_types(self, project, start_model): + model_file = project.project_root / Path("models") / Path("my_model.sql") + + write_file(start_model, model_file) + run_results = run_dbt() + assert len(run_results) == 1 + + rm_file(model_file) + write_file(_MODEL_TABLE_FOR_SWAP_ICEBERG, model_file) + run_results = run_dbt() + assert len(run_results) == 1 + + rm_file(model_file) + write_file(start_model, model_file) + run_results = run_dbt() + assert len(run_results) == 1 From 3cbe12f4543d0357ec0ed4b7f6a17345c6198bae Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 18 Sep 2024 13:29:42 -0400 Subject: [PATCH 2/6] Microbatch strategy (#1179) * first pass: add incremental_predicates * safely add incremental_predicates + testing * remove requirement for unique_id --------- Co-authored-by: Quigley Malcolm --- .../unreleased/Features-20240913-215416.yaml | 6 ++++ dbt/adapters/snowflake/impl.py | 2 +- .../macros/materializations/merge.sql | 32 +++++++++++++++++++ .../adapter/test_incremental_microbatch.py | 24 ++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Features-20240913-215416.yaml create mode 100644 tests/functional/adapter/test_incremental_microbatch.py diff --git a/.changes/unreleased/Features-20240913-215416.yaml b/.changes/unreleased/Features-20240913-215416.yaml new file mode 100644 index 000000000..b2a6e556e --- /dev/null +++ b/.changes/unreleased/Features-20240913-215416.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Microbatch incremental strategy +time: 2024-09-13T21:54:16.492885-04:00 +custom: + Author: michelleark + Issue: "1182" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 7e8ec9cf2..a6297887d 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -416,7 +416,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str): return response def valid_incremental_strategies(self): - return ["append", "merge", "delete+insert"] + return ["append", "merge", "delete+insert", "microbatch"] def debug_query(self): """Override for DebugTask method""" diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index e93b29155..57c58afdd 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -48,3 +48,35 @@ {% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %} {% do return(snowflake_dml_explicit_transaction(dml)) %} {% endmacro %} + + +{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + delete from {{ target }} DBT_INTERNAL_TARGET + using {{ source }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py new file mode 100644 index 000000000..bbb57f96c --- /dev/null +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -0,0 +1,24 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +# No requirement for a unique_id for snowflake microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')" From 0bfef5b4abf27e1179867d039c45b33bd12be626 Mon Sep 17 00:00:00 2001 From: Mila Page <67295367+VersusFacit@users.noreply.github.com> Date: Wed, 18 Sep 2024 13:11:07 -0700 Subject: [PATCH 3/6] Throw error when building Iceberg tables without behavior flag set (#1184) * pipe the behavior through to relation in a temporary way to help with warnings in cli * Adjust flag to new dbt-common setup. * add changelog. * Throw a compiler warning for iceberg model build without flag. --------- Co-authored-by: VersusFacit Co-authored-by: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com> --- .../unreleased/Under the Hood-20240917-181147.yaml | 6 ++++++ dbt/adapters/snowflake/impl.py | 13 ++++++++++++- .../snowflake/macros/relations/table/create.sql | 5 +++++ 3 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Under the Hood-20240917-181147.yaml diff --git a/.changes/unreleased/Under the Hood-20240917-181147.yaml b/.changes/unreleased/Under the Hood-20240917-181147.yaml new file mode 100644 index 000000000..2f52174dd --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240917-181147.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Change behavior flag semantics to log iceberg flag warnings.. +time: 2024-09-17T18:11:47.525026-07:00 +custom: + Author: versusfacit + Issue: "321" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index a6297887d..69da11802 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -80,7 +80,18 @@ class SnowflakeAdapter(SQLAdapter): @property def _behavior_flags(self) -> List[BehaviorFlag]: - return [{"name": "enable_iceberg_materializations", "default": False}] + return [ + { + "name": "enable_iceberg_materializations", + "default": False, + "description": ( + "Enabling Iceberg materializations introduces latency to metadata queries, " + "specifically within the list_relations_without_caching macro. Since Iceberg " + "benefits only those actively using it, we've made this behavior opt-in to " + "prevent unnecessary latency for other users." + ), + } + ] @classmethod def date_function(cls): diff --git a/dbt/include/snowflake/macros/relations/table/create.sql b/dbt/include/snowflake/macros/relations/table/create.sql index 355150e28..e60b93039 100644 --- a/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt/include/snowflake/macros/relations/table/create.sql @@ -1,4 +1,9 @@ {% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%} + + {%- if relation.is_iceberg_format and not adapter.behavior.enable_iceberg_materializations.no_warn %} + {% do exceptions.raise_compiler_error('Was unable to create model as Iceberg Table Format. Please set the `enable_iceberg_materializations` behavior flag to True in your dbt_project.yml. For more information, go to .') %} + {%- endif %} + {%- set materialization_prefix = relation.get_ddl_prefix_for_create(config.model.config, temporary) -%} {%- set alter_prefix = relation.get_ddl_prefix_for_alter() -%} From 084674f68f52216a48e5c70a202666d3c2de1af4 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Thu, 19 Sep 2024 10:54:19 -0400 Subject: [PATCH 4/6] Remove a macro that is entirely a docstring (#1185) * remove a macro that only contains a docstring --- .../snowflake/macros/materializations/table.sql | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/dbt/include/snowflake/macros/materializations/table.sql b/dbt/include/snowflake/macros/materializations/table.sql index cbc6d9ce6..9ee8a0b12 100644 --- a/dbt/include/snowflake/macros/materializations/table.sql +++ b/dbt/include/snowflake/macros/materializations/table.sql @@ -72,17 +72,3 @@ def main(session): materialize(session, df, dbt.this) return "OK" {% endmacro %} - -{% macro py_script_comment()%} -# To run this in snowsight, you need to select entry point to be main -# And you may have to modify the return type to text to get the result back -# def main(session): -# dbt = dbtObj(session.table) -# df = model(dbt, session) -# return df.collect() - -# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python -# then you can do the following to run model -# dbt = dbtObj(session.table) -# df = model(dbt, session) -{%endmacro%} From 34c44422845f8cc720821b8fa481733891a450bc Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Fri, 20 Sep 2024 18:18:59 -0400 Subject: [PATCH 5/6] Dynamic Table testing improvements (#1187) * move dynamic table tests down into the relation tests folder * make utils more generic, move out of dynamic table tests * add init files for namespacing in test discovery * remove 2024_03 bundle items --- dbt/adapters/snowflake/impl.py | 35 +- .../test_dynamic_tables_basic.py | 186 ----------- .../test_dynamic_tables_changes.py | 307 ------------------ .../adapter/dynamic_table_tests/utils.py | 53 --- tests/functional/relation_tests/__init__.py | 0 .../dynamic_table_tests/__init__.py | 0 .../dynamic_table_tests/models.py | 50 +++ .../dynamic_table_tests/test_basic.py | 30 ++ .../test_configuration_changes.py | 103 ++++++ .../files.py => relation_tests/models.py} | 10 +- .../test_relation_type_change.py | 64 ++++ tests/functional/utils.py | 78 +++++ 12 files changed, 341 insertions(+), 575 deletions(-) delete mode 100644 tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py delete mode 100644 tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py delete mode 100644 tests/functional/adapter/dynamic_table_tests/utils.py create mode 100644 tests/functional/relation_tests/__init__.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/__init__.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/models.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/test_basic.py create mode 100644 tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py rename tests/functional/{adapter/dynamic_table_tests/files.py => relation_tests/models.py} (78%) create mode 100644 tests/functional/relation_tests/test_relation_type_change.py create mode 100644 tests/functional/utils.py diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 69da11802..5b5881eed 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -258,28 +258,20 @@ def list_relations_without_caching( return [] raise - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - columns = ["database_name", "schema_name", "name", "kind"] - if "is_dynamic" in schema_objects.column_names: - columns.append("is_dynamic") - if "is_iceberg" in schema_objects.column_names: + # this can be collapsed once Snowflake adds is_iceberg to show objects + columns = ["database_name", "schema_name", "name", "kind", "is_dynamic"] + if self.behavior.enable_iceberg_materializations.no_warn: columns.append("is_iceberg") return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)] def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation: - # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory - # this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects - try: - if self.behavior.enable_iceberg_materializations.no_warn: - database, schema, identifier, relation_type, is_dynamic, is_iceberg = result - else: - database, schema, identifier, relation_type, is_dynamic = result - except ValueError: - database, schema, identifier, relation_type = result - is_dynamic = "N" - if self.behavior.enable_iceberg_materializations.no_warn: - is_iceberg = "N" + # this can be collapsed once Snowflake adds is_iceberg to show objects + if self.behavior.enable_iceberg_materializations.no_warn: + database, schema, identifier, relation_type, is_dynamic, is_iceberg = result + else: + database, schema, identifier, relation_type, is_dynamic = result + is_iceberg = "N" try: relation_type = self.Relation.get_relation_type(relation_type.lower()) @@ -289,13 +281,8 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation if relation_type == self.Relation.Table and is_dynamic == "Y": relation_type = self.Relation.DynamicTable - # This line is the main gate on supporting Iceberg materializations. Pass forward a default - # table format, and no downstream table macros can build iceberg relations. - table_format: str = ( - TableFormat.ICEBERG - if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES") - else TableFormat.DEFAULT - ) + table_format = TableFormat.ICEBERG if is_iceberg in ("Y", "YES") else TableFormat.DEFAULT + quote_policy = {"database": True, "schema": True, "identifier": True} return self.Relation.create( diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py deleted file mode 100644 index a17f5d267..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_basic.py +++ /dev/null @@ -1,186 +0,0 @@ -from typing import Optional, Tuple - -import pytest - -from dbt.tests.util import ( - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, - MY_TABLE, - MY_VIEW, -) -from tests.functional.adapter.dynamic_table_tests.utils import query_relation_type - - -class TestSnowflakeDynamicTableBasic: - @staticmethod - def insert_record(project, table: SnowflakeRelation, record: Tuple[int, int]): - my_id, value = record - project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") - - @staticmethod - def refresh_dynamic_table(project, dynamic_table: SnowflakeRelation): - sql = f"alter dynamic table {dynamic_table} refresh" - project.run_sql(sql) - - @staticmethod - def query_row_count(project, relation: SnowflakeRelation) -> int: - sql = f"select count(*) from {relation}" - return project.run_sql(sql, fetch="one")[0] - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - return {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield { - "my_table.sql": MY_TABLE, - "my_view.sql": MY_VIEW, - "my_dynamic_table.sql": MY_DYNAMIC_TABLE, - } - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="class") - def my_view(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_view", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.View, - ) - - @pytest.fixture(scope="class") - def my_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @pytest.fixture(scope="class") - def my_seed(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_seed", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.Table, - ) - - @staticmethod - def load_model(project, current_model, new_model): - model_to_load = get_model_file(project, new_model) - set_model_file(project, current_model, model_to_load) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table, my_view, my_table): - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - my_dynamic_table_config = get_model_file(project, my_dynamic_table) - my_view_config = get_model_file(project, my_view) - my_table_config = get_model_file(project, my_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, my_dynamic_table_config) - set_model_file(project, my_view, my_view_config) - set_model_file(project, my_table, my_table_config) - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_dynamic_table_create(self, project, my_dynamic_table): - # setup creates it; verify it's there - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_create_idempotent(self, project, my_dynamic_table): - # setup creates it once; verify it's there and run once - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_full_refresh(self, project, my_dynamic_table): - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - def test_dynamic_table_replaces_table(self, project, my_table, my_dynamic_table): - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "table" - - self.load_model(project, my_table, my_dynamic_table) - - run_dbt(["run", "--models", my_table.identifier]) - assert self.query_relation_type(project, my_table) == "dynamic_table" - - def test_dynamic_table_replaces_view(self, project, my_view, my_dynamic_table): - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "view" - - self.load_model(project, my_view, my_dynamic_table) - - run_dbt(["run", "--models", my_view.identifier]) - assert self.query_relation_type(project, my_view) == "dynamic_table" - - def test_table_replaces_dynamic_table(self, project, my_dynamic_table, my_table): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_table) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "table" - - def test_view_replaces_dynamic_table(self, project, my_dynamic_table, my_view): - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table" - - self.load_model(project, my_dynamic_table, my_view) - - run_dbt(["run", "--models", my_dynamic_table.identifier]) - assert self.query_relation_type(project, my_dynamic_table) == "view" - - def test_dynamic_table_only_updates_after_refresh(self, project, my_dynamic_table, my_seed): - # poll database - table_start = self.query_row_count(project, my_seed) - view_start = self.query_row_count(project, my_dynamic_table) - - # insert new record in table - self.insert_record(project, my_seed, (4, 400)) - - # poll database - table_mid = self.query_row_count(project, my_seed) - view_mid = self.query_row_count(project, my_dynamic_table) - - # refresh the materialized view - self.refresh_dynamic_table(project, my_dynamic_table) - - # poll database - table_end = self.query_row_count(project, my_seed) - view_end = self.query_row_count(project, my_dynamic_table) - - # new records were inserted in the table but didn't show up in the view until it was refreshed - assert table_start < table_mid == table_end - assert view_start == view_mid < view_end diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py deleted file mode 100644 index a58b76f29..000000000 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py +++ /dev/null @@ -1,307 +0,0 @@ -from typing import Optional - -import pytest - -from dbt_common.contracts.config.materialization import OnConfigurationChangeOption -from dbt.tests.util import ( - assert_message_in_logs, - get_model_file, - run_dbt, - run_dbt_and_capture, - set_model_file, -) - -from dbt.adapters.snowflake.relation import SnowflakeRelation, SnowflakeRelationType -from tests.functional.adapter.dynamic_table_tests.files import ( - MY_DYNAMIC_TABLE, - MY_SEED, -) -from tests.functional.adapter.dynamic_table_tests.utils import ( - query_refresh_mode, - query_relation_type, - query_target_lag, - query_warehouse, -) - - -class SnowflakeDynamicTableChanges: - @staticmethod - def check_start_state(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "2 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - assert query_refresh_mode(project, dynamic_table) == "INCREMENTAL" - - @staticmethod - def change_config_via_alter(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='5 minutes'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def change_config_via_alter_downstream(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace( - "target_lag='2 minutes'", "target_lag='DOWNSTREAM'" - ) - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_alter_change_is_applied(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "5 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def check_state_alter_change_is_applied_downstream(project, dynamic_table): - assert query_target_lag(project, dynamic_table) == "DOWNSTREAM" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" - - @staticmethod - def change_config_via_replace(project, dynamic_table): - initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace("refresh_mode='INCREMENTAL'", "refresh_mode='FULL'") - set_model_file(project, dynamic_table, new_model) - - @staticmethod - def check_state_replace_change_is_applied(project, dynamic_table): - assert query_refresh_mode(project, dynamic_table) == "FULL" - - @staticmethod - def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - return query_relation_type(project, relation) - - @pytest.fixture(scope="class", autouse=True) - def seeds(self): - yield {"my_seed.csv": MY_SEED} - - @pytest.fixture(scope="class", autouse=True) - def models(self): - yield {"my_dynamic_table.sql": MY_DYNAMIC_TABLE} - - @pytest.fixture(scope="class") - def my_dynamic_table(self, project) -> SnowflakeRelation: - return project.adapter.Relation.create( - identifier="my_dynamic_table", - schema=project.test_schema, - database=project.database, - type=SnowflakeRelationType.DynamicTable, - ) - - @pytest.fixture(scope="function", autouse=True) - def setup(self, project, my_dynamic_table): - # make sure the model in the data reflects the files each time - run_dbt(["seed"]) - run_dbt(["run", "--models", my_dynamic_table.identifier, "--full-refresh"]) - - # the tests touch these files, store their contents in memory - initial_model = get_model_file(project, my_dynamic_table) - - # verify the initial settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - yield - - # and then reset them after the test runs - set_model_file(project, my_dynamic_table, initial_model) - - # ensure clean slate each method - project.run_sql(f"drop schema if exists {project.test_schema} cascade") - - def test_full_refresh_occurs_with_changes(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"] - ) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}} - - def test_change_is_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_applied_via_alter_downstream(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter_downstream(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied_downstream(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - @pytest.mark.skip( - "dbt-snowflake does not currently monitor any changes the trigger a full refresh" - ) - def test_change_is_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "") - ) - - -class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `continue` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - -class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges): - @pytest.fixture(scope="class") - def project_config_update(self): - return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}} - - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) - - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - - # update the settings - self.change_config_via_alter(project, my_dynamic_table) - self.change_config_via_replace(project, my_dynamic_table) - _, logs = run_dbt_and_capture( - ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False - ) - - # verify the updated settings are correct in Snowflake - self.check_start_state(project, my_dynamic_table) - - # verify the settings were changed with the correct method - assert_message_in_logs( - f"Configuration changes were identified and `on_configuration_change` was set" - f" to `fail` for `{my_dynamic_table}`", - logs, - ) - assert_message_in_logs( - f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False - ) - assert_message_in_logs( - f"Applying REPLACE to: {my_dynamic_table.render().upper()}", - logs.replace('"', ""), - False, - ) diff --git a/tests/functional/adapter/dynamic_table_tests/utils.py b/tests/functional/adapter/dynamic_table_tests/utils.py deleted file mode 100644 index d72b231c9..000000000 --- a/tests/functional/adapter/dynamic_table_tests/utils.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Optional - -import agate -from dbt.tests.util import get_connection - -from dbt.adapters.snowflake.relation import SnowflakeRelation - - -def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: - sql = f""" - select - case - when table_type = 'BASE TABLE' and is_dynamic = 'YES' then 'dynamic_table' - when table_type = 'BASE TABLE' then 'table' - when table_type = 'VIEW' then 'view' - when table_type = 'EXTERNAL TABLE' then 'external_table' - end as relation_type - from information_schema.tables - where table_name like '{relation.identifier.upper()}' - and table_schema like '{relation.schema.upper()}' - and table_catalog like '{relation.database.upper()}' - """ - results = project.run_sql(sql, fetch="one") - if results is None or len(results) == 0: - return None - elif len(results) > 1: - raise ValueError(f"More than one instance of {relation.name} found!") - else: - return results[0].lower() - - -def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("target_lag") - - -def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("warehouse") - - -def query_refresh_mode(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - config = describe_dynamic_table(project, dynamic_table) - return config.get("refresh_mode") - - -def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row: - with get_connection(project.adapter): - macro_results = project.adapter.execute_macro( - "snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table} - ) - config = macro_results["dynamic_table"] - return config.rows[0] diff --git a/tests/functional/relation_tests/__init__.py b/tests/functional/relation_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/__init__.py b/tests/functional/relation_tests/dynamic_table_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/functional/relation_tests/dynamic_table_tests/models.py b/tests/functional/relation_tests/dynamic_table_tests/models.py new file mode 100644 index 000000000..5e46bed53 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/models.py @@ -0,0 +1,50 @@ +SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +DYNAMIC_TABLE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_DOWNSTREAM = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='DOWNSTREAM', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_ALTER = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='5 minutes', + refresh_mode='INCREMENTAL', +) }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE_REPLACE = """ +{{ config( + materialized='dynamic_table', + snowflake_warehouse='DBT_TESTING', + target_lag='2 minutes', + refresh_mode='FULL', +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_basic.py b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py new file mode 100644 index 000000000..2406e1c14 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_basic.py @@ -0,0 +1,30 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import query_relation_type + + +class TestBasic: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_dynamic_table.sql": models.DYNAMIC_TABLE, + "my_dynamic_table_downstream.sql": models.DYNAMIC_TABLE_DOWNSTREAM, + } + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + def test_dynamic_table_full_refresh(self, project): + run_dbt(["run", "--full-refresh"]) + assert query_relation_type(project, "my_dynamic_table") == "dynamic_table" + assert query_relation_type(project, "my_dynamic_table_downstream") == "dynamic_table" diff --git a/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py new file mode 100644 index 000000000..3c4f65a87 --- /dev/null +++ b/tests/functional/relation_tests/dynamic_table_tests/test_configuration_changes.py @@ -0,0 +1,103 @@ +import pytest + +from dbt.tests.util import run_dbt + +from tests.functional.relation_tests.dynamic_table_tests import models +from tests.functional.utils import describe_dynamic_table, update_model + + +class Changes: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + yield {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "dynamic_table_alter.sql": models.DYNAMIC_TABLE, + "dynamic_table_replace.sql": models.DYNAMIC_TABLE, + } + + @pytest.fixture(scope="function", autouse=True) + def setup_class(self, project): + run_dbt(["seed"]) + yield + project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + @pytest.fixture(scope="function", autouse=True) + def setup_method(self, project, setup_class): + # make sure the model in the data reflects the files each time + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_not_applied(project) + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE_ALTER) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE_REPLACE) + + yield + + update_model(project, "dynamic_table_alter", models.DYNAMIC_TABLE) + update_model(project, "dynamic_table_replace", models.DYNAMIC_TABLE) + + @staticmethod + def assert_changes_are_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "5 minutes" # this updated + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "FULL" # this updated + + @staticmethod + def assert_changes_are_not_applied(project): + altered = describe_dynamic_table(project, "dynamic_table_alter") + assert altered.snowflake_warehouse == "DBT_TESTING" + assert altered.target_lag == "2 minutes" # this would have updated, but didn't + assert altered.refresh_mode == "INCREMENTAL" + + replaced = describe_dynamic_table(project, "dynamic_table_replace") + assert replaced.snowflake_warehouse == "DBT_TESTING" + assert replaced.target_lag == "2 minutes" + assert replaced.refresh_mode == "INCREMENTAL" # this would have updated, but didn't + + def test_full_refresh_is_always_successful(self, project): + # this always passes and always changes the configuration, regardless of on_configuration_change + # and regardless of whether the changes require a replace versus an alter + run_dbt(["run", "--full-refresh"]) + self.assert_changes_are_applied(project) + + +class TestChangesApply(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "apply"}} + + def test_changes_are_applied(self, project): + # this passes and changes the configuration + run_dbt(["run"]) + self.assert_changes_are_applied(project) + + +class TestChangesContinue(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "continue"}} + + def test_changes_are_not_applied(self, project): + # this passes but does not change the configuration + run_dbt(["run"]) + self.assert_changes_are_not_applied(project) + + +class TestChangesFail(Changes): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"on_configuration_change": "fail"}} + + def test_changes_are_not_applied(self, project): + # this fails and does not change the configuration + run_dbt(["run"], expect_pass=False) + self.assert_changes_are_not_applied(project) diff --git a/tests/functional/adapter/dynamic_table_tests/files.py b/tests/functional/relation_tests/models.py similarity index 78% rename from tests/functional/adapter/dynamic_table_tests/files.py rename to tests/functional/relation_tests/models.py index ef8d2bf1f..6fe066313 100644 --- a/tests/functional/adapter/dynamic_table_tests/files.py +++ b/tests/functional/relation_tests/models.py @@ -1,4 +1,4 @@ -MY_SEED = """ +SEED = """ id,value 1,100 2,200 @@ -6,7 +6,7 @@ """.strip() -MY_TABLE = """ +TABLE = """ {{ config( materialized='table', ) }} @@ -14,7 +14,7 @@ """ -MY_VIEW = """ +VIEW = """ {{ config( materialized='view', ) }} @@ -22,11 +22,11 @@ """ -MY_DYNAMIC_TABLE = """ +DYNAMIC_TABLE = """ {{ config( materialized='dynamic_table', snowflake_warehouse='DBT_TESTING', - target_lag='2 minutes', + target_lag='1 minute', refresh_mode='INCREMENTAL', ) }} select * from {{ ref('my_seed') }} diff --git a/tests/functional/relation_tests/test_relation_type_change.py b/tests/functional/relation_tests/test_relation_type_change.py new file mode 100644 index 000000000..1246b0791 --- /dev/null +++ b/tests/functional/relation_tests/test_relation_type_change.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +from itertools import product + +from dbt.tests.util import run_dbt +import pytest + +from tests.functional.relation_tests import models +from tests.functional.utils import query_relation_type, update_model + + +@dataclass +class Model: + model: str + relation_type: str + + @property + def name(self) -> str: + return f"{self.relation_type}" + + +@dataclass +class Scenario: + initial: Model + final: Model + + @property + def name(self) -> str: + return f"REPLACE_{self.initial.name}__WITH_{self.final.name}" + + @property + def error_message(self) -> str: + return f"Failed when migrating from: {self.initial.name} to: {self.final.name}" + + +relations = [ + Model(models.VIEW, "view"), + Model(models.TABLE, "table"), + Model(models.DYNAMIC_TABLE, "dynamic_table"), +] +scenarios = [Scenario(*scenario) for scenario in product(relations, relations)] + + +class TestRelationTypeChange: + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": models.SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield {f"{scenario.name}.sql": scenario.initial.model for scenario in scenarios} + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + for scenario in scenarios: + update_model(project, scenario.name, scenario.final.model) + run_dbt(["run"]) + + @pytest.mark.parametrize("scenario", scenarios, ids=[scenario.name for scenario in scenarios]) + def test_replace(self, project, scenario): + relation_type = query_relation_type(project, scenario.name) + assert relation_type == scenario.final.relation_type, scenario.error_message diff --git a/tests/functional/utils.py b/tests/functional/utils.py new file mode 100644 index 000000000..d185e8d2b --- /dev/null +++ b/tests/functional/utils.py @@ -0,0 +1,78 @@ +from typing import Any, Dict, Optional + +from dbt.tests.util import ( + get_connection, + get_model_file, + relation_from_name, + set_model_file, +) + +from dbt.adapters.snowflake.relation_configs import SnowflakeDynamicTableConfig + + +def query_relation_type(project, name: str) -> Optional[str]: + relation = relation_from_name(project.adapter, name) + sql = f""" + select + case table_type + when 'BASE TABLE' then iff(is_dynamic = 'YES', 'dynamic_table', 'table') + when 'VIEW' then 'view' + when 'EXTERNAL TABLE' then 'external_table' + end as relation_type + from information_schema.tables + where table_name like '{relation.identifier.upper()}' + and table_schema like '{relation.schema.upper()}' + and table_catalog like '{relation.database.upper()}' + """ + results = project.run_sql(sql, fetch="all") + + assert len(results) > 0, f"Relation {relation} not found" + assert len(results) == 1, f"Multiple relations found" + + return results[0][0].lower() + + +def query_row_count(project, name: str) -> int: + relation = relation_from_name(project.adapter, name) + sql = f"select count(*) from {relation}" + return project.run_sql(sql, fetch="one")[0] + + +def insert_record(project, name: str, record: Dict[str, Any]): + relation = relation_from_name(project.adapter, name) + column_names = ", ".join(record.keys()) + values = ", ".join( + [f"'{value}'" if isinstance(value, str) else f"{value}" for value in record.values()] + ) + sql = f"insert into {relation} ({column_names}) values ({values})" + project.run_sql(sql) + + +def update_model(project, name: str, model: str) -> str: + relation = relation_from_name(project.adapter, name) + original_model = get_model_file(project, relation) + set_model_file(project, relation, model) + return original_model + + +def describe_dynamic_table(project, name: str) -> Optional[SnowflakeDynamicTableConfig]: + macro = "snowflake__describe_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + results = project.adapter.execute_macro(macro, kwargs=kwargs) + + assert len(results["dynamic_table"].rows) > 0, f"Dynamic table {dynamic_table} not found" + found = len(results["dynamic_table"].rows) + names = ", ".join([table.get("name") for table in results["dynamic_table"].rows]) + assert found == 1, f"Multiple dynamic tables found: {names}" + + return SnowflakeDynamicTableConfig.from_relation_results(results) + + +def refresh_dynamic_table(project, name: str) -> None: + macro = "snowflake__refresh_dynamic_table" + dynamic_table = relation_from_name(project.adapter, name) + kwargs = {"relation": dynamic_table} + with get_connection(project.adapter): + project.adapter.execute_macro(macro, kwargs=kwargs) From 583ec5eb2686720b5715b0121c4c7d79a3e319d3 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 23 Sep 2024 22:57:17 +0100 Subject: [PATCH 6/6] Add required 'begin' config for testing microbatch models (#1189) --- tests/functional/adapter/test_incremental_microbatch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py index bbb57f96c..f228c370c 100644 --- a/tests/functional/adapter/test_incremental_microbatch.py +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -6,7 +6,7 @@ # No requirement for a unique_id for snowflake microbatch! _microbatch_model_no_unique_id_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} """