Skip to content

Commit

Permalink
Merge pull request #2946 from fishtown-analytics/fix/defer-if-not-exist
Browse files Browse the repository at this point in the history
Defer iff unselected reference does not exist in current env
  • Loading branch information
jtcohen6 authored Dec 16, 2020
2 parents 454ddc6 + 08e425b commit aff7299
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- Set default `materialized` for test node configs to `test` ([#2806](https://github.com/fishtown-analytics/dbt/issues/2806), [#2902](https://github.com/fishtown-analytics/dbt/pull/2902))
- Allow docs blocks in exposure descriptions ([#2913](https://github.com/fishtown-analytics/dbt/issues/2913), [#2920](https://github.com/fishtown-analytics/dbt/pull/2920))
- Use original file path instead of absolute path as checksum for big seeds ([#2927](https://github.com/fishtown-analytics/dbt/issues/2927), [#2939](https://github.com/fishtown-analytics/dbt/pull/2939))
- Defer if and only if upstream reference does not exist in current environment namespace ([#2909](https://github.com/fishtown-analytics/dbt/issues/2909), [#2946](https://github.com/fishtown-analytics/dbt/pull/2946))

### Under the hood
- Bump hologram version to 0.0.11. Add scripts/dtr.py ([#2888](https://github.com/fishtown-analytics/dbt/issues/2840),[#2889](https://github.com/fishtown-analytics/dbt/pull/2889))
Expand Down
9 changes: 7 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def resolve_doc(

def merge_from_artifact(
self,
adapter,
other: 'WritableManifest',
selected: AbstractSet[UniqueID],
) -> None:
Expand All @@ -898,10 +899,14 @@ def merge_from_artifact(
refables = set(NodeType.refable())
merged = set()
for unique_id, node in other.nodes.items():
if (
current = self.nodes.get(unique_id)
if current and (
node.resource_type in refables and
not node.is_ephemeral and
unique_id not in selected
unique_id not in selected and
not adapter.get_relation(
current.database, current.schema, current.identifier
)
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def _get_deferred_manifest(self) -> Optional[WritableManifest]:
)
return state.manifest

def defer_to_manifest(self, selected_uids: AbstractSet[str]):
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
Expand All @@ -390,17 +390,18 @@ def defer_to_manifest(self, selected_uids: AbstractSet[str]):
'manifest to defer from!'
)
self.manifest.merge_from_artifact(
adapter=adapter,
other=deferred_manifest,
selected=selected_uids,
)
# TODO: is it wrong to write the manifest here? I think it's right...
self.write_manifest()

def before_run(self, adapter, selected_uids: AbstractSet[str]):
self.defer_to_manifest(selected_uids)
with adapter.connection_named('master'):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
self.defer_to_manifest(adapter, selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})

def after_run(self, adapter, results):
Expand Down
24 changes: 23 additions & 1 deletion test/integration/062_defer_state_test/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ def run_switchdirs_defer(self):
expect_pass=False,
)

def run_defer_iff_not_exists(self):
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
results = self.run_dbt(['run', '--target', 'otherschema'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run', '--state', 'state', '--defer'])
assert len(results) == 2

# because the seed now exists in our schema, we shouldn't defer it
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql

@use_profile('postgres')
def test_postgres_state_changetarget(self):
self.run_and_defer()
Expand All @@ -118,8 +136,12 @@ def test_postgres_state_changetarget(self):
self.run_dbt(['snapshot', '--defer'])

@use_profile('postgres')
def test_postgres_stat_changedir(self):
def test_postgres_state_changedir(self):
self.run_switchdirs_defer()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists(self):
self.run_defer_iff_not_exists()

@use_profile('snowflake')
def test_snowflake_state_changetarget(self):
Expand Down

0 comments on commit aff7299

Please sign in to comment.