From 75d90db142bf3e3057d5375669b2957a7e316418 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Sun, 24 Apr 2022 03:09:21 +0200 Subject: [PATCH] Support copy partitions option for insert overwrite mode --- dbt/adapters/bigquery/impl.py | 12 +- .../macros/materializations/incremental.sql | 14 +- .../incremental_strategy/insert_overwrite.sql | 194 +++++++++++------- tests/unit/test_bigquery_adapter.py | 39 ++-- 4 files changed, 163 insertions(+), 96 deletions(-) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 4a0e34d06..ee38d0608 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -48,6 +48,7 @@ class PartitionConfig(dbtClassMixin): granularity: str = "day" range: Optional[Dict[str, Any]] = None time_ingestion_partitioning: bool = False + copy_partitions: bool = False def reject_partition_field_column(self, columns: List[Any]) -> List[str]: return [c for c in columns if not c.name.upper() == self.field.upper()] @@ -492,11 +493,16 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) -> if not is_partitioned and not conf_partition: return True elif conf_partition and table.time_partitioning is not None: - partioning_field = table.time_partitioning.field or "_PARTITIONTIME" - table_field = partioning_field.lower() + partitioning_field = table.time_partitioning.field or "_PARTITIONTIME" + table_field = partitioning_field.lower() table_granularity = table.partitioning_type.lower() + conf_table_field = ( + conf_partition.field + if not conf_partition.time_ingestion_partitioning + else "_PARTITIONTIME" + ) return ( - table_field == conf_partition.field.lower() + table_field == conf_table_field.lower() and table_granularity == conf_partition.granularity.lower() ) elif conf_partition and table.range_partitioning is not None: diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index def718636..81ae2517f 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -24,13 +24,13 @@ {% endmacro %} {% macro bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions ) %} {% else %} {# strategy == 'merge' #} @@ -66,7 +66,13 @@ {{ run_hooks(pre_hooks) }} - {% if existing_relation is none %} + {% if partition_by.copy_partitions is true and strategy != 'insert_overwrite' %} {#-- We can't copy partitions with merge strategy --#} + {% set wrong_strategy_msg -%} + The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite'. + {%- endset %} + {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} + + {% elif existing_relation is none %} {% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %} {% elif existing_relation.is_view %} @@ -99,7 +105,7 @@ {% set dest_columns = adapter.add_time_ingestion_partition_column(dest_columns) %} {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions ) %} {% endif %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3d89a9b22..8a00754e3 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -1,91 +1,133 @@ {% macro bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions ) %} - {% if partition_by is none %} - {% set missing_partition_msg -%} - The 'insert_overwrite' strategy requires the `partition_by` config. - {%- endset %} - {% do exceptions.raise_compiler_error(missing_partition_msg) %} - {% endif %} + {% if partition_by is none %} + {% set missing_partition_msg -%} + The 'insert_overwrite' strategy requires the `partition_by` config. + {%- endset %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} - {% set build_sql = bq_insert_overwrite( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change - ) %} + {% set build_sql = bq_insert_overwrite( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, on_schema_change, copy_partitions + ) %} - {{ return(build_sql) }} + {{ return(build_sql) }} {% endmacro %} -{% macro bq_insert_overwrite( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists -) %} +{% macro bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} + {% for partition in partitions %} + {% if partition_by.granularity == 'hour' %} + {% set partition = partition.strftime("%Y%m%d%H") %} + {% elif partition_by.granularity == 'day' %} + {% set partition = partition.strftime("%Y%m%d") %} + {% elif partition_by.granularity == 'month' %} + {% set partition = partition.strftime("%Y%m") %} + {% elif partition_by.granularity == 'year' %} + {% set partition = partition.strftime("%Y") %} + {% endif %} + {% set tmp_relation_partitioned = api.Relation.create(database=tmp_relation.database, schema=tmp_relation.schema, identifier=tmp_relation.table ~ '$' ~ partition, type=tmp_relation.type) %} + {% set target_relation_partitioned = api.Relation.create(database=target_relation.database, schema=target_relation.schema, identifier=target_relation.table ~ '$' ~ partition, type=target_relation.type) %} + {% do adapter.copy_table(tmp_relation_partitioned, target_relation_partitioned, "table") %} + {% endfor %} +{% endmacro %} - {% if partitions is not none and partitions != [] %} {# static #} +{% macro bq_static_insert_overwrite(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) %} + {% set predicate -%} + {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in ( + {{ partitions | join (', ') }} + ) + {%- endset %} + + {%- set source_sql -%} + ( + {%- if partition_by.time_ingestion_partitioning -%} + {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }} + {%- else -%} + {{sql}} + {%- endif -%} + ) + {%- endset -%} + + {% if copy_partitions %} + {% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} + {% else %} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} + {% endif %} +{% endmacro %} - {% set predicate -%} - {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in ( - {{ partitions | join (', ') }} - ) - {%- endset %} +{% macro bq_dynamic_copy_partitions_insert_overwrite( + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions + ) %} + {# We run temp table creation in a separated script to move to partitions copy #} + {%- do run_query(bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)) -%} + {%- set partitions_sql -%} + select distinct {{ partition_by.render_wrapped() }} + from {{ tmp_relation }} + {%- endset -%} + {%- set partitions = run_query(partitions_sql).columns[0].values() -%} + {# We copy the partitions #} + {%- do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) -%} + -- Clean up the temp table + drop table if exists {{ tmp_relation }} +{% endmacro %} - {%- set source_sql -%} - ( - {%- if partition_by.time_ingestion_partitioning -%} - {{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql, True) }} - {%- else -%} - {{sql}} - {%- endif -%} - ) - {%- endset -%} +{% macro bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} + {%- if copy_partitions is true %} + {{ bq_dynamic_copy_partitions_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} + {% else -%} + {% set predicate -%} + {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) + {%- endset %} + + {%- set source_sql -%} + ( + select + {% if partition_by.time_ingestion_partitioning -%} + _PARTITIONTIME, + {%- endif -%} + * from {{ tmp_relation }} + ) + {%- endset -%} + + declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>; + + {# have we already created the temp table to check for schema changes? #} + {% if not tmp_relation_exists %} + {{ declare_dbt_max_partition(this, partition_by, sql) }} + + -- 1. create a temp table with model data + {{ bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) }} + {% else %} + -- 1. temp table already exists, we used it to check for schema changes + {% endif %} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }} + -- 2. define partitions to update + set (dbt_partitions_for_replacement) = ( + select as struct + array_agg(distinct {{ partition_by.render_wrapped() }}) + from {{ tmp_relation }} + ); + + {# + TODO: include_sql_header is a hack; consider a better approach that includes + the sql_header at the materialization-level instead + #} + -- 3. run the merge statement + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }}; + + -- 4. clean up the temp table + drop table if exists {{ tmp_relation }} + {% endif %} +{% endmacro %} +{% macro bq_insert_overwrite( + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions +) %} + {% if partitions is not none and partitions != [] %} {# static #} + {{ bq_static_insert_overwrite(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, copy_partitions) }} {% else %} {# dynamic #} - - {% set predicate -%} - {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) - {%- endset %} - - {%- set source_sql -%} - ( - select - {% if partition_by.time_ingestion_partitioning -%} - _PARTITIONTIME, - {%- endif -%} - * from {{ tmp_relation }} - ) - {%- endset -%} - - declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>; - - {# have we already created the temp table to check for schema changes? #} - {% if not tmp_relation_exists %} - {{ declare_dbt_max_partition(this, partition_by, sql) }} - - -- 1. create a temp table - {% set create_table_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql) %} - {{ create_table_sql }} - {% else %} - -- 1. temp table already exists, we used it to check for schema changes - {% endif %} - - -- 2. define partitions to update - set (dbt_partitions_for_replacement) = ( - select as struct - array_agg(distinct {{ partition_by.render_wrapped() }}) - from {{ tmp_relation }} - ); - - {# - TODO: include_sql_header is a hack; consider a better approach that includes - the sql_header at the materialization-level instead - #} - -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }}; - - -- 4. clean up the temp table - drop table if exists {{ tmp_relation }} - + {{ bq_dynamic_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} {% endif %} - {% endmacro %} diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index d84cb6c53..f027605c0 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -692,7 +692,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "date", "granularity": "day", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -704,7 +705,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "date", "granularity": "day", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -718,7 +720,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "date", "granularity": "MONTH", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -732,7 +735,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "date", "granularity": "YEAR", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -746,7 +750,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "timestamp", "granularity": "HOUR", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -761,7 +766,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "timestamp", "granularity": "MONTH", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -775,7 +781,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "timestamp", "granularity": "YEAR", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -789,7 +796,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "datetime", "granularity": "HOUR", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -803,7 +811,8 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "datetime", "granularity": "MONTH", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) @@ -817,20 +826,23 @@ def test_parse_partition_by(self): "field": "ts", "data_type": "datetime", "granularity": "YEAR", - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } ) self.assertEqual( adapter.parse_partition_by({ "field": "ts", - "time_ingestion_partitioning": True + "time_ingestion_partitioning": True, + "copy_partitions": True }).to_dict(omit_none=True), { "field": "ts", "data_type": "date", "granularity": "day", - "time_ingestion_partitioning": True + "time_ingestion_partitioning": True, + "copy_partitions": True } ) @@ -858,7 +870,8 @@ def test_parse_partition_by(self): "end": 100, "interval": 20 }, - "time_ingestion_partitioning": False + "time_ingestion_partitioning": False, + "copy_partitions": False } )