Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support node selector in row count checks #360

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion js/src/components/lineage/NodeView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ export function NodeView({ node, onCloseNode }: NodeViewProps) {
<Box color="gray" paddingLeft={"16px"}>
<HStack spacing={"8px"}>
<ResourceTypeTag node={node} />
{node.resourceType === "model" && (
{(node.resourceType === "model" ||
node.resourceType === "snapshot") && (
<RowCountTag node={node} isInteractive />
)}
</HStack>
Expand Down
5 changes: 5 additions & 0 deletions recce/adapter/dbt_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def select_nodes(self, select: Optional[str] = None, exclude: Optional[str] = No
from dbt.graph import NodeSelector
from dbt.compilation import Compiler
from dbt.graph import parse_difference
import dbt.compilation

select_list = [select] if select else None
exclude_list = [exclude] if exclude else None
Expand All @@ -618,7 +619,11 @@ def select_nodes(self, select: Optional[str] = None, exclude: Optional[str] = No
else:
spec = parse_difference(select_list, exclude_list)
compiler = Compiler(self.runtime_config)
# disable to print compile states
tmp_func = dbt.compilation.print_compile_stats
dbt.compilation.print_compile_stats = lambda x: None
graph = compiler.compile(self.manifest, write=False)
dbt.compilation.print_compile_stats = tmp_func
selector = NodeSelector(graph, self.manifest, previous_state=self.previous_state)

return selector.get_selected(spec)
Expand Down
29 changes: 21 additions & 8 deletions recce/tasks/rowcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
class RowCountDiffParams(TypedDict, total=False):
node_names: Optional[list[str]]
node_ids: Optional[list[str]]
select: Optional[str]
exclude: Optional[str]


class RowCountDiffTask(Task, QueryMixin):
Expand All @@ -22,10 +24,10 @@ def _query_row_count(self, dbt_adapter, model_name, base=False):
if node is None:
return None

if node.resource_type != 'model':
if node.resource_type != 'model' and node.resource_type != 'snapshot':
return None

if node.config and node.config.materialized not in ['table', 'view', 'incremental']:
if node.config and node.config.materialized not in ['table', 'view', 'incremental', 'snapshot']:
return None

relation = dbt_adapter.create_relation(model_name, base=base)
Expand All @@ -43,12 +45,23 @@ def execute_dbt(self):
dbt_adapter = default_context().adapter

query_candidates = []
for node_id in self.params.get('node_ids', []):
name = dbt_adapter.get_node_name_by_id(node_id)
if name:
query_candidates.append(name)
for node in self.params.get('node_names', []):
query_candidates.append(node)
if self.params.get('node_ids', []) or self.params.get('node_names', []):
for node_id in self.params.get('node_ids', []):
name = dbt_adapter.get_node_name_by_id(node_id)
if name:
query_candidates.append(name)
for node in self.params.get('node_names', []):
query_candidates.append(node)
elif self.params.get('select', "") or self.params.get('exclude', ""):
def countable(unique_id):
return unique_id.startswith('model') or unique_id.startswith('snapshot')

node_ids = dbt_adapter.select_nodes(self.params.get('select', ""), self.params.get('exclude', ""))
node_ids = list(filter(countable, node_ids))
for node_id in node_ids:
name = dbt_adapter.get_node_name_by_id(node_id)
if name:
query_candidates.append(name)

# Query row count for nodes that are not cached
with dbt_adapter.connection_named("query"):
Expand Down
66 changes: 65 additions & 1 deletion tests/adapter/dbt_adapter/dbt_test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
from io import StringIO

from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.graph.nodes import ModelNode, SnapshotNode

from recce.adapter.dbt_adapter import DbtAdapter, as_manifest, load_manifest
from recce.core import RecceContext
Expand Down Expand Up @@ -110,3 +110,67 @@ def cleanup(self):
with dbt_adapter.connection_named('cleanup'):
dbt_adapter.execute(f"DROP SCHEMA IF EXISTS {self.base_schema} CASCADE")
dbt_adapter.execute(f"DROP SCHEMA IF EXISTS {self.curr_schema} CASCADE")

def create_snapshot(self, sanpshot_name, base_csv, curr_csv, depends_on=[]):
package_name = "recce_test"
# unique_id = f"model.{package_name}.{model_name}"
unique_id = sanpshot_name

def _add_snapshot_to_manifest(base, raw_code):
if base:
schema = self.base_schema
manifest = self.base_manifest
else:
schema = self.curr_schema
manifest = self.curr_manifest

node = SnapshotNode.from_dict({
"resource_type": "snapshot",
"name": sanpshot_name,
"package_name": package_name,
"path": "",
"original_file_path": "",
"unique_id": unique_id,
"fqn": [
package_name,
sanpshot_name,
],
"schema": schema,
"alias": sanpshot_name,
"checksum": {
"name": "sha256",
"checksum": hash(raw_code),
},
"raw_code": raw_code,
"config": {
"materialized": "snapshot",
"tags": ["test_tag"],
},
"tags": ["test_tag"],
"depends_on": {
"nodes": depends_on
},
})
manifest.add_node_nofile(node)
return node

base_csv = textwrap.dedent(base_csv)
curr_csv = textwrap.dedent(curr_csv)

_add_snapshot_to_manifest(True, base_csv)
_add_snapshot_to_manifest(False, curr_csv)

import pandas as pd
df_base = pd.read_csv(StringIO(base_csv))
df_curr = pd.read_csv(StringIO(curr_csv))
dbt_adapter = self.adapter
with dbt_adapter.connection_named('create model'):
dbt_adapter.execute(f"CREATE TABLE {self.base_schema}.{sanpshot_name} AS SELECT * FROM df_base")
dbt_adapter.execute(f"CREATE TABLE {self.curr_schema}.{sanpshot_name} AS SELECT * FROM df_curr")
self.adapter.set_artifacts(self.base_manifest, self.curr_manifest)

def remove_snapshot(self, snapshot_name):
dbt_adapter = self.adapter
with dbt_adapter.connection_named('cleanup'):
dbt_adapter.execute(f"DROP TABLE IF EXISTS {self.base_schema}.{snapshot_name}")
dbt_adapter.execute(f"DROP TABLE IF EXISTS {self.curr_schema}.{snapshot_name} ")
13 changes: 13 additions & 0 deletions tests/adapter/dbt_adapter/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ def test_select(dbt_test_helper):
assert len(node_ids) == 2
node_ids = adapter.select_nodes("+state:modified")
assert len(node_ids) == 1

# Test resource type: snapshot
dbt_test_helper.create_snapshot("snapshot_1", csv_data_base, csv_data_curr)
dbt_test_helper.create_model("use_snapshot", csv_data_base, csv_data_base, ["snapshot_1"])

node_ids = adapter.select_nodes('resource_type:snapshot')
assert len(node_ids) == 1

node_ids = adapter.select_nodes('resource_type:snapshot+')
assert len(node_ids) == 2

node_ids = adapter.select_nodes("state:modified,resource_type:snapshot")
assert len(node_ids) == 1
Loading