-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
cache.py
501 lines (413 loc) · 18.8 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
import threading
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from dbt.adapters.reference_keys import _make_key, _ReferenceKey
import dbt.exceptions
from dbt.events.functions import fire_event
from dbt.events.types import (
AddLink,
AddRelation,
DropCascade,
DropMissingRelation,
DropRelation,
DumpAfterAddGraph,
DumpAfterRenameSchema,
DumpBeforeAddGraph,
DumpBeforeRenameSchema,
RenameSchema,
TemporaryRelation,
UncachedRelation,
UpdateReference,
)
from dbt.utils import lowercase
from dbt.helper_types import Lazy
def dot_separated(key: _ReferenceKey) -> str:
"""Return the key in dot-separated string form.
:param _ReferenceKey key: The key to stringify.
"""
return ".".join(map(str, key))
class _CachedRelation:
"""Nothing about _CachedRelation is guaranteed to be thread-safe!
:attr str schema: The schema of this relation.
:attr str identifier: The identifier of this relation.
:attr Dict[_ReferenceKey, _CachedRelation] referenced_by: The relations
that refer to this relation.
:attr BaseRelation inner: The underlying dbt relation.
"""
def __init__(self, inner):
self.referenced_by = {}
self.inner = inner
def __str__(self) -> str:
return ("_CachedRelation(database={}, schema={}, identifier={}, inner={})").format(
self.database, self.schema, self.identifier, self.inner
)
@property
def database(self) -> Optional[str]:
return lowercase(self.inner.database)
@property
def schema(self) -> Optional[str]:
return lowercase(self.inner.schema)
@property
def identifier(self) -> Optional[str]:
return lowercase(self.inner.identifier)
def __copy__(self):
new = self.__class__(self.inner)
new.__dict__.update(self.__dict__)
return new
def __deepcopy__(self, memo):
new = self.__class__(self.inner.incorporate())
new.__dict__.update(self.__dict__)
new.referenced_by = deepcopy(self.referenced_by, memo)
def is_referenced_by(self, key):
return key in self.referenced_by
def key(self):
"""Get the _ReferenceKey that represents this relation
:return _ReferenceKey: A key for this relation.
"""
return _make_key(self)
def add_reference(self, referrer: "_CachedRelation"):
"""Add a reference from referrer to self, indicating that if this node
were drop...cascaded, the referrer would be dropped as well.
:param _CachedRelation referrer: The node that refers to this node.
"""
self.referenced_by[referrer.key()] = referrer
def collect_consequences(self):
"""Recursively collect a set of _ReferenceKeys that would
consequentially get dropped if this were dropped via
"drop ... cascade".
:return Set[_ReferenceKey]: All the relations that would be dropped
"""
consequences = {self.key()}
for relation in self.referenced_by.values():
consequences.update(relation.collect_consequences())
return consequences
def release_references(self, keys):
"""Non-recursively indicate that an iterable of _ReferenceKey no longer
exist. Unknown keys are ignored.
:param Iterable[_ReferenceKey] keys: The keys to drop.
"""
keys = set(self.referenced_by) & set(keys)
for key in keys:
self.referenced_by.pop(key)
def rename(self, new_relation):
"""Rename this cached relation to new_relation.
Note that this will change the output of key(), all refs must be
updated!
:param _CachedRelation new_relation: The new name to apply to the
relation
"""
# Relations store this stuff inside their `path` dict. But they
# also store a table_name, and usually use it in their .render(),
# so we need to update that as well. It doesn't appear that
# table_name is ever anything but the identifier (via .create())
self.inner = self.inner.incorporate(
path={
"database": new_relation.inner.database,
"schema": new_relation.inner.schema,
"identifier": new_relation.inner.identifier,
},
)
def rename_key(self, old_key, new_key):
"""Rename a reference that may or may not exist. Only handles the
reference itself, so this is the other half of what `rename` does.
If old_key is not in referenced_by, this is a no-op.
:param _ReferenceKey old_key: The old key to be renamed.
:param _ReferenceKey new_key: The new key to rename to.
:raises InternalError: If the new key already exists.
"""
if new_key in self.referenced_by:
dbt.exceptions.raise_cache_inconsistent(
'in rename of "{}" -> "{}", new name is in the cache already'.format(
old_key, new_key
)
)
if old_key not in self.referenced_by:
return
value = self.referenced_by.pop(old_key)
self.referenced_by[new_key] = value
def dump_graph_entry(self):
"""Return a key/value pair representing this key and its referents.
return List[str]: The dot-separated form of all referent keys.
"""
return [dot_separated(r) for r in self.referenced_by]
class RelationsCache:
"""A cache of the relations known to dbt. Keeps track of relationships
declared between tables and handles renames/drops as a real database would.
:attr Dict[_ReferenceKey, _CachedRelation] relations: The known relations.
:attr threading.RLock lock: The lock around relations, held during updates.
The adapters also hold this lock while filling the cache.
:attr Set[str] schemas: The set of known/cached schemas, all lowercased.
"""
def __init__(self) -> None:
self.relations: Dict[_ReferenceKey, _CachedRelation] = {}
self.lock = threading.RLock()
self.schemas: Set[Tuple[Optional[str], Optional[str]]] = set()
def add_schema(
self,
database: Optional[str],
schema: Optional[str],
) -> None:
"""Add a schema to the set of known schemas (case-insensitive)
:param database: The database name to add.
:param schema: The schema name to add.
"""
self.schemas.add((lowercase(database), lowercase(schema)))
def drop_schema(
self,
database: Optional[str],
schema: Optional[str],
) -> None:
"""Drop the given schema and remove it from the set of known schemas.
Then remove all its contents (and their dependents, etc) as well.
"""
key = (lowercase(database), lowercase(schema))
if key not in self.schemas:
return
# avoid iterating over self.relations while removing things by
# collecting the list first.
with self.lock:
to_remove = self._list_relations_in_schema(database, schema)
self._remove_all(to_remove)
# handle a drop_schema race by using discard() over remove()
self.schemas.discard(key)
def update_schemas(self, schemas: Iterable[Tuple[Optional[str], str]]):
"""Add multiple schemas to the set of known schemas (case-insensitive)
:param schemas: An iterable of the schema names to add.
"""
self.schemas.update((lowercase(d), s.lower()) for (d, s) in schemas)
def __contains__(self, schema_id: Tuple[Optional[str], str]):
"""A schema is 'in' the relations cache if it is in the set of cached
schemas.
:param schema_id: The db name and schema name to look up.
"""
db, schema = schema_id
return (lowercase(db), schema.lower()) in self.schemas
def dump_graph(self):
"""Dump a key-only representation of the schema to a dictionary. Every
known relation is a key with a value of a list of keys it is referenced
by.
"""
# we have to hold the lock for the entire dump, if other threads modify
# self.relations or any cache entry's referenced_by during iteration
# it's a runtime error!
with self.lock:
return {dot_separated(k): v.dump_graph_entry() for k, v in self.relations.items()}
def _setdefault(self, relation: _CachedRelation):
"""Add a relation to the cache, or return it if it already exists.
:param _CachedRelation relation: The relation to set or get.
:return _CachedRelation: The relation stored under the given relation's
key
"""
self.add_schema(relation.database, relation.schema)
key = relation.key()
return self.relations.setdefault(key, relation)
def _add_link(self, referenced_key, dependent_key):
"""Add a link between two relations to the database. Both the old and
new entries must alraedy exist in the database.
:param _ReferenceKey referenced_key: The key identifying the referenced
model (the one that if dropped will drop the dependent model).
:param _ReferenceKey dependent_key: The key identifying the dependent
model.
:raises InternalError: If either entry does not exist.
"""
referenced = self.relations.get(referenced_key)
if referenced is None:
return
if referenced is None:
dbt.exceptions.raise_cache_inconsistent(
"in add_link, referenced link key {} not in cache!".format(referenced_key)
)
dependent = self.relations.get(dependent_key)
if dependent is None:
dbt.exceptions.raise_cache_inconsistent(
"in add_link, dependent link key {} not in cache!".format(dependent_key)
)
assert dependent is not None # we just raised!
referenced.add_reference(dependent)
# TODO: Is this dead code? I can't seem to find it grepping the codebase.
def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
The dependent model refers _to_ the referenced model. So, given
arguments of (jake_test, bar, jake_test, foo):
both values are in the schema jake_test and foo is a view that refers
to bar, so "drop bar cascade" will drop foo and all of foo's
dependents.
:param BaseRelation referenced: The referenced model.
:param BaseRelation dependent: The dependent model.
:raises InternalError: If either entry does not exist.
"""
ref_key = _make_key(referenced)
dep_key = _make_key(dependent)
if (ref_key.database, ref_key.schema) not in self:
# if we have not cached the referenced schema at all, we must be
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(UncachedRelation(dep_key=dep_key, ref_key=ref_key))
return
if ref_key not in self.relations:
# Insert a dummy "external" relation.
referenced = referenced.replace(type=referenced.External)
self.add(referenced)
if dep_key not in self.relations:
# Insert a dummy "external" relation.
dependent = dependent.replace(type=referenced.External)
self.add(dependent)
fire_event(AddLink(dep_key=dep_key, ref_key=ref_key))
with self.lock:
self._add_link(ref_key, dep_key)
def add(self, relation):
"""Add the relation inner to the cache, under the schema schema and
identifier identifier
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(AddRelation(relation=_make_key(cached)))
fire_event(DumpBeforeAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
with self.lock:
self._setdefault(cached)
fire_event(DumpAfterAddGraph(dump=Lazy.defer(lambda: self.dump_graph())))
def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
cascade!
:param Iterable[_ReferenceKey] keys: The keys to remove.
"""
# remove direct refs
for key in keys:
del self.relations[key]
# then remove all entries from each child
for cached in self.relations.values():
cached.release_references(keys)
def _drop_cascade_relation(self, dropped_key):
"""Drop the given relation and cascade it appropriately to all
dependent relations.
:param _CachedRelation dropped: An existing _CachedRelation to drop.
"""
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key))
return
consequences = self.relations[dropped_key].collect_consequences()
fire_event(DropCascade(dropped=dropped_key, consequences=consequences))
self._remove_refs(consequences)
def drop(self, relation):
"""Drop the named relation and cascade it appropriately to all
dependent relations.
Because dbt proactively does many `drop relation if exist ... cascade`
that are noops, nonexistent relation drops cause a debug log and no
other actions.
:param str schema: The schema of the relation to drop.
:param str identifier: The identifier of the relation to drop.
"""
dropped_key = _make_key(relation)
fire_event(DropRelation(dropped=dropped_key))
with self.lock:
self._drop_cascade_relation(dropped_key)
def _rename_relation(self, old_key, new_relation):
"""Rename a relation named old_key to new_key, updating references.
Return whether or not there was a key to rename.
:param _ReferenceKey old_key: The existing key, to rename from.
:param _CachedRelation new_key: The new relation, to rename to.
"""
# On the database level, a rename updates all values that were
# previously referenced by old_name to be referenced by new_name.
# basically, the name changes but some underlying ID moves. Kind of
# like an object reference!
relation = self.relations.pop(old_key)
new_key = new_relation.key()
# relaton has to rename its innards, so it needs the _CachedRelation.
relation.rename(new_relation)
# update all the relations that refer to it
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
fire_event(
UpdateReference(old_key=old_key, new_key=new_key, cached_key=cached.key())
)
cached.rename_key(old_key, new_key)
self.relations[new_key] = relation
# also fixup the schemas!
self.add_schema(new_key.database, new_key.schema)
return True
def _check_rename_constraints(self, old_key, new_key):
"""Check the rename constraints, and return whether or not the rename
can proceed.
If the new key is already present, that is an error.
If the old key is absent, we debug log and return False, assuming it's
a temp table being renamed.
:param _ReferenceKey old_key: The existing key, to rename from.
:param _ReferenceKey new_key: The new key, to rename to.
:return bool: If the old relation exists for renaming.
:raises InternalError: If the new key is already present.
"""
if new_key in self.relations:
dbt.exceptions.raise_cache_inconsistent(
"in rename, new key {} already in cache: {}".format(
new_key, list(self.relations.keys())
)
)
if old_key not in self.relations:
fire_event(TemporaryRelation(key=old_key))
return False
return True
def rename(self, old, new):
"""Rename the old schema/identifier to the new schema/identifier and
update references.
If the new schema/identifier is already present, that is an error.
If the schema/identifier key is absent, we only debug log and return,
assuming it's a temp table being renamed.
:param BaseRelation old: The existing relation name information.
:param BaseRelation new: The new relation name information.
:raises InternalError: If the new key is already present.
"""
old_key = _make_key(old)
new_key = _make_key(new)
fire_event(RenameSchema(old_key=old_key, new_key=new_key))
fire_event(DumpBeforeRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
with self.lock:
if self._check_rename_constraints(old_key, new_key):
self._rename_relation(old_key, _CachedRelation(new))
else:
self._setdefault(_CachedRelation(new))
fire_event(DumpAfterRenameSchema(dump=Lazy.defer(lambda: self.dump_graph())))
def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
"""Case-insensitively yield all relations matching the given schema.
:param str schema: The case-insensitive schema name to list from.
:return List[BaseRelation]: The list of relations with the given
schema
"""
database = lowercase(database)
schema = lowercase(schema)
with self.lock:
results = [
r.inner
for r in self.relations.values()
if (lowercase(r.schema) == schema and lowercase(r.database) == database)
]
if None in results:
dbt.exceptions.raise_cache_inconsistent(
"in get_relations, a None relation was found in the cache!"
)
return results
def clear(self):
"""Clear the cache"""
with self.lock:
self.relations.clear()
self.schemas.clear()
def _list_relations_in_schema(
self, database: Optional[str], schema: Optional[str]
) -> List[_CachedRelation]:
"""Get the relations in a schema. Callers should hold the lock."""
key = (lowercase(database), lowercase(schema))
to_remove: List[_CachedRelation] = []
for cachekey, relation in self.relations.items():
if (cachekey.database, cachekey.schema) == key:
to_remove.append(relation)
return to_remove
def _remove_all(self, to_remove: List[_CachedRelation]):
"""Remove all the listed relations. Ignore relations that have been
cascaded out.
"""
for relation in to_remove:
# it may have been cascaded out already
drop_key = _make_key(relation)
if drop_key in self.relations:
self.drop(drop_key)