Skip to content

Commit

Permalink
Write microbatch compiled + run code to separate target files (#10743)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Sep 24, 2024
1 parent 3ac20ce commit a1e4753
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240920-172419.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Write microbatch compiled/run targets to separate files, one per batch
time: 2024-09-20T17:24:19.219556+01:00
custom:
Author: michelleark
Issue: "10714"
11 changes: 8 additions & 3 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@ def write_graph_file(self, linker: Linker, manifest: Manifest):
linker.write_graph(graph_path, manifest)

# writes the "compiled_code" into the target/compiled directory
def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
def _write_node(
self, node: ManifestSQLNode, split_suffix: Optional[str] = None
) -> ManifestSQLNode:
if not node.extra_ctes_injected or node.resource_type in (
NodeType.Snapshot,
NodeType.Seed,
Expand All @@ -530,7 +532,9 @@ def _write_node(self, node: ManifestSQLNode) -> ManifestSQLNode:
fire_event(WritingInjectedSQLForNode(node_info=get_node_info()))

if node.compiled_code:
node.compiled_path = node.get_target_write_path(self.config.target_path, "compiled")
node.compiled_path = node.get_target_write_path(
self.config.target_path, "compiled", split_suffix
)
node.write_node(self.config.project_root, node.compiled_path, node.compiled_code)
return node

Expand All @@ -540,6 +544,7 @@ def compile_node(
manifest: Manifest,
extra_context: Optional[Dict[str, Any]] = None,
write: bool = True,
split_suffix: Optional[str] = None,
) -> ManifestSQLNode:
"""This is the main entry point into this code. It's called by
CompileRunner.compile, GenericRPCRunner.compile, and
Expand All @@ -562,7 +567,7 @@ def compile_node(

node, _ = self._recursively_prepend_ctes(node, manifest, extra_context)
if write:
self._write_node(node)
self._write_node(node, split_suffix=split_suffix)
return node


Expand Down
17 changes: 16 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Exposure,
Macro,
ManifestNode,
ModelNode,
Resource,
SeedNode,
SemanticModel,
Expand All @@ -77,6 +78,7 @@
SecretEnvVarLocationError,
TargetNotFoundError,
)
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.node_types import ModelLanguage, NodeType
from dbt.utils import MultiDict, args_to_dict
from dbt_common.clients.jinja import MacroProtocol
Expand Down Expand Up @@ -982,7 +984,20 @@ def write(self, payload: str) -> str:
# macros/source defs aren't 'writeable'.
if isinstance(self.model, (Macro, SourceDefinition)):
raise MacrosSourcesUnWriteableError(node=self.model)
self.model.build_path = self.model.get_target_write_path(self.config.target_path, "run")

split_suffix = None
if (
isinstance(self.model, ModelNode)
and self.model.config.get("incremental_strategy") == "microbatch"
):
split_suffix = MicrobatchBuilder.format_batch_start(
self.model.config.get("__dbt_internal_microbatch_event_time_start"),
self.model.config.batch_size,
)

self.model.build_path = self.model.get_target_write_path(
self.config.target_path, "run", split_suffix=split_suffix
)
self.model.write_node(self.config.project_root, self.model.build_path, payload)
return ""

Expand Down
14 changes: 13 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Dict,
Expand Down Expand Up @@ -243,14 +244,25 @@ def clear_event_status(self):

@dataclass
class ParsedNode(ParsedResource, NodeInfoMixin, ParsedNodeMandatory, SerializableType):
def get_target_write_path(self, target_path: str, subdirectory: str):
def get_target_write_path(
self, target_path: str, subdirectory: str, split_suffix: Optional[str] = None
):
# This is called for both the "compiled" subdirectory of "target" and the "run" subdirectory
if os.path.basename(self.path) == os.path.basename(self.original_file_path):
# One-to-one relationship of nodes to files.
path = self.original_file_path
else:
# Many-to-one relationship of nodes to files.
path = os.path.join(self.original_file_path, self.path)

if split_suffix:
pathlib_path = Path(path)
path = str(
pathlib_path.parent
/ pathlib_path.stem
/ (pathlib_path.stem + f"_{split_suffix}" + pathlib_path.suffix)
)

target_write_path = os.path.join(target_path, subdirectory, self.package_name, path)
return target_write_path

Expand Down
11 changes: 11 additions & 0 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,14 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc)

return truncated

@staticmethod
def format_batch_start(
batch_start: Optional[datetime], batch_size: BatchSize
) -> Optional[str]:
if batch_start is None:
return batch_start

return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)
17 changes: 11 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
)
from dbt.adapters.exceptions import MissingMaterializationError
from dbt.artifacts.resources import Hook
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.results import (
BaseResult,
NodeStatus,
Expand Down Expand Up @@ -197,11 +196,10 @@ def describe_node(self) -> str:

def describe_batch(self, batch_start: Optional[datetime]) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = (
batch_start.date()
if (batch_start and self.node.config.batch_size != BatchSize.hour)
else batch_start
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
)

return f"batch {formatted_batch_start} of {self.get_node_representation()}"

def print_start_line(self):
Expand Down Expand Up @@ -463,7 +461,14 @@ def _execute_microbatch_materialization(
model.config["__dbt_internal_microbatch_event_time_end"] = batch[1]

# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(model, manifest, {})
self.compiler.compile_node(
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
context["model"] = model
context["sql"] = model.compiled_code
context["compiled_code"] = model.compiled_code
Expand Down
76 changes: 76 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dbt.tests.util import (
patch_microbatch_end_time,
read_file,
relation_from_name,
run_dbt,
run_dbt_and_capture,
Expand Down Expand Up @@ -442,3 +443,78 @@ def test_run_with_event_time(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])
self.assert_row_count(project, "microbatch_model", 2)


class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run", "--event-time-start", "2020-01-01"])

# Compiled paths - compiled model without filter only
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model.sql",
)

# Compiled paths - batch compilations
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"compiled",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)

assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-01.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-02.sql",
)
assert read_file(
project.project_root,
"target",
"run",
"test",
"models",
"microbatch_model",
"microbatch_model_2020-01-03.sql",
)
34 changes: 33 additions & 1 deletion tests/unit/graph/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from dbt.artifacts.resources.v1.semantic_model import NodeRelation
from dbt.contracts.graph.model_config import TestConfig
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, SemanticModel
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, ParsedNode, SemanticModel
from dbt.node_types import NodeType
from dbt_common.contracts.constraints import (
ColumnLevelConstraint,
Expand Down Expand Up @@ -391,3 +391,35 @@ def test_disabled_unique_combo_multiple():

def assertSameContents(list1, list2):
assert sorted(list1) == sorted(list2)


class TestParsedNode:
@pytest.fixture(scope="class")
def parsed_node(self) -> ParsedNode:
return ParsedNode(
resource_type=NodeType.Model,
unique_id="model.test_package.test_name",
name="test_name",
package_name="test_package",
schema="test_schema",
alias="test_alias",
fqn=["models", "test_name"],
original_file_path="test_original_file_path",
checksum=FileHash.from_contents("checksum"),
path="test_path.sql",
database=None,
)

def test_get_target_write_path(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path.sql"
)

def test_get_target_write_path_split(self, parsed_node):
write_path = parsed_node.get_target_write_path("target_path", "subdirectory", "split")
assert (
write_path
== "target_path/subdirectory/test_package/test_original_file_path/test_path/test_path_split.sql"
)
16 changes: 16 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,19 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam
)
def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp):
assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp

@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
],
)
def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start):
assert (
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
== expected_formatted_batch_start
)

0 comments on commit a1e4753

Please sign in to comment.