diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index abc56bd5b..2bbf152a0 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -51,6 +51,12 @@ class PartitionConfig(dbtClassMixin): data_type: str = 'date' granularity: str = 'day' range: Optional[Dict[str, Any]] = None + time_ingestion_partitioning: bool = False + + def reject_partition_field_column( + self, + columns: List[str]) -> List[str]: + return [c for c in columns if not c.upper() == self.field.upper()] def render(self, alias: Optional[str] = None): column: str = self.field diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 56811234f..2efad6e91 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -27,6 +27,52 @@ {% do return(strategy) %} {% endmacro %} +{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set raw_cluster_by = config.get('cluster_by', none) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + + {%- set columns = get_columns_in_query(sql) -%} + {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%} + + {{ sql_header if sql_header is not none }} + + create or replace table {{ relation }} ({{table_dest_columns_csv}}) + {{ partition_by(partition_config) }} + {{ cluster_by(raw_cluster_by) }} + {{ bigquery_table_options(config, model, temporary) }} + +{%- endmacro -%} + + +{% macro columns_without_partition_fields_csv(partition_config, columns) -%} + {%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%} + {{ log("columns_no_partition " ~ columns_no_partition) }} + {% set columns_names = get_quoted_csv(columns_no_partition) %} + {{ return(columns_names) }} + +{%- endmacro -%} + +{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%} + + {%- set partition_by = config.get('partition_by', none) -%} + {% if partition_by.data_type == 'timestamp' %} + {% set partition_time_exp = partition_by.field %} + {% else %} + {% set partition_time_exp = 'timestamp(' + partition_by.field + ')' %} + {% endif %} + {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} + {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }}) + select {{ partition_time_exp }} as _partitiontime from ( + {{ sql }} + ) + +{%- endmacro -%} {% macro bq_insert_overwrite( tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists @@ -94,6 +140,15 @@ {% endmacro %} +{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %} + {% if is_time_ingestion_partitioning == True %} + {#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#} + {% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %} + {{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }} + {% else %} + {{ return(create_table_as(temporary, relation, sql)) }} + {% endif %} +{% endmacro %} {% macro bq_generate_incremental_build_sql( strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists @@ -155,12 +210,12 @@ {{ run_hooks(pre_hooks) }} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% elif existing_relation.is_view %} {#-- There's no way to atomically replace a view with a table on BQ --#} {{ adapter.drop_relation(existing_relation) }} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% elif full_refresh_mode %} {#-- If the partition/cluster config has changed, then we must drop and recreate --#} @@ -168,13 +223,13 @@ {% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %} {{ adapter.drop_relation(existing_relation) }} {% endif %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% else %} {% set tmp_relation_exists = false %} {% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #} {% do run_query( - declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql) + declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) ) %} {% set tmp_relation_exists = true %} {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}