"""A collection of denormalized FERC assets and helper functions."""
import importlib
import re
from copy import deepcopy
from functools import cached_property
from typing import Any, Literal, NamedTuple, Self
import networkx as nx
import numpy as np
import pandas as pd
from dagster import AssetIn, AssetsDefinition, Field, Mapping, asset
from matplotlib import pyplot as plt
from networkx.drawing.nx_agraph import graphviz_layout
from pandas._libs.missing import NAType as pandas_NAType
from pydantic import (
BaseModel,
ConfigDict,
ValidationInfo,
field_validator,
model_validator,
)
import pudl
from pudl.transform.ferc1 import (
GroupMetricChecks,
GroupMetricTolerances,
MetricTolerances,
)
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
EXPLOSION_CALCULATION_TOLERANCES: dict[str, GroupMetricChecks] = {
"core_ferc1__yearly_income_statements_sched114": GroupMetricChecks(
groups_to_check=[
"ungrouped",
"report_year",
"xbrl_factoid",
"utility_id_ferc1",
],
group_metric_tolerances=GroupMetricTolerances(
ungrouped=MetricTolerances(
error_frequency=0.02,
relative_error_magnitude=0.04,
null_calculated_value_frequency=1.0,
),
report_year=MetricTolerances(
error_frequency=0.036,
relative_error_magnitude=0.048,
null_calculated_value_frequency=1.0,
),
xbrl_factoid=MetricTolerances(
error_frequency=0.35,
relative_error_magnitude=0.17,
null_calculated_value_frequency=1.0,
),
utility_id_ferc1=MetricTolerances(
error_frequency=0.13,
relative_error_magnitude=0.42,
null_calculated_value_frequency=1.0,
),
),
),
"core_ferc1__yearly_balance_sheet_assets_sched110": GroupMetricChecks(
groups_to_check=[
"ungrouped",
"report_year",
"xbrl_factoid",
"utility_id_ferc1",
],
group_metric_tolerances=GroupMetricTolerances(
ungrouped=MetricTolerances(
error_frequency=0.011,
relative_error_magnitude=0.004,
null_calculated_value_frequency=1.0,
),
report_year=MetricTolerances(
error_frequency=0.12,
relative_error_magnitude=0.12,
null_calculated_value_frequency=1.0,
),
xbrl_factoid=MetricTolerances(
error_frequency=0.41,
relative_error_magnitude=0.22,
null_calculated_value_frequency=1.0,
),
utility_id_ferc1=MetricTolerances(
error_frequency=0.27,
relative_error_magnitude=0.4,
null_calculated_value_frequency=1.0,
),
),
),
"core_ferc1__yearly_balance_sheet_liabilities_sched110": GroupMetricChecks(
groups_to_check=[
"ungrouped",
"report_year",
"xbrl_factoid",
"utility_id_ferc1",
],
group_metric_tolerances=GroupMetricTolerances(
ungrouped=MetricTolerances(
error_frequency=0.048,
relative_error_magnitude=0.019,
null_calculated_value_frequency=1.0,
),
report_year=MetricTolerances(
error_frequency=0.048,
relative_error_magnitude=0.05,
null_calculated_value_frequency=1.0,
),
xbrl_factoid=MetricTolerances(
error_frequency=0.65, # worst fact: retained_earnings
relative_error_magnitude=0.17,
null_calculated_value_frequency=1.0,
),
utility_id_ferc1=MetricTolerances(
error_frequency=0.063,
relative_error_magnitude=0.16,
null_calculated_value_frequency=1.0,
),
),
),
}
"""Manually compiled metadata from DBF-only or PUDL-generated xbrl_factios.
Note: the factoids beginning with "less" here could be removed after a transition
of expectations from assuming the calculation components in any given explosion
is a tree structure to being a dag. These xbrl_factoids were added in
`transform.ferc1` and could be removed upon this transition.
"""
[docs]
def get_core_ferc1_asset_description(asset_name: str) -> str:
"""Get the asset description portion of a core FERC FORM 1 asset.
This is useful when programatically constructing output assets
from core assets using asset factories.
Args:
asset_name: The name of the core asset.
Returns:
asset_description: The asset description portion of the asset name.
"""
pattern = r"yearly_(.*?)_sched"
match = re.search(pattern, asset_name)
if match:
asset_description = match.group(1)
else:
raise ValueError(
f"The asset description can not be parsed from {asset_name}"
"because it is not a valide core FERC Form 1 asset name."
)
return asset_description
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def _out_ferc1__yearly_plants_utilities(
core_pudl__assn_ferc1_pudl_plants: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""A denormalized table containing FERC plant and utility names and IDs."""
return pd.merge(
core_pudl__assn_ferc1_pudl_plants,
core_pudl__assn_ferc1_pudl_utilities,
on="utility_id_ferc1",
)
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_steam_plants_sched402(
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
_out_ferc1__yearly_steam_plants_sched402_with_plant_ids: pd.DataFrame,
) -> pd.DataFrame:
"""Select and joins some useful fields from the FERC Form 1 steam table.
Select the FERC Form 1 steam plant table entries, add in the reporting utility's
name, and the PUDL ID for the plant and utility for readability and integration with
other tables that have PUDL IDs. Also calculates ``capacity_factor`` (based on
``net_generation_mwh`` & ``capacity_mw``)
Args:
_out_ferc1__yearly_plants_utilities: Denormalized dataframe of FERC Form 1
plants and utilities data.
_out_ferc1__yearly_steam_plants_sched402_with_plant_ids: The FERC Form 1 steam
table with imputed plant IDs to group plants across report years.
Returns:
A DataFrame containing useful fields from the FERC Form 1 steam table.
"""
steam_df = (
_out_ferc1__yearly_steam_plants_sched402_with_plant_ids.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
how="left",
)
.assign(
capacity_factor=lambda x: x.net_generation_mwh / (8760 * x.capacity_mw),
opex_fuel_per_mwh=lambda x: x.opex_fuel / x.net_generation_mwh,
opex_total_nonfuel=lambda x: x.opex_production_total
- x.opex_fuel.fillna(0),
opex_nonfuel_per_mwh=lambda x: np.where(
x.net_generation_mwh > 0,
x.opex_total_nonfuel / x.net_generation_mwh,
np.nan,
),
)
.pipe(calc_annual_capital_additions_ferc1)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_id_pudl",
"plant_id_ferc1",
"plant_name_ferc1",
],
)
)
return steam_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_small_plants_sched410(
core_ferc1__yearly_small_plants_sched410: pd.DataFrame,
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe related to the FERC Form 1 small plants."""
plants_small_df = (
core_ferc1__yearly_small_plants_sched410.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
how="left",
)
.assign(
opex_total=lambda x: (
x[["opex_fuel", "opex_maintenance", "opex_operations"]]
.fillna(0)
.sum(axis=1)
),
opex_total_nonfuel=lambda x: (x.opex_total - x.opex_fuel.fillna(0)),
)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_id_pudl",
"plant_name_ferc1",
"record_id",
],
)
)
return plants_small_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_hydroelectric_plants_sched406(
core_ferc1__yearly_hydroelectric_plants_sched406: pd.DataFrame,
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe related to the FERC Form 1 hydro plants."""
plants_hydro_df = (
core_ferc1__yearly_hydroelectric_plants_sched406.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
how="left",
)
.assign(
capacity_factor=lambda x: (x.net_generation_mwh / (8760 * x.capacity_mw)),
opex_total_nonfuel=lambda x: x.opex_total,
)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_name_ferc1",
"record_id",
],
)
)
return plants_hydro_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_pumped_storage_plants_sched408(
core_ferc1__yearly_pumped_storage_plants_sched408: pd.DataFrame,
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a dataframe of FERC Form 1 Pumped Storage plant data."""
pumped_storage_df = (
core_ferc1__yearly_pumped_storage_plants_sched408.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
how="left",
)
.assign(
capacity_factor=lambda x: x.net_generation_mwh / (8760 * x.capacity_mw),
opex_total_nonfuel=lambda x: x.opex_total,
)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_name_ferc1",
"record_id",
],
)
)
return pumped_storage_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_steam_plants_fuel_sched402(
core_ferc1__yearly_steam_plants_fuel_sched402: pd.DataFrame,
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe related to FERC Form 1 fuel information.
This function pulls the FERC Form 1 fuel data, and joins in the name of the
reporting utility, as well as the PUDL IDs for that utility and the plant, allowing
integration with other PUDL tables. Useful derived values include:
* ``fuel_consumed_mmbtu`` (total fuel heat content consumed)
* ``fuel_consumed_total_cost`` (total cost of that fuel)
"""
fuel_df = (
core_ferc1__yearly_steam_plants_fuel_sched402.assign(
fuel_consumed_mmbtu=lambda x: x["fuel_consumed_units"]
* x["fuel_mmbtu_per_unit"],
fuel_consumed_total_cost=lambda x: x["fuel_consumed_units"]
* x["fuel_cost_per_unit_burned"],
)
.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_id_pudl",
"plant_name_ferc1",
],
)
)
return fuel_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_purchased_power_and_exchanges_sched326(
core_ferc1__yearly_purchased_power_and_exchanges_sched326: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
purchased_power_df = (
core_ferc1__yearly_purchased_power_and_exchanges_sched326.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"seller_name",
"record_id",
],
)
)
return purchased_power_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_plant_in_service_sched204(
core_ferc1__yearly_plant_in_service_sched204: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a dataframe of FERC Form 1 Electric Plant in Service data."""
pis_df = core_ferc1__yearly_plant_in_service_sched204.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
],
)
return pis_df
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_balance_sheet_assets_sched110(
core_ferc1__yearly_balance_sheet_assets_sched110: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 balance sheet assets data."""
out_ferc1__yearly_balance_sheet_assets_sched110 = (
core_ferc1__yearly_balance_sheet_assets_sched110.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"asset_type",
],
)
)
return out_ferc1__yearly_balance_sheet_assets_sched110
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_balance_sheet_liabilities_sched110(
core_ferc1__yearly_balance_sheet_liabilities_sched110: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 balance_sheet liabilities data."""
out_ferc1__yearly_balance_sheet_liabilities_sched110 = (
core_ferc1__yearly_balance_sheet_liabilities_sched110.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"liability_type",
],
)
)
return out_ferc1__yearly_balance_sheet_liabilities_sched110
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_cash_flows_sched120(
core_ferc1__yearly_cash_flows_sched120: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 cash flow data."""
out_ferc1__yearly_cash_flows_sched120 = (
core_ferc1__yearly_cash_flows_sched120.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"amount_type",
],
)
)
return out_ferc1__yearly_cash_flows_sched120
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_depreciation_summary_sched336(
core_ferc1__yearly_depreciation_summary_sched336: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 depreciation amortization data."""
out_ferc1__yearly_depreciation_summary_sched336 = (
core_ferc1__yearly_depreciation_summary_sched336.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"plant_function",
"ferc_account_label",
],
)
)
return out_ferc1__yearly_depreciation_summary_sched336
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_energy_dispositions_sched401(
core_ferc1__yearly_energy_dispositions_sched401: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 energy dispositions data."""
out_ferc1__yearly_energy_dispositions_sched401 = (
core_ferc1__yearly_energy_dispositions_sched401.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"energy_disposition_type",
],
)
)
return out_ferc1__yearly_energy_dispositions_sched401
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_energy_sources_sched401(
core_ferc1__yearly_energy_sources_sched401: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_energy_sources_sched401 = (
core_ferc1__yearly_energy_sources_sched401.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"energy_source_type",
],
)
)
return out_ferc1__yearly_energy_sources_sched401
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_operating_expenses_sched320(
core_ferc1__yearly_operating_expenses_sched320: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_operating_expenses_sched320 = (
core_ferc1__yearly_operating_expenses_sched320.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"expense_type",
],
)
)
return out_ferc1__yearly_operating_expenses_sched320
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_operating_revenues_sched300(
core_ferc1__yearly_operating_revenues_sched300: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_operating_revenues_sched300 = (
core_ferc1__yearly_operating_revenues_sched300.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"revenue_type",
],
)
)
return out_ferc1__yearly_operating_revenues_sched300
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_depreciation_changes_sched219(
core_ferc1__yearly_depreciation_changes_sched219: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_depreciation_changes_sched219 = (
core_ferc1__yearly_depreciation_changes_sched219.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"depreciation_type",
"plant_status",
"utility_type",
],
)
)
return out_ferc1__yearly_depreciation_changes_sched219
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_depreciation_by_function_sched219(
core_ferc1__yearly_depreciation_by_function_sched219: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_depreciation_by_function_sched219 = (
core_ferc1__yearly_depreciation_by_function_sched219.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"plant_function",
"plant_status",
"utility_type",
],
)
)
return out_ferc1__yearly_depreciation_by_function_sched219
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_sales_by_rate_schedules_sched304(
core_ferc1__yearly_sales_by_rate_schedules_sched304: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_sales_by_rate_schedules_sched304 = (
core_ferc1__yearly_sales_by_rate_schedules_sched304.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
],
)
)
return out_ferc1__yearly_sales_by_rate_schedules_sched304
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_income_statements_sched114(
core_ferc1__yearly_income_statements_sched114: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_income_statements_sched114 = (
core_ferc1__yearly_income_statements_sched114.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"utility_type",
"income_type",
],
)
)
return out_ferc1__yearly_income_statements_sched114
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_other_regulatory_liabilities_sched278(
core_ferc1__yearly_other_regulatory_liabilities_sched278: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_other_regulatory_liabilities_sched278 = (
core_ferc1__yearly_other_regulatory_liabilities_sched278.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
],
)
)
return out_ferc1__yearly_other_regulatory_liabilities_sched278
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_retained_earnings_sched118(
core_ferc1__yearly_retained_earnings_sched118: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_retained_earnings_sched118 = (
core_ferc1__yearly_retained_earnings_sched118.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"earnings_type",
],
)
)
return out_ferc1__yearly_retained_earnings_sched118
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_transmission_lines_sched422(
core_ferc1__yearly_transmission_lines_sched422: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_transmission_lines_sched422 = (
core_ferc1__yearly_transmission_lines_sched422.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
],
)
)
return out_ferc1__yearly_transmission_lines_sched422
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_utility_plant_summary_sched200(
core_ferc1__yearly_utility_plant_summary_sched200: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Pull a useful dataframe of FERC Form 1 Purchased Power data."""
out_ferc1__yearly_utility_plant_summary_sched200 = (
core_ferc1__yearly_utility_plant_summary_sched200.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
).pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"record_id",
"utility_type",
"utility_plant_asset_type",
],
)
)
return out_ferc1__yearly_utility_plant_summary_sched200
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
[docs]
def out_ferc1__yearly_all_plants(
out_ferc1__yearly_steam_plants_sched402: pd.DataFrame,
out_ferc1__yearly_small_plants_sched410: pd.DataFrame,
out_ferc1__yearly_hydroelectric_plants_sched406: pd.DataFrame,
out_ferc1__yearly_pumped_storage_plants_sched408: pd.DataFrame,
) -> pd.DataFrame:
"""Combine the steam, small generators, hydro, and pumped storage tables.
While this table may have many purposes, the main one is to prepare it for
integration with the EIA Master Unit List (MUL). All subtables included in this
output table must have pudl ids. Table prepping involves ensuring that the
individual tables can merge correctly (like columns have the same name) both with
each other and the EIA MUL.
"""
# Prep steam table
logger.debug("prepping steam table")
steam_df = out_ferc1__yearly_steam_plants_sched402.rename(
columns={"opex_plants": "opex_plant"}
)
# Prep hydro tables (Add this to the meta data later)
logger.debug("prepping hydro tables")
hydro_df = out_ferc1__yearly_hydroelectric_plants_sched406.rename(
columns={"project_num": "ferc_license_id"}
)
pump_df = out_ferc1__yearly_pumped_storage_plants_sched408.rename(
columns={"project_num": "ferc_license_id"}
)
# Combine all the tables together
logger.debug("combining all tables")
all_df = (
pd.concat(
[steam_df, out_ferc1__yearly_small_plants_sched410, hydro_df, pump_df]
)
.rename(
columns={
"fuel_cost": "total_fuel_cost",
"fuel_mmbtu": "total_mmbtu",
"opex_fuel_per_mwh": "fuel_cost_per_mwh",
"primary_fuel_by_mmbtu": "fuel_type_code_pudl",
}
)
.replace({"": np.nan})
)
return all_df
@asset(
io_manager_key="pudl_io_manager",
config_schema={
"thresh": Field(
float,
default_value=0.5,
description=(
"Minimum fraction of fuel (cost and mmbtu) required in order for a "
"plant to be assigned a primary fuel. Must be between 0.5 and 1.0. "
"Default value is 0.5."
),
)
},
compute_kind="Python",
)
[docs]
def out_ferc1__yearly_steam_plants_fuel_by_plant_sched402(
context,
core_ferc1__yearly_steam_plants_fuel_sched402: pd.DataFrame,
_out_ferc1__yearly_plants_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Summarize FERC fuel data by plant for output.
This is mostly a wrapper around
:func:`pudl.analysis.record_linkage.classify_plants_ferc1.fuel_by_plant_ferc1`
which calculates some summary values on a per-plant basis (as indicated
by ``utility_id_ferc1`` and ``plant_name_ferc1``) related to fuel
consumption.
Args:
context: Dagster context object
core_ferc1__yearly_steam_plants_fuel_sched402: Normalized FERC fuel table.
_out_ferc1__yearly_plants_utilities: Denormalized table of FERC1 plant & utility
IDs.
Returns:
A DataFrame with fuel use summarized by plant.
"""
def drop_other_fuel_types(df):
"""Internal function to drop other fuel type.
Fuel type other indicates we didn't know how to categorize the reported fuel
type, which leads to records with incomplete and unsable data.
"""
return df[df.fuel_type_code_pudl != "other"].copy()
thresh = context.op_config["thresh"]
# The existing function expects `fuel_type_code_pudl` to be an object, rather than
# a category. This is a legacy of pre-dagster code, and we convert here to prevent
# further retooling in the code-base.
core_ferc1__yearly_steam_plants_fuel_sched402["fuel_type_code_pudl"] = (
core_ferc1__yearly_steam_plants_fuel_sched402["fuel_type_code_pudl"].astype(str)
)
fuel_categories = list(
pudl.transform.ferc1.SteamPlantsFuelTableTransformer()
.params.categorize_strings["fuel_type_code_pudl"]
.categories.keys()
)
fbp_df = (
core_ferc1__yearly_steam_plants_fuel_sched402.pipe(drop_other_fuel_types)
.pipe(
pudl.analysis.fuel_by_plant.fuel_by_plant_ferc1,
fuel_categories=fuel_categories,
thresh=thresh,
)
.pipe(pudl.analysis.fuel_by_plant.revert_filled_in_float_nulls)
.pipe(pudl.analysis.fuel_by_plant.revert_filled_in_string_nulls)
.merge(
_out_ferc1__yearly_plants_utilities,
on=["utility_id_ferc1", "plant_name_ferc1"],
)
.pipe(
pudl.helpers.organize_cols,
[
"report_year",
"utility_id_ferc1",
"utility_id_pudl",
"utility_name_ferc1",
"plant_id_pudl",
"plant_name_ferc1",
],
)
)
return fbp_df
###########################################################################
# HELPER FUNCTIONS
###########################################################################
[docs]
def calc_annual_capital_additions_ferc1(
steam_df: pd.DataFrame, window: int = 3
) -> pd.DataFrame:
"""Calculate annual capital additions for FERC1 steam records.
Convert the capex_total column into annual capital additons the
`capex_total` column is the cumulative capital poured into the plant over
time. This function takes the annual difference should generate the annual
capial additions. It also want generates a rolling average, to smooth out
the big annual fluxuations.
Args:
steam_df: result of `prep_plants_ferc()`
window: number of years for window to generate rolling average. Argument for
:func:`pudl.helpers.generate_rolling_avg`
Returns:
Augemented version of steam_df with two additional columns:
``capex_annual_addition`` and ``capex_annual_addition_rolling``.
"""
idx_steam_no_date = ["utility_id_ferc1", "plant_id_ferc1"]
# we need to sort the df so it lines up w/ the groupby
steam_df = steam_df.assign(
report_date=lambda x: pd.to_datetime(x.report_year, format="%Y")
).sort_values(idx_steam_no_date + ["report_date"])
steam_df = steam_df.assign(
capex_wo_retirement_total=lambda x: x.capex_equipment.fillna(0)
+ x.capex_land.fillna(0)
+ x.capex_structures.fillna(0)
)
# we group on everything but the year so the groups are multi-year unique
# plants the shift happens within these multi-year plant groups
steam_df["capex_total_shifted"] = steam_df.groupby(idx_steam_no_date)[
["capex_wo_retirement_total"]
].shift()
steam_df = steam_df.assign(
capex_annual_addition=lambda x: x.capex_wo_retirement_total
- x.capex_total_shifted
)
addts = pudl.helpers.generate_rolling_avg(
steam_df,
group_cols=idx_steam_no_date,
data_col="capex_annual_addition",
window=window,
)
steam_df_w_addts = pd.merge(
steam_df,
addts[
idx_steam_no_date
+ [
"report_date",
"capex_wo_retirement_total",
"capex_annual_addition_rolling",
]
],
on=idx_steam_no_date + ["report_date", "capex_wo_retirement_total"],
how="left",
).assign(
capex_annual_per_mwh=lambda x: x.capex_annual_addition / x.net_generation_mwh,
capex_annual_per_mw=lambda x: x.capex_annual_addition / x.capacity_mw,
capex_annual_per_kw=lambda x: x.capex_annual_addition / x.capacity_mw / 1000,
capex_annual_per_mwh_rolling=lambda x: x.capex_annual_addition_rolling
/ x.net_generation_mwh,
capex_annual_per_mw_rolling=lambda x: x.capex_annual_addition_rolling
/ x.capacity_mw,
)
steam_df_w_addts = add_mean_cap_additions(steam_df_w_addts)
# bb tests for volumne of negative annual capex
neg_cap_addts = len(
steam_df_w_addts[steam_df_w_addts.capex_annual_addition_rolling < 0]
) / len(steam_df_w_addts)
neg_cap_addts_mw = (
steam_df_w_addts[
steam_df_w_addts.capex_annual_addition_rolling < 0
].net_generation_mwh.sum()
/ steam_df_w_addts.net_generation_mwh.sum()
)
message = (
f"{neg_cap_addts:.02%} records have negative capitial additions"
f": {neg_cap_addts_mw:.02%} of capacity"
)
if neg_cap_addts > 0.1:
logger.warning(message)
else:
logger.info(message)
return steam_df_w_addts.drop(
columns=[
"report_date",
"capex_total_shifted",
"capex_annual_addition_gen_mean",
"capex_annual_addition_gen_std",
"capex_annual_addition_diff_mean",
]
)
[docs]
def add_mean_cap_additions(steam_df):
"""Add mean capital additions over lifetime of plant."""
idx_steam_no_date = ["utility_id_ferc1", "plant_id_ferc1"]
gb_cap_an = steam_df.groupby(idx_steam_no_date)[["capex_annual_addition"]]
# calcuate the standard deviatoin of each generator's capex over time
df = (
steam_df.merge(
gb_cap_an.std()
.add_suffix("_gen_std")
.reset_index()
.pipe(pudl.helpers.convert_cols_dtypes, "ferc1"),
how="left",
on=idx_steam_no_date,
validate="m:1",
)
.merge(
gb_cap_an.mean()
.add_suffix("_gen_mean")
.reset_index()
.pipe(pudl.helpers.convert_cols_dtypes, "ferc1"),
how="left",
on=idx_steam_no_date,
validate="m:1",
)
.assign(
capex_annual_addition_diff_mean=lambda x: x.capex_annual_addition
- x.capex_annual_addition_gen_mean,
)
)
return df
#########
# Explode
#########
[docs]
class NodeId(NamedTuple):
"""The primary keys which identify a node in a calculation tree.
Since NodeId is just a :class:`NamedTuple` a list of NodeId instances can also be
used to index into a :class:pandas.DataFrame` that uses these fields as its index.
This is convenient since many :mod:`networkx` functions and methods return iterable
containers of graph nodes, which can be turned into lists and used directly to index
into dataframes.
The additional dimensions (``utility_type``, ``plant_status``, and
``plant_function``) each have a small number of allowable values, which we could
further impose as constraints on the values here using Pydantic if we wanted.
"""
[docs]
utility_type: str | pandas_NAType
[docs]
plant_status: str | pandas_NAType
[docs]
plant_function: str | pandas_NAType
[docs]
class OffByFactoid(NamedTuple):
"""A calculated factoid which is off by one other factoid.
A factoid where a sizeable majority of utilities are using a non-standard and
non-reported calculation to generate it. These calculated factoids are either
missing one factoid, or include an additional factoid not included in the FERC
metadata. Thus, the calculations are 'off by' this factoid.
"""
[docs]
utility_type: str | pandas_NAType
[docs]
plant_status: str | pandas_NAType
[docs]
plant_function: str | pandas_NAType
[docs]
xbrl_factoid_off_by: str
[docs]
utility_type_off_by: str | pandas_NAType
[docs]
plant_status_off_by: str | pandas_NAType
[docs]
plant_function_off_by: str | pandas_NAType
@asset
[docs]
def exploded_table_asset_factory(
root_table: str,
table_names: list[str],
seed_nodes: list[NodeId],
group_metric_checks: GroupMetricChecks,
off_by_facts: list[OffByFactoid],
io_manager_key: str | None = None,
) -> AssetsDefinition:
"""Create an exploded table based on a set of related input tables."""
ins: Mapping[str, AssetIn] = {
"_core_ferc1_xbrl__metadata": AssetIn("_core_ferc1_xbrl__metadata"),
"_core_ferc1_xbrl__calculation_components": AssetIn(
"_core_ferc1_xbrl__calculation_components"
),
"_out_ferc1__detailed_tags": AssetIn("_out_ferc1__detailed_tags"),
}
ins |= {table_name: AssetIn(table_name) for table_name in table_names}
@asset(
name=f"_out_ferc1__detailed_{get_core_ferc1_asset_description(root_table)}",
ins=ins,
io_manager_key=io_manager_key,
)
def exploded_tables_asset(
**kwargs: dict[str, pd.DataFrame],
) -> pd.DataFrame:
_core_ferc1_xbrl__metadata = kwargs["_core_ferc1_xbrl__metadata"]
_core_ferc1_xbrl__calculation_components = kwargs[
"_core_ferc1_xbrl__calculation_components"
]
tags = kwargs["_out_ferc1__detailed_tags"]
tables_to_explode = {
name: df
for (name, df) in kwargs.items()
if name
not in [
"_core_ferc1_xbrl__metadata",
"_core_ferc1_xbrl__calculation_components",
"_out_ferc1__detailed_tags",
"off_by_facts",
]
}
return Exploder(
table_names=tables_to_explode.keys(),
root_table=root_table,
metadata_xbrl_ferc1=_core_ferc1_xbrl__metadata,
calculation_components_xbrl_ferc1=_core_ferc1_xbrl__calculation_components,
seed_nodes=seed_nodes,
tags=tags,
group_metric_checks=group_metric_checks,
off_by_facts=off_by_facts,
).boom(tables_to_explode=tables_to_explode)
return exploded_tables_asset
[docs]
EXPLOSION_ARGS = [
{
"root_table": "core_ferc1__yearly_income_statements_sched114",
"table_names": [
"core_ferc1__yearly_income_statements_sched114",
"core_ferc1__yearly_depreciation_summary_sched336",
"core_ferc1__yearly_operating_expenses_sched320",
"core_ferc1__yearly_operating_revenues_sched300",
],
"group_metric_checks": EXPLOSION_CALCULATION_TOLERANCES[
"core_ferc1__yearly_income_statements_sched114"
],
"seed_nodes": [
NodeId(
table_name="core_ferc1__yearly_income_statements_sched114",
xbrl_factoid="net_income_loss",
utility_type="total",
plant_status=pd.NA,
plant_function=pd.NA,
),
],
"off_by_facts": [],
},
{
"root_table": "core_ferc1__yearly_balance_sheet_assets_sched110",
"table_names": [
"core_ferc1__yearly_balance_sheet_assets_sched110",
"core_ferc1__yearly_utility_plant_summary_sched200",
"core_ferc1__yearly_plant_in_service_sched204",
"core_ferc1__yearly_depreciation_by_function_sched219",
],
"group_metric_checks": EXPLOSION_CALCULATION_TOLERANCES[
"core_ferc1__yearly_balance_sheet_assets_sched110"
],
"seed_nodes": [
NodeId(
table_name="core_ferc1__yearly_balance_sheet_assets_sched110",
xbrl_factoid="assets_and_other_debits",
utility_type="total",
plant_status=pd.NA,
plant_function=pd.NA,
)
],
"off_by_facts": [
OffByFactoid(
"core_ferc1__yearly_utility_plant_summary_sched200",
"utility_plant_in_service_classified_and_property_under_capital_leases",
"electric",
pd.NA,
pd.NA,
"core_ferc1__yearly_utility_plant_summary_sched200",
"utility_plant_in_service_completed_construction_not_classified",
"electric",
pd.NA,
pd.NA,
),
OffByFactoid(
"core_ferc1__yearly_utility_plant_summary_sched200",
"utility_plant_in_service_classified_and_property_under_capital_leases",
"electric",
pd.NA,
pd.NA,
"core_ferc1__yearly_utility_plant_summary_sched200",
"utility_plant_in_service_property_under_capital_leases",
"electric",
pd.NA,
pd.NA,
),
OffByFactoid(
"core_ferc1__yearly_utility_plant_summary_sched200",
"depreciation_utility_plant_in_service",
"electric",
pd.NA,
pd.NA,
"core_ferc1__yearly_utility_plant_summary_sched200",
"amortization_of_other_utility_plant_utility_plant_in_service",
"electric",
pd.NA,
pd.NA,
),
],
},
{
"root_table": "core_ferc1__yearly_balance_sheet_liabilities_sched110",
"table_names": [
"core_ferc1__yearly_balance_sheet_liabilities_sched110",
"core_ferc1__yearly_retained_earnings_sched118",
],
"group_metric_checks": EXPLOSION_CALCULATION_TOLERANCES[
"core_ferc1__yearly_balance_sheet_liabilities_sched110"
],
"seed_nodes": [
NodeId(
table_name="core_ferc1__yearly_balance_sheet_liabilities_sched110",
xbrl_factoid="liabilities_and_other_credits",
utility_type="total",
plant_status=pd.NA,
plant_function=pd.NA,
)
],
"off_by_facts": [],
},
]
[docs]
def create_exploded_table_assets() -> list[AssetsDefinition]:
"""Create a list of exploded FERC Form 1 assets.
Returns:
A list of :class:`AssetsDefinitions` where each asset is an exploded FERC Form 1
table.
"""
return [exploded_table_asset_factory(**kwargs) for kwargs in EXPLOSION_ARGS]
[docs]
exploded_ferc1_assets = create_exploded_table_assets()
[docs]
class Exploder:
"""Get unique, granular datapoints from a set of related, nested FERC1 tables."""
def __init__(
self: Self,
table_names: list[str],
root_table: str,
metadata_xbrl_ferc1: pd.DataFrame,
calculation_components_xbrl_ferc1: pd.DataFrame,
seed_nodes: list[NodeId],
tags: pd.DataFrame = pd.DataFrame(),
group_metric_checks: GroupMetricChecks = GroupMetricChecks(),
off_by_facts: list[OffByFactoid] = None,
):
"""Instantiate an Exploder class.
Args:
table_names: list of table names to explode.
root_table: the table at the base of the tree of tables_to_explode.
metadata_xbrl_ferc1: table of factoid-level metadata.
calculation_components_xbrl_ferc1: table of calculation components.
seed_nodes: NodeIds to use as seeds for the calculation forest.
tags: Additional metadata to merge onto the exploded dataframe.
"""
self.table_names: list[str] = table_names
self.root_table: str = root_table
self.group_metric_checks = group_metric_checks
self.metadata_xbrl_ferc1 = metadata_xbrl_ferc1
self.calculation_components_xbrl_ferc1 = calculation_components_xbrl_ferc1
self.seed_nodes = seed_nodes
self.tags = tags
self.off_by_facts = off_by_facts
@cached_property
[docs]
def exploded_calcs(self: Self):
"""Remove any calculation components that aren't relevant to the explosion.
At the end of this process several things should be true:
- Only parents with table_name in the explosion tables should be retained.
- All calculations in which *any* components were outside of the tables in
the explosion should be turned into leaves -- i.e. they should be replaced
with a single calculation component filled with NA values.
- Every remaining calculation component must also appear as a parent (if it
is a leaf, then it will have a single null calculation component)
- There should be no records where only one of table_name or xbrl_factoid
are null. They should either both or neither be null.
- table_name_parent and xbrl_factoid_parent should be non-null.
"""
calc_cols = list(NodeId._fields)
parent_cols = [col + "_parent" for col in calc_cols]
# Keep only records where both parent and child facts are in exploded tables:
calc_explode = self.calculation_components_xbrl_ferc1[
self.calculation_components_xbrl_ferc1.table_name_parent.isin(
self.table_names
)
].copy()
# Groupby parent factoids
gb = calc_explode.groupby(
["table_name_parent", "xbrl_factoid_parent"], as_index=False
)
# Identify groups where **all** calculation components are within the explosion
calc_explode["is_in_explosion"] = gb.table_name.transform(
lambda x: x.isin(self.table_names).all()
)
# Keep only calculations in which ALL calculation components are in explosion
# Restrict columns to the ones we actually need. Drop duplicates and order
# things for legibility.
calc_explode = (
calc_explode[calc_explode.is_in_explosion]
.loc[
:,
parent_cols
+ calc_cols
+ ["weight", "is_within_table_calc", "is_total_to_subdimensions_calc"],
]
.drop_duplicates()
.set_index(parent_cols + calc_cols)
.sort_index()
.reset_index()
)
calc_explode = self.add_sizable_minority_corrections_to_calcs(calc_explode)
##############################################################################
# Everything below here is error checking / debugging / temporary fixes
dupes = calc_explode.duplicated(subset=parent_cols + calc_cols, keep=False)
if dupes.any():
logger.warning(
"Consolidating non-unique associations found in exploded_calcs:\n"
f"{calc_explode.loc[dupes]}"
)
# Drop all duplicates with null weights -- this is a temporary fix to an issue
# from upstream.
# TODO: remove once things are fixed in calculation_components_xbrl_ferc1
calc_explode = calc_explode.loc[~(dupes & calc_explode.weight.isna())]
assert not calc_explode.duplicated(
subset=parent_cols + calc_cols, keep=False
).any()
# There should be no cases where only one of table_name or xbrl_factoid is NA:
partially_null = calc_explode[
calc_explode.table_name.isna() ^ calc_explode.xbrl_factoid.isna()
]
if not partially_null.empty:
logger.error(
"Found unacceptably null calculation components. Dropping!\n"
f"{partially_null[['table_name', 'xbrl_factoid']]}"
)
calc_explode = calc_explode.drop(index=partially_null)
# These should never be NA:
assert calc_explode.xbrl_factoid_parent.notna().all()
assert calc_explode.table_name_parent.notna().all()
return calc_explode
[docs]
def add_sizable_minority_corrections_to_calcs(
self: Self, exploded_calcs: pd.DataFrame
) -> pd.DataFrame:
"""Add correction calculation records for the sizable fuck up utilities."""
if not self.off_by_facts:
return exploded_calcs
facts_to_fix = (
pd.DataFrame(self.off_by_facts)
.rename(columns={col: f"{col}_parent" for col in NodeId._fields})
.assign(
xbrl_factoid=(
lambda x: x.xbrl_factoid_parent
+ "_off_by_"
+ x.xbrl_factoid_off_by
+ "_correction"
),
weight=1,
is_total_to_subdimensions_calc=False,
# theses fixes rn are only from inter-table calcs.
# if they weren't we'd need to check within the group of
# the parent fact like in process_xbrl_metadata_calculations
is_within_table_calc=False,
)
.drop(columns=["xbrl_factoid_off_by"])
)
facts_to_fix.columns = facts_to_fix.columns.str.removesuffix("_off_by")
return pd.concat(
[
exploded_calcs,
facts_to_fix[exploded_calcs.columns].astype(exploded_calcs.dtypes),
]
)
@cached_property
@cached_property
[docs]
def calculation_forest(self: Self) -> "XbrlCalculationForestFerc1":
"""Construct a calculation forest based on class attributes."""
return XbrlCalculationForestFerc1(
exploded_calcs=self.exploded_calcs,
seeds=self.seed_nodes,
tags=self.tags,
group_metric_checks=self.group_metric_checks,
)
@cached_property
[docs]
def dimensions(self: Self) -> list[str]:
"""Get all of the column names for the other dimensions."""
return pudl.transform.ferc1.other_dimensions(table_names=self.table_names)
@cached_property
[docs]
def exploded_pks(self: Self) -> list[str]:
"""Get the joint primary keys of the exploded tables."""
pks = []
for table_name in self.table_names:
xbrl_factoid_name = pudl.transform.ferc1.table_to_xbrl_factoid_name()[
table_name
]
pks.append(
[
col
for col in pudl.metadata.classes.Resource.from_id(
table_name
).schema.primary_key
if col != xbrl_factoid_name
]
)
# Some xbrl_factoid names are the same in more than one table, so we also add
# table_name here.
pks = [
"table_name",
"xbrl_factoid",
] + pudl.helpers.dedupe_n_flatten_list_of_lists(pks)
return pks
@cached_property
[docs]
def value_col(self: Self) -> str:
"""Get the value column for the exploded tables."""
value_cols = []
for table_name in self.table_names:
value_cols.append(
pudl.transform.ferc1.FERC1_TFR_CLASSES[
table_name
]().params.reconcile_table_calculations.column_to_check
)
if len(set(value_cols)) != 1:
raise ValueError(
"Exploding FERC tables requires tables with only one value column. Got: "
f"{set(value_cols)}"
)
value_col = list(set(value_cols))[0]
return value_col
@property
[docs]
def calc_idx(self: Self) -> list[str]:
"""Primary key columns for calculations in this explosion."""
return [col for col in list(NodeId._fields) if col in self.exploded_pks]
[docs]
def prep_table_to_explode(
self: Self, table_name: str, table_df: pd.DataFrame
) -> pd.DataFrame:
"""Assign table name and rename factoid column in preparation for explosion."""
xbrl_factoid_name = pudl.transform.ferc1.FERC1_TFR_CLASSES[
table_name
]().params.xbrl_factoid_name
table_df = table_df.assign(table_name=table_name).rename(
columns={xbrl_factoid_name: "xbrl_factoid"}
)
return table_df
[docs]
def boom(self: Self, tables_to_explode: dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Explode a set of nested tables.
There are five main stages of this process:
#. Prep all of the individual tables for explosion.
#. Concatenate all of the tabels together.
#. Remove duplication in the concatenated exploded table.
#. Annotate the fine-grained data with additional metadata.
#. Validate that calculated top-level values are correct. (not implemented)
Args:
tables_to_explode: dictionary of table name (key) to transfomed table (value).
"""
exploded = (
self.initial_explosion_concatenation(tables_to_explode)
.pipe(self.calculate_intertable_non_total_calculations)
.pipe(self.add_sizable_minority_corrections)
.pipe(self.reconcile_intertable_calculations)
.pipe(self.calculation_forest.leafy_data, value_col=self.value_col)
)
# Identify which columns should be kept in the output...
# TODO: Define schema for the tables explicitly.
cols_to_keep = list(set(self.exploded_pks + self.dimensions + [self.value_col]))
if ("utility_type" in cols_to_keep) and (
"utility_type_other" in exploded.columns
):
cols_to_keep += ["utility_type_other"]
cols_to_keep += exploded.filter(regex="tags.*").columns.to_list()
cols_to_keep += [
"ferc_account",
"row_type_xbrl",
]
# TODO: Validate the root node calculations.
# Verify that we get the same values for the root nodes using only the input
# data from the leaf nodes:
# root_calcs = self.calculation_forest.root_calculations
return exploded[cols_to_keep].convert_dtypes()
[docs]
def initial_explosion_concatenation(
self, tables_to_explode: dict[str, pd.DataFrame]
) -> pd.DataFrame:
"""Concatenate all of the tables for the explosion.
Merge in some basic pieces of the each table's metadata and add ``table_name``.
At this point in the explosion, there will be a lot of duplicaiton in the
output.
"""
logger.info("Explode: CONCAT!")
explosion_tables = []
# GRAB/PREP EACH TABLE
for table_name, table_df in tables_to_explode.items():
tbl = self.prep_table_to_explode(table_name, table_df)
explosion_tables.append(tbl)
exploded = pd.concat(explosion_tables)
# Identify which dimensions apply to the curent explosion -- not all collections
# of tables have all dimensions.
meta_idx = list(NodeId._fields)
missing_dims = list(set(meta_idx).difference(exploded.columns))
# Missing dimensions SHOULD be entirely null in the metadata, if so we can drop
if not self.exploded_meta.loc[:, missing_dims].isna().all(axis=None):
raise AssertionError(
f"Expected missing metadata dimensions {missing_dims} to be null."
)
exploded_meta = self.exploded_meta.drop(columns=missing_dims)
meta_idx = list(set(meta_idx).difference(missing_dims))
# drop any metadata columns that appear in the data tables, because we may have
# edited them in the metadata table, and want the edited version to take
# precedence
cols_to_keep = list(
set(exploded.columns).difference(exploded_meta.columns).union(meta_idx)
)
exploded = pd.merge(
left=exploded.loc[:, cols_to_keep],
right=exploded_meta,
how="left",
on=meta_idx,
validate="m:1",
)
return exploded
[docs]
def calculate_intertable_non_total_calculations(
self, exploded: pd.DataFrame
) -> pd.DataFrame:
"""Calculate the inter-table non-total calculable xbrl_factoids."""
# add the abs_diff column for the calculated fields
calculated_df = pudl.transform.ferc1.calculate_values_from_components(
calculation_components=self.exploded_calcs[
~self.exploded_calcs.is_within_table_calc
& ~self.exploded_calcs.is_total_to_subdimensions_calc
],
data=exploded,
calc_idx=self.calc_idx,
value_col=self.value_col,
calc_to_data_merge_validation="many_to_many",
)
return calculated_df
[docs]
def add_sizable_minority_corrections(
self: Self, calculated_df: pd.DataFrame
) -> pd.DataFrame:
"""Identify and fix the utilities that report calcs off-by one other fact.
We noticed that there are a sizable minority of utilities that report some
calculated values with a different set of child subcomponents. Our tooling
for these calculation expects all utilities to report in the same manner.
So we have identified the handful of worst calculable ``xbrl_factiod``
offenders :attr:`self.off_by_facts`. This method identifies data corrections
for those :attr:`self.off_by_facts` and adds them into the exploded data table.
The data corrections are identified by calculating the absolute difference
between the reported value and calculable value from the standard set of
subcomponents (via :func:`pudl.transform.ferc1.calculate_values_from_components`)
and finding the child factiods that have the same value as the absolute difference.
This indicates that the calculable parent factiod is off by that cooresponding
child fact.
Relatedly, :meth:`add_sizable_minority_corrections_to_calcs` adds these
:attr:`self.off_by_facts` to :attr:`self.exploded_calcs`.
"""
if not self.off_by_facts:
return calculated_df
off_by = pd.DataFrame(self.off_by_facts)
not_close = (
calculated_df[~np.isclose(calculated_df.abs_diff, 0)]
.set_index(list(NodeId._fields))
# grab the parent side of the off_by facts
.loc[off_by.set_index(list(NodeId._fields)).index.unique()]
.reset_index()
)
off_by_fact = (
calculated_df[calculated_df.row_type_xbrl != "correction"]
.set_index(list(NodeId._fields))
# grab the child side of the off_by facts
.loc[
off_by.set_index(
[f"{col}_off_by" for col in list(NodeId._fields)]
).index.unique()
]
.reset_index()
)
# Idenfity the calculations with one missing other fact
# when the diff is the same as the value of another fact, then that
# calculated value could have been perfect with the addition of the
# missing facoid.
data_corrections = (
pd.merge(
left=not_close,
right=off_by_fact,
left_on=["report_year", "utility_id_ferc1", "abs_diff"],
right_on=["report_year", "utility_id_ferc1", self.value_col],
how="inner",
suffixes=("", "_off_by"),
)
# use a dict/kwarg for assign so we can dynamically set the name of value_col
# the correction value is the diff - not the abs_diff from not_close or value_col
# from off_by_fact bc often the utility included a fact so this diff is negative
.assign(
**{
"xbrl_factoid": (
lambda x: x.xbrl_factoid
+ "_off_by_"
+ x.xbrl_factoid_off_by
+ "_correction"
),
self.value_col: lambda x: x["diff"],
"row_type_xbrl": "correction",
}
)[
# drop all the _off_by and calc cols
list(NodeId._fields)
+ ["report_year", "utility_id_ferc1", self.value_col, "row_type_xbrl"]
]
)
bad_utils = data_corrections.utility_id_ferc1.unique()
logger.info(
f"Adding {len(data_corrections)} from {len(bad_utils)} utilities that report differently."
)
dtype_calced = {
col: dtype
for (col, dtype) in calculated_df.dtypes.items()
if col in data_corrections.columns
}
corrected = pd.concat(
[calculated_df, data_corrections.astype(dtype_calced)]
).set_index(self.calc_idx)
# now we are going to re-calculate just the fixed facts
fixed_facts = corrected.loc[off_by.set_index(self.calc_idx).index.unique()]
unfixed_facts = corrected.loc[
corrected.index.difference(off_by.set_index(self.calc_idx).index.unique())
].reset_index()
fixed_facts = self.calculate_intertable_non_total_calculations(
exploded=fixed_facts.drop(
columns=["calculated_value", "is_calc"]
).reset_index()
)
return pd.concat([unfixed_facts, fixed_facts.astype(unfixed_facts.dtypes)])
[docs]
def reconcile_intertable_calculations(
self: Self, exploded: pd.DataFrame
) -> pd.DataFrame:
"""Generate calculated values for inter-table calculated factoids.
This function sums components of calculations for a given factoid when the
components originate entirely or partially outside of the table. It also
accounts for components that only sum to a factoid within a particular dimension
(e.g., for an electric utility or for plants whose plant_function is
"in_service"). This returns a dataframe with a "calculated_value" column.
Args:
exploded: concatenated tables for table explosion.
"""
calculations_intertable = self.exploded_calcs[
~self.exploded_calcs.is_within_table_calc
]
if calculations_intertable.empty:
return exploded
logger.info(
f"{self.root_table}: Reconcile inter-table calculations: "
f"{list(calculations_intertable.xbrl_factoid.unique())}."
)
logger.info("Checking inter-table, non-total to subdimension calcs.")
# we've added calculated fields via calculate_intertable_non_total_calculations
calculated_df = pudl.transform.ferc1.check_calculation_metrics(
calculated_df=exploded, group_metric_checks=self.group_metric_checks
)
calculated_df = pudl.transform.ferc1.add_corrections(
calculated_df=calculated_df,
value_col=self.value_col,
is_close_tolerance=pudl.transform.ferc1.IsCloseTolerance(),
table_name=self.root_table,
is_subdimension=False,
)
logger.info("Checking sub-total calcs.")
subdimension_calcs = pudl.transform.ferc1.calculate_values_from_components(
calculation_components=calculations_intertable[
calculations_intertable.is_total_to_subdimensions_calc
],
data=exploded.drop(columns=["calculated_value", "is_calc"]),
calc_idx=self.calc_idx,
value_col=self.value_col,
)
subdimension_calcs = pudl.transform.ferc1.check_calculation_metrics(
calculated_df=subdimension_calcs,
group_metric_checks=self.group_metric_checks,
)
return calculated_df
[docs]
def in_explosion_tables(table_name: str, in_explosion_table_names: list[str]) -> bool:
"""Determine if any of a list of table_names in the list of thre explosion tables.
Args:
table_name: tables name. Typically from the ``source_tables`` element from an
xbrl calculation component
in_explosion_table_names: list of tables involved in a particular set of
exploded tables.
"""
return table_name in in_explosion_table_names
################################################################################
# XBRL Calculation Forests
################################################################################
[docs]
class XbrlCalculationForestFerc1(BaseModel):
"""A class for manipulating groups of hierarchically nested XBRL calculations.
We expect that the facts reported in high-level FERC tables like
:ref:`core_ferc1__yearly_income_statements_sched114` and
:ref:`core_ferc1__yearly_balance_sheet_assets_sched110` should be
calculable from many individually reported granular values, based on the
calculations encoded in the XBRL Metadata, and that these relationships should have
a hierarchical tree structure. Several individual values from the higher level
tables will appear as root nodes at the top of each hierarchy, and the leaves in
the underlying tree structure are the individually reported non-calculated values
that make them up. Because the top-level tables have several distinct values in
them, composed of disjunct sets of reported values, we have a forest (a group of
several trees) rather than a single tree.
The information required to build a calculation forest is most readily found in the
:meth:`Exploder.exploded_calcs` A list of seed nodes can also be supplied,
indicating which nodes must be present in the resulting forest. This can be used to
prune irrelevant portions of the overall forest out of the exploded metadata. If no
seeds are provided, then all of the nodes referenced in the exploded_calcs input
dataframe will be used as seeds.
This class makes heavy use of :mod:`networkx` to manage the graph that we build
from calculation relationships.
"""
# Not sure if dynamically basing this on NodeId is really a good idea here.
[docs]
calc_cols: list[str] = list(NodeId._fields)
[docs]
exploded_calcs: pd.DataFrame = pd.DataFrame()
[docs]
seeds: list[NodeId] = []
[docs]
group_metric_checks: GroupMetricChecks = GroupMetricChecks()
[docs]
model_config = ConfigDict(
arbitrary_types_allowed=True, ignored_types=(cached_property,)
)
@property
[docs]
def parent_cols(self: Self) -> list[str]:
"""Construct parent_cols based on the provided calc_cols."""
return [col + "_parent" for col in self.calc_cols]
@model_validator(mode="after")
[docs]
def unique_associations(self: Self):
"""Ensure parent-child associations in exploded calculations are unique."""
pks = self.calc_cols + self.parent_cols
dupes = self.exploded_calcs.duplicated(subset=pks, keep=False)
if dupes.any():
logger.warning(
"Consolidating non-unique associations found in exploded_calcs:\n"
f"{self.exploded_calcs.loc[dupes]}"
)
assert not self.exploded_calcs.duplicated(subset=pks, keep=False).any()
return self
@model_validator(mode="after")
[docs]
def calcs_have_required_cols(self: Self):
"""Ensure exploded calculations include all required columns."""
required_cols = self.parent_cols + self.calc_cols + ["weight"]
missing_cols = [
col for col in required_cols if col not in self.exploded_calcs.columns
]
if missing_cols:
raise ValueError(
f"Exploded calculations missing expected columns: {missing_cols=}"
)
self.exploded_calcs = self.exploded_calcs.loc[:, required_cols]
return self
@model_validator(mode="after")
[docs]
def calc_parents_notna(self: Self):
"""Ensure that parent table_name and xbrl_factoid columns are non-null."""
if (
self.exploded_calcs[["table_name_parent", "xbrl_factoid_parent"]]
.isna()
.any(axis=None)
):
raise AssertionError("Null parent table name or xbrl_factoid found.")
return self
@field_validator("tags")
@classmethod
@field_validator("tags")
@classmethod
@field_validator("tags")
@classmethod
@model_validator(mode="after")
[docs]
def seeds_within_bounds(self: Self):
"""Ensure that all seeds are present within exploded_calcs index.
For some reason this validator is being run before exploded_calcs has been
added to the values dictionary, which doesn't make sense, since "seeds" is
defined after exploded_calcs in the model.
"""
all_nodes = self.exploded_calcs.set_index(self.parent_cols).index
bad_seeds = [seed for seed in self.seeds if seed not in all_nodes]
if bad_seeds:
raise ValueError(f"Seeds missing from exploded_calcs index: {bad_seeds=}")
return self
[docs]
def exploded_calcs_to_digraph(
self: Self,
exploded_calcs: pd.DataFrame,
) -> nx.DiGraph:
"""Construct :class:`networkx.DiGraph` of all calculations in exploded_calcs.
First we construct a directed graph based on the calculation components. The
"parent" or "source" nodes are the results of the calculations, and the "child"
or "target" nodes are the individual calculation components. The structure of
the directed graph is determined entirely by the primary key columns in the
calculation components table.
Then we compile a dictionary of node attributes, based on the individual
calculation components in the exploded calcs dataframe.
"""
source_nodes = [
NodeId(*x)
for x in exploded_calcs.set_index(self.parent_cols).index.to_list()
]
target_nodes = [
NodeId(*x) for x in exploded_calcs.set_index(self.calc_cols).index.to_list()
]
edgelist = pd.DataFrame({"source": source_nodes, "target": target_nodes})
forest = nx.from_pandas_edgelist(edgelist, create_using=nx.DiGraph)
return forest
@cached_property
[docs]
def node_attrs(self: Self) -> dict[NodeId, dict[str, dict[str, str]]]:
"""Construct a dictionary of node attributes for application to the forest.
Note attributes consist of the manually assigned tags.
"""
# Reshape the tags to turn them into a dictionary of values per-node. This
# will make it easier to add arbitrary sets of tags later on.
tags_dict = (
self.tags.convert_dtypes()
.set_index(self.calc_cols)
.dropna(how="all")
.to_dict(orient="index")
)
# Drop None tags created by combining multiple tagging CSVs
clean_tags_dict = {
k: {a: b for a, b in v.items() if b is not None}
for k, v in tags_dict.items()
}
node_attrs = (
pd.DataFrame(
index=pd.MultiIndex.from_tuples(
clean_tags_dict.keys(), names=self.calc_cols
),
data={"tags": list(clean_tags_dict.values())},
)
.reset_index()
# Type conversion is necessary to get pd.NA in the index:
.astype({col: pd.StringDtype() for col in self.calc_cols})
.assign(tags=lambda x: np.where(x["tags"].isna(), {}, x["tags"]))
)
return node_attrs.set_index(self.calc_cols).to_dict(orient="index")
@cached_property
[docs]
def edge_attrs(self: Self) -> dict[Any, Any]:
"""Construct a dictionary of edge attributes for application to the forest.
The only edge attribute is the calculation component weight.
"""
parents = [
NodeId(*x)
for x in self.exploded_calcs.set_index(self.parent_cols).index.to_list()
]
children = [
NodeId(*x)
for x in self.exploded_calcs.set_index(self.calc_cols).index.to_list()
]
weights = self.exploded_calcs["weight"].to_list()
edge_attrs = {
(parent, child): {"weight": weight}
for parent, child, weight in zip(parents, children, weights, strict=True)
}
return edge_attrs
@cached_property
[docs]
def annotated_forest(self: Self) -> nx.DiGraph:
"""Annotate the calculation forest with node calculation weights and tags.
The annotated forest should have exactly the same structure as the forest, but
with additional data associated with each of the nodes. This method also does
some error checking to try and ensure that the weights and tags that are being
associated with the forest are internally self-consistent.
We check whether there are multiple different weights assocated with the same
node in the calculation components. There are a few instances where this is
expected, but if there a lot of conflicting weights something is probably wrong.
We check whether any of the nodes that were orphaned (never connected to the
graph) or that were pruned in the course of enforcing a forest structure had
manually assigned tags (e.g. indicating whether they contribute to rate base).
If they do, then the final exploded data table may not capture all of the
manually assigned metadata, and we either need to edit the metadata, or figure
out why those nodes aren't being included in the final calculation forest.
"""
annotated_forest = deepcopy(self.forest)
nx.set_node_attributes(annotated_forest, self.node_attrs)
nx.set_edge_attributes(annotated_forest, self.edge_attrs)
annotated_forest = self.propagate_node_attributes(annotated_forest)
logger.info("Checking whether any pruned nodes were also tagged.")
self.check_lost_tags(lost_nodes=self.pruned)
logger.info("Checking whether any orphaned nodes were also tagged.")
self.check_lost_tags(lost_nodes=self.orphans)
self.check_conflicting_tags(annotated_forest)
return annotated_forest
[docs]
def propagate_node_attributes(self: Self, annotated_forest: nx.DiGraph):
"""Propagate tags.
Propagate tags leafwards, rootward & to the _correction nodes.
"""
## Leafwards propagation
annotated_forest = _propagate_tags_leafward(annotated_forest, ["in_rate_base"])
# Rootward propagation
annotated_forest = _propagate_tag_rootward(annotated_forest, "in_rate_base")
# Correction Records
annotated_forest = _propagate_tags_to_corrections(annotated_forest)
return annotated_forest
@staticmethod
@cached_property
[docs]
def full_digraph(self: Self) -> nx.DiGraph:
"""A digraph of all calculations described by the exploded metadata."""
full_digraph = self.exploded_calcs_to_digraph(
exploded_calcs=self.exploded_calcs,
)
connected_components = list(
nx.connected_components(full_digraph.to_undirected())
)
logger.debug(
f"Full digraph contains {len(connected_components)} connected components."
)
if not nx.is_directed_acyclic_graph(full_digraph):
logger.critical(
"Calculations in Exploded Metadata contain cycles, which is invalid."
)
return full_digraph
[docs]
def prune_unrooted(self: Self, graph: nx.DiGraph) -> nx.DiGraph:
"""Prune those parts of the input graph that aren't reachable from the roots.
Build a table of exploded calculations that includes only those nodes that
are part of the input graph, and that are reachable from the roots of the
calculation forest. Then use that set of exploded calculations to construct a
new graph.
This is complicated by the fact that some nodes may have already been pruned
from the input graph, and so when selecting both parent and child nodes from
the calculations, we need to make sure that they are present in the input graph,
as well as the complete set of calculation components.
"""
seeded_nodes = set(self.seeds)
for seed in self.seeds:
# the seeds and all of their descendants from the graph
seeded_nodes = list(
seeded_nodes.union({seed}).union(nx.descendants(graph, seed))
)
# Any seeded node that appears in the input graph and is also a parent.
seeded_parents = [
node
for node, degree in dict(graph.out_degree(seeded_nodes)).items()
if degree > 0
]
# Any calculation where the parent is one of the seeded parents.
seeded_calcs = (
self.exploded_calcs.set_index(self.parent_cols)
.loc[seeded_parents]
.reset_index()
)
# All child nodes in the seeded calculations that are part of the input graph.
seeded_child_nodes = list(
set(
seeded_calcs[self.calc_cols].itertuples(index=False, name="NodeId")
).intersection(graph.nodes)
)
# This seeded calcs includes only calculations where both the parent and child
# nodes were part of the input graph.
seeded_calcs = (
seeded_calcs.set_index(self.calc_cols).loc[seeded_child_nodes].reset_index()
)
return self.exploded_calcs_to_digraph(exploded_calcs=seeded_calcs)
@cached_property
[docs]
def seeded_digraph(self: Self) -> nx.DiGraph:
"""A digraph of all calculations that contribute to the seed values.
Prune the full digraph to contain only those nodes in the :meth:`full_digraph`
that are descendants of the seed nodes -- i.e. that are reachable along the
directed edges, and thus contribute to the values reported to the XBRL facts
associated with the seed nodes.
We compile a list of all the :class:`NodeId` values that should be included in
the pruned graph, and then use that list to select a subset of the exploded
metadata to pass to :meth:`exploded_calcs_to_digraph`, so that all of the
associated metadata is also added to the pruned graph.
"""
return self.prune_unrooted(self.full_digraph)
@cached_property
[docs]
def forest(self: Self) -> nx.DiGraph:
"""A pruned version of the seeded digraph that should be one or more trees.
This method contains any special logic that's required to convert the
:meth:`seeded_digraph` into a collection of trees. The main issue we currently
have to deal with is passthrough calculations that we've added to avoid having
duplicated calculations in the graph.
In practice this method will probably return a single tree rather than a forest,
but a forest with several root nodes might also be appropriate, since the root
table may or may not have a top level summary value that includes all underlying
calculated values of interest.
"""
forest = deepcopy(self.seeded_digraph)
# Remove any node that ONLY has stepchildren.
# A stepparent is a node that has a child with more than one parent.
# A stepchild is a node with more than one parent.
# See self.stepparents and self.stepchildren
pure_stepparents = []
stepparents = sorted(self.stepparents(forest))
logger.info(f"Investigating {len(stepparents)=}")
for node in stepparents:
children = set(forest.successors(node))
stepchildren = set(self.stepchildren(forest)).intersection(children)
if (children == stepchildren) & (len(children) > 0):
pure_stepparents.append(node)
forest.remove_node(node)
logger.info(f"Removed {len(pure_stepparents)} redundant/stepparent nodes.")
logger.debug(f"Removed redunant/stepparent nodes: {sorted(pure_stepparents)}")
# Removing pure stepparents should NEVER disconnect nodes from the forest.
# Defensive check to ensure that this is actually true
nodes_before_pruning = forest.nodes
forest = self.prune_unrooted(forest)
nodes_after_pruning = forest.nodes
if pruned_nodes := set(nodes_before_pruning).difference(nodes_after_pruning):
raise AssertionError(f"Unexpectedly pruned stepchildren: {pruned_nodes=}")
# HACK alter.
# two different parents. those parents have different sets of dimensions.
# sharing some but not all of their children so they weren't caught from in the
# only stepchildren node removal from above. a generalization here would be good
almost_pure_stepparents = [
NodeId(
"core_ferc1__yearly_utility_plant_summary_sched200",
"depreciation_amortization_and_depletion_utility_plant_leased_to_others",
"total",
pd.NA,
pd.NA,
),
NodeId(
"core_ferc1__yearly_utility_plant_summary_sched200",
"depreciation_and_amortization_utility_plant_held_for_future_use",
"total",
pd.NA,
pd.NA,
),
NodeId(
"core_ferc1__yearly_utility_plant_summary_sched200",
"utility_plant_in_service_classified_and_unclassified",
"total",
pd.NA,
pd.NA,
),
]
forest.remove_nodes_from(almost_pure_stepparents)
forest = self.prune_unrooted(forest)
if not nx.is_forest(forest):
logger.error(
"Calculations in Exploded Metadata can not be represented as a forest!"
)
remaining_stepparents = set(self.stepparents(forest))
if remaining_stepparents:
logger.error(f"{remaining_stepparents=}")
return forest
@staticmethod
[docs]
def roots(graph: nx.DiGraph) -> list[NodeId]:
"""Identify all root nodes in a digraph."""
return [n for n, d in graph.in_degree() if d == 0]
@cached_property
[docs]
def full_digraph_roots(self: Self) -> list[NodeId]:
"""Find all roots in the full digraph described by the exploded metadata."""
return self.roots(graph=self.full_digraph)
@cached_property
[docs]
def seeded_digraph_roots(self: Self) -> list[NodeId]:
"""Find all roots in the seeded digraph."""
return self.roots(graph=self.seeded_digraph)
@cached_property
[docs]
def forest_roots(self: Self) -> list[NodeId]:
"""Find all roots in the pruned calculation forest."""
return self.roots(graph=self.forest)
@staticmethod
[docs]
def leaves(graph: nx.DiGraph) -> list[NodeId]:
"""Identify all leaf nodes in a digraph."""
return [n for n, d in graph.out_degree() if d == 0]
@cached_property
[docs]
def full_digraph_leaves(self: Self) -> list[NodeId]:
"""All leaf nodes in the full digraph."""
return self.leaves(graph=self.full_digraph)
@cached_property
[docs]
def seeded_digraph_leaves(self: Self) -> list[NodeId]:
"""All leaf nodes in the seeded digraph."""
return self.leaves(graph=self.seeded_digraph)
@cached_property
[docs]
def forest_leaves(self: Self) -> list[NodeId]:
"""All leaf nodes in the pruned forest."""
return self.leaves(graph=self.forest)
@cached_property
[docs]
def orphans(self: Self) -> list[NodeId]:
"""Identify all nodes that appear in the exploded_calcs but not in the full digraph.
Because we removed the metadata and are now building the tree entirely based on
the exploded_calcs, this should now never produce any orphans and is a bit redundant.
"""
nodes = self.full_digraph.nodes
orphans = []
for idx_cols in [self.calc_cols, self.parent_cols]:
orphans.extend(
[
NodeId(*n)
for n in self.exploded_calcs.set_index(idx_cols).index
if NodeId(*n) not in nodes
]
)
return list(set(orphans))
@cached_property
[docs]
def pruned(self: Self) -> list[NodeId]:
"""List of all nodes that appear in the DAG but not in the pruned forest."""
return list(set(self.full_digraph.nodes).difference(self.forest.nodes))
[docs]
def stepchildren(self: Self, graph: nx.DiGraph) -> list[NodeId]:
"""Find all nodes in the graph that have more than one parent."""
return [n for n, d in graph.in_degree() if d > 1]
[docs]
def stepparents(self: Self, graph: nx.DiGraph) -> list[NodeId]:
"""Find all nodes in the graph with children having more than one parent."""
stepchildren = self.stepchildren(graph)
stepparents = set()
for stepchild in stepchildren:
stepparents = stepparents.union(graph.predecessors(stepchild))
return list(stepparents)
[docs]
def _get_path_weight(self, path: list[NodeId], graph: nx.DiGraph) -> float:
"""Multiply all weights along a path together."""
leaf_weight = 1.0
for parent, child in zip(path, path[1:], strict=False):
leaf_weight *= graph.get_edge_data(parent, child)["weight"]
return leaf_weight
@cached_property
@cached_property
[docs]
def root_calculations(self: Self) -> pd.DataFrame:
"""Produce a calculation components dataframe containing only roots and leaves.
This dataframe has a format similar to exploded_calcs and can be used with the
exploded data to verify that the root values can still be correctly calculated
from the leaf values.
"""
return self.leafy_meta.rename(columns=lambda x: re.sub("_root$", "_parent", x))
@cached_property
[docs]
def table_names(self: Self) -> list[str]:
"""Produce the list of tables involved in this explosion."""
return list(self.exploded_calcs["table_name_parent"].unique())
[docs]
def plot_graph(self: Self, graph: nx.DiGraph) -> None:
"""Visualize a CalculationForest graph."""
colors = ["red", "yellow", "green", "blue", "orange", "cyan", "purple"]
color_map = dict(
zip(self.table_names, colors[: len(self.table_names)], strict=True)
)
pos = graphviz_layout(graph, prog="dot", args='-Grankdir="LR"')
for table, color in color_map.items():
nodes = [node for node in graph.nodes if node.table_name == table]
nx.draw_networkx_nodes(nodes, pos, node_color=color, label=table)
nx.draw_networkx_edges(graph, pos)
# The labels are currently unwieldy
# nx.draw_networkx_labels(nx_forest, pos)
# Use this to draw everything if/once labels are fixed
# nx.draw_networkx(nx_forest, pos, node_color=node_color)
plt.legend(scatterpoints=1)
plt.show()
[docs]
def plot_nodes(self: Self, nodes: list[NodeId]) -> None:
"""Plot a list of nodes based on edges found in exploded_calcs."""
graph_to_plot = self.full_digraph
nodes_to_remove = set(graph_to_plot.nodes()).difference(nodes)
# NOTE: this doesn't and can't revise the graph to identify which nodes are
# still connecteded to the root we care about after remvoing these nodes.
# We might want to use a more intelligent method of building the graph.
graph_to_plot.remove_nodes_from(nodes_to_remove)
self.plot_graph(graph_to_plot)
[docs]
def plot(
self: Self, graph: Literal["full_digraph", "seeded_digraph", "forest"]
) -> None:
"""Visualize various stages of the calculation forest."""
self.plot_graph(self.__getattribute__(graph))
[docs]
def leafy_data(
self: Self, exploded_data: pd.DataFrame, value_col: str
) -> pd.DataFrame:
"""Use the calculation forest to prune the exploded dataframe.
- Drop all rows that don't correspond to either root or leaf facts.
- Verify that the reported root values can still be generated by calculations
that only refer to leaf values. (not yet implemented)
- Merge the leafy metadata onto the exploded data, keeping only those rows
which refer to the leaf facts.
- Use the leaf weights to adjust the reported data values.
TODO: This method could either live here, or in the Exploder class, which would
also have access to exploded_meta, exploded_data, and the calculation forest.
- Still need to validate the root node calculations.
"""
# inner merge here because the nodes need to *both* be leaves in the forest
# and need to show up in the data. If a node doesn't show up in the data itself
# it won't have any non-node values like utility_id_ferc1 or report_year or any
# $$ values.
leafy_data = pd.merge(
left=self.leafy_meta,
right=exploded_data,
how="inner",
validate="one_to_many",
)
# Scale the data column of interest:
leafy_data[value_col] = leafy_data[value_col] * leafy_data["weight"]
return leafy_data.reset_index(drop=True).convert_dtypes()
@cached_property
[docs]
def forest_as_table(self: Self) -> pd.DataFrame:
"""Construct a tabular representation of the calculation forest.
Each generation of nodes, starting with the root(s) of the calculation forest,
make up a set of columns in the table. Each set of columns is merged onto
"""
logger.info("Recursively building a tabular version of the calculation forest.")
# Identify all root nodes in the forest:
layer0_nodes = [n for n, d in self.annotated_forest.in_degree() if d == 0]
# Convert them into the first layer of the dataframe:
layer0_df = pd.DataFrame(layer0_nodes).rename(columns=lambda x: x + "_layer0")
return self._add_layers_to_forest_as_table(df=layer0_df).convert_dtypes()
[docs]
def _add_layers_to_forest_as_table(self: Self, df: pd.DataFrame) -> pd.DataFrame:
"""Recursively add additional layers of nodes from the forest to the table.
Given a dataframe with one or more set of columns with names corresponding to
the components of a NodeId with suffixes of the form _layerN, identify the
children of the nodes in the set of columns with the largest N, and merge them
onto the table, recursively until there are no more children to add. Creating a
tabular representation of the calculation forest that can be inspected in Excel.
Include inter-layer calculation weights and tags associated with the nodes pre
propagation.
"""
# Identify the last layer of nodes present in the input dataframe.
current_layer = df.rename(
columns=lambda x: int(re.sub(r"^.*_layer(\d+)$", r"\1", x))
).columns.max()
logger.info(f"{current_layer=}")
suffix = f"_layer{current_layer}"
parent_cols = [col + suffix for col in self.calc_cols]
# Identify the list of nodes that are part of that last layer:
parent_nodes = list(
df[parent_cols]
.drop_duplicates()
.dropna(how="all", axis="rows")
.rename(columns=lambda x: x.removesuffix(suffix))
.itertuples(name="NodeId", index=False)
)
# Identify the successors (children), if any, of each node in the last layer:
successor_dfs = []
for node in parent_nodes:
successor_nodes = list(self.forest.successors(node))
# If this particular node has no successors, skip to the next one.
if not successor_nodes:
continue
# Convert the list of successor nodes into a dataframe with layer = n+1
successor_df = nodes_to_df(
calc_forest=self.annotated_forest, nodes=successor_nodes
).rename(columns=lambda x: x + f"_layer{current_layer + 1}")
# Add a set of parent columns that all have the same values so we can merge
# this onto the previous layer
successor_df[parent_cols] = node
successor_dfs.append(successor_df)
# If any child nodes were found, merge them onto the input dataframe creating
# a new layer , and recurse:
if successor_dfs:
new_df = df.merge(pd.concat(successor_dfs), on=parent_cols, how="outer")
df = self._add_layers_to_forest_as_table(df=new_df)
# If no child nodes were found return the dataframe terminating the recursion.
return df
[docs]
def nodes_to_df(calc_forest: nx.DiGraph, nodes: list[NodeId]) -> pd.DataFrame:
"""Construct a dataframe from a list of nodes, including their annotations.
NodeIds that are not present in the calculation forest will be ignored.
Args:
calc_forest: A calculation forest made of nodes with "weight" and "tags" data.
nodes: List of :class:`NodeId` values to extract from the calculation forest.
Returns:
A tabular dataframe representation of the nodes, including their tags, extracted
from the calculation forest.
"""
node_dict = {
k: v for k, v in dict(calc_forest.nodes(data=True)).items() if k in nodes
}
index = pd.DataFrame(node_dict.keys()).astype("string")
data = pd.DataFrame(node_dict.values())
try:
tags = pd.json_normalize(data.tags).astype("string")
except AttributeError:
tags = pd.DataFrame()
return pd.concat([index, tags], axis="columns")
[docs]
def _propagate_tag_rootward(
annotated_forest: nx.DiGraph, tag_name: Literal["in_rate_base"]
) -> nx.DiGraph:
"""Set the tag for nodes when all of its children have same tag.
This function returns the value of a tag, but also sets node attributes
down the tree when all children of a node share the same tag.
"""
def _get_tag(annotated_forest, node, tag_name):
return annotated_forest.nodes.get(node, {}).get("tags", {}).get(tag_name)
generations = list(nx.topological_generations(annotated_forest))
for gen in reversed(generations):
untagged_nodes = {
node_id
for node_id in gen
if _get_tag(annotated_forest, node_id, tag_name) is None
}
for parent_node in untagged_nodes:
child_tags = {
_get_tag(annotated_forest, c, tag_name)
for c in annotated_forest.successors(parent_node)
if not c.xbrl_factoid.endswith("_correction")
}
non_null_tags = child_tags - {None}
# sometimes, all children can share same tag but it's null.
if len(child_tags) == 1 and non_null_tags:
# actually assign the tag here but don't wipe out any other tags
new_node_tag = non_null_tags.pop()
existing_tags = nx.get_node_attributes(annotated_forest, "tags")
node_tags = {
parent_node: {
"tags": {tag_name: new_node_tag}
| existing_tags.get(parent_node, {})
}
}
nx.set_node_attributes(annotated_forest, node_tags)
return annotated_forest
[docs]
def check_for_correction_xbrl_factoids_with_tag(
df: pd.DataFrame, propagated_tag: Literal["in_rate_base"]
):
"""Check if any correction records have tags.
Args:
df: table to check. This should be either the
:func:`out_ferc1__yearly_rate_base`, ``exploded_balance_sheet_assets_ferc1``
or ``exploded_balance_sheet_liabilities_ferc1``. The
``exploded_income_statement_ferc1`` table does not currently have propagated
tags.
propagated_tag: name of tag. Currently ``in_rate_base`` is the only propagated tag.
Raises:
AssertionError: If there are zero correction ``xbrl_factoids`` in ``df`` with tags.
"""
detailed_tagged_corrections = df[
df[f"tags_{propagated_tag}"].notnull()
& df.xbrl_factoid.str.endswith("_correction")
].xbrl_factoid.unique()
if len(detailed_tagged_corrections) == 0:
raise AssertionError(
"We expect there to be more than zero correction recrods with tags, but "
f"found {len(detailed_tagged_corrections)}."
)
@asset
[docs]
def out_ferc1__yearly_rate_base(
_out_ferc1__detailed_balance_sheet_assets: pd.DataFrame,
_out_ferc1__detailed_balance_sheet_liabilities: pd.DataFrame,
core_ferc1__yearly_operating_expenses_sched320: pd.DataFrame,
_out_ferc1__detailed_tags: pd.DataFrame,
) -> pd.DataFrame:
"""Make a table of granular utility rate-base data.
This table contains granular data consisting of what utilities can
include in their rate bases. This information comes from two core
inputs: ``exploded_balance_sheet_assets_ferc1`` and
``exploded_balance_sheet_liabilities_ferc1``. These tables include granular
data from the nested calculations that are build into the accounting tables.
See :class:`Exploder` for more details.
This rate base table also contains one specific addition from
:ref:`core_ferc1__yearly_operating_expenses_sched320`. In standard ratemaking
processes, utilities are enabled to include working capital - sometimes referred
to as cash on hand or cash reverves. A standard ratemaking process is to consider
the available rate-baseable working capital to be one eigth of the average
operations and maintenance expense. This function grabs that expense and
concatenates it with the rest of the assets and liabilities from the granular
exploded data.
"""
# get the factoid name to grab the right part of the table
xbrl_factoid_name = pudl.transform.ferc1.FERC1_TFR_CLASSES[
"core_ferc1__yearly_operating_expenses_sched320"
]().params.xbrl_factoid_name
# First grab the working capital out of the operating expense table.
# then prep it for concating. Calculate working capital & add tags
cash_working_capital = (
core_ferc1__yearly_operating_expenses_sched320[
core_ferc1__yearly_operating_expenses_sched320[xbrl_factoid_name]
== "operations_and_maintenance_expenses_electric"
]
.assign(
dollar_value=lambda x: x.dollar_value.divide(8),
xbrl_factoid="cash_working_capital", # newly definied (do we need to add it anywhere?)
tags_rate_base_category="net_working_capital",
tags_aggregatable_utility_type="electric",
table_name="core_ferc1__yearly_operating_expenses_sched320",
)
.drop(columns=[xbrl_factoid_name])
# the assets/liabilites both use ending_balance for its main $$ column
.rename(columns={"dollar_value": "ending_balance"})
)
# then select only the leafy exploded records that are in rate base and concat
in_rate_base = pd.concat(
[
_out_ferc1__detailed_balance_sheet_assets[
_out_ferc1__detailed_balance_sheet_assets.tags_in_rate_base.isin(
["yes", "partial"]
)
],
_out_ferc1__detailed_balance_sheet_liabilities[
_out_ferc1__detailed_balance_sheet_liabilities.tags_in_rate_base.isin(
["yes", "partial"]
)
].assign(ending_balance=lambda x: -x.ending_balance),
cash_working_capital,
]
).sort_values(by=["report_year", "utility_id_ferc1", "table_name"], ascending=False)
# note: we need the `tags_in_rate_base` column for these checks
check_tag_propagation_compared_to_compiled_tags(
in_rate_base, "in_rate_base", _out_ferc1__detailed_tags
)
check_for_correction_xbrl_factoids_with_tag(in_rate_base, "in_rate_base")
return in_rate_base