Skip to content

Commit

Permalink
Merge pull request #160 from franloza/feature/93-catalog-generation
Browse files Browse the repository at this point in the history
Parse information returned by list_relations_without_caching macro to speed up catalog generation
  • Loading branch information
jtcohen6 authored Apr 17, 2021
2 parents 40db364 + 36367e6 commit 27409c4
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

- Cast `table_owner` to string to avoid errors generating docs ([#158](https://github.com/fishtown-analytics/dbt-spark/pull/158), [#159](https://github.com/fishtown-analytics/dbt-spark/pull/159))

### Under the hood

- Parse information returned by `list_relations_without_caching` macro to speed up catalog generation ([#93](https://github.com/fishtown-analytics/dbt-spark/issues/93), [#160](https://github.com/fishtown-analytics/dbt-spark/pull/160))

### Contributors
- [@friendofasquid](https://github.com/friendofasquid) ([#159](https://github.com/fishtown-analytics/dbt-spark/pull/159))
- [@franloza](https://github.com/franloza) ([#160](https://github.com/fishtown-analytics/dbt-spark/pull/160))


## dbt-spark 0.19.1 (Release TBD)

Expand Down
58 changes: 50 additions & 8 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, Union, Iterable
Expand Down Expand Up @@ -60,6 +61,11 @@ class SparkAdapter(SQLAdapter):
'stats:rows:description',
'stats:rows:include',
)
INFORMATION_COLUMNS_REGEX = re.compile(
r"\|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE)
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -139,7 +145,8 @@ def list_relations_without_caching(
schema=_schema,
identifier=name,
type=rel_type,
is_delta=is_delta
information=information,
is_delta=is_delta,
)
relations.append(relation)

Expand Down Expand Up @@ -197,19 +204,54 @@ def find_table_information_separator(rows: List[dict]) -> int:
return pos

def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
rows: List[agate.Row] = super().get_columns_in_relation(relation)
return self.parse_describe_extended(relation, rows)
cached_relations = self.cache.get_relations(
relation.database, relation.schema)
cached_relation = next((cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)),
None)
if cached_relation is None:
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)
else:
columns = self.parse_columns_from_information(cached_relation)
return columns

def parse_columns_from_information(
self, relation: SparkRelation
) -> List[SparkColumn]:
owner_match = re.findall(
self.INFORMATION_OWNER_REGEX, relation.information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(
self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(
self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = SparkColumn(
table_database=None,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type,
table_stats=table_stats
)
columns.append(column)
return columns

def _get_columns_for_catalog(
self, relation: SparkRelation
) -> Iterable[Dict[str, Any]]:
properties = self.get_properties(relation)
columns = self.get_columns_in_relation(relation)
owner = properties.get(KEY_TABLE_OWNER)
columns = self.parse_columns_from_information(relation)

for column in columns:
if owner:
column.table_owner = owner
# convert SparkColumns into catalog dicts
as_dict = column.to_column_dict()
as_dict['column_name'] = as_dict.pop('column', None)
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
information: str = None

def __post_init__(self):
if self.database != self.schema and self.database:
Expand Down
174 changes: 174 additions & 0 deletions test/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,177 @@ def test_profile_with_cluster_and_sql_endpoint(self):
}
with self.assertRaises(RuntimeException):
config_from_parts_or_dicts(self.project_cfg, profile)

def test_parse_columns_from_information_with_table_type_and_delta_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

# Mimics the output of Spark in the information column
information = (
"Database: default_schema\n"
"Table: mytable\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: Wed May 20 19:25:00 UTC 1925\n"
"Created By: Spark 3.0.1\n"
"Type: MANAGED\n"
"Provider: delta\n"
"Statistics: 123456789 bytes\n"
"Location: /mnt/vo\n"
"Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n"
"InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n"
"Partition Provider: Catalog\n"
"Partition Columns: [`dt`]\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='mytable',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(columns[0].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'col1',
'column_index': 0,
'dtype': 'decimal(22,0)',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789,
})

def test_parse_columns_from_information_with_view_type(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.View
information = (
"Database: default_schema\n"
"Table: myview\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: UNKNOWN\n"
"Created By: Spark 3.0.1\n"
"Type: VIEW\n"
"View Text: WITH base (\n"
" SELECT * FROM source_table\n"
")\n"
"SELECT col1, col2, dt FROM base\n"
"View Original Text: WITH base (\n"
" SELECT * FROM source_table\n"
")\n"
"SELECT col1, col2, dt FROM base\n"
"View Catalog and Namespace: spark_catalog.default\n"
"View Query Output Columns: [col1, col2, dt]\n"
"Table Properties: [view.query.out.col.1=col1, view.query.out.col.2=col2, "
"transient_lastDdlTime=1618324324, view.query.out.col.3=dt, "
"view.catalogAndNamespace.part.0=spark_catalog, "
"view.catalogAndNamespace.part.1=default]\n"
"Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\n"
"InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat\n"
"Storage Properties: [serialization.format=1]\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='myview',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(columns[1].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'col2',
'column_index': 1,
'dtype': 'string',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
})

def test_parse_columns_from_information_with_table_type_and_parquet_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

information = (
"Database: default_schema\n"
"Table: mytable\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: Wed May 20 19:25:00 UTC 1925\n"
"Created By: Spark 3.0.1\n"
"Type: MANAGED\n"
"Provider: parquet\n"
"Statistics: 1234567890 bytes, 12345678 rows\n"
"Location: /mnt/vo\n"
"Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n"
"InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='mytable',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(columns[2].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'dt',
'column_index': 2,
'dtype': 'date',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,

'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
})

38 changes: 38 additions & 0 deletions test/unit/test_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest

from dbt.adapters.spark import SparkColumn


class TestSparkColumn(unittest.TestCase):

def test_convert_table_stats_with_no_statistics(self):
self.assertDictEqual(
SparkColumn.convert_table_stats(None),
{}
)

def test_convert_table_stats_with_bytes(self):
self.assertDictEqual(
SparkColumn.convert_table_stats("123456789 bytes"),
{
'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789
}
)

def test_convert_table_stats_with_bytes_and_rows(self):
self.assertDictEqual(
SparkColumn.convert_table_stats("1234567890 bytes, 12345678 rows"),
{
'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,
'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
}
)

0 comments on commit 27409c4

Please sign in to comment.