Skip to content

Commit

Permalink
Support for incremental materialization with ingestion time partition…
Browse files Browse the repository at this point in the history
… tables
  • Loading branch information
Kayrnt committed Mar 29, 2022
1 parent 072050e commit 9e1c480
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 5 deletions.
12 changes: 11 additions & 1 deletion dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class PartitionConfig(dbtClassMixin):
data_type: str = 'date'
granularity: str = 'day'
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False

def reject_partition_field_column(
self,
columns: List[Any]) -> List[str]:
logger.debug("reject_partition_field_column: {}".format(columns))
return [c for c in columns if not c.name.upper() == self.field.upper()]

def render(self, alias: Optional[str] = None):
column: str = self.field
Expand Down Expand Up @@ -507,7 +514,10 @@ def _partitions_match(
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field.lower()
logger.debug('table.time_partitioning ({})'.format(
table.time_partitioning))
partioning_field = table.time_partitioning.field or '_PARTITIONTIME'
table_field = partioning_field.lower()
table_granularity = table.partitioning_type.lower()
return table_field == conf_partition.field.lower() \
and table_granularity == conf_partition.granularity.lower()
Expand Down
96 changes: 92 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,84 @@
{% do return(strategy) %}
{% endmacro %}

{% macro get_columns_with_types_in_query(select_sql) %}
{% call statement('get_columns_with_types_in_query', fetch_result=True, auto_begin=False) -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endcall %}
{%- set result = load_result('get_columns_with_types_in_query') -%}
{{ log('result ' ~ result) }}
{%- set table = result.table -%}
{{ log('table ' ~ table) }}
{%- set columns = result.columns -%}
{{ log('columns ' ~ columns) }}
{{ return(load_result('get_columns_with_types_in_query').table.columns | list) }}
{% endmacro %}

{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{%- set columns = get_columns_with_types_in_query(sql) -%}
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}

{{ sql_header if sql_header is not none }}

{% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %}
{{ log('ingestion_time_partition_config_raw ' ~ ingestion_time_partition_config_raw) }}
{% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %}

{%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%}

create or replace table {{ relation }} ({{table_dest_columns_csv}})
{{ partition_by(ingestion_time_partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}

{%- endmacro -%}

{% macro get_quoted_with_types_csv(columns) %}
{% set quoted = [] %}
{% for col in columns -%}
{{ log('col > ' ~ col) }}
{%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.data_type) -%}
{%- endfor %}
{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}

{% endmacro %}

{% macro columns_without_partition_fields_csv(partition_config, columns) -%}
{%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%}
{{ log('columns_no_partition ' ~ columns_no_partition) }}
{% set columns_names = get_quoted_with_types_csv(columns_no_partition) %}
{{ return(columns_names) }}

{%- endmacro -%}

{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%}

{%- set partition_by = config.get('partition_by', none) -%}
{% if partition_by.data_type == 'timestamp' %}
{% set partition_time_exp = partition_by.field %}
{% else %}
{% set partition_time_exp = 'timestamp(' + partition_by.field + ')' %}
{% endif %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
select {{ partition_time_exp }} as _partitiontime from (
{{ sql }}
)

{%- endmacro -%}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -94,6 +172,15 @@

{% endmacro %}

{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %}
{% if is_time_ingestion_partitioning == True %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{% endif %}
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -147,6 +234,7 @@

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{% do log("partition by" ~ partition_by) %}
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

Expand All @@ -155,26 +243,26 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% else %}
{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
{% do run_query(
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)
) %}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
Expand Down

0 comments on commit 9e1c480

Please sign in to comment.