Skip to content

Commit

Permalink
Support for incremental materialization with ingestion time partition…
Browse files Browse the repository at this point in the history
… tables
  • Loading branch information
Kayrnt committed Mar 13, 2022
1 parent d5d94f9 commit 7e6d317
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
6 changes: 6 additions & 0 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class PartitionConfig(dbtClassMixin):
data_type: str = 'date'
granularity: str = 'day'
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False

def reject_partition_field_column(
self,
columns: List[str]) -> List[str]:
return [c for c in columns if not c.upper() == self.field.upper()]

def render(self, alias: Optional[str] = None):
column: str = self.field
Expand Down
63 changes: 59 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,52 @@
{% do return(strategy) %}
{% endmacro %}

{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{%- set columns = get_columns_in_query(sql) -%}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}

{{ sql_header if sql_header is not none }}

create or replace table {{ relation }} ({{table_dest_columns_csv}})
{{ partition_by(partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}

{%- endmacro -%}


{% macro columns_without_partition_fields_csv(partition_config, columns) -%}
{%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%}
{{ log("columns_no_partition " ~ columns_no_partition) }}
{% set columns_names = get_quoted_csv(columns_no_partition) %}
{{ return(columns_names) }}

{%- endmacro -%}

{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%}

{%- set partition_by = config.get('partition_by', none) -%}
{% if partition_by.data_type == 'timestamp' %}
{% set partition_time_exp = partition_by.field %}
{% else %}
{% set partition_time_exp = 'timestamp(' + partition_by.field + ')' %}
{% endif %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
select {{ partition_time_exp }} as _partitiontime from (
{{ sql }}
)

{%- endmacro -%}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -94,6 +140,15 @@

{% endmacro %}

{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %}
{% if is_time_ingestion_partitioning == True %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{% endif %}
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -155,26 +210,26 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% else %}
{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
{% do run_query(
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)
) %}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
Expand Down

0 comments on commit 7e6d317

Please sign in to comment.