Skip to content

Commit

Permalink
Add experimental cache_selected_only config (dbt-labs#5036)
Browse files Browse the repository at this point in the history
* cache schema for selected models

* Create Features-20220316-003847.yaml

* rename flag, update postgres adapter

rename flag to cache_selected_only, update postgres adapter: function _relations_cache_for_schemas

* Update Features-20220316-003847.yaml

* added test for cache_selected_only flag

* formatted as per pre-commit

* Add breaking change note for adapter plugin maintainers

* Fix whitespace

* Add a test

Co-authored-by: karunpoudel-chr <poudel.karun@gmail.com>
Co-authored-by: karunpoudel-chr <62040859+karunpoudel@users.noreply.github.com>
  • Loading branch information
3 people authored and Axel Goblet committed May 20, 2022
1 parent 063c0d6 commit 97a52ff
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 13 deletions.
9 changes: 9 additions & 0 deletions .changes/unreleased/Breaking Changes-20220412-152450.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kind: Breaking Changes
body: 'For adapter plugin maintainers only: Internal adapter methods `set_relations_cache` + `_relations_cache_for_schemas`
each take an additional argument, for use with experimental `CACHE_SELECTED_ONLY`
config'
time: 2022-04-12T15:24:50.159572+02:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220316-003847.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Add `--cache_selected_only` flag to cache schema object of selected models
only.
time: 2022-03-16T00:38:47.8468296-05:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
13 changes: 9 additions & 4 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,14 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Set[BaseRelation] = None
) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
"""
cache_schemas = self._get_cache_schemas(manifest)
if not cache_schemas:
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
Expand All @@ -370,14 +373,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(self, manifest: Manifest, clear: bool = False) -> None:
def set_relations_cache(
self, manifest: Manifest, clear: bool = False, required_schemas: Set[BaseRelation] = None
) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
with self.cache.lock:
if clear:
self.cache.clear()
self._relations_cache_for_schemas(manifest)
self._relations_cache_for_schemas(manifest, required_schemas)

@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
use_experimental_parser: Optional[bool] = None
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None


@dataclass
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
EVENT_BUFFER_SIZE = 100000
QUIET = None
NO_PRINT = None
CACHE_SELECTED_ONLY = None

_NON_BOOLEAN_FLAGS = [
"LOG_FORMAT",
Expand Down Expand Up @@ -69,6 +70,7 @@
"EVENT_BUFFER_SIZE": 100000,
"QUIET": False,
"NO_PRINT": False,
"CACHE_SELECTED_ONLY": False,
}


Expand Down Expand Up @@ -118,7 +120,7 @@ def set_from_args(args, user_config):
global STRICT_MODE, FULL_REFRESH, WARN_ERROR, USE_EXPERIMENTAL_PARSER, STATIC_PARSER
global WRITE_JSON, PARTIAL_PARSE, USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT
global INDIRECT_SELECTION, VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, NO_PRINT
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, NO_PRINT, CACHE_SELECTED_ONLY

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -145,6 +147,7 @@ def set_from_args(args, user_config):
EVENT_BUFFER_SIZE = get_flag_value("EVENT_BUFFER_SIZE", args, user_config)
QUIET = get_flag_value("QUIET", args, user_config)
NO_PRINT = get_flag_value("NO_PRINT", args, user_config)
CACHE_SELECTED_ONLY = get_flag_value("CACHE_SELECTED_ONLY", args, user_config)

_set_overrides_from_env()

Expand Down
21 changes: 21 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,27 @@ def parse_args(args, cls=DBTArgumentParser):
""",
)

schema_cache_flag = p.add_mutually_exclusive_group()
schema_cache_flag.add_argument(
"--cache-selected-only",
action="store_const",
const=True,
default=None,
dest="cache_selected_only",
help="""
Pre cache database objects relevant to selected resource only.
""",
)
schema_cache_flag.add_argument(
"--no-cache-selected-only",
action="store_const",
const=False,
dest="cache_selected_only",
help="""
Pre cache all database objects related to the project.
""",
)

subs = p.add_subparsers(title="Available sub-commands")

base_subparser = _build_base_subparser()
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
required_schemas = self.get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.defer_to_manifest(adapter, selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})

Expand Down
10 changes: 6 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,12 @@ def _mark_dependent_errors(self, node_id, result, cause):
for dep_node_id in self.graph.get_dependent_nodes(node_id):
self._skipped_children[dep_node_id] = cause

def populate_adapter_cache(self, adapter):
def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = None):
start_populate_cache = time.perf_counter()
adapter.set_relations_cache(self.manifest)
if flags.CACHE_SELECTED_ONLY is True:
adapter.set_relations_cache(self.manifest, required_schemas=required_schemas)
else:
adapter.set_relations_cache(self.manifest)
cache_populate_time = time.perf_counter() - start_populate_cache
if dbt.tracking.active_user is not None:
dbt.tracking.track_runnable_timing(
Expand Down Expand Up @@ -504,8 +507,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def create_schemas(self, adapter, selected_uids: Iterable[str]):
required_schemas = self.get_model_schemas(adapter, selected_uids)
def create_schemas(self, adapter, required_schemas: Set[BaseRelation]):
# we want the string form of the information schema database
required_databases: Set[BaseRelation] = set()
for required in required_schemas:
Expand Down
4 changes: 2 additions & 2 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def _link_cached_relations(self, manifest):

self._link_cached_database_relations(schemas)

def _relations_cache_for_schemas(self, manifest):
super()._relations_cache_for_schemas(manifest)
def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{{
config(
materialized='table',
schema='another_schema'
)
}}
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
config(
materialized='table'
)
}}
select 1 as id
14 changes: 14 additions & 0 deletions test/integration/038_caching_tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ def models(self):
@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()

class TestCachingSelectedSchemaOnly(TestBaseCaching):
@property
def models(self):
return "models_multi_schemas"

def run_and_get_adapter(self):
# select only the 'model' in the default schema
self.run_dbt(['--cache-selected-only', 'run', '--select', 'model'])
return FACTORY.adapters[self.adapter_type]

@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()
15 changes: 15 additions & 0 deletions test/unit/test_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,18 @@ def test__flags(self):
self.assertEqual(flags.NO_PRINT, True)
# cleanup
self.user_config.no_print = None

# cache_selected_only
self.user_config.cache_selected_only = True
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
os.environ['DBT_CACHE_SELECTED_ONLY'] = 'false'
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, False)
setattr(self.args, 'cache_selected_only', True)
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
# cleanup
os.environ.pop('DBT_CACHE_SELECTED_ONLY')
delattr(self.args, 'cache_selected_only')
self.user_config.cache_selected_only = False

0 comments on commit 97a52ff

Please sign in to comment.