Skip to content

Commit

Permalink
Support copy partitions option for insert overwrite mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Apr 24, 2022
1 parent ed18955 commit 75d90db
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 96 deletions.
12 changes: 9 additions & 3 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PartitionConfig(dbtClassMixin):
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]
Expand Down Expand Up @@ -492,11 +493,16 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partioning_field.lower()
partitioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partitioning_field.lower()
table_granularity = table.partitioning_type.lower()
conf_table_field = (
conf_partition.field
if not conf_partition.time_ingestion_partitioning
else "_PARTITIONTIME"
)
return (
table_field == conf_partition.field.lower()
table_field == conf_table_field.lower()
and table_granularity == conf_partition.granularity.lower()
)
elif conf_partition and table.range_partitioning is not None:
Expand Down
14 changes: 10 additions & 4 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}

{% else %} {# strategy == 'merge' #}
Expand Down Expand Up @@ -66,7 +66,13 @@

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% if partition_by.copy_partitions is true and strategy != 'insert_overwrite' %} {#-- We can't copy partitions with merge strategy --#}
{% set wrong_strategy_msg -%}
The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}

{% elif existing_relation is none %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif existing_relation.is_view %}
Expand Down Expand Up @@ -99,7 +105,7 @@
{% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %}
{% endif %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions
) %}

{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,91 +1,133 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change
) %}
{% set build_sql = bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions
) %}

{{ return(build_sql) }}
{{ return(build_sql) }}

{% endmacro %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
{% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% for partition in partitions %}
{% if partition_by.granularity == 'hour' %}
{% set partition = partition.strftime("%Y%m%d%H") %}
{% elif partition_by.granularity == 'day' %}
{% set partition = partition.strftime("%Y%m%d") %}
{% elif partition_by.granularity == 'month' %}
{% set partition = partition.strftime("%Y%m") %}
{% elif partition_by.granularity == 'year' %}
{% set partition = partition.strftime("%Y") %}
{% endif %}
{% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %}
{% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %}
{% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %}
{% endfor %}
{% endmacro %}

{% if partitions is not none and partitions != [] %} {# static #}
{% macro bq_static_insert_overwrite(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) %}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }}
{%- else -%}
{{sql}}
{%- endif -%}
)
{%- endset -%}

{% if copy_partitions %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
{% endif %}
{% endmacro %}

{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}
{% macro bq_dynamic_copy_partitions_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
) %}
{# We run temp table creation in a separated script to move to partitions copy #}
{%- do run_query(bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)) -%}
{%- set partitions_sql -%}
select distinct {{ partition_by.render_wrapped() }}
from {{ tmp_relation }}
{%- endset -%}
{%- set partitions = run_query(partitions_sql).columns[0].values() -%}
{# We copy the partitions #}
{%- do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) -%}
-- Clean up the temp table
drop table if exists {{ tmp_relation }}
{% endmacro %}

{%- set source_sql -%}
(
{%- if partition_by.time_ingestion_partitioning -%}
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }}
{%- else -%}
{{sql}}
{%- endif -%}
)
{%- endset -%}
{% macro bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{%- endif -%}
* from {{ tmp_relation }}
)
{%- endset -%}

declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table with model data
{{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
{% endif %}
{% endmacro %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) }}
{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{%- endif -%}
* from {{ tmp_relation }}
)
{%- endset -%}

declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{% set create_table_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) %}
{{ create_table_sql }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{{ bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% endif %}

{% endmacro %}
Loading

0 comments on commit 75d90db

Please sign in to comment.