Skip to content

Commit

Permalink
Add support to infer schemas on Snowflake (dbt-labs#211)
Browse files Browse the repository at this point in the history
* Add support to infer schemas on Snowflake
* Add tests and example for Snowflake infer schema
  • Loading branch information
b-per authored and jarno-r committed Jan 2, 2024
1 parent 26cdfc2 commit ad58a7e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 22 deletions.
8 changes: 6 additions & 2 deletions integration_tests/macros/plugins/snowflake/prep_external.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
{% macro snowflake__prep_external() %}

{% set external_stage = target.schema ~ '.dbt_external_tables_testing' %}
{% set parquet_file_format = target.schema ~ '.dbt_external_tables_testing_parquet' %}

{% set create_external_stage %}
{% set create_external_stage_and_file_format %}

begin;
create or replace stage
{{ external_stage }}
url = 's3://dbt-external-tables-testing';

create or replace file format {{ parquet_file_format }} type = parquet;
commit;

{% endset %}

{% do log('Creating external stage ' ~ external_stage, info = true) %}
{% do run_query(create_external_stage) %}
{% do log('Creating parquet file format ' ~ parquet_file_format, info = true) %}
{% do run_query(create_external_stage_and_file_format) %}

{% endmacro %}
37 changes: 37 additions & 0 deletions integration_tests/models/plugins/snowflake/snowflake_external.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,40 @@ sources:
data_type: varchar
expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
tests: *same-rowcount

- name: people_parquet_column_list_unpartitioned
external: &parquet-people
location: '@{{ target.schema }}.dbt_external_tables_testing/parquet'
file_format: '{{ target.schema }}.dbt_external_tables_testing_parquet'
columns: *cols-of-the-people
tests: *equal-to-the-people

- name: people_parquet_column_list_partitioned
external:
<<: *parquet-people
partitions: *parts-of-the-people
columns: *cols-of-the-people
tests: *equal-to-the-people

- name: people_parquet_infer_schema_unpartitioned
external:
<<: *parquet-people
infer_schema: true
tests: *equal-to-the-people

- name: people_parquet_infer_schema_partitioned
external:
<<: *parquet-people
partitions: *parts-of-the-people
infer_schema: true
tests: *equal-to-the-people

- name: people_parquet_infer_schema_partitioned_and_column_desc
external:
<<: *parquet-people
partitions: *parts-of-the-people
infer_schema: true
tests: *equal-to-the-people
columns:
- name: id
description: "the unique ID for people"
45 changes: 33 additions & 12 deletions macros/plugins/snowflake/create_external_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,47 @@
{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}
{%- set infer_schema = external.infer_schema -%}

{% if infer_schema %}
{% set query_infer_schema %}
select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') )
{% endset %}
{% if execute %}
{% set columns_infer = run_query(query_infer_schema) %}
{% endif %}
{% endif %}

{%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%}

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}
create or replace external table {{source(source_node.source_name, source_node.name)}}
{%- if columns or partitions -%}
{%- if columns or partitions or infer_schema -%}
(
{%- if partitions -%}{%- for partition in partitions %}
{{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 -}}
{{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}}
{%- endfor -%}{%- endif -%}
{%- for column in columns %}
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %}
{%- set col_expression -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_quoted -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column_quoted}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}
{%- if not infer_schema -%}
{%- for column in columns %}
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %}
{%- set col_expression -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_quoted -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column_quoted}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}
{% else %}
{%- for column in columns_infer %}
{%- set col_expression -%}
{%- set col_id = 'value:' ~ column[0] -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}})
{{- ',' if not loop.last -}}
{% endfor %}
{%- endif -%}
)
{%- endif -%}
{% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %}
Expand All @@ -34,4 +55,4 @@
{% if external.integration -%} integration = '{{external.integration}}' {%- endif %}
file_format = {{external.file_format}}
{% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %}
{% endmacro %}
{% endmacro %}
4 changes: 2 additions & 2 deletions run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ set -eo pipefail
dbt deps --target $1
dbt seed --full-refresh --target $1
dbt run-operation prep_external --target $1
dbt run-operation stage_external_sources --vars 'ext_full_refresh: true' --target $1
dbt run-operation stage_external_sources --target $1
dbt run-operation dbt_external_tables.stage_external_sources --vars 'ext_full_refresh: true' --target $1
dbt run-operation dbt_external_tables.stage_external_sources --target $1
dbt test --target $1
29 changes: 23 additions & 6 deletions sample_sources/snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,26 @@ sources:
# include `value`, the JSON blob of all file contents.

- name: delta_tbl
description: "External table using Delta files"
external:
location: "@stage" # reference an existing external stage
file_format: "( type = parquet )" # fully specified here, or reference an existing file format
table_format: delta # specify the table format
auto_refresh: false # requires configuring an event notification from Amazon S3 or Azure
description: "External table using Delta files"
external:
location: "@stage" # reference an existing external stage
file_format: "( type = parquet )" # fully specified here, or reference an existing file format
table_format: delta # specify the table format
auto_refresh: false # requires configuring an event notification from Amazon S3 or Azure


- name: parquet_with_inferred_schema
description: "External table using Parquet and inferring the schema"
external:
location: "@stage" # reference an existing external stage
file_format: "my_file_format" # we need a named file format for infer to work
infer_schema: true # parameter to tell Snowflake we want to infer the table schema
partitions:
- name: section # we can define partitions on top of the schema columns
data_type: varchar(64)
expression: "substr(split_part(metadata$filename, 'section=', 2), 1, 1)"
columns: # columns can still be listed for documentation/testing purpose
- name: id
description: this is an id
- name: name
description: and this is a name

0 comments on commit ad58a7e

Please sign in to comment.