Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental cache_selected_only config #5036

Merged
merged 9 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changes/unreleased/Breaking Changes-20220412-152450.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kind: Breaking Changes
body: 'For adapter plugin maintainers only: Internal adapter methods `set_relations_cache` + `_relations_cache_for_schemas`
each take an additional argument, for use with experimental `CACHE_SELECTED_ONLY`
config'
time: 2022-04-12T15:24:50.159572+02:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220316-003847.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Add `--cache_selected_only` flag to cache schema object of selected models
only.
time: 2022-03-16T00:38:47.8468296-05:00
custom:
Author: karunpoudel
Issue: "4688"
PR: "4860"
13 changes: 9 additions & 4 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,14 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
# databases
return info_schema_name_map

def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Set[BaseRelation] = None
) -> None:
"""Populate the relations cache for the given schemas. Returns an
iterable of the schemas populated, as strings.
"""
cache_schemas = self._get_cache_schemas(manifest)
if not cache_schemas:
cache_schemas = self._get_cache_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[List[BaseRelation]]] = []
for cache_schema in cache_schemas:
Expand All @@ -370,14 +373,16 @@ def _relations_cache_for_schemas(self, manifest: Manifest) -> None:
cache_update.add((relation.database, relation.schema))
self.cache.update_schemas(cache_update)

def set_relations_cache(self, manifest: Manifest, clear: bool = False) -> None:
def set_relations_cache(
self, manifest: Manifest, clear: bool = False, required_schemas: Set[BaseRelation] = None
) -> None:
"""Run a query that gets a populated cache of the relations in the
database and set the cache on this adapter.
"""
with self.cache.lock:
if clear:
self.cache.clear()
self._relations_cache_for_schemas(manifest)
self._relations_cache_for_schemas(manifest, required_schemas)

@available
def cache_added(self, relation: Optional[BaseRelation]) -> str:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract):
use_experimental_parser: Optional[bool] = None
static_parser: Optional[bool] = None
indirect_selection: Optional[str] = None
cache_selected_only: Optional[bool] = None


@dataclass
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
EVENT_BUFFER_SIZE = 100000
QUIET = None
NO_PRINT = None
CACHE_SELECTED_ONLY = None

_NON_BOOLEAN_FLAGS = [
"LOG_FORMAT",
Expand Down Expand Up @@ -69,6 +70,7 @@
"EVENT_BUFFER_SIZE": 100000,
"QUIET": False,
"NO_PRINT": False,
"CACHE_SELECTED_ONLY": False,
}


Expand Down Expand Up @@ -118,7 +120,7 @@ def set_from_args(args, user_config):
global STRICT_MODE, FULL_REFRESH, WARN_ERROR, USE_EXPERIMENTAL_PARSER, STATIC_PARSER
global WRITE_JSON, PARTIAL_PARSE, USE_COLORS, STORE_FAILURES, PROFILES_DIR, DEBUG, LOG_FORMAT
global INDIRECT_SELECTION, VERSION_CHECK, FAIL_FAST, SEND_ANONYMOUS_USAGE_STATS
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, NO_PRINT
global PRINTER_WIDTH, WHICH, LOG_CACHE_EVENTS, EVENT_BUFFER_SIZE, QUIET, NO_PRINT, CACHE_SELECTED_ONLY

STRICT_MODE = False # backwards compatibility
# cli args without user_config or env var option
Expand All @@ -145,6 +147,7 @@ def set_from_args(args, user_config):
EVENT_BUFFER_SIZE = get_flag_value("EVENT_BUFFER_SIZE", args, user_config)
QUIET = get_flag_value("QUIET", args, user_config)
NO_PRINT = get_flag_value("NO_PRINT", args, user_config)
CACHE_SELECTED_ONLY = get_flag_value("CACHE_SELECTED_ONLY", args, user_config)

_set_overrides_from_env()

Expand Down
21 changes: 21 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,27 @@ def parse_args(args, cls=DBTArgumentParser):
""",
)

schema_cache_flag = p.add_mutually_exclusive_group()
schema_cache_flag.add_argument(
"--cache-selected-only",
action="store_const",
const=True,
default=None,
dest="cache_selected_only",
help="""
Pre cache database objects relevant to selected resource only.
""",
)
schema_cache_flag.add_argument(
"--no-cache-selected-only",
action="store_const",
const=False,
dest="cache_selected_only",
help="""
Pre cache all database objects related to the project.
""",
)

subs = p.add_subparsers(title="Available sub-commands")

base_subparser = _build_base_subparser()
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,9 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
self.create_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter)
required_schemas = self.get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, required_schemas)
self.populate_adapter_cache(adapter, required_schemas)
self.defer_to_manifest(adapter, selected_uids)
self.safe_run_hooks(adapter, RunHookType.Start, {})

Expand Down
10 changes: 6 additions & 4 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,12 @@ def _mark_dependent_errors(self, node_id, result, cause):
for dep_node_id in self.graph.get_dependent_nodes(node_id):
self._skipped_children[dep_node_id] = cause

def populate_adapter_cache(self, adapter):
def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = None):
start_populate_cache = time.perf_counter()
adapter.set_relations_cache(self.manifest)
if flags.CACHE_SELECTED_ONLY is True:
adapter.set_relations_cache(self.manifest, required_schemas=required_schemas)
else:
adapter.set_relations_cache(self.manifest)
cache_populate_time = time.perf_counter() - start_populate_cache
if dbt.tracking.active_user is not None:
dbt.tracking.track_runnable_timing(
Expand Down Expand Up @@ -504,8 +507,7 @@ def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRe

return result

def create_schemas(self, adapter, selected_uids: Iterable[str]):
required_schemas = self.get_model_schemas(adapter, selected_uids)
def create_schemas(self, adapter, required_schemas: Set[BaseRelation]):
# we want the string form of the information schema database
required_databases: Set[BaseRelation] = set()
for required in required_schemas:
Expand Down
4 changes: 2 additions & 2 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def _link_cached_relations(self, manifest):

self._link_cached_database_relations(schemas)

def _relations_cache_for_schemas(self, manifest):
super()._relations_cache_for_schemas(manifest)
def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{{
config(
materialized='table',
schema='another_schema'
)
}}
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
config(
materialized='table'
)
}}
select 1 as id
14 changes: 14 additions & 0 deletions test/integration/038_caching_tests/test_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ def models(self):
@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()

class TestCachingSelectedSchemaOnly(TestBaseCaching):
@property
def models(self):
return "models_multi_schemas"

def run_and_get_adapter(self):
# select only the 'model' in the default schema
self.run_dbt(['--cache-selected-only', 'run', '--select', 'model'])
return FACTORY.adapters[self.adapter_type]

@use_profile('postgres')
def test_postgres_cache(self):
self.cache_run()
15 changes: 15 additions & 0 deletions test/unit/test_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,18 @@ def test__flags(self):
self.assertEqual(flags.NO_PRINT, True)
# cleanup
self.user_config.no_print = None

# cache_selected_only
self.user_config.cache_selected_only = True
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
os.environ['DBT_CACHE_SELECTED_ONLY'] = 'false'
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, False)
setattr(self.args, 'cache_selected_only', True)
flags.set_from_args(self.args, self.user_config)
self.assertEqual(flags.CACHE_SELECTED_ONLY, True)
# cleanup
os.environ.pop('DBT_CACHE_SELECTED_ONLY')
delattr(self.args, 'cache_selected_only')
self.user_config.cache_selected_only = False