Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write microbatch compiled + run code to separate target files #10743

Merged
merged 8 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240920-172419.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Write microbatch compiled/run targets to separate files, one per batch
time: 2024-09-20T17:24:19.219556+01:00
custom:
Author: michelleark
Issue: "10714"
11 changes: 8 additions & 3 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ def write_graph_file(self, linker: Linker, manifest: Manifest):
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
def _write_node(
self, node: ManifestSQLNode, split_suffix: Optional[str] = None
) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
NodeType.Snapshot,
NodeType.Seed,
Expand All @@ -530,7 +532,9 @@ def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
fire_event(WritingInjectedSQLForNode(node_info=get_node_info()))

if node.compiled_code:
node.compiled_path = node.get_target_write_path(self.config.target_path, "compiled")
node.compiled_path = node.get_target_write_path(
self.config.target_path, "compiled", split_suffix
)
node.write_node(self.config.project_root, node.compiled_path, node.compiled_code)
return node

Expand All @@ -540,6 +544,7 @@ def compile_node(
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
write: bool = True,
split_suffix: Optional[str] = None,
) -> ManifestSQLNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
Expand All @@ -562,7 +567,7 @@ def compile_node(

node, _ = self._recursively_prepend_ctes(node, manifest, extra_context)
if write:
self._write_node(node)
self._write_node(node, split_suffix=split_suffix)
return node


Expand Down
15 changes: 14 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Exposure,
Macro,
ManifestNode,
ModelNode,
Resource,
SeedNode,
SemanticModel,
Expand Down Expand Up @@ -972,7 +973,19 @@
# macros/source defs aren't 'writeable'.
if isinstance(self.model, (Macro, SourceDefinition)):
raise MacrosSourcesUnWriteableError(node=self.model)
self.model.build_path = self.model.get_target_write_path(self.config.target_path, "run")

split_suffix = None
if (
isinstance(self.model, ModelNode)
and self.model.config.get("incremental_strategy") == "microbatch"
):
split_suffix = self.model.format_batch_start(

Check warning on line 982 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L982

Added line #L982 was not covered by tests
self.model.config.get("__dbt_internal_microbatch_event_time_start")
)

self.model.build_path = self.model.get_target_write_path(
self.config.target_path, "run", split_suffix=split_suffix
)
self.model.write_node(self.config.project_root, self.model.build_path, payload)
return ""

Expand Down
25 changes: 24 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -59,6 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.types import BatchSize
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -243,14 +245,25 @@ def clear_event_status(self):

@dataclass
class ParsedNode(ParsedResource, NodeInfoMixin, ParsedNodeMandatory, SerializableType):
def get_target_write_path(self, target_path: str, subdirectory: str):
def get_target_write_path(
self, target_path: str, subdirectory: str, split_suffix: Optional[str] = None
):
# This is called for both the "compiled" subdirectory of "target" and the "run" subdirectory
if os.path.basename(self.path) == os.path.basename(self.original_file_path):
# One-to-one relationship of nodes to files.
path = self.original_file_path
else:
# Many-to-one relationship of nodes to files.
path = os.path.join(self.original_file_path, self.path)

if split_suffix:
pathlib_path = Path(path)
path = str(
pathlib_path.parent
/ pathlib_path.stem
/ (pathlib_path.stem + f"_{split_suffix}" + pathlib_path.suffix)
)

target_write_path = os.path.join(target_path, subdirectory, self.package_name, path)
return target_write_path

Expand Down Expand Up @@ -559,6 +572,16 @@ def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:

return []

def format_batch_start(self, batch_start: Optional[datetime]) -> Optional[str]:
if batch_start is None:
return batch_start

return str(
batch_start.date()
if (batch_start and self.config.batch_size != BatchSize.hour)
else batch_start
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Feels somewhat weird that this is on the ParsedNode class instead of the MicrobatchBuilder class. Just feels out of place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree! I think at first I didn't do this because it's necessary in providers.py which didn't Added it as a static method. Good call 👍


def same_contents(self, old, adapter_type) -> bool:
return super().same_contents(old, adapter_type) and self.same_ref_representation(old)

Expand Down
12 changes: 5 additions & 7 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
)
from dbt.adapters.exceptions import MissingMaterializationError
from dbt.artifacts.resources import Hook
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.results import (
BaseResult,
NodeStatus,
Expand Down Expand Up @@ -197,11 +196,8 @@

def describe_batch(self, batch_start: Optional[datetime]) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = (
batch_start.date()
if (batch_start and self.node.config.batch_size != BatchSize.hour)
else batch_start
)
formatted_batch_start = self.node.format_batch_start(batch_start)

Check warning on line 199 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L199

Added line #L199 was not covered by tests

return f"batch {formatted_batch_start} of {self.get_node_representation()}"

def print_start_line(self):
Expand Down Expand Up @@ -463,7 +459,9 @@
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(model, manifest, {})
self.compiler.compile_node(

Check warning on line 462 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L462

Added line #L462 was not covered by tests
model, manifest, {}, split_suffix=model.format_batch_start(batch[0])
)
context["model"] = model
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
Expand Down
76 changes: 76 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dbt.tests.util import (
patch_microbatch_end_time,
read_file,
relation_from_name,
run_dbt,
run_dbt_and_capture,
Expand Down Expand Up @@ -442,3 +443,78 @@ def test_run_with_event_time(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])
self.assert_row_count(project, "microbatch_model", 2)


class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])

# Compiled paths - compiled model without filter only
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with @graciegoheen, we decided it could still be useful to see the non-batched (no filters applied) model file, and at the top-level is where users would expect it. I've added a test to formalize this expected behaviour, but it's something that would be easy to change if we get beta feedback!

assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model.sql",
)

# Compiled paths - batch compilations
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)

assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)
51 changes: 50 additions & 1 deletion tests/unit/graph/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
Measure,
TestMetadata,
)
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.resources.v1.semantic_model import NodeRelation
from dbt.contracts.graph.model_config import TestConfig
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, SemanticModel
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, ParsedNode, SemanticModel
from dbt.node_types import NodeType
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
Expand Down Expand Up @@ -110,6 +111,22 @@ def test_all_constraints(

assert default_model_node.all_constraints == expected_all_constraints

@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
],
)
def test_format_batch_start(
self, default_model_node, batch_size, batch_start, expected_formatted_batch_start
):
default_model_node.config.batch_size = batch_size
assert default_model_node.format_batch_start(batch_start) == expected_formatted_batch_start


class TestSemanticModel:
@pytest.fixture(scope="function")
Expand Down Expand Up @@ -391,3 +408,35 @@ def test_disabled_unique_combo_multiple():

def assertSameContents(list1, list2):
assert sorted(list1) == sorted(list2)


class TestParsedNode:
@pytest.fixture(scope="class")
def parsed_node(self) -> ParsedNode:
return ParsedNode(
resource_type=NodeType.Model,
unique_id="model.test_package.test_name",
name="test_name",
package_name="test_package",
schema="test_schema",
alias="test_alias",
fqn=["models", "test_name"],
original_file_path="test_original_file_path",
checksum=FileHash.from_contents("checksum"),
path="test_path.sql",
database=None,
)

def test_get_target_write_path(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path.sql"
)

def test_get_target_write_path_split(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory", "split")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path/test_path_split.sql"
)
Loading