Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: see Presto row and array data types #7413

Merged
merged 11 commits into from
May 1, 2019
206 changes: 201 additions & 5 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
import re
import textwrap
import time
from typing import List, Tuple

from flask import g
from flask_babel import lazy_gettext as _
import pandas
import sqlalchemy as sqla
from sqlalchemy import Column, select
from sqlalchemy import Column, select, types
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.result import RowProxy
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import quoted_name, text
from sqlalchemy.sql.expression import TextAsFrom
Expand All @@ -52,6 +56,7 @@

from superset import app, conf, db, sql_parse
from superset.exceptions import SupersetTemplateException
from superset.models.sql_types.presto_sql_types import type_map as presto_type_map
from superset.utils import core as utils

QueryStatus = utils.QueryStatus
Expand Down Expand Up @@ -105,16 +110,16 @@ class BaseEngineSpec(object):
"""Abstract class for database engine specific configurations"""

engine = 'base' # str as defined in sqlalchemy.engine.engine
time_grain_functions = {}
time_grain_functions: dict = {}
time_groupby_inline = False
limit_method = LimitMethod.FORCE_LIMIT
time_secondary_columns = False
inner_joins = True
allows_subquery = True
supports_column_aliases = True
force_column_alias_quotes = False
arraysize = None
max_column_name_length = None
arraysize = 0
max_column_name_length = 0

@classmethod
def get_time_expr(cls, expr, pdf, time_grain, grain):
Expand Down Expand Up @@ -351,6 +356,10 @@ def get_table_names(cls, inspector, schema):
def get_view_names(cls, inspector, schema):
return sorted(inspector.get_view_names(schema))

@classmethod
def get_columns(cls, inspector: Inspector, table_name: str, schema: str) -> list:
return inspector.get_columns(table_name, schema)

@classmethod
def where_latest_partition(
cls, table_name, schema, database, qry, columns=None):
Expand Down Expand Up @@ -735,7 +744,7 @@ class MySQLEngineSpec(BaseEngineSpec):
'INTERVAL DAYOFWEEK(DATE_SUB({col}, INTERVAL 1 DAY)) - 1 DAY))',
}

type_code_map = {} # loaded from get_datatype only if needed
type_code_map: dict = {} # loaded from get_datatype only if needed

@classmethod
def convert_dttm(cls, target_type, dttm):
Expand Down Expand Up @@ -814,6 +823,180 @@ def get_view_names(cls, inspector, schema):
"""
return []

@classmethod
def _create_column_info(cls, column: RowProxy, name: str, data_type: str) -> dict:
"""
Create column info object
:param column: column object
:param name: column name
:param data_type: column data type
:return: column info object
"""
return {
'name': name,
'type': data_type,
# newer Presto no longer includes this column
'nullable': getattr(column, 'Null', True),
'default': None,
}

@classmethod
def _get_full_name(cls, names: List[Tuple[str, str]]) -> str:
"""
Get the full column name
:param names: list of all individual column names
:return: full column name
"""
return '.'.join(column[0] for column in names if column[0])

@classmethod
def _has_nested_data_types(cls, component_type: str) -> bool:
"""
Check if string contains a data type. We determine if there is a data type by
whitespace or multiple data types by commas
:param component_type: data type
:return: boolean
"""
comma_regex = r',(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
white_space_regex = r'\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
return re.search(comma_regex, component_type) is not None \
or re.search(white_space_regex, component_type) is not None

@classmethod
def _split_data_type(cls, data_type: str, delimiter: str) -> List[str]:
"""
Split data type based on given delimiter. Do not split the string if the
delimiter is enclosed in quotes
:param data_type: data type
:param delimiter: string separator (i.e. open parenthesis, closed parenthesis,
comma, whitespace)
:return: list of strings after breaking it by the delimiter
"""
return re.split(
r'{}(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'.format(delimiter), data_type)

@classmethod
def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None:
"""
Parse a row or array column
:param column: column
:param result: list tracking the results
"""
full_data_type = '{} {}'.format(column.Column, column.Type)
# split on open parenthesis ( to get the structural
# data type and its component types
data_types = cls._split_data_type(full_data_type, r'\(')
stack: List[Tuple[str, str]] = []
for data_type in data_types:
# split on closed parenthesis ) to track which component
# types belong to what structural data type
inner_types = cls._split_data_type(data_type, r'\)')
for inner_type in inner_types:
# We have finished parsing multiple structural data types
if not inner_type and len(stack) > 0:
stack.pop()
elif cls._has_nested_data_types(inner_type):
# split on comma , to get individual data types
single_fields = cls._split_data_type(inner_type, ', ')
for single_field in single_fields:
# If component type starts with a comma, the first single field
# will be an empty string. Disregard this empty string.
if not single_field:
continue
# split on whitespace to get field name and data type
field_info = cls._split_data_type(single_field, r'\s')
# check if there is a structural data type within
# overall structural data type
if field_info[1] == 'array' or field_info[1] == 'row':
stack.append((field_info[0], field_info[1]))
full_parent_path = cls._get_full_name(stack)
result.append(cls._create_column_info(
column, full_parent_path,
presto_type_map[field_info[1]]()))
else: # otherwise this field is a basic data type
full_parent_path = cls._get_full_name(stack)
column_name = '{}.{}'.format(full_parent_path, field_info[0])
result.append(cls._create_column_info(
column, column_name, presto_type_map[field_info[1]]()))
# If the component type ends with a structural data type, do not pop
# the stack. We have run across a structural data type within the
# overall structural data type. Otherwise, we have completely parsed
# through the entire structural data type and can move on.
if not (inner_type.endswith('array') or inner_type.endswith('row')):
stack.pop()
# We have an array of row objects (i.e. array(row(...)))
elif 'array' == inner_type or 'row' == inner_type:
# Push a dummy object to represent the structural data type
stack.append(('', inner_type))
# We have an array of a basic data types(i.e. array(varchar)).
elif len(stack) > 0:
# Because it is an array of a basic data type. We have finished
# parsing the structural data type and can move on.
stack.pop()

@classmethod
def _show_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[RowProxy]:
"""
Show presto column names
:param inspector: object that performs database schema inspection
:param table_name: table name
:param schema: schema name
:return: list of column objects
"""
quote = inspector.engine.dialect.identifier_preparer.quote_identifier
full_table = quote(table_name)
if schema:
full_table = '{}.{}'.format(quote(schema), full_table)
columns = inspector.bind.execute('SHOW COLUMNS FROM {}'.format(full_table))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question here, @khtruong / @DiggidyDave: SQLAlchemy already has an interface for getting the columns of a given table. Did you try using that instead? Does it not return complex types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use pyhive to get the columns, which does not handle complex types. It will detect row and array objects as unknown types and assign it as a null data type.

return columns

@classmethod
def get_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[dict]:
"""
Get columns from a Presto data source. This includes handling row and
array data types
:param inspector: object that performs database schema inspection
:param table_name: table name
:param schema: schema name
:return: a list of results that contain column info
(i.e. column name and data type)
"""
columns = cls._show_columns(inspector, table_name, schema)
result: List[dict] = []
for column in columns:
try:
# parse column if it is a row or array
if 'array' in column.Type or 'row' in column.Type:
cls._parse_structural_column(column, result)
continue
else: # otherwise column is a basic data type
column_type = presto_type_map[column.Type]()
except KeyError:
logging.info('Did not recognize type {} of column {}'.format(
column.Type, column.Column))
column_type = types.NullType
result.append(cls._create_column_info(column, column.Column, column_type))
return result

@classmethod
def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None,
limit: int = 100, show_cols: bool = False, indent: bool = True,
latest_partition: bool = True, cols: List[dict] = []) -> str:
"""
Temporary method until we have a function that can handle row and array columns
"""
presto_cols = cols
if show_cols:
dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
presto_cols = [
col for col in presto_cols if re.search(dot_regex, col['name']) is None]
return BaseEngineSpec.select_star(
my_db, table_name, engine, schema, limit,
show_cols, indent, latest_partition, presto_cols,
)

@classmethod
def adjust_database_uri(cls, uri, selected_schema=None):
database = uri.database
Expand Down Expand Up @@ -1323,6 +1506,11 @@ def handle_cursor(cls, cursor, query, session):
time.sleep(hive_poll_interval)
polled = cursor.poll()

@classmethod
def get_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[dict]:
return inspector.get_columns(table_name, schema)

@classmethod
def where_latest_partition(
cls, table_name, schema, database, qry, columns=None):
Expand Down Expand Up @@ -1354,6 +1542,14 @@ def _partition_query(
cls, table_name, limit=0, order_by=None, filters=None):
return f'SHOW PARTITIONS {table_name}'

@classmethod
def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None,
limit: int = 100, show_cols: bool = False, indent: bool = True,
latest_partition: bool = True, cols: List[dict] = []) -> str:
return BaseEngineSpec.select_star(
my_db, table_name, engine, schema, limit,
show_cols, indent, latest_partition, cols)

@classmethod
def modify_url_for_impersonation(cls, url, impersonate_user, username):
"""
Expand Down
2 changes: 1 addition & 1 deletion superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ def get_table(self, table_name, schema=None):
autoload_with=self.get_sqla_engine())

def get_columns(self, table_name, schema=None):
return self.inspector.get_columns(table_name, schema)
return self.db_engine_spec.get_columns(self.inspector, table_name, schema)

def get_indexes(self, table_name, schema=None):
return self.inspector.get_indexes(table_name, schema)
Expand Down
16 changes: 16 additions & 0 deletions superset/models/sql_types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading