Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer iff unselected reference does not exist in current env #2946

Merged
merged 2 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- dbt list command always return 0 as exit code ([#2886](https://github.com/fishtown-analytics/dbt/issues/2886), [#2892](https://github.com/fishtown-analytics/dbt/issues/2892))
- Set default `materialized` for test node configs to `test` ([#2806](https://github.com/fishtown-analytics/dbt/issues/2806), [#2902](https://github.com/fishtown-analytics/dbt/pull/2902))
- Use original file path instead of absolute path as checksum for big seeds ([#2927](https://github.com/fishtown-analytics/dbt/issues/2927), [#2939](https://github.com/fishtown-analytics/dbt/pull/2939))
- Defer if and only if upstream reference does not exist in current environment namespace ([#2909](https://github.com/fishtown-analytics/dbt/issues/2909), [#2946](https://github.com/fishtown-analytics/dbt/pull/2946))

### Under the hood
- Bump hologram version to 0.0.11. Add scripts/dtr.py ([#2888](https://github.com/fishtown-analytics/dbt/issues/2840),[#2889](https://github.com/fishtown-analytics/dbt/pull/2889))
Expand Down
7 changes: 6 additions & 1 deletion core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def resolve_doc(

def merge_from_artifact(
self,
adapter,
other: 'WritableManifest',
selected: AbstractSet[UniqueID],
) -> None:
Expand All @@ -898,10 +899,14 @@ def merge_from_artifact(
refables = set(NodeType.refable())
merged = set()
for unique_id, node in other.nodes.items():
current = self.nodes[unique_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a scenario where unique_id is present in the other manifest, but not in this manifest? I'm thinking that could happen if a model was deleted, for instance. If I'm thinking about that right, then I think this could raise a KeyError, and we'd probably want to use self.nodes.get(unique_id) and do something smart if the result is None.

I might be way off-base here, it just jumped out at me though and wanted to confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really good point, I was able to produce a KeyError here and changed the line to handle. this also gave me a clue to #2875 :)

if (
node.resource_type in refables and
not node.is_ephemeral and
unique_id not in selected
unique_id not in selected and
not adapter.get_relation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know if this happens before or after a relation cache has been built? Curious if this implementation is going to run one introspective query per node in the deferral manifest, or if they'll all be cache lookups?

current.database, current.schema, current.identifier
)
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def _get_deferred_manifest(self) -> Optional[WritableManifest]:
)
return state.manifest

def defer_to_manifest(self, selected_uids: AbstractSet[str]):
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
Expand All @@ -382,17 +382,18 @@ def defer_to_manifest(self, selected_uids: AbstractSet[str]):
'manifest to defer from!'
)
self.manifest.merge_from_artifact(
adapter=adapter,
other=deferred_manifest,
selected=selected_uids,
)
# TODO: is it wrong to write the manifest here? I think it's right...
self.write_manifest()

def before_run(self, adapter, selected_uids: AbstractSet[str]):
self.defer_to_manifest(selected_uids)
with adapter.connection_named('master'):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
self.defer_to_manifest(adapter, selected_uids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - i am seeing this now, and you can ignore my comment about caching above. Very clever, very cool. Leaving that comment above parce que je suis impressionné

self.safe_run_hooks(adapter, RunHookType.Start, {})

def after_run(self, adapter, results):
Expand Down
24 changes: 23 additions & 1 deletion test/integration/062_defer_state_test/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ def run_switchdirs_defer(self):
expect_pass=False,
)

def run_defer_iff_not_exists(self):
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
results = self.run_dbt(['run', '--target', 'otherschema'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run', '--state', 'state', '--defer'])
assert len(results) == 2

# because the seed now exists in our schema, we shouldn't defer it
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql

@use_profile('postgres')
def test_postgres_state_changetarget(self):
self.run_and_defer()
Expand All @@ -118,8 +136,12 @@ def test_postgres_state_changetarget(self):
self.run_dbt(['snapshot', '--defer'])

@use_profile('postgres')
def test_postgres_stat_changedir(self):
def test_postgres_state_changedir(self):
self.run_switchdirs_defer()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists(self):
self.run_defer_iff_not_exists()

@use_profile('snowflake')
def test_snowflake_state_changetarget(self):
Expand Down