Skip to content

Commit

Permalink
fix(ingest/redshift): Temp table lineage fix (#10008)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
treff7es and hsheth2 authored Mar 12, 2024
1 parent 4155a62 commit 4535f2a
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 46 deletions.
97 changes: 59 additions & 38 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,49 +455,70 @@ def list_insert_create_queries_sql(
db_name: str, start_time: datetime, end_time: datetime
) -> str:
return """
select
distinct cluster,
target_schema,
target_table,
username,
query as query_id,
LISTAGG(CASE WHEN LEN(RTRIM(querytxt)) = 0 THEN querytxt ELSE RTRIM(querytxt) END) WITHIN GROUP (ORDER BY sequence) as ddl,
ANY_VALUE(pid) as session_id,
starttime as timestamp
from
(
with query_txt as
(
select
distinct tbl as target_table_id,
sti.schema as target_schema,
sti.table as target_table,
sti.database as cluster,
usename as username,
text as querytxt,
sq.query,
sequence,
si.starttime as starttime,
pid
query,
pid,
LISTAGG(case
when LEN(RTRIM(text)) = 0 then text
else RTRIM(text)
end) within group (
order by
sequence) as ddl
from
stl_insert as si
join SVV_TABLE_INFO sti on
sti.table_id = tbl
left join svl_user_info sui on
si.userid = sui.usesysid
left join STL_QUERYTEXT sq on
si.query = sq.query
left join stl_load_commits slc on
slc.query = si.query
where
(
select
query,
pid,
text,
sequence
from
STL_QUERYTEXT
where
sequence < 320
order by
sequence
)
group by
query,
pid
)
select
distinct tbl as target_table_id,
sti.schema as target_schema,
sti.table as target_table,
sti.database as cluster,
usename as username,
ddl,
sq.query as query_id,
min(si.starttime) as starttime,
ANY_VALUE(pid) as session_id
from
stl_insert as si
left join SVV_TABLE_INFO sti on
sti.table_id = tbl
left join svl_user_info sui on
si.userid = sui.usesysid
left join query_txt sq on
si.query = sq.query
left join stl_load_commits slc on
slc.query = si.query
where
sui.usename <> 'rdsdb'
and slc.query IS NULL
and cluster = '{db_name}'
and slc.query IS NULL
and si.starttime >= '{start_time}'
and si.starttime < '{end_time}'
and sequence < 320
) as target_tables
group by cluster, query_id, target_schema, target_table, username, starttime
order by cluster, query_id, target_schema, target_table, starttime asc
""".format(
group by
target_table_id,
target_schema,
target_table,
cluster,
username,
ddl,
sq.query
""".format(
# We need the original database name for filtering
db_name=db_name,
start_time=start_time.strftime(redshift_datetime_format),
Expand Down Expand Up @@ -615,7 +636,7 @@ def temp_table_ddl_query(start_time: datetime, end_time: datetime) -> str:
)
where
rn = 1;
rn = 1
"""

# Add this join to the sql query for more metrics on completed queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ class AlterTableRow:
start_time: datetime


def _stringy(x: Optional[int]) -> Optional[str]:
if x is None:
return None
return str(x)


# this is a class to be a proxy to query Redshift
class RedshiftDataDictionary:
def __init__(self, is_serverless):
Expand Down Expand Up @@ -419,9 +425,8 @@ def get_lineage_rows(
else None
),
session_id=(
str(row[field_names.index("session_id")])
_stringy(row[field_names.index("session_id")])
if "session_id" in field_names
and row[field_names.index("session_id")]
else None
),
)
Expand All @@ -441,9 +446,13 @@ def get_temporary_rows(
rows = cursor.fetchmany()
while rows:
for row in rows:
# Skipping roews with no session_id
session_id = _stringy(row[field_names.index("session_id")])
if session_id is None:
continue
yield TempTableRow(
transaction_id=row[field_names.index("transaction_id")],
session_id=row[field_names.index("session_id")],
session_id=session_id,
# See https://docs.aws.amazon.com/redshift/latest/dg/r_STL_QUERYTEXT.html
# for why we need to replace the \n with a newline.
query_text=row[field_names.index("query_text")].replace(
Expand All @@ -468,9 +477,12 @@ def get_alter_table_commands(
rows = cursor.fetchmany()
while rows:
for row in rows:
session_id = _stringy(row[field_names.index("session_id")])
if session_id is None:
continue
yield AlterTableRow(
transaction_id=row[field_names.index("transaction_id")],
session_id=row[field_names.index("session_id")],
session_id=session_id,
query_text=row[field_names.index("query_text")],
start_time=row[field_names.index("start_time")],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,9 +1066,12 @@ def _recurse_into_query(
# - Update the query text to combine the queries

composite_query_id = self._composite_query_id(composed_of_queries)
self.report.queries_with_temp_upstreams.setdefault(
composite_query_id, LossyList()
).extend(composed_of_queries)
composed_of_queries_truncated: LossyList[str] = LossyList()
for query_id in composed_of_queries:
composed_of_queries_truncated.append(query_id)
self.report.queries_with_temp_upstreams[
composite_query_id
] = composed_of_queries_truncated

merged_query_text = ";\n\n".join(
[
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/redshift_query_mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def mock_stl_insert_table_cursor(cursor: MagicMock) -> None:
"TABLE volt_tt_'\n -- We need to filter out our query and it was not possible "
"earlier when we did not have any comment in the query\n and query_text not ilike "
"'%https://stackoverflow.com/questions/72770890/redshift-result-size-exceeds-listagg-limit-on-svl"
"-statementtext%'\n\n )\n where\n rn = 1;\n "
"-statementtext%'\n\n )\n where\n rn = 1\n "
): mock_temp_table_cursor,
"select * from test_collapse_temp_lineage": mock_stl_insert_table_cursor,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
},
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),a)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD),b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),b)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD),c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD),c)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "create table #temp2 as select b, c from upstream2;\n\ncreate table #temp1 as select a, 2*b as b from upstream1;\n\ncreate temp table staging_foo as select up1.a, up1.b, up2.c from #temp1 up1 left join #temp2 up2 on up1.b = up2.b where up1.b > 0;\n\ninsert into table prod_foo\nselect * from staging_foo",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.prod_foo,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)"
}
]
}
}
}
]
54 changes: 54 additions & 0 deletions metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,60 @@ def test_temp_table(pytestconfig: pytest.Config) -> None:
)


@freeze_time(FROZEN_TIME)
def test_multistep_temp_table(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)

aggregator.add_observed_query(
query="create table #temp1 as select a, 2*b as b from upstream1",
default_db="dev",
default_schema="public",
session_id="session1",
)
aggregator.add_observed_query(
query="create table #temp2 as select b, c from upstream2",
default_db="dev",
default_schema="public",
session_id="session1",
)
aggregator.add_observed_query(
query="create temp table staging_foo as select up1.a, up1.b, up2.c from #temp1 up1 left join #temp2 up2 on up1.b = up2.b where up1.b > 0",
default_db="dev",
default_schema="public",
session_id="session1",
)
aggregator.add_observed_query(
query="insert into table prod_foo\nselect * from staging_foo",
default_db="dev",
default_schema="public",
session_id="session1",
)

mcps = list(aggregator.gen_metadata())

report = aggregator.report
assert len(report.queries_with_temp_upstreams) == 1
assert (
len(
report.queries_with_temp_upstreams[
"composite_c89ee7c127c64a5d3a42ee875305087991891c80f42a25012910524bd2c77c45"
]
)
== 4
)

mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_multistep_temp_table.json",
)


@freeze_time(FROZEN_TIME)
def test_aggregate_operations(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
Expand Down

0 comments on commit 4535f2a

Please sign in to comment.