Skip to content

Commit

Permalink
Support for incremental materialization with ingestion time partition…
Browse files Browse the repository at this point in the history
… tables
  • Loading branch information
Kayrnt committed Apr 11, 2022
1 parent 3d69869 commit f8e8739
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 32 deletions.
24 changes: 19 additions & 5 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import lru_cache
import agate
from requests.exceptions import ConnectionError
from typing import Optional, Any, Dict, Tuple
from typing import Optional, Any, Dict, Tuple, List

import google.auth
import google.auth.exceptions
Expand Down Expand Up @@ -86,6 +86,7 @@ class BigQueryConnectionMethod(StrEnum):
@dataclass
class BigQueryAdapterResponse(AdapterResponse):
bytes_processed: Optional[int] = None
fields: List[Any] = field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -434,6 +435,7 @@ def execute(
code = None
num_rows = None
bytes_processed = None
fields = list()

if query_job.statement_type == "CREATE_VIEW":
code = "CREATE VIEW"
Expand All @@ -448,6 +450,7 @@ def execute(
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
fields = query_table.schema

elif query_job.statement_type == "SCRIPT":
code = "SCRIPT"
Expand All @@ -473,9 +476,14 @@ def execute(
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"
fields = query_table.schema

response = BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
fields=fields,
)

return response, table
Expand Down Expand Up @@ -529,7 +537,8 @@ def copy_and_results():

self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path
", ".join(source_ref.path for source_ref in source_ref_array),
destination_ref.path,
),
conn=conn,
fn=copy_and_results,
Expand Down Expand Up @@ -571,7 +580,12 @@ def fn():
self._retry_and_handle(msg="create dataset", conn=conn, fn=fn)

def _query_and_results(
self, client, sql, job_params, job_creation_timeout=None, job_execution_timeout=None
self,
client,
sql,
job_params,
job_creation_timeout=None,
job_execution_timeout=None,
):
"""Query the client and wait for results."""
# Cannot reuse job_config if destination is set and ddl is used
Expand Down
50 changes: 40 additions & 10 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
import dbt.clients.agate_helper

from dbt import ui # type: ignore
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
from dbt.adapters.base import (
BaseAdapter,
available,
RelationType,
SchemaSearchMap,
AdapterConfig,
)
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
Expand Down Expand Up @@ -47,11 +53,15 @@ class PartitionConfig(dbtClassMixin):
data_type: str = "date"
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def render(self, alias: Optional[str] = None):
column: str = self.field
column: str = self.field if not self.time_ingestion_partitioning else "_PARTITIONTIME"
if alias:
column = f"{alias}.{self.field}"
column = f"{alias}.{column}"

if self.data_type.lower() == "int64" or (
self.data_type.lower() == "date" and self.granularity.lower() == "day"
Expand Down Expand Up @@ -89,7 +99,11 @@ def render(self):

def _stub_relation(*args, **kwargs):
return BigQueryRelation.create(
database="", schema="", identifier="", quote_policy={}, type=BigQueryRelation.Table
database="",
schema="",
identifier="",
quote_policy={},
type=BigQueryRelation.Table,
)


Expand Down Expand Up @@ -209,7 +223,9 @@ def check_schema_exists(self, database: str, schema: str) -> bool:
def get_columns_in_relation(self, relation: BigQueryRelation) -> List[BigQueryColumn]:
try:
table = self.connections.get_bq_table(
database=relation.database, schema=relation.schema, identifier=relation.identifier
database=relation.database,
schema=relation.schema,
identifier=relation.identifier,
)
return self._get_dbt_columns_from_bq_table(table)

Expand Down Expand Up @@ -358,7 +374,10 @@ def _materialize_as_view(self, model: Dict[str, Any]) -> str:

logger.debug("Model SQL ({}):\n{}".format(model_alias, model_sql))
self.connections.create_view(
database=model_database, schema=model_schema, table_name=model_alias, sql=model_sql
database=model_database,
schema=model_schema,
table_name=model_alias,
sql=model_sql,
)
return "CREATE VIEW"

Expand All @@ -379,7 +398,10 @@ def _materialize_as_table(

logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
self.connections.create_table(
database=model_database, schema=model_schema, table_name=table_name, sql=model_sql
database=model_database,
schema=model_schema,
table_name=table_name,
sql=model_sql,
)

return "CREATE TABLE"
Expand Down Expand Up @@ -462,7 +484,8 @@ def _partitions_match(self, table, conf_partition: Optional[PartitionConfig]) ->
if not is_partitioned and not conf_partition:
return True
elif conf_partition and table.time_partitioning is not None:
table_field = table.time_partitioning.field.lower()
partioning_field = table.time_partitioning.field or "_PARTITIONTIME"
table_field = partioning_field.lower()
table_granularity = table.partitioning_type.lower()
return (
table_field == conf_partition.field.lower()
Expand Down Expand Up @@ -508,7 +531,9 @@ def is_replaceable(

try:
table = self.connections.get_bq_table(
database=relation.database, schema=relation.schema, identifier=relation.identifier
database=relation.database,
schema=relation.schema,
identifier=relation.identifier,
)
except google.cloud.exceptions.NotFound:
return True
Expand Down Expand Up @@ -630,7 +655,12 @@ def load_dataframe(self, database, schema, table_name, agate_table, column_overr

@available.parse_none
def upload_file(
self, local_file_path: str, database: str, table_schema: str, table_name: str, **kwargs
self,
local_file_path: str,
database: str,
table_schema: str,
table_name: str,
**kwargs,
) -> None:
conn = self.connections.get_thread_connection()
client = conn.handle
Expand Down
109 changes: 103 additions & 6 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,94 @@
{% do return(strategy) %}
{% endmacro %}

{% macro get_columns_with_types_in_query(select_sql) %}
{% call statement('get_columns_with_types_in_query', fetch_result=True, auto_begin=False) -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endcall %}
{%- set result = load_result('get_columns_with_types_in_query') -%}
{{ return(load_result('get_columns_with_types_in_query').response.fields) }}
{% endmacro %}

{% macro create_ingestion_time_partitioned_table_as(temporary, relation, sql) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{%- set columns = get_columns_with_types_in_query(sql) -%}
{%- set table_dest_columns_csv = columns_without_partition_fields_csv(partition_config, columns) -%}

{{ sql_header if sql_header is not none }}

{% set ingestion_time_partition_config_raw = fromjson(tojson(raw_partition_by)) %}
{% do ingestion_time_partition_config_raw.update({'field':'_PARTITIONTIME'}) %}

{%- set ingestion_time_partition_config = adapter.parse_partition_by(ingestion_time_partition_config_raw) -%}

create or replace table {{ relation }} ({{table_dest_columns_csv}})
{{ partition_by(ingestion_time_partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}

{%- endmacro -%}

{% macro get_quoted_with_types_csv(columns) %}
{% set quoted = [] %}
{% for col in columns -%}
{%- do quoted.append(adapter.quote(col.name) ~ " " ~ col.field_type) -%}
{%- endfor %}
{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}

{% endmacro %}

{% macro columns_without_partition_fields_csv(partition_config, columns) -%}
{%- set columns_no_partition = partition_config.reject_partition_field_column(columns) -%}
{% set columns_names = get_quoted_with_types_csv(columns_no_partition) %}
{{ return(columns_names) }}

{%- endmacro -%}

{% macro bq_insert_into_ingestion_time_partitioned_table(target_relation, sql) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
{{ wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), sql) }}

{%- endmacro -%}

{% macro build_partition_time_exp(partition_by) %}
{% if partition_by.data_type == 'timestamp' %}
{{ return(partition_by.field) }}
{% else %}
{{ return('timestamp(' + partition_by.field + ')') }}
{% endif %}
{% endmacro %}

{% macro wrap_with_time_ingestion_partitioning(partition_time_exp, sql) %}

select {{ partition_time_exp }} as _partitiontime, * EXCEPT({{ partition_time_exp }}) from (
{{ sql }}
);

{% endmacro %}

{% macro source_sql_with_partition(partition_by, source_sql) %}

{%- if partition_by.time_ingestion_partitioning %}
{{ return(wrap_with_time_ingestion_partitioning(build_partition_time_exp(partition_by), source_sql)) }}
{% else %}
{{ return(source_sql) }}
{%- endif -%}

{% endmacro %}

{% macro bq_insert_overwrite(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -60,15 +148,15 @@
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}
{% set create_table_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}
{{ create_table_sql }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
Expand All @@ -94,6 +182,15 @@

{% endmacro %}

{% macro bq_create_table_as(is_time_ingestion_partitioning, temporary, relation, sql) %}
{% if is_time_ingestion_partitioning %}
{#-- Create the table before inserting data as ingestion time partitioned tables can't be created with the transformed data --#}
{% do run_query(create_ingestion_time_partitioned_table_as(temporary, relation, sql)) %}
{{ return(bq_insert_into_ingestion_time_partitioned_table(relation, sql)) }}
{% else %}
{{ return(create_table_as(temporary, relation, sql)) }}
{% endif %}
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
Expand Down Expand Up @@ -155,26 +252,26 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = bq_create_table_as(partition_by.time_ingestion_partitioning, False, target_relation, sql) %}

{% else %}
{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
{% do run_query(
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
declare_dbt_max_partition(this, partition_by, sql) + bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql)
) %}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns',
partition_by={
"field": "date",
"data_type": "timestamp",
"time_ingestion_partitioning": true
},
)
}}

{% set string_type = 'string' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Loading

0 comments on commit f8e8739

Please sign in to comment.