From b025f208a870bb858d93d80fc67f32ccf208ee41 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Fri, 11 Dec 2020 18:53:57 -0500 Subject: [PATCH 1/2] Check if relation exists before deferring --- CHANGELOG.md | 1 + core/dbt/contracts/graph/manifest.py | 7 +++++- core/dbt/task/run.py | 5 ++-- .../062_defer_state_test/test_defer_state.py | 24 ++++++++++++++++++- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e596be3ba8..3175f028278 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - dbt list command always return 0 as exit code ([#2886](https://github.com/fishtown-analytics/dbt/issues/2886), [#2892](https://github.com/fishtown-analytics/dbt/issues/2892)) - Set default `materialized` for test node configs to `test` ([#2806](https://github.com/fishtown-analytics/dbt/issues/2806), [#2902](https://github.com/fishtown-analytics/dbt/pull/2902)) - Use original file path instead of absolute path as checksum for big seeds ([#2927](https://github.com/fishtown-analytics/dbt/issues/2927), [#2939](https://github.com/fishtown-analytics/dbt/pull/2939)) +- Defer if and only if upstream reference does not exist in current environment namespace ([#2909](https://github.com/fishtown-analytics/dbt/issues/2909), [#2946](https://github.com/fishtown-analytics/dbt/pull/2946)) ### Under the hood - Bump hologram version to 0.0.11. Add scripts/dtr.py ([#2888](https://github.com/fishtown-analytics/dbt/issues/2840),[#2889](https://github.com/fishtown-analytics/dbt/pull/2889)) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 6f8bb26d8ce..48f68bba150 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -887,6 +887,7 @@ def resolve_doc( def merge_from_artifact( self, + adapter, other: 'WritableManifest', selected: AbstractSet[UniqueID], ) -> None: @@ -898,10 +899,14 @@ def merge_from_artifact( refables = set(NodeType.refable()) merged = set() for unique_id, node in other.nodes.items(): + current = self.nodes[unique_id] if ( node.resource_type in refables and not node.is_ephemeral and - unique_id not in selected + unique_id not in selected and + not adapter.get_relation( + current.database, current.schema, current.identifier + ) ): merged.add(unique_id) self.nodes[unique_id] = node.replace(deferred=True) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 086d423477e..495cd5523bb 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -372,7 +372,7 @@ def _get_deferred_manifest(self) -> Optional[WritableManifest]: ) return state.manifest - def defer_to_manifest(self, selected_uids: AbstractSet[str]): + def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): deferred_manifest = self._get_deferred_manifest() if deferred_manifest is None: return @@ -382,6 +382,7 @@ def defer_to_manifest(self, selected_uids: AbstractSet[str]): 'manifest to defer from!' ) self.manifest.merge_from_artifact( + adapter=adapter, other=deferred_manifest, selected=selected_uids, ) @@ -389,10 +390,10 @@ def defer_to_manifest(self, selected_uids: AbstractSet[str]): self.write_manifest() def before_run(self, adapter, selected_uids: AbstractSet[str]): - self.defer_to_manifest(selected_uids) with adapter.connection_named('master'): self.create_schemas(adapter, selected_uids) self.populate_adapter_cache(adapter) + self.defer_to_manifest(adapter, selected_uids) self.safe_run_hooks(adapter, RunHookType.Start, {}) def after_run(self, adapter, results): diff --git a/test/integration/062_defer_state_test/test_defer_state.py b/test/integration/062_defer_state_test/test_defer_state.py index 1d3a4fc7b8d..ebdd2421e0c 100644 --- a/test/integration/062_defer_state_test/test_defer_state.py +++ b/test/integration/062_defer_state_test/test_defer_state.py @@ -102,6 +102,24 @@ def run_switchdirs_defer(self): expect_pass=False, ) + def run_defer_iff_not_exists(self): + results = self.run_dbt(['seed', '--target', 'otherschema']) + assert len(results) == 1 + results = self.run_dbt(['run', '--target', 'otherschema']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run', '--state', 'state', '--defer']) + assert len(results) == 2 + + # because the seed now exists in our schema, we shouldn't defer it + assert self.other_schema not in results[0].node.compiled_sql + assert self.unique_schema() in results[0].node.compiled_sql + @use_profile('postgres') def test_postgres_state_changetarget(self): self.run_and_defer() @@ -118,8 +136,12 @@ def test_postgres_state_changetarget(self): self.run_dbt(['snapshot', '--defer']) @use_profile('postgres') - def test_postgres_stat_changedir(self): + def test_postgres_state_changedir(self): self.run_switchdirs_defer() + + @use_profile('postgres') + def test_postgres_state_defer_iffnotexists(self): + self.run_defer_iff_not_exists() @use_profile('snowflake') def test_snowflake_state_changetarget(self): From 08e425bcf62b0b4b4cbb728a888435fa1d4b70e6 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 16 Dec 2020 00:24:00 -0500 Subject: [PATCH 2/2] Handle keyerror if old node missing --- core/dbt/contracts/graph/manifest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 48f68bba150..e3a750e7a66 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -899,8 +899,8 @@ def merge_from_artifact( refables = set(NodeType.refable()) merged = set() for unique_id, node in other.nodes.items(): - current = self.nodes[unique_id] - if ( + current = self.nodes.get(unique_id) + if current and ( node.resource_type in refables and not node.is_ephemeral and unique_id not in selected and