Skip to content

Commit

Permalink
Parse statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
Fran Lozano committed Apr 15, 2021
1 parent 2c6a5d8 commit 7612fcb
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 3 deletions.
8 changes: 7 additions & 1 deletion dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SparkAdapter(SQLAdapter):
INFORMATION_COLUMNS_REGEX = re.compile(
r"\|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE)
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(r"^Statistics: (.*)$", re.MULTILINE)

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -224,6 +225,10 @@ def parse_columns_from_information(
matches = re.finditer(
self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(
self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = SparkColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = SparkColumn(
Expand All @@ -234,7 +239,8 @@ def parse_columns_from_information(
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type
dtype=column_type,
table_stats=table_stats
)
columns.append(column)
return columns
Expand Down
67 changes: 65 additions & 2 deletions test/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def test_profile_with_cluster_and_sql_endpoint(self):
with self.assertRaises(RuntimeException):
config_from_parts_or_dicts(self.project_cfg, profile)

def test_parse_columns_from_information_with_table_type(self):
def test_parse_columns_from_information_with_table_type_and_delta_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

Expand Down Expand Up @@ -495,7 +495,12 @@ def test_parse_columns_from_information_with_table_type(self):
'dtype': 'decimal(22,0)',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789,
})

def test_parse_columns_from_information_with_view_type(self):
Expand Down Expand Up @@ -557,3 +562,61 @@ def test_parse_columns_from_information_with_view_type(self):
'char_size': None
})

def test_parse_columns_from_information_with_table_type_and_parquet_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table

information = (
"Database: default_schema\n"
"Table: mytable\n"
"Owner: root\n"
"Created Time: Wed Feb 04 18:15:00 UTC 1815\n"
"Last Access: Wed May 20 19:25:00 UTC 1925\n"
"Created By: Spark 3.0.1\n"
"Type: MANAGED\n"
"Provider: parquet\n"
"Statistics: 1234567890 bytes, 12345678 rows\n"
"Location: /mnt/vo\n"
"Serde Library: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe\n"
"InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat\n"
"OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat\n"
"Schema: root\n"
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
identifier='mytable',
type=rel_type,
information=information
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(columns[2].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'dt',
'column_index': 2,
'dtype': 'date',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,

'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
})

38 changes: 38 additions & 0 deletions test/unit/test_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest

from dbt.adapters.spark import SparkColumn


class TestSparkColumn(unittest.TestCase):

def test_convert_table_stats_with_no_statistics(self):
self.assertDictEqual(
SparkColumn.convert_table_stats(None),
{}
)

def test_convert_table_stats_with_bytes(self):
self.assertDictEqual(
SparkColumn.convert_table_stats("123456789 bytes"),
{
'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789
}
)

def test_convert_table_stats_with_bytes_and_rows(self):
self.assertDictEqual(
SparkColumn.convert_table_stats("1234567890 bytes, 12345678 rows"),
{
'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,
'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
}
)

0 comments on commit 7612fcb

Please sign in to comment.