Skip to content

Commit

Permalink
Support copy partitions option for insert overwrite mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Oct 24, 2022
1 parent ed2b051 commit 5bcff7d
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 32 deletions.
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20221020-223914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Optimize insert_overwrite incremental strategy with WRITE_TRUNCATE / Partition
copy
time: 2022-10-20T22:39:14.091878+02:00
custom:
Author: Kayrnt
Issue: "77"
PR: "167"
12 changes: 9 additions & 3 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class PartitionConfig(dbtClassMixin):
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]
Expand Down Expand Up @@ -525,11 +526,16 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partioning_field.lower()
partitioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partitioning_field.lower()
table_granularity = table.partitioning_type.lower()
conf_table_field = (
conf_partition.field
if not conf_partition.time_ingestion_partitioning
else "_PARTITIONTIME"
)
return (
table_field == conf_partition.field.lower()
table_field == conf_table_field.lower()
and table_granularity == conf_partition.granularity.lower()
)
elif conf_partition and table.range_partitioning is not None:
Expand Down
14 changes: 10 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{% else %} {# strategy == 'merge' #}
Expand Down Expand Up @@ -79,7 +79,13 @@

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% if partition_by.copy_partitions is true and strategy != 'insert_overwrite' %} {#-- We can't copy partitions with merge strategy --#}
{% set wrong_strategy_msg -%}
The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}

{% elif existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, compiled_code, language) }}
{%- endcall -%}
Expand Down Expand Up @@ -131,7 +137,7 @@
{% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %}
{% endif %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions
) %}

{%- call statement('main') -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
{{ return({'value': partition_value, 'field': partition_by.field}) }}
{% endmacro %}

{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}
{% macro declare_dbt_max_partition(relation, partition_by, compiled_code, language='sql') %}

{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}
{%- if '_dbt_max_partition' in compiled_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
Expand All @@ -9,18 +9,45 @@
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}

{{ return(build_sql) }}

{% endmacro %}

{% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}

{% for partition in partitions %}
{% if partition_by.granularity == 'hour' %}
{% set partition = partition.strftime("%Y%m%d%H") %}
{% elif partition_by.granularity == 'day' %}
{% set partition = partition.strftime("%Y%m%d") %}
{% elif partition_by.granularity == 'month' %}
{% set partition = partition.strftime("%Y%m") %}
{% elif partition_by.granularity == 'year' %}
{% set partition = partition.strftime("%Y") %}
{% endif %}
{% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %}
{% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %}
{% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %}
{% endfor %}

{% endmacro %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}

{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) }}
{% else %} {# dynamic #}
{{ bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% endif %}
{% endmacro %}

{% macro bq_static_insert_overwrite(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions
) %}

{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
Expand All @@ -38,6 +65,10 @@
)
{%- endset -%}

{% if copy_partitions %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}

{#-- Because we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
Expand All @@ -46,8 +77,29 @@
#}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}
{% endif %}
{% endmacro %}

{% macro bq_dynamic_copy_partitions_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
) %}
{# We run temp table creation in a separated script to move to partitions copy #}
{%- do run_query(bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)) -%}
{%- set partitions_sql -%}
select distinct {{ partition_by.render_wrapped() }}
from {{ tmp_relation }}
{%- endset -%}
{%- set partitions = run_query(partitions_sql).columns[0].values() -%}
{# We copy the partitions #}
{%- do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) -%}
-- Clean up the temp table
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
Expand All @@ -69,7 +121,7 @@
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
-- 1. create a temp table with model data
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, compiled_code) }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime",
"copy_partitions": true
}
)
}}


with data as (

{% if not is_incremental() %}

select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time

{% else %}

-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time

{% endif %}

)

select * from data

{% if is_incremental() %}
where date_time >= _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ def test__bigquery_assert_incrementals(self):
self.assertTablesEqual('incremental_overwrite_partitions', 'incremental_overwrite_date_expected')
self.assertTablesEqual('incremental_overwrite_day', 'incremental_overwrite_day_expected')
self.assertTablesEqual('incremental_overwrite_range', 'incremental_overwrite_range_expected')
self.assertTablesEqual('incremental_overwrite_day_with_copy_partitions', 'incremental_overwrite_day_expected')

Loading

0 comments on commit 5bcff7d

Please sign in to comment.