From f8e8739505c8e29dfd9abee02bf403e811a4a404 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Sun, 27 Mar 2022 22:06:43 +0200 Subject: [PATCH] Support for incremental materialization with ingestion time partition tables --- dbt/adapters/bigquery/connections.py | 24 +++- dbt/adapters/bigquery/impl.py | 50 ++++++-- .../macros/materializations/incremental.sql | 109 +++++++++++++++++- ...ncremental_time_ingestion_partitioning.sql | 34 ++++++ tests/unit/test_bigquery_adapter.py | 46 ++++++-- 5 files changed, 231 insertions(+), 32 deletions(-) create mode 100644 tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 05f236a55..50a880a20 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,11 +1,11 @@ import json import re from contextlib import contextmanager -from dataclasses import dataclass +from dataclasses import dataclass, field from functools import lru_cache import agate from requests.exceptions import ConnectionError -from typing import Optional, Any, Dict, Tuple +from typing import Optional, Any, Dict, Tuple, List import google.auth import google.auth.exceptions @@ -86,6 +86,7 @@ class BigQueryConnectionMethod(StrEnum): @dataclass class BigQueryAdapterResponse(AdapterResponse): bytes_processed: Optional[int] = None + fields: List[Any] = field(default_factory=list) @dataclass @@ -434,6 +435,7 @@ def execute( code = None num_rows = None bytes_processed = None + fields = list() if query_job.statement_type == "CREATE_VIEW": code = "CREATE VIEW" @@ -448,6 +450,7 @@ def execute( bytes_processed = query_job.total_bytes_processed processed_bytes = self.format_bytes(bytes_processed) message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" + fields = query_table.schema elif query_job.statement_type == "SCRIPT": code = "SCRIPT" @@ -473,9 +476,14 @@ def execute( bytes_processed = query_job.total_bytes_processed processed_bytes = self.format_bytes(bytes_processed) message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" + fields = query_table.schema response = BigQueryAdapterResponse( # type: ignore[call-arg] - _message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed + _message=message, + rows_affected=num_rows, + code=code, + bytes_processed=bytes_processed, + fields=fields, ) return response, table @@ -529,7 +537,8 @@ def copy_and_results(): self._retry_and_handle( msg='copy table "{}" to "{}"'.format( - ", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path + ", ".join(source_ref.path for source_ref in source_ref_array), + destination_ref.path, ), conn=conn, fn=copy_and_results, @@ -571,7 +580,12 @@ def fn(): self._retry_and_handle(msg="create dataset", conn=conn, fn=fn) def _query_and_results( - self, client, sql, job_params, job_creation_timeout=None, job_execution_timeout=None + self, + client, + sql, + job_params, + job_creation_timeout=None, + job_execution_timeout=None, ): """Query the client and wait for results.""" # Cannot reuse job_config if destination is set and ddl is used diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 0fc5fc1cc..225732e6c 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -7,7 +7,13 @@ import dbt.clients.agate_helper from dbt import ui # type: ignore -from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig +from dbt.adapters.base import ( + BaseAdapter, + available, + RelationType, + SchemaSearchMap, + AdapterConfig, +) from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager @@ -47,11 +53,15 @@ class PartitionConfig(dbtClassMixin): data_type: str = "date" granularity: str = "day" range: Optional[Dict[str, Any]] = None + time_ingestion_partitioning: bool = False + + def reject_partition_field_column(self, columns: List[Any]) -> List[str]: + return [c for c in columns if not c.name.upper() == self.field.upper()] def render(self, alias: Optional[str] = None): - column: str = self.field + column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME" if alias: - column = f"{alias}.{self.field}" + column = f"{alias}.{column}" if self.data_type.lower() == "int64" or ( self.data_type.lower() == "date" and self.granularity.lower() == "day" @@ -89,7 +99,11 @@ def render(self): def _stub_relation(*args, **kwargs): return BigQueryRelation.create( - database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table + database="", + schema="", + identifier="", + quote_policy={}, + type=BigQueryRelation.Table, ) @@ -209,7 +223,9 @@ def check_schema_exists(self, database: str, schema: str) -> bool: def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]: try: table = self.connections.get_bq_table( - database=relation.database, schema=relation.schema, identifier=relation.identifier + database=relation.database, + schema=relation.schema, + identifier=relation.identifier, ) return self._get_dbt_columns_from_bq_table(table) @@ -358,7 +374,10 @@ def _materialize_as_view(self, model: Dict[str, Any]) -> str: logger.debug("Model SQL ({}):\n{}".format(model_alias, model_sql)) self.connections.create_view( - database=model_database, schema=model_schema, table_name=model_alias, sql=model_sql + database=model_database, + schema=model_schema, + table_name=model_alias, + sql=model_sql, ) return "CREATE VIEW" @@ -379,7 +398,10 @@ def _materialize_as_table( logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql)) self.connections.create_table( - database=model_database, schema=model_schema, table_name=table_name, sql=model_sql + database=model_database, + schema=model_schema, + table_name=table_name, + sql=model_sql, ) return "CREATE TABLE" @@ -462,7 +484,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) -> if not is_partitioned and not conf_partition: return True elif conf_partition and table.time_partitioning is not None: - table_field = table.time_partitioning.field.lower() + partioning_field = table.time_partitioning.field or "_PARTITIONTIME" + table_field = partioning_field.lower() table_granularity = table.partitioning_type.lower() return ( table_field == conf_partition.field.lower() @@ -508,7 +531,9 @@ def is_replaceable( try: table = self.connections.get_bq_table( - database=relation.database, schema=relation.schema, identifier=relation.identifier + database=relation.database, + schema=relation.schema, + identifier=relation.identifier, ) except google.cloud.exceptions.NotFound: return True @@ -630,7 +655,12 @@ def load_dataframe(self, database, schema, table_name, agate_table, column_overr @available.parse_none def upload_file( - self, local_file_path: str, database: str, table_schema: str, table_name: str, **kwargs + self, + local_file_path: str, + database: str, + table_schema: str, + table_name: str, + **kwargs, ) -> None: conn = self.connections.get_thread_connection() client = conn.handle diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index f9d36aead..ef8fd18ae 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -27,6 +27,94 @@ {% do return(strategy) %} {% endmacro %} +{% macro get_columns_with_types_in_query(select_sql) %} + {% call statement('get_columns_with_types_in_query', fetch_result=True, auto_begin=False) -%} + select * from ( + {{ select_sql }} + ) as __dbt_sbq + where false + limit 0 + {% endcall %} + {%- set result = load_result('get_columns_with_types_in_query') -%} + {{ return(load_result('get_columns_with_types_in_query').response.fields) }} +{% endmacro %} + +{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set raw_cluster_by = config.get('cluster_by', none) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + + {%- set columns = get_columns_with_types_in_query(sql) -%} + {%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%} + + {{ sql_header if sql_header is not none }} + + {% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %} + {% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %} + + {%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%} + + create or replace table {{ relation }} ({{table_dest_columns_csv}}) + {{ partition_by(ingestion_time_partition_config) }} + {{ cluster_by(raw_cluster_by) }} + {{ bigquery_table_options(config, model, temporary) }} + +{%- endmacro -%} + +{% macro get_quoted_with_types_csv(columns) %} + {% set quoted = [] %} + {% for col in columns -%} + {%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.field_type) -%} + {%- endfor %} + {%- set dest_cols_csv = quoted | join(', ') -%} + {{ return(dest_cols_csv) }} + +{% endmacro %} + +{% macro columns_without_partition_fields_csv(partition_config, columns) -%} + {%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%} + {% set columns_names = get_quoted_with_types_csv(columns_no_partition) %} + {{ return(columns_names) }} + +{%- endmacro -%} + +{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%} + {%- set partition_by = config.get('partition_by', none) -%} + {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} + {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }}) + {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql) }} + +{%- endmacro -%} + +{% macro build_partition_time_exp(partition_by) %} + {% if partition_by.data_type == 'timestamp' %} + {{ return(partition_by.field) }} + {% else %} + {{ return('timestamp(' + partition_by.field + ')') }} + {% endif %} +{% endmacro %} + +{% macro wrap_with_time_ingestion_partitioning(partition_time_exp, sql) %} + + select {{ partition_time_exp }} as _partitiontime, * EXCEPT({{ partition_time_exp }}) from ( + {{ sql }} + ); + +{% endmacro %} + +{% macro source_sql_with_partition(partition_by, source_sql) %} + + {%- if partition_by.time_ingestion_partitioning %} + {{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), source_sql)) }} + {% else %} + {{ return(source_sql) }} + {%- endif -%} + +{% endmacro %} {% macro bq_insert_overwrite( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists @@ -60,7 +148,6 @@ ) {%- endset -%} - -- generated script to merge partitions into {{ target_relation }} declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>; {# have we already created the temp table to check for schema changes? #} @@ -68,7 +155,8 @@ {{ declare_dbt_max_partition(this, partition_by, sql) }} -- 1. create a temp table - {{ create_table_as(True, tmp_relation, sql) }} + {% set create_table_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} + {{ create_table_sql }} {% else %} -- 1. temp table already exists, we used it to check for schema changes {% endif %} @@ -94,6 +182,15 @@ {% endmacro %} +{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %} + {% if is_time_ingestion_partitioning %} + {#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#} + {% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %} + {{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }} + {% else %} + {{ return(create_table_as(temporary, relation, sql)) }} + {% endif %} +{% endmacro %} {% macro bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists @@ -155,12 +252,12 @@ {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% elif existing_relation.is_view %} {#-- There's no way to atomically replace a view with a table on BQ --#} {{ adapter.drop_relation(existing_relation) }} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% elif full_refresh_mode %} {#-- If the partition/cluster config has changed, then we must drop and recreate --#} @@ -168,13 +265,13 @@ {% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %} {{ adapter.drop_relation(existing_relation) }} {% endif %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% else %} {% set tmp_relation_exists = false %} {% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #} {% do run_query( - declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql) + declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) ) %} {% set tmp_relation_exists = true %} {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} diff --git a/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql new file mode 100644 index 000000000..4e041f8ce --- /dev/null +++ b/tests/integration/incremental_schema_tests/models/incremental_time_ingestion_partitioning.sql @@ -0,0 +1,34 @@ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns', + partition_by={ + "field": "date", + "data_type": "timestamp", + "time_ingestion_partitioning": true + }, + ) +}} + +{% set string_type = 'string' %} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2, + cast(field3 as {{string_type}}) as field3, + cast(field4 as {{string_type}}) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 +FROM source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index eee92f1ab..6bfacbbf0 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -691,7 +691,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "day" + "granularity": "day", + "time_ingestion_partitioning": False } ) @@ -702,7 +703,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "day" + "granularity": "day", + "time_ingestion_partitioning": False } ) @@ -715,7 +717,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -728,7 +731,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False } ) @@ -741,7 +745,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "timestamp", - "granularity": "HOUR" + "granularity": "HOUR", + "time_ingestion_partitioning": False } ) @@ -755,7 +760,8 @@ def test_parse_partition_by(self): ), { "field": "ts", "data_type": "timestamp", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -768,7 +774,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "timestamp", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False } ) @@ -781,7 +788,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "HOUR" + "granularity": "HOUR", + "time_ingestion_partitioning": False } ) @@ -794,7 +802,8 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "MONTH" + "granularity": "MONTH", + "time_ingestion_partitioning": False } ) @@ -807,7 +816,21 @@ def test_parse_partition_by(self): }).to_dict(omit_none=True), { "field": "ts", "data_type": "datetime", - "granularity": "YEAR" + "granularity": "YEAR", + "time_ingestion_partitioning": False + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + "time_ingestion_partitioning": True + + }).to_dict(omit_none=True), { + "field": "ts", + "data_type": "datetime", + "granularity": "day", + "time_ingestion_partitioning": False } ) @@ -834,7 +857,8 @@ def test_parse_partition_by(self): "start": 1, "end": 100, "interval": 20 - } + }, + "time_ingestion_partitioning": False } )