Skip to content

Commit

Permalink
Track publication time in primary key & stop deduping rows (#151)
Browse files Browse the repository at this point in the history
* Track publication time, stop deduping

Update all test archives to new format.

* Update tests to expect publication time

* Always read the published_parsed as America/New_York
  • Loading branch information
jdangerx committed Oct 12, 2023
1 parent 959138b commit cd9d241
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 115 deletions.
9 changes: 9 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
PACKAGE_NAME Release Notes
=======================================================================================

.. _release-v1-2-0:

---------------------------------------------------------------------------------------
1.2.0 (2023-10-06)
---------------------------------------------------------------------------------------

* Instead of combining multiple filings from one year, we track publication
time of each filing and keep all filings. This allows downstream users to
deduplicate facts from multiple filings.

.. _release-v1-1-0:

Expand Down
2 changes: 1 addition & 1 deletion src/ferc_xbrl_extractor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def run_main(
loglevel: str,
logfile: Path | None,
requested_tables: list[str] | None = None,
instance_pattern: str = None,
instance_pattern: str = r"",
):
"""Log setup, taxonomy finding, and SQL IO."""
logger = get_logger("ferc_xbrl_extractor")
Expand Down
20 changes: 16 additions & 4 deletions src/ferc_xbrl_extractor/datapackage.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ def __hash__(self):
Field representing the filing name (Present in all tables).
"""

PUBLICATION_TIME = Field(
name="publication_time",
title="Publication Time",
type="date",
description="Time the filing was made available on the FERC RSS feed.",
)
"""
Field representing the publication time (injected into all tables).
"""

START_DATE = Field(
name="start_date",
title="Start Date",
Expand Down Expand Up @@ -89,12 +99,12 @@ def __hash__(self):
Field representing an instant date (Present in all instant tables).
"""

DURATION_COLUMNS = [ENTITY_ID, FILING_NAME, START_DATE, END_DATE]
DURATION_COLUMNS = [ENTITY_ID, FILING_NAME, PUBLICATION_TIME, START_DATE, END_DATE]
"""
Fields common to all duration tables.
"""

INSTANT_COLUMNS = [ENTITY_ID, FILING_NAME, INSTANT_DATE]
INSTANT_COLUMNS = [ENTITY_ID, FILING_NAME, PUBLICATION_TIME, INSTANT_DATE]
"""
Fields common to all instant tables.
"""
Expand Down Expand Up @@ -358,7 +368,9 @@ def construct_dataframe(self, instance: Instance) -> pd.DataFrame:
instance.used_fact_ids |= {f.f_id() for f in raw_facts}

if not raw_facts:
return pd.DataFrame()
return pd.DataFrame(columns=self.columns.keys()).set_index(
self.schema.primary_key
)

fact_index = ["c_id", "name"]
facts = (
Expand All @@ -377,7 +389,7 @@ def construct_dataframe(self, instance: Instance) -> pd.DataFrame:
.reindex(columns=self.data_columns)
)

facts["report_date"] = instance.report_date
facts["publication_time"] = instance.publication_time

contexts = facts.index.to_series().apply(
lambda c_id: pd.Series(
Expand Down
60 changes: 57 additions & 3 deletions src/ferc_xbrl_extractor/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import datetime
import io
import itertools
import json
import zipfile
from collections import Counter, defaultdict
from enum import Enum, auto
from pathlib import Path
from typing import BinaryIO
from zoneinfo import ZoneInfo

import stringcase
from lxml import etree # nosec: B410
Expand Down Expand Up @@ -243,6 +245,7 @@ def __init__(
instant_facts: dict[str, list[Fact]],
duration_facts: dict[str, list[Fact]],
filing_name: str,
publication_time: datetime.datetime,
):
"""Construct Instance from parsed contexts and facts.
Expand All @@ -255,6 +258,7 @@ def __init__(
instant_facts: Dictionary mapping concept name to list of instant facts.
duration_facts: Dictionary mapping concept name to list of duration facts.
filing_name: Name of parsed filing.
publication_time: the time at which the filing was made available online.
"""
self.logger = get_logger(__name__)
self.instant_facts = instant_facts
Expand Down Expand Up @@ -290,6 +294,7 @@ def __init__(
self.report_date = datetime.date.fromisoformat(
duration_facts["certifying_official_date"][0].value
)
self.publication_time = publication_time

def get_facts(
self, instant: bool, concept_names: list[str], primary_key: list[str]
Expand All @@ -316,15 +321,22 @@ def get_facts(
class InstanceBuilder:
"""Class to manage parsing XBRL filings."""

def __init__(self, file_info: str | BinaryIO, name: str):
def __init__(
self,
file_info: str | BinaryIO,
name: str,
publication_time: datetime.datetime,
):
"""Construct InstanceBuilder class.
Args:
file_info: Either path to filing, or file data.
name: Name of filing.
publication_time: Time this filing was published.
"""
self.name = name
self.file = file_info
self.publication_time = publication_time

def parse(self, fact_prefix: str = "ferc") -> Instance:
"""Parse a single XBRL instance using XML library directly.
Expand Down Expand Up @@ -377,7 +389,13 @@ def parse(self, fact_prefix: str = "ferc") -> Instance:
else:
duration_facts[new_fact.name].append(new_fact)

return Instance(context_dict, instant_facts, duration_facts, self.name)
return Instance(
context_dict,
instant_facts,
duration_facts,
self.name,
publication_time=self.publication_time,
)


def instances_from_zip(instance_path: Path | io.BytesIO) -> list[InstanceBuilder]:
Expand All @@ -390,16 +408,52 @@ def instances_from_zip(instance_path: Path | io.BytesIO) -> list[InstanceBuilder

archive = zipfile.ZipFile(instance_path)

with archive.open("rssfeed") as f:
filings_metadata = json.loads(f.read())

publication_times = {
get_filing_name(metadata): datetime.datetime.fromisoformat(
metadata["published_parsed"]
)
for metadata in itertools.chain.from_iterable(
e.values() for e in filings_metadata.values()
)
}

# Read files into in memory buffers to parse
return [
InstanceBuilder(
io.BytesIO(archive.open(filename).read()), filename.split(".")[0]
io.BytesIO(archive.open(filename).read()),
Path(filename).stem,
publication_time=publication_times[filename],
)
for filename in archive.namelist()
if Path(filename).suffix in allowable_suffixes
]


def get_filing_name(filing_metadata: dict[str, str | int]) -> str:
"""Generate the filing filename based on its metadata, as seen in `rssfeed`.
This uses the same logic as `pudl_archiver.archivers.ferc.xbrl.archive_year`.
NOTE: the published time appears to be in America/New_York. We need to make the
archivers explictly use UTC everywhere, but until then we will force America/New_York
in this function.
"""
# TODO (daz): just put the expected filename in rssfeed also, so we don't
# have to reconstruct the name generation logic.
published_time = datetime.datetime.fromisoformat(
filing_metadata["published_parsed"]
).replace(tzinfo=ZoneInfo("America/New_York"))
return (
f"{filing_metadata['title']}_"
f"form{filing_metadata['ferc_formname'].split('_')[-1]}_"
f"{filing_metadata['ferc_period']}_"
f"{round(published_time.timestamp())}.xbrl".replace(" ", "_")
)


def get_instances(instance_path: Path | io.BytesIO) -> list[InstanceBuilder]:
"""Get list of instances from specified path.
Expand Down
28 changes: 1 addition & 27 deletions src/ferc_xbrl_extractor/xbrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,6 @@ def extract(
return ExtractOutput(table_defs=table_defs, table_data=table_data, stats=stats)


def _dedupe_newer_report_wins(df: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame:
"""Collapse all facts for a primary key into one row.
If a primary key corresponds to multiple rows of data from different
filings, treat the newer filings as updates to the older ones.
"""
if df.empty:
return df
unique_cols = [col for col in primary_key if col != "filing_name"]
old_index = df.index.names
return (
df.reset_index()
.sort_values("report_date")
.groupby(unique_cols)
.last()
.drop("report_date", axis="columns")
.reset_index()
.set_index(old_index)
)


def table_data_from_instances(
instance_builders: list[InstanceBuilder],
table_defs: dict[str, FactTable],
Expand Down Expand Up @@ -148,12 +127,7 @@ def table_data_from_instances(
for instance_name, fact_ids in batch["metadata"].items():
results["metadata"][instance_name] |= fact_ids

filings = {
table: pd.concat(dfs).pipe(
_dedupe_newer_report_wins, table_defs[table].schema.primary_key
)
for table, dfs in results["dfs"].items()
}
filings = {table: pd.concat(dfs) for table, dfs in results["dfs"].items()}
metadata = results["metadata"]
return filings, metadata

Expand Down
Binary file modified tests/integration/data/ferc1-xbrl-2021.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc1-xbrl-2022.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc2-xbrl-2021.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc2-xbrl-2022.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc6-xbrl-2021.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc6-xbrl-2022.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc60-xbrl-2021.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc60-xbrl-2022.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc714-xbrl-2021.zip
Binary file not shown.
Binary file modified tests/integration/data/ferc714-xbrl-2022.zip
Binary file not shown.
41 changes: 31 additions & 10 deletions tests/integration/data_quality_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def test_lost_facts_pct(extracted, request):
# We have unallocated data for Form 6 for some reason.
total_threshold = 0.9
per_filing_threshold = 0.8
# Assert that this is < 0.95 so we remember to fix this test once we
# Assert that this is < 0.96 so we remember to fix this test once we
# fix the bug. We don't use xfail here because the parametrization is
# at the *fixture* level, and only the lost facts tests should fail
# for form 6.
assert used_fact_ratio > total_threshold and used_fact_ratio <= 0.95
assert used_fact_ratio > total_threshold and used_fact_ratio <= 0.96
else:
total_threshold = 0.99
per_filing_threshold = 0.95
Expand All @@ -71,17 +71,38 @@ def test_lost_facts_pct(extracted, request):
assert instance_used_ratio > per_filing_threshold and instance_used_ratio <= 1


def test_deduplication(extracted):
def test_publication_time(extracted):
table_defs, table_data, _stats = extracted

for table_name, table in table_defs.items():
date_cols = ["date"] if table.instant else ["start_date", "end_date"]
# we want to make sure that any fact only comes from one filing, so we
# don't want to include filing_name in the unique_cols
if (df := table_data[table_name]).empty:
continue
unique_cols = ["entity_id"] + date_cols + table.axes
assert not df.reset_index().duplicated(unique_cols).any()
assert (
table_data[table_name]
.reset_index(level="publication_time")
.publication_time.notna()
.all()
)


def test_all_data_has_corresponding_id(extracted):
table_defs, table_data, _stats = extracted

[id_table_name] = [
name
for name in table_defs
if name.startswith("ident") and name.endswith("_duration")
]
id_table = table_data[id_table_name].reset_index()

for table_name, table in table_defs.items():
data_table = table_data[table_name]
data_table = data_table.reset_index()
merged = data_table.merge(
id_table,
how="left",
on=["entity_id", "filing_name"],
indicator=True,
)
assert (merged._merge != "left_only").all()


def test_null_values(extracted):
Expand Down
46 changes: 29 additions & 17 deletions tests/integration/datapackage_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test datapackage descriptor from taxonomy."""
import datetime
import io
from pathlib import Path

Expand Down Expand Up @@ -72,46 +73,57 @@ def _create_schema(instant=True, axes=None):
"duration",
pd.read_csv(
io.StringIO(
"c_id,entity_id,filing_name,start_date,end_date,column_one,column_two,null_col\n"
'cid_1,EID1,filing,2021-01-01,2021-12-31,"value 1","value 2",\n'
'cid_4,EID1,filing,2020-01-01,2020-12-31,"value 3","value 4",\n'
)
"c_id,entity_id,filing_name,publication_time,start_date,end_date,column_one,column_two,null_col\n"
'cid_1,EID1,filing,2023-01-01T00:00:01,2021-01-01,2021-12-31,"value 1","value 2",\n'
'cid_4,EID1,filing,2023-01-01T00:00:01,2020-01-01,2020-12-31,"value 3","value 4",\n'
),
parse_dates=["publication_time"],
),
),
(
_create_schema(instant=False, axes=["dimension_one_axis"]),
"duration",
pd.read_csv(
io.StringIO(
"c_id,entity_id,filing_name,start_date,end_date,dimension_one_axis,column_one,column_two,null_col\n"
'cid_1,EID1,filing,2021-01-01,2021-12-31,total,"value 1","value 2",\n'
'cid_4,EID1,filing,2020-01-01,2020-12-31,total,"value 3","value 4",\n'
'cid_5,EID1,filing,2020-01-01,2020-12-31,"Dim 1 Value","value 9","value 10",\n'
)
"c_id,entity_id,filing_name,publication_time,start_date,end_date,dimension_one_axis,column_one,column_two,null_col\n"
'cid_1,EID1,filing,2023-01-01T00:00:01,2021-01-01,2021-12-31,total,"value 1","value 2",\n'
'cid_4,EID1,filing,2023-01-01T00:00:01,2020-01-01,2020-12-31,total,"value 3","value 4",\n'
'cid_5,EID1,filing,2023-01-01T00:00:01,2020-01-01,2020-12-31,"Dim 1 Value","value 9","value 10",\n'
),
parse_dates=["publication_time"],
),
),
(
_create_schema(axes=["dimension_one_axis", "dimension_two_axis"]),
"instant",
pd.read_csv(
io.StringIO(
"c_id,entity_id,filing_name,date,dimension_one_axis,dimension_two_axis,column_one,column_two,null_col\n"
'cid_2,EID1,filing,2021-12-31,total,total,"value 5","value 6",\n'
'cid_3,EID1,filing,2021-12-31,"Dim 1 Value","ferc:Dimension2Value","value 7","value 8",\n'
)
"c_id,entity_id,filing_name,publication_time,date,dimension_one_axis,dimension_two_axis,column_one,column_two,null_col\n"
'cid_2,EID1,filing,2023-01-01T00:00:01,2021-12-31,total,total,"value 5","value 6",\n'
'cid_3,EID1,filing,2023-01-01T00:00:01,2021-12-31,"Dim 1 Value","ferc:Dimension2Value","value 7","value 8",\n'
),
parse_dates=["publication_time"],
),
),
],
)
def test_construct_dataframe(table_schema, period, df, in_memory_filing):
"""Test dataframe construction."""
instance_builder = InstanceBuilder(in_memory_filing, "filing")
instance_builder = InstanceBuilder(
in_memory_filing,
"filing",
publication_time=datetime.datetime(2023, 1, 1, 0, 0, 1),
)
instance = instance_builder.parse()

fact_table = FactTable(table_schema, period)

constructed_df = fact_table.construct_dataframe(instance).drop(
"report_date", axis="columns"
constructed_df = fact_table.construct_dataframe(instance).reset_index()
constructed_df = constructed_df.astype({"publication_time": "datetime64[s]"})
expected_df = (
df.set_index(table_schema.primary_key)
.drop("c_id", axis="columns")
.reset_index()
)
expected_df = df.set_index(table_schema.primary_key).drop("c_id", axis="columns")
expected_df = expected_df.astype({"publication_time": "datetime64[s]"})
pd.testing.assert_frame_equal(expected_df, constructed_df)
Loading

0 comments on commit cd9d241

Please sign in to comment.