Skip to content

Commit

Permalink
move format_batch_start to MicrobatchBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Sep 24, 2024
1 parent efe3a1d commit 6da854a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
6 changes: 4 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
SecretEnvVarLocationError,
TargetNotFoundError,
)
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.node_types import ModelLanguage, NodeType
from dbt.utils import MultiDict, args_to_dict
from dbt_common.clients.jinja import MacroProtocol
Expand Down Expand Up @@ -979,8 +980,9 @@ def write(self, payload: str) -> str:
isinstance(self.model, ModelNode)
and self.model.config.get("incremental_strategy") == "microbatch"
):
split_suffix = self.model.format_batch_start(
self.model.config.get("__dbt_internal_microbatch_event_time_start")
split_suffix = MicrobatchBuilder.format_batch_start(
self.model.config.get("__dbt_internal_microbatch_event_time_start"),
self.model.config.batch_size,
)

Check warning on line 986 in core/dbt/context/providers.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/context/providers.py#L986

Added line #L986 was not covered by tests

self.model.build_path = self.model.get_target_write_path(
Expand Down
11 changes: 0 additions & 11 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.resources.types import BatchSize
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -572,16 +571,6 @@ def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:

return []

def format_batch_start(self, batch_start: Optional[datetime]) -> Optional[str]:
if batch_start is None:
return batch_start

return str(
batch_start.date()
if (batch_start and self.config.batch_size != BatchSize.hour)
else batch_start
)

def same_contents(self, old, adapter_type) -> bool:
return super().same_contents(old, adapter_type) and self.same_ref_representation(old)

Expand Down
11 changes: 11 additions & 0 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,14 @@ def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
truncated = datetime(timestamp.year, 1, 1, 0, 0, 0, 0, pytz.utc)

return truncated

@staticmethod
def format_batch_start(
batch_start: Optional[datetime], batch_size: BatchSize
) -> Optional[str]:
if batch_start is None:
return batch_start

return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)
11 changes: 9 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ def describe_node(self) -> str:

def describe_batch(self, batch_start: Optional[datetime]) -> str:
# Only visualize date if batch_start year/month/day
formatted_batch_start = self.node.format_batch_start(batch_start)
formatted_batch_start = MicrobatchBuilder.format_batch_start(
batch_start, self.node.config.batch_size
)

return f"batch {formatted_batch_start} of {self.get_node_representation()}"

Expand Down Expand Up @@ -460,7 +462,12 @@ def _execute_microbatch_materialization(

# Recompile node to re-resolve refs with event time filters rendered, update context
self.compiler.compile_node(
model, manifest, {}, split_suffix=model.format_batch_start(batch[0])
model,
manifest,
{},
split_suffix=MicrobatchBuilder.format_batch_start(
batch[0], model.config.batch_size
),
)
context["model"] = model
context["sql"] = model.compiled_code
Expand Down
17 changes: 0 additions & 17 deletions tests/unit/graph/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Measure,
TestMetadata,
)
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.resources.v1.semantic_model import NodeRelation
from dbt.contracts.graph.model_config import TestConfig
from dbt.contracts.graph.nodes import ColumnInfo, ModelNode, ParsedNode, SemanticModel
Expand Down Expand Up @@ -111,22 +110,6 @@ def test_all_constraints(

assert default_model_node.all_constraints == expected_all_constraints

@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
],
)
def test_format_batch_start(
self, default_model_node, batch_size, batch_start, expected_formatted_batch_start
):
default_model_node.config.batch_size = batch_size
assert default_model_node.format_batch_start(batch_start) == expected_formatted_batch_start


class TestSemanticModel:
@pytest.fixture(scope="function")
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,19 @@ def test_offset_timestamp(self, timestamp, batch_size, offset, expected_timestam
)
def test_truncate_timestamp(self, timestamp, batch_size, expected_timestamp):
assert MicrobatchBuilder.truncate_timestamp(timestamp, batch_size) == expected_timestamp

@pytest.mark.parametrize(
"batch_size,batch_start,expected_formatted_batch_start",
[
(None, None, None),
(BatchSize.year, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.month, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.day, datetime(2020, 1, 1, 1), "2020-01-01"),
(BatchSize.hour, datetime(2020, 1, 1, 1), "2020-01-01 01:00:00"),
],
)
def test_format_batch_start(self, batch_size, batch_start, expected_formatted_batch_start):
assert (
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
== expected_formatted_batch_start
)

0 comments on commit 6da854a

Please sign in to comment.