From bbdb98fa5d0890616e78348e17abb28930f261e6 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 24 Sep 2024 13:56:34 -0500 Subject: [PATCH] Microbatch Config Validation (#10752) --- .../unreleased/Features-20240924-154639.yaml | 7 + core/dbt/parser/manifest.py | 65 ++++++ .../functional/microbatch/test_microbatch.py | 8 +- .../test_microbatch_config_validation.py | 185 ++++++++++++++++++ 4 files changed, 261 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Features-20240924-154639.yaml create mode 100644 tests/functional/microbatch/test_microbatch_config_validation.py diff --git a/.changes/unreleased/Features-20240924-154639.yaml b/.changes/unreleased/Features-20240924-154639.yaml new file mode 100644 index 00000000000..41fbdeaaa6f --- /dev/null +++ b/.changes/unreleased/Features-20240924-154639.yaml @@ -0,0 +1,7 @@ +kind: Features +body: 'Parse-time validation of microbatch configs: require event_time, batch_size, + lookback and validate input event_time' +time: 2024-09-24T15:46:39.83112+01:00 +custom: + Author: michelleark + Issue: "10709" diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index d54aa898713..e265408602b 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -24,6 +24,7 @@ register_adapter, ) from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion +from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.base import Writable from dbt.clients.jinja import MacroStack, get_rendered from dbt.clients.jinja_static import statically_extract_macro_calls @@ -468,6 +469,7 @@ def load(self) -> Manifest: self.check_valid_group_config() self.check_valid_access_property() self.check_valid_snapshot_config() + self.check_valid_microbatch_config() semantic_manifest = SemanticManifest(self.manifest) if not semantic_manifest.validate(): @@ -1355,6 +1357,69 @@ def check_valid_snapshot_config(self): continue node.config.final_validate() + def check_valid_microbatch_config(self): + if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + # Required configs: event_time, batch_size, begin + event_time = node.config.event_time + if event_time is None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates the name of the event time column." + ) + if not isinstance(event_time, str): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}." + ) + + begin = node.config.begin + if begin is None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from." + ) + + # Try to cast begin to a datetime using same format as mashumaro for consistency with other yaml-provided datetimes + # Mashumaro default: https://github.com/Fatal1ty/mashumaro/blob/4ac16fd060a6c651053475597b58b48f958e8c5c/README.md?plain=1#L1186 + if isinstance(begin, str): + try: + begin = datetime.datetime.fromisoformat(begin) + node.config.begin = begin + except Exception: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' config of valid datetime (ISO format), but got: {begin}." + ) + + if not isinstance(begin, datetime.datetime): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}." + ) + + batch_size = node.config.batch_size + valid_batch_sizes = [size.value for size in BatchSize] + if batch_size not in valid_batch_sizes: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'batch_size' config that is one of {valid_batch_sizes}, but got: {batch_size}." + ) + + # Optional config: lookback (int) + lookback = node.config.lookback + if not isinstance(lookback, int) and lookback is not None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})." + ) + + # Validate upstream node event_time (if configured) + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time and not isinstance(input_event_time, str): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." + ) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index fc27482a2df..7b6699334b2 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -39,7 +39,7 @@ microbatch_model_ref_render_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model').render() }} """ @@ -369,7 +369,7 @@ def test_run_with_event_time(self, project): microbatch_model_context_vars = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}} {{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}} select * from {{ ref('input_model') }} @@ -400,7 +400,7 @@ def test_run_with_event_time_logs(self, project): microbatch_model_failing_incremental_partition_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {% if '2020-01-02' in (model.config.__dbt_internal_microbatch_event_time_start | string) %} invalid_sql {% endif %} @@ -425,7 +425,7 @@ def test_run_with_event_time(self, project): microbatch_model_first_partition_failing_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %} invalid_sql {% endif %} diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py new file mode 100644 index 00000000000..cdebd3a791b --- /dev/null +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -0,0 +1,185 @@ +import os +from unittest import mock + +import pytest + +from dbt.exceptions import ParsingError +from dbt.tests.util import run_dbt + +valid_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +valid_microbatch_model_no_config_sql = """ +select * from {{ ref('input_model') }} +""" + +valid_microbatch_model_config_yml = """ +models: + - name: microbatch + config: + materialized: incremental + incremental_strategy: microbatch + batch_size: day + event_time: event_time + begin: 2020-01-01 +""" + +invalid_microbatch_model_config_yml = """ +models: + - name: microbatch + config: + materialized: incremental + incremental_strategy: microbatch + batch_size: day + event_time: event_time + begin: 2020-01-01 11 PM +""" + +missing_event_time_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + +invalid_event_time_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time=2) }} +select * from {{ ref('input_model') }} +""" + +missing_begin_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_begin_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time', begin=2) }} +select * from {{ ref('input_model') }} +""" + + +missing_batch_size_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_batch_size_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='invalid', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_event_time_input_model_sql = """ +{{ config(materialized='table', event_time=1) }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +""" + +valid_input_model_sql = """ +{{ config(materialized='table') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +""" + + +class BaseMicrobatchTestParseError: + @pytest.fixture(scope="class") + def models(self): + return {} + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_parsing_error_raised(self, project): + with pytest.raises(ParsingError): + run_dbt(["parse"]) + + +class BaseMicrobatchTestNoError: + @pytest.fixture(scope="class") + def models(self): + return {} + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_parsing_error_not_raised(self, project): + run_dbt(["parse"]) + + +class TestMissingEventTimeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_event_time_microbatch_model_sql, + } + + +class TestInvalidEventTimeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_event_time_microbatch_model_sql, + } + + +class TestMissingBeginMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_begin_microbatch_model_sql, + } + + +class TestInvaliBeginTypeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_begin_microbatch_model_sql, + } + + +class TestInvaliBegiFormatMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": valid_microbatch_model_no_config_sql, + "microbatch.yml": invalid_microbatch_model_config_yml, + } + + +class TestMissingBatchSizeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_batch_size_microbatch_model_sql, + } + + +class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_batch_size_microbatch_model_sql, + } + + +class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": invalid_event_time_input_model_sql, + "microbatch.sql": valid_microbatch_model_sql, + } + + +class TestValidBeginMicrobatch(BaseMicrobatchTestNoError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": valid_microbatch_model_no_config_sql, + "schema.yml": valid_microbatch_model_config_yml, + }