"""Routines used for extracting the raw FERC 714 data."""
import json
from collections import OrderedDict
from itertools import chain
from pathlib import Path
from typing import Any
import pandas as pd
from dagster import AssetKey, AssetsDefinition, SourceAsset, asset
import pudl
from pudl.workspace.setup import PudlPaths
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
FERC714_CSV_ENCODING: OrderedDict[str, dict[str, str]] = OrderedDict(
{
"yearly_id_certification": {
"name": "Part 1 Schedule 1 - Identification Certification.csv",
"encoding": "iso-8859-1",
},
"yearly_balancing_authority_plants": {
"name": "Part 2 Schedule 1 - Balancing Authority Generating Plants.csv",
"encoding": "iso-8859-1",
},
"monthly_balancing_authority_demand": {
"name": "Part 2 Schedule 2 - Balancing Authority Monthly Demand.csv",
"encoding": "utf-8",
},
"yearly_balancing_authority_net_energy_load": {
"name": "Part 2 Schedule 3 - Balancing Authority Net Energy for Load.csv",
"encoding": "utf-8",
},
"yearly_balancing_authority_adjacency": {
"name": "Part 2 Schedule 4 - Adjacent Balancing Authorities.csv",
"encoding": "iso-8859-1",
},
"yearly_balancing_authority_interchange": {
"name": "Part 2 Schedule 5 - Balancing Authority Interchange.csv",
"encoding": "iso-8859-1",
},
"hourly_balancing_authority_lambda": {
"name": "Part 2 Schedule 6 - Balancing Authority Hourly System Lambda.csv",
"encoding": "utf-8",
},
"yearly_lambda_description": {
"name": "Part 2 Schedule 6 - System Lambda Description.csv",
"encoding": "iso-8859-1",
},
"yearly_planning_area_description": {
"name": "Part 3 Schedule 1 - Planning Area Description.csv",
"encoding": "iso-8859-1",
},
"yearly_planning_area_demand_forecast": {
"name": "Part 3 Schedule 2 - Planning Area Forecast Demand.csv",
"encoding": "utf-8",
},
"hourly_planning_area_demand": {
"name": "Part 3 Schedule 2 - Planning Area Hourly Demand.csv",
"encoding": "utf-8",
},
"respondent_id": {
"name": "Respondent IDs.csv",
"encoding": "utf-8",
},
}
)
"""Dictionary mapping PUDL tables to FERC-714 CSV filenames and character encodings."""
[docs]
TABLE_NAME_MAP_FERC714: OrderedDict[str, dict[str, str]] = OrderedDict(
{
"core_ferc714__yearly_planning_area_demand_forecast": {
"csv": "Part 3 Schedule 2 - Planning Area Forecast Demand.csv",
"xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2",
},
"out_ferc714__hourly_planning_area_demand": {
"csv": "Part 3 Schedule 2 - Planning Area Hourly Demand.csv",
"xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2",
},
"core_ferc714__respondent_id": {
"csv": "Respondent IDs.csv",
"xbrl": "identification_and_certification_01_1",
},
}
)
"""A mapping of PUDL DB table names to their XBRL and CSV source table names."""
[docs]
def raw_ferc714_csv_asset_factory(table_name: str) -> AssetsDefinition:
"""Generates an asset for building the raw CSV-based FERC 714 dataframe."""
assert table_name in FERC714_CSV_ENCODING
@asset(
name=f"raw_ferc714_csv__{table_name}",
required_resource_keys={"datastore", "dataset_settings"},
compute_kind="pandas",
)
def _extract_raw_ferc714_csv(context):
"""Extract the raw FERC Form 714 dataframes from their original CSV files.
Args:
context: dagster keyword that provides access to resources and config.
"""
ds = context.resources.datastore
ferc714_settings = context.resources.dataset_settings.ferc714
years = ", ".join(map(str, ferc714_settings.csv_years))
logger.info(
f"Extracting {table_name} from CSV into pandas DataFrame (years: {years})."
)
with (
ds.get_zipfile_resource("ferc714", name="ferc714.zip") as zf,
zf.open(FERC714_CSV_ENCODING[table_name]["name"]) as csv_file,
):
df = pd.read_csv(
csv_file,
encoding=FERC714_CSV_ENCODING[table_name]["encoding"],
)
if table_name != "respondent_id":
df = df.query("report_yr in @ferc714_settings.years")
return df
return _extract_raw_ferc714_csv
@asset
[docs]
def create_raw_ferc714_xbrl_assets() -> list[SourceAsset]:
"""Create SourceAssets for raw FERC 714 XBRL tables.
SourceAssets allow you to access assets that are generated elsewhere.
In our case, the XBRL database contains the raw FERC 714 assets from
2021 onward. Prior to that, the assets are distributed as CSVs and
are extracted with the ``raw_ferc714_csv_asset_factory`` function.
Returns:
A list of FERC 714 SourceAssets.
"""
# Create assets for the duration and instant tables
xbrls = (v["xbrl"] for v in TABLE_NAME_MAP_FERC714.values())
flattened_xbrls = chain.from_iterable(
x if isinstance(x, list) else [x] for x in xbrls
)
xbrls_with_periods = chain.from_iterable(
(f"{tn}_instant", f"{tn}_duration") for tn in flattened_xbrls
)
xbrl_table_names = tuple(set(xbrls_with_periods))
raw_ferc714_xbrl_assets = [
SourceAsset(
key=AssetKey(f"raw_ferc714_xbrl__{table_name}"),
io_manager_key="ferc714_xbrl_sqlite_io_manager",
)
for table_name in xbrl_table_names
]
return raw_ferc714_xbrl_assets
[docs]
raw_ferc714_csv_assets = [
raw_ferc714_csv_asset_factory(table_name) for table_name in FERC714_CSV_ENCODING
]
[docs]
raw_ferc714_xbrl_assets = create_raw_ferc714_xbrl_assets()