Skip to content

Commit

Permalink
Microbatch Config Validation (#10752)
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm authored Sep 24, 2024
1 parent a8d4ba2 commit bbdb98f
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20240924-154639.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: 'Parse-time validation of microbatch configs: require event_time, batch_size,
lookback and validate input event_time'
time: 2024-09-24T15:46:39.83112+01:00
custom:
Author: michelleark
Issue: "10709"
65 changes: 65 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
register_adapter,
)
from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.base import Writable
from dbt.clients.jinja import MacroStack, get_rendered
from dbt.clients.jinja_static import statically_extract_macro_calls
Expand Down Expand Up @@ -468,6 +469,7 @@ def load(self) -> Manifest:
self.check_valid_group_config()
self.check_valid_access_property()
self.check_valid_snapshot_config()
self.check_valid_microbatch_config()

semantic_manifest = SemanticManifest(self.manifest)
if not semantic_manifest.validate():
Expand Down Expand Up @@ -1355,6 +1357,69 @@ def check_valid_snapshot_config(self):
continue
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Required configs: event_time, batch_size, begin
event_time = node.config.event_time
if event_time is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates the name of the event time column."
)
if not isinstance(event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}."
)

begin = node.config.begin
if begin is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from."
)

# Try to cast begin to a datetime using same format as mashumaro for consistency with other yaml-provided datetimes
# Mashumaro default: https://github.com/Fatal1ty/mashumaro/blob/4ac16fd060a6c651053475597b58b48f958e8c5c/README.md?plain=1#L1186
if isinstance(begin, str):
try:
begin = datetime.datetime.fromisoformat(begin)
node.config.begin = begin
except Exception:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' config of valid datetime (ISO format), but got: {begin}."
)

if not isinstance(begin, datetime.datetime):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}."
)

batch_size = node.config.batch_size
valid_batch_sizes = [size.value for size in BatchSize]
if batch_size not in valid_batch_sizes:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'batch_size' config that is one of {valid_batch_sizes}, but got: {batch_size}."
)

# Optional config: lookback (int)
lookback = node.config.lookback
if not isinstance(lookback, int) and lookback is not None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})."
)

# Validate upstream node event_time (if configured)
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
8 changes: 4 additions & 4 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model').render() }}
"""

Expand Down Expand Up @@ -369,7 +369,7 @@ def test_run_with_event_time(self, project):


microbatch_model_context_vars = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_run_with_event_time_logs(self, project):


microbatch_model_failing_incremental_partition_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-02' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand All @@ -425,7 +425,7 @@ def test_run_with_event_time(self, project):


microbatch_model_first_partition_failing_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand Down
185 changes: 185 additions & 0 deletions tests/functional/microbatch/test_microbatch_config_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import os
from unittest import mock

import pytest

from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt

valid_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

valid_microbatch_model_no_config_sql = """
select * from {{ ref('input_model') }}
"""

valid_microbatch_model_config_yml = """
models:
- name: microbatch
config:
materialized: incremental
incremental_strategy: microbatch
batch_size: day
event_time: event_time
begin: 2020-01-01
"""

invalid_microbatch_model_config_yml = """
models:
- name: microbatch
config:
materialized: incremental
incremental_strategy: microbatch
batch_size: day
event_time: event_time
begin: 2020-01-01 11 PM
"""

missing_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time=2) }}
select * from {{ ref('input_model') }}
"""

missing_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time', begin=2) }}
select * from {{ ref('input_model') }}
"""


missing_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='invalid', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_input_model_sql = """
{{ config(materialized='table', event_time=1) }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""

valid_input_model_sql = """
{{ config(materialized='table') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""


class BaseMicrobatchTestParseError:
@pytest.fixture(scope="class")
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_parsing_error_raised(self, project):
with pytest.raises(ParsingError):
run_dbt(["parse"])


class BaseMicrobatchTestNoError:
@pytest.fixture(scope="class")
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_parsing_error_not_raised(self, project):
run_dbt(["parse"])


class TestMissingEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_event_time_microbatch_model_sql,
}


class TestInvalidEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_event_time_microbatch_model_sql,
}


class TestMissingBeginMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_begin_microbatch_model_sql,
}


class TestInvaliBeginTypeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_begin_microbatch_model_sql,
}


class TestInvaliBegiFormatMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": valid_microbatch_model_no_config_sql,
"microbatch.yml": invalid_microbatch_model_config_yml,
}


class TestMissingBatchSizeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_batch_size_microbatch_model_sql,
}


class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_batch_size_microbatch_model_sql,
}


class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": invalid_event_time_input_model_sql,
"microbatch.sql": valid_microbatch_model_sql,
}


class TestValidBeginMicrobatch(BaseMicrobatchTestNoError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": valid_microbatch_model_no_config_sql,
"schema.yml": valid_microbatch_model_config_yml,
}

0 comments on commit bbdb98f

Please sign in to comment.