Source code for pudl.transform.ferc714

"""Transformation of the FERC Form 714 data.

FERC Form 714 has two separate raw data sources - CSV and XBRL. For both sources
there is usually some specific processing that needs to happen before the two
data sources get concatenated together to create the full timeseries. We are
currently processing three tables from 714. Each one is processed using a similar
pattern: we've defined a class with a run classmethod as a coordinating method,
any table-specific transforms are defined as staticmethod's within the table
class and any generic 714 transforms are defined as internal module functions.
The table assets are created through a small function that calls the run method.
Any of the methods or functions that only apply to either of the raw data sources
should include a raw datasource suffix.
"""

import importlib
import re
from dataclasses import dataclass
from typing import Literal

import numpy as np
import pandas as pd
from dagster import AssetCheckResult, AssetChecksDefinition, AssetIn, asset, asset_check

import pudl.logging_helpers
from pudl.extract.ferc714 import TABLE_NAME_MAP_FERC714
from pudl.settings import Ferc714Settings
from pudl.transform.classes import (
    RenameColumns,
    rename_columns,
)
from pudl.transform.ferc import filter_for_freshest_data_xbrl, get_primary_key_raw_xbrl

[docs] logger = pudl.logging_helpers.get_logger(__name__)
############################################################################## # Constants required for transforming FERC 714 ############################################################################## # More detailed fixes on a per respondent basis
[docs] TIMEZONE_OFFSET_CODE_FIXES = { 2: {"CPT": "CST"}, 10: {"CPT": "EST"}, 15: {"MS": "MST"}, 17: {"CS": "CST", "CD": "CDT"}, 19: {"CTR": "CST", "CSR": "CST", "CPT": "CST", "DST": "CST", np.nan: "CST"}, 27: { "AKS": "AKST", "AST": "AKST", "AKD": "AKDT", "ADT": "AKDT", "AKT": "AKST", "1": "AKST", # they swap from 1 - 2 in 2023 "2": "AKDT", }, 28: {np.nan: "EST"}, 31: {np.nan: "CST"}, 34: {"1": "EST", "2": "EDT", np.nan: "EST", "UTC": "EST"}, # city of Tallahassee 35: {np.nan: "CST"}, 37: {"MS": "MST"}, 40: {"DST": "EST"}, 42: {np.nan: "CST"}, 45: {"DST": "CDT", np.nan: "CST"}, 47: {np.nan: "MST"}, 48: {np.nan: "MST"}, 50: {np.nan: "CST"}, 51: {"DST": "EDT", "EPT": "EST"}, 54: {"CPT": "CST"}, 56: {"CPT": "CST"}, 57: {np.nan: "CST"}, 58: {"CS": "CST"}, # Uniform across the year. 65: {"CPT": "CST", np.nan: "CST"}, 66: { "CS": "CDT", # Only shows up in summer! Seems backwards. "CD": "CST", # Only shows up in winter! Seems backwards. "433": "CDT", }, 68: {"E": "EST", np.nan: "EST"}, 73: {"PPT": "PDT"}, # Imperial Irrigation District P looks like D 77: {"EAS": "EST"}, 80: {"CPT": "CST"}, 81: {"CPT": "CST"}, 83: {"CS": "CST", "CD": "CDT"}, 84: {"PPT": "PST"}, # LADWP, constant across all years. 85: {"CPT": "CST"}, 97: {np.nan: "CST"}, 100: {"206": "EST", "DST": "EDT", np.nan: "EST"}, 102: {"CDS": "CDT", "CDST": "CDT"}, 105: {np.nan: "CDT"}, 106: {"MPP": "MST", "MPT": "MST"}, 113: {"DST": "EST"}, 114: {"EDS": "EDT", "DST": "EDT", "EPT": "EST"}, 115: {"DST": "CDT"}, 119: {"EPT": "EST"}, 122: {"DST": "EDT", "EPT": "EST"}, 123: {"1": "EST", "2": "EDT", "DST": "EDT"}, 128: {"PPT": "PST"}, # Constant across the year. Never another timezone seen. 132: {"DST": "PST", np.nan: "PST"}, 134: {"CDS": "CDT"}, 137: {"DST": "EDT"}, 142: {"CPT": "CST"}, 143: {"DST": "CDT"}, 146: {"CPT": "CST"}, 148: {"DST": "CDT"}, 153: {"CDS": "CDT"}, 159: {"EDS": "EDT"}, 163: {"CPT": "CST"}, 164: {"CPT": "CST", np.nan: "CST"}, 168: {"CEN": "CST"}, 175: {np.nan: "EST"}, 180: {np.nan: "MST"}, 181: {np.nan: "EST"}, 183: {"CPT": "CST"}, 184: {"CPT": "CST"}, 185: {"CPT": "CST"}, 186: {"CPT": "CST"}, 194: {"PPT": "PST"}, # Pacificorp, constant across the whole year. 195: {"DST": "EDT", "EDS": "EDT", "EPT": "EST"}, 210: {"EPT": "EST"}, 217: {"CPT": "CST"}, 214: {"EPT": "EST"}, 215: {"EDT/EST": "EST", "EST/EDT": "EST"}, # this is duke. 211: { # more recent years have CST & CDT. CDST correspond to DST months "CDST": "CDT" }, 20: {"3": "MST"}, # black hills (CO). in year after this 3 its all MST 95: {np.nan: "PST"}, # just empty in 2021, other years is PST 29: {np.nan: "PST"}, # just empty in 2022, other years is PST 101: {np.nan: "EST"}, # this was just one lil empty guy }
[docs] TIMEZONE_OFFSET_CODE_FIXES_BY_YEAR = [ {"respondent_id_ferc714": 33, "report_year": 2006, "utc_offset_code": "PST"}, {"respondent_id_ferc714": 124, "report_year": 2015, "utc_offset_code": "MST"}, {"respondent_id_ferc714": 176, "report_year": 2011, "utc_offset_code": "CST"}, {"respondent_id_ferc714": 179, "report_year": 2011, "utc_offset_code": "CST"}, ]
[docs] BAD_RESPONDENTS = [ 2, 319, 99991, 99992, 99993, 99994, 99995, ]
"""Fake respondent IDs for database test entities."""
[docs] TIMEZONE_OFFSET_CODES = { "EST": pd.Timedelta(-5, unit="hours"), # Eastern Standard "EDT": pd.Timedelta(-5, unit="hours"), # Eastern Daylight "CST": pd.Timedelta(-6, unit="hours"), # Central Standard "CDT": pd.Timedelta(-6, unit="hours"), # Central Daylight "MST": pd.Timedelta(-7, unit="hours"), # Mountain Standard "MDT": pd.Timedelta(-7, unit="hours"), # Mountain Daylight "PST": pd.Timedelta(-8, unit="hours"), # Pacific Standard "PDT": pd.Timedelta(-8, unit="hours"), # Pacific Daylight "AKST": pd.Timedelta(-9, unit="hours"), # Alaska Standard "AKDT": pd.Timedelta(-9, unit="hours"), # Alaska Daylight "HST": pd.Timedelta(-10, unit="hours"), # Hawaii Standard }
"""A mapping of timezone offset codes to Timedelta offsets from UTC. Note that the FERC 714 instructions state that all hourly demand is to be reported in STANDARD time for whatever timezone is being used. Even though many respondents use daylight savings / standard time abbreviations, a large majority do appear to conform to using a single UTC offset throughout the year. There are 6 instances in which the timezone associated with reporting changed dropped. """
[docs] TIMEZONE_CODES = { "EST": "America/New_York", "EDT": "America/New_York", "CST": "America/Chicago", "CDT": "America/Chicago", "MST": "America/Denver", "MDT": "America/Denver", "PST": "America/Los_Angeles", "PDT": "America/Los_Angeles", "AKST": "America/Anchorage", "AKDT": "America/Anchorage", "HST": "Pacific/Honolulu", }
"""Mapping between standardized time offset codes and canonical timezones."""
[docs] EIA_CODE_FIXES: dict[Literal["combined", "csv", "xbrl"], dict[int | str], int] = { "combined": { # FERC 714 Respondent ID: EIA BA or Utility ID 125: 2775, # EIA BA CAISO (fixing bad EIA Code of 229) 47: 56812, # Duke Energy Control Area Services, LLC (Arlington Valley WECC AZ) 146: 59504, # Southwest Power Pool (Fixing bad EIA Coding) 180: 32790, # New Harquahala. # PacifiCorp Utility ID is 14354. It ALSO has 2 BA IDs: (14378, 14379) # See https://github.com/catalyst-cooperative/pudl/issues/616 194: 14379, # Using this ID for now only b/c it's in the HIFLD geometry 206: 58791, # NaturEner Wind Watch LLC (Fixes bad ID 57995) 201: 56090, # Griffith Energy (bad id was 55124) 205: 58790, # Gridforce Energy Management (missing or 11378 in xbrl) 213: 64898, # GridLiance (missing) }, "xbrl": { # FERC 714 Respondent ID XBRL: EIA BA or Utility ID "C011373": 14610, # Florida Municipal Power Pool (lines up with CSV code & is FL util) "C011421": 9617, # JEA - lines up w/ CSV code and is EIA util "C002732": 56365, # NaturEner Power Watch LLC: Fixes bad ID "57049, 57050" "C002447": 7004, # Buckeye Power: was null or the entity_id "C001526": 14369, # Avangrid Renewables: was null or the entity_id "C001132": 15248, # PGE. Bad id was 43. New one lines up w/ CSV and is EIA util }, "csv": { # FERC 714 Respondent ID CSV: EIA BA or Utility ID 134: 5416, # Duke Energy Corp. (bad id was non-existent 3260) 203: 12341, # MidAmerican Energy Co. (fixes typo, from 12431) 292: 20382, # City of West Memphis -- (fixes a typo, from 20383) 295: 40229, # Old Dominion Electric Cooperative (missing) 301: 14725, # PJM Interconnection Eastern Hub (missing) 302: 14725, # PJM Interconnection Western Hub (missing) 303: 14725, # PJM Interconnection Illinois Hub (missing) 304: 14725, # PJM Interconnection Northern Illinois Hub (missing) 305: 14725, # PJM Interconnection Dominion Hub (missing) 306: 14725, # PJM Interconnection AEP-Dayton Hub (missing) 309: 12427, # Michigan Power Pool / Power Coordination Center (missing) 312: 59435, # NaturEner Glacier Wind (missing) 329: 39347, # East Texas Electricity Cooperative (missing) }, }
"""Overrides of FERC 714 respondent IDs with wrong or missing EIA Codes. This is used in :meth:`RespondentId.spot_fix_eia_codes`. The dictionary is organized by "source" keys ("combined", "csv", or "xbrl"). Each source's value is a secondary dictionary which contains source respondent ID's as keys and fixes for EIA codes as values. We separated these fixes by either coming directly from the CSV data, the XBRL data, or the combined data. We use the corresponding source or PUDL-derived respondent ID to identify the EIA code to overwrite. We could have combined these fixes all into one set of combined fixes identified by the PUDL-derived ``respondent_id_ferc714``, but this way we can do more targeted source-based cleaning and test each source's EIA codes before the sources are concatenated together. """
[docs] RENAME_COLS = { "core_ferc714__respondent_id": { "csv": { "respondent_id": "respondent_id_ferc714_csv", "respondent_name": "respondent_name_ferc714", "eia_code": "eia_code", }, "xbrl": { "entity_id": "respondent_id_ferc714_xbrl", "respondent_legal_name": "respondent_name_ferc714", "respondent_identification_code": "eia_code", }, }, "out_ferc714__hourly_planning_area_demand": { "csv": { "report_yr": "report_year", "plan_date": "report_date", "respondent_id": "respondent_id_ferc714_csv", "timezone": "utc_offset_code", }, "xbrl": { "entity_id": "respondent_id_ferc714_xbrl", "date": "report_date", "report_year": "report_year", "time_zone": "utc_offset_code", "planning_area_hourly_demand_megawatts": "demand_mwh", }, }, "core_ferc714__yearly_planning_area_demand_forecast": { "csv": { "respondent_id": "respondent_id_ferc714_csv", "report_yr": "report_year", "plan_year": "forecast_year", "summer_forecast": "summer_peak_demand_forecast_mw", "winter_forecast": "winter_peak_demand_forecast_mw", "net_energy_forecast": "net_demand_forecast_mwh", }, "xbrl": { "entity_id": "respondent_id_ferc714_xbrl", "start_date": "start_date", "end_date": "end_date", "report_year": "report_year", "planning_area_hourly_demand_and_forecast_year": "forecast_year", "planning_area_hourly_demand_and_forecast_summer_forecast": "summer_peak_demand_forecast_mw", "planning_area_hourly_demand_and_forecast_winter_forecast": "winter_peak_demand_forecast_mw", "planning_area_hourly_demand_and_forecast_forecast_of_annual_net_energy_for_load": "net_demand_forecast_mwh", }, }, }
############################################################################## # Internal helper functions. ##############################################################################
[docs] def _pre_process_csv(df: pd.DataFrame, table_name: str) -> pd.DataFrame: """A simple transform function for processing the CSV raw data. * Removes footnotes columns ending with _f * Drops report_prd, spplmnt_num, and row_num columns * Excludes records which pertain to bad (test) respondents. """ logger.info("Removing unneeded columns and dropping bad respondents.") out_df = ( rename_columns( df=df, params=RenameColumns(columns=RENAME_COLS[table_name]["csv"]) ) .filter(regex=r"^(?!.*_f$).*") .drop(["report_prd", "spplmnt_num", "row_num"], axis="columns", errors="ignore") ) # Exclude fake Test IDs -- not real respondents out_df = out_df[~out_df.respondent_id_ferc714_csv.isin(BAD_RESPONDENTS)] return out_df
[docs] def _assign_respondent_id_ferc714( df: pd.DataFrame, source: Literal["csv", "xbrl"] ) -> pd.DataFrame: """Assign the PUDL-assigned respondent_id_ferc714 based on the native respondent ID. We need to replace the natively reported respondent ID from each of the two FERC714 sources with a PUDL-assigned respondent ID. The mapping between the native ID's and these PUDL-assigned ID's can be accessed in the database tables ``respondents_csv_ferc714`` and ``respondents_xbrl_ferc714``. Args: df: the input table with the native respondent ID column. source: the lower-case string name of the source of the FERC714 data. Either csv or xbrl. Returns: an augmented version of the input ``df`` with a new column that replaces the natively reported respondent ID with the PUDL-assigned respondent ID. """ respondent_map_ferc714 = pd.read_csv( importlib.resources.files("pudl.package_data.glue") / "respondent_id_ferc714.csv" ).convert_dtypes() # use the source utility ID column to get a unique map and for merging resp_id_col = f"respondent_id_ferc714_{source}" resp_map_series = ( respondent_map_ferc714.dropna(subset=[resp_id_col]) .set_index(resp_id_col) .respondent_id_ferc714 ) df["respondent_id_ferc714"] = df[resp_id_col].map(resp_map_series) return df
[docs] def _filter_for_freshest_data_xbrl( raw_xbrl: pd.DataFrame, table_name: str, instant_or_duration: Literal["instant", "duration"], ): """Wrapper around filter_for_freshest_data_xbrl. Most of the specific stuff here is in just converting the core table name into the raw instant or duration XBRL table name. """ table_name_raw_xbrl = ( f"{TABLE_NAME_MAP_FERC714[table_name]["xbrl"]}_{instant_or_duration}" ) xbrl = filter_for_freshest_data_xbrl( raw_xbrl, get_primary_key_raw_xbrl(table_name_raw_xbrl, "ferc714"), ) return xbrl
[docs] def _fillna_respondent_id_ferc714_source( df: pd.DataFrame, source: Literal["csv", "xbrl"] ) -> pd.DataFrame: """Fill missing CSV or XBRL respondent id. The source (CSV or XBRL) tables get assigned a PUDL-derived ``respondent_id_ferc714`` ID column (via :func:`_assign_respondent_id_ferc714`). After we concatenate the source tables, we sometimes backfill and forward-fill the source IDs (``respondent_id_ferc714_csv`` and ``respondent_id_ferc714_xbrl``). This way the older records from the CSV years will also have the XBRL ID's and vice versa. This will enable users to find the full timeseries of a respondent that given either source ID (instead of using the source ID to find the PUDL-derived ID and then finding the records). """ respondent_map_ferc714 = pd.read_csv( importlib.resources.files("pudl.package_data.glue") / "respondent_id_ferc714.csv" ).convert_dtypes() # use the source utility ID column to get a unique map and for merging resp_id_col = f"respondent_id_ferc714_{source}" resp_map_series = respondent_map_ferc714.dropna(subset=[resp_id_col]).set_index( "respondent_id_ferc714" )[resp_id_col] df[resp_id_col] = df[resp_id_col].fillna( df["respondent_id_ferc714"].map(resp_map_series) ) return df
[docs] def assign_report_day(df: pd.DataFrame, date_col: str) -> pd.DataFrame: """Add a report_day column.""" return df.assign( report_day=pd.to_datetime(df[date_col], format="%Y-%m-%d", exact=False) )
[docs] class RespondentId: """Class for building the :ref:`core_ferc714__respondent_id` asset. Most of the methods in this class as staticmethods. The purpose of using a class in this instance is mostly for organizing the table specific transforms under the same name-space. """ @classmethod
[docs] def run( cls, raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame ) -> pd.DataFrame: """Build the table for the :ref:`core_ferc714__respondent_id` asset. Process and combine the CSV and XBRL based data. There are two main threads of transforms happening here: * Table compatibility: The CSV raw table is static (does not even report years) while the xbrl table is reported annually. A lot of the downstream analysis expects this table to be static. So the first step was to check whether or not the columns that we have in the CSV years had consistent data over the few XBRL years that we have. There are a small number of eia_code's we needed to clean up, but besides that it was static. We then convert the XBRL data into a static table, then we concat-ed the tables and checked the static-ness again via :meth:`ensure_eia_code_uniqueness`. * eia_code cleaning: Clean up FERC-714 respondent names and manually assign EIA utility IDs to a few FERC Form 714 respondents that report planning area demand, but which don't have their corresponding EIA utility IDs provided by FERC for some reason (including PacifiCorp). Done all via :meth:`spot_fix_eia_codes` & EIA_CODE_FIXES. """ table_name = "core_ferc714__respondent_id" # CSV STUFF csv = ( _pre_process_csv(raw_csv, table_name) .pipe(_assign_respondent_id_ferc714, source="csv") .astype({"eia_code": pd.Int64Dtype()}) .pipe(cls.spot_fix_eia_codes, "csv") .pipe(cls.ensure_eia_code_uniqueness, "csv") .assign(source="csv") ) # XBRL STUFF xbrl = ( _filter_for_freshest_data_xbrl(raw_xbrl_duration, table_name, "duration") .pipe( rename_columns, params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), ) .pipe(_assign_respondent_id_ferc714, source="xbrl") .pipe(cls.clean_eia_codes_xbrl) .astype({"eia_code": pd.Int64Dtype()}) .pipe(cls.spot_fix_eia_codes, "xbrl") .pipe(cls.ensure_eia_code_uniqueness, "xbrl") .pipe(cls.convert_into_static_table_xbrl) .assign(source="xbrl") ) # CONCATED STUFF df = ( pd.concat([csv, xbrl]) .reset_index(drop=True) .convert_dtypes() .pipe(cls.spot_fix_eia_codes, "combined") .pipe(cls.ensure_eia_code_uniqueness, "combined") .pipe(cls.condense_into_one_source_table) .pipe(_fillna_respondent_id_ferc714_source, "csv") # the xbrl version of this is fillna is not *strictly necessary* # bc we are sorting the records grab the xbrl record if there is one # for each respondent during condense_into_one_source_table. .pipe(_fillna_respondent_id_ferc714_source, "xbrl") ) return df
@staticmethod
[docs] def spot_fix_eia_codes( df: pd.DataFrame, source: Literal["csv", "xbrl", "combined"] ) -> pd.DataFrame: """Spot fix the eia_codes. Using the manually compiled fixes to the ``eia_code`` column stored in :py:const:`EIA_CODE_FIXES`, replace the reported values by respondent. """ df.loc[df.eia_code == 0, "eia_code"] = pd.NA suffix = "" if source == "combined" else f"_{source}" # There are a few utilities that seem mappable, but missing: for rid, new_code in EIA_CODE_FIXES[source].items(): df.loc[df[f"respondent_id_ferc714{suffix}"] == rid, "eia_code"] = new_code return df
@staticmethod
[docs] def ensure_eia_code_uniqueness( df: pd.DataFrame, source: Literal["csv", "xbrl", "combined"] ) -> pd.DataFrame: """Ensure there is only one unique eia_code for each respondent.""" df["eia_code_count"] = ( df.dropna(subset=["eia_code"]) .groupby(["respondent_id_ferc714"])[["eia_code"]] .transform("nunique") ) if not ( multiple_eia_codes := df[(df.eia_code_count != 1) & (df.eia_code.notnull())] ).empty: raise AssertionError( "We expected 0 respondents with multiple different eia_code's " f"reported for each respondent in {source} data, " f"but we found {len(multiple_eia_codes)}" ) return df.drop(columns=["eia_code_count"])
@staticmethod
[docs] def clean_eia_codes_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: """Make eia_code's cleaner coming from the XBRL data. Desired outcomes here include all respondents have only one non-null eia_code and all eia_codes that are actually the respondent_id_ferc714_xbrl are nulled. """ # we expect all of these submissions to be from the last Q assert all(xbrl.report_period == "Q4") # first we are gonna null out all of the "EIA" codes that are really just the respondent id code_is_respondent_id_mask = xbrl.eia_code.str.startswith("C") & ( xbrl.respondent_id_ferc714_xbrl == xbrl.eia_code ) xbrl.loc[code_is_respondent_id_mask, "eia_code"] = pd.NA # lets null out some of the eia_code's from XBRL that we've manually culled # because they are were determined to be wrong. These respondents # had more than one value for their eia_code and one was always wrong respondent_id_xbrl_to_bad_eia_code = { "C002422": ["5776"], "C011374": ["8376"], "C002869": ["F720204"], "C002732": ["F720204", "57049, 57050"], "C011420": ["16606"], } for rid_xbrl, bad_eia_codes in respondent_id_xbrl_to_bad_eia_code.items(): xbrl.loc[ (xbrl.respondent_id_ferc714_xbrl == rid_xbrl) & (xbrl.eia_code.isin(bad_eia_codes)), "eia_code", ] = pd.NA return xbrl
@staticmethod
[docs] def convert_into_static_table_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: """Convert this annually reported table into a skinnier, static table. The CSV table is entirely static - it doesn't have any reported changes that vary over time. The XBRL table does have start and end dates in it. In order to merge these two sources, we are checking whether or not the shared variables change over time and then converting this table into a non-time-varying table. """ # the CSV data does not vary by year, so we need to check if that is # also going to be the case for the XBRL data. we check the eia_codes # during ensure_eia_code_uniqueness. The name is less crucial but we # should still check. assert all( xbrl.groupby(["respondent_id_ferc714_xbrl"])[ # noqa: PD101 ["respondent_name_ferc714"] ].nunique() == 1 ) cols_to_keep = [ "respondent_id_ferc714", "respondent_id_ferc714_xbrl", "respondent_name_ferc714", "eia_code", ] # we are going to first sort by report year (descending) so the more recent # name is the name we get - just in case - we are checking for consistency of # the name above. return ( xbrl.sort_values(["report_year"], ascending=False)[cols_to_keep] .sort_values(["respondent_id_ferc714", "eia_code"]) .drop_duplicates(subset=["respondent_id_ferc714"], keep="first") )
@staticmethod
[docs] def condense_into_one_source_table(df): """Condense the CSV and XBRL records together into one record. We have two records coming from each of the two sources in this table. This method simply drops duplicates based on the PKs of the table. We know that the names are different in the CSV vs the XBRL source. We are going to grab the XBRL names because they are more recent. NOTE: We could have merged the data in :meth:`run` instead of concatenating along the index. We would have had to develop different methods for :meth:`ensure_eia_code_uniqueness`. """ return df.sort_values(["source"], ascending=False).drop_duplicates( subset=["respondent_id_ferc714", "eia_code"], keep="first" )
@asset( io_manager_key="pudl_io_manager", ins={ "raw_csv": AssetIn(key="raw_ferc714_csv__respondent_id"), "raw_xbrl_duration": AssetIn( key="raw_ferc714_xbrl__identification_and_certification_01_1_duration" ), }, compute_kind="pandas", )
[docs] def core_ferc714__respondent_id( raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame ) -> pd.DataFrame: """Transform the FERC 714 respondent IDs, names, and EIA utility IDs. This is a light wrapper around :class:`RespondentId` because you need to build an asset from a function - not a staticmethod of a class. Args: raw_csv: Raw table describing the FERC 714 Respondents from the CSV years. raw_xbrl_duration: Raw table describing the FERC 714 Respondents from the XBRL years. Returns: A clean(er) version of the FERC-714 respondents table. """ return RespondentId.run(raw_csv, raw_xbrl_duration)
[docs] class HourlyPlanningAreaDemand: """Class for building the :ref:`out_ferc714__hourly_planning_area_demand` asset. The :ref:`out_ferc714__hourly_planning_area_demand` table is an hourly time series of demand by Planning Area. Most of the methods in this class as staticmethods. The purpose of using a class in this instance is mostly for organizing the table specific transforms under the same name-space. """ @classmethod
[docs] def run( cls, raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame, raw_xbrl_instant: pd.DataFrame, ) -> pd.DataFrame: """Build the :ref:`out_ferc714__hourly_planning_area_demand` asset. To transform this table we have to process the instant and duration xbrl tables so we can merge them together and process the XBRL data. We also have to process the CSV data so we can concatenate it with the XBLR data. Then we can process all of the data together. For both the CSV and XBRL data, the main transforms that are happening have to do with cleaning the timestamps in the data, resulting in timestamps that are in a datetime format and are nearly continuous for every respondent. Once the CSV and XBRL data is merged together, the transforms are mostly focused on cleaning the timezone codes reported to FERC and then using those timezone codes to convert all of timestamps into UTC datetime. The outcome here is nearly continuous and non-duplicative time series. """ table_name = "out_ferc714__hourly_planning_area_demand" # XBRL STUFF duration_xbrl = _filter_for_freshest_data_xbrl( raw_xbrl_duration, table_name, "duration" ).pipe(cls.remove_yearly_records_duration_xbrl) instant_xbrl = _filter_for_freshest_data_xbrl( raw_xbrl_instant, table_name, "instant" ) xbrl = ( cls.merge_instant_and_duration_tables_xbrl( instant_xbrl, duration_xbrl, table_name=table_name ) .pipe( rename_columns, params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), ) .pipe(_assign_respondent_id_ferc714, "xbrl") .pipe(cls.convert_dates_to_zero_offset_hours_xbrl) .astype({"report_date": "datetime64[ns]"}) .pipe(cls.convert_dates_to_zero_seconds_xbrl) .pipe(cls.spot_fix_records_xbrl) .pipe(cls.ensure_dates_are_continuous, source="xbrl") ) # CSV STUFF csv = ( _pre_process_csv(raw_csv, table_name=table_name) .pipe(_assign_respondent_id_ferc714, "csv") .pipe(cls.melt_hourx_columns_csv) .pipe(cls.parse_date_strings_csv) .pipe(cls.ensure_dates_are_continuous, source="csv") ) # CONCATED STUFF df = ( pd.concat([csv, xbrl]) .reset_index(drop=True) .assign( utc_offset_code=lambda x: cls.standardize_offset_codes( x, TIMEZONE_OFFSET_CODE_FIXES ) ) .pipe(cls.clean_utc_code_offsets_and_set_timezone) .pipe(cls.drop_missing_utc_offset) .pipe(cls.construct_utc_datetime) .pipe(cls.ensure_non_duplicated_datetimes) .pipe(cls.spot_fix_values) # Convert report_date to first day of year .assign( report_date=lambda x: x.report_date.dt.to_period("Y").dt.to_timestamp() ) .pipe(_fillna_respondent_id_ferc714_source, "xbrl") .pipe(_fillna_respondent_id_ferc714_source, "csv") # sort so that the parquet files have all the repeating IDs are next # to each other for smoller storage .sort_values(by=["respondent_id_ferc714", "datetime_utc"]) ) return df
@staticmethod
[docs] def melt_hourx_columns_csv(df): """Melt hourX columns into hours. There are some instances of the CSVs with a 25th hour. We drop those entirely because almost all of them are unusable (0.0 or daily totals), and they shouldn't really exist at all based on FERC instructions. """ df = df.drop(columns="hour25") # Melt daily rows with 24 demands to hourly rows with single demand logger.info("Melting daily FERC 714 records into hourly records.") df = df.rename( columns=lambda x: int(re.sub(r"^hour", "", x)) - 1 if "hour" in x else x, ) df = df.melt( id_vars=[ "respondent_id_ferc714", "respondent_id_ferc714_csv", "report_year", "report_date", "utc_offset_code", ], value_vars=range(24), var_name="hour", value_name="demand_mwh", ) return df
@staticmethod
[docs] def parse_date_strings_csv(csv): """Convert report_date into pandas Datetime types. Make the report_date column from the daily string ``report_date`` and the integer ``hour`` column. """ # Parse date strings hour_timedeltas = {i: pd.to_timedelta(i, unit="h") for i in range(24)} # NOTE: Faster to ignore trailing 00:00:00 and use exact=False csv["report_date"] = pd.to_datetime( csv["report_date"], format="%m/%d/%Y", exact=False ) + csv["hour"].map(hour_timedeltas) return csv.drop(columns=["hour"])
@staticmethod
[docs] def remove_yearly_records_duration_xbrl(duration_xbrl): """Convert a table with mostly daily records with some annuals into fully daily. Almost all of the records have a start_date that == the end_date which I'm assuming means the record spans the duration of one day there are a small handful of records which seem to span a full year. """ duration_xbrl = duration_xbrl.astype( {"start_date": "datetime64[ns]", "end_date": "datetime64[ns]"} ) one_day_mask = duration_xbrl.start_date == duration_xbrl.end_date duration_xbrl_one_day = duration_xbrl[one_day_mask] duration_xbrl_one_year = duration_xbrl[~one_day_mask] # ensure there are really only a few of these multi-day records assert len(duration_xbrl_one_year) / len(duration_xbrl_one_day) < 0.0005 # ensure all of these records are one year records assert all( duration_xbrl_one_year.start_date + pd.DateOffset(years=1) - pd.DateOffset(days=1) == duration_xbrl_one_year.end_date ) # these one-year records all show up as one-day records. idx = ["entity_id", "report_year", "start_date"] assert all( duration_xbrl_one_year.merge( duration_xbrl_one_day, on=idx, how="left", indicator=True )._merge == "both" ) # all but two of them have the same timezone as the hourly data. # two of them have UTC instead of a local timezone reported in hourly data. # this leads me to think these are okay to just drop return duration_xbrl_one_day
@staticmethod
[docs] def merge_instant_and_duration_tables_xbrl( instant_xbrl: pd.DataFrame, duration_xbrl: pd.DataFrame, table_name: str ) -> pd.DataFrame: """Merge XBRL instant and duration tables, reshaping instant as needed. FERC714 XBRL instant period signifies that it is true as of the reported date, while a duration fact pertains to the specified time period. The ``date`` column for an instant fact corresponds to the ``end_date`` column of a duration fact. Args: instant_xbrl: table representing XBRL instant facts. raw_xbrl_duration: table representing XBRL duration facts. Returns: A unified table combining the XBRL duration and instant facts, if both types of facts were present. If either input dataframe is empty, the other dataframe is returned unchanged, except that several unused columns are dropped. If both input dataframes are empty, an empty dataframe is returned. """ drop_cols = ["filing_name", "index"] # Ignore errors in case not all drop_cols are present. instant = instant_xbrl.drop(columns=drop_cols, errors="ignore").pipe( assign_report_day, "date" ) duration = duration_xbrl.drop(columns=drop_cols, errors="ignore").pipe( assign_report_day, "start_date" ) merge_keys = ["entity_id", "report_year", "report_day", "sched_table_name"] # Merge instant into duration. out_df = pd.merge( instant, duration, how="left", on=merge_keys, validate="m:1", ).drop(columns=["report_day", "start_date", "end_date"]) return out_df
@staticmethod
[docs] def convert_dates_to_zero_offset_hours_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: """Convert all hours to: Hour (24-hour clock) as a zero-padded decimal number. The FERC 714 form includes columns for the hours of each day. Those columns are labeled with 1-24 to indicate the hours of the day. The XBRL filings themselves have time-like string associated with each of the facts. They include both a the year-month-day portion (formatted as %Y-%m-%d) as well as an hour-minute-second component (semi-formatted as T%H:%M:%S). Attempting to simply convert this timestamp information to a datetime using the format ``"%Y-%m-%dT%H:%M:%S"`` fails because about a third of the records include hour 24 - which is not an accepted hour in standard datetime formats. The respondents that report hour 24 do not report hour 00. We have done some spot checking of values reported to FERC and have determined that hour 24 seems to correspond with hour 00 (of the next day). We have not gotten complete confirmation from FERC staff that this is always the case, but it seems like a decent assumption. So, this step converts all of the hour 24 records to be hour 00 of the next day. """ bad_24_hour_mask = xbrl.report_date.str.contains("T24:") xbrl.loc[bad_24_hour_mask, "report_date"] = pd.to_datetime( xbrl[bad_24_hour_mask].report_date.str.replace("T24:", "T23:"), format="%Y-%m-%dT%H:%M:%S", ) + np.timedelta64(1, "h") return xbrl
@staticmethod
[docs] def convert_dates_to_zero_seconds_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: """Convert the last second of the day records to the first (0) second of the next day. There are a small amount of records which report the last "hour" of the day as last second of the day, as opposed to T24 cleaned in :meth:`convert_dates_to_zero_offset_hours_xbrl` or T00 which is standard for a datetime. This function finds these records and adds one second to them and then ensures all of the records has 0's for seconds. """ last_second_mask = xbrl.report_date.dt.second == 59 xbrl.loc[last_second_mask, "report_date"] = xbrl.loc[ last_second_mask, "report_date" ] + pd.Timedelta("1s") assert xbrl[xbrl.report_date.dt.second != 0].empty return xbrl
@staticmethod
[docs] def spot_fix_records_xbrl(xbrl: pd.DataFrame): """Spot fix some specific XBRL records.""" xbrl_years_mask = xbrl.report_date.dt.year >= min(Ferc714Settings().xbrl_years) if (len_csv_years := len(xbrl[~xbrl_years_mask])) > 25: raise AssertionError( "We expected less than 25 XBRL records that have timestamps " f"with years before the XBRL years, but we found {len_csv_years}" ) return xbrl[xbrl_years_mask]
@staticmethod
[docs] def ensure_dates_are_continuous(df: pd.DataFrame, source: Literal["csv", "xbrl"]): """Assert that almost all respondents have continuous timestamps. In the xbrl data, we found 41 gaps in the timeseries! They are almost entirely on the hour in which daylight savings times goes into effect. The csv data had 10 gaps. Pretty good all in all! """ df["gap"] = df[["respondent_id_ferc714", "report_date"]].sort_values( by=["respondent_id_ferc714", "report_date"] ).groupby("respondent_id_ferc714").diff() > pd.to_timedelta("1h") if len(gappy_dates := df[df.gap]) > (41 if source == "xbrl" else 10): raise AssertionError( "We expect there to be nearly no gaps in the time series." f"but we found these gaps:\n{gappy_dates}" ) return df.drop(columns=["gap"])
@staticmethod
[docs] def standardize_offset_codes(df: pd.DataFrame, offset_fixes) -> pd.Series: """Convert to standardized UTC offset abbreviations. This function ensures that all of the 3-4 letter abbreviations used to indicate a timestamp's localized offset from UTC are standardized, so that they can be used to make the timestamps timezone aware. The standard abbreviations we're using are: "HST": Hawaii Standard Time "AKST": Alaska Standard Time "AKDT": Alaska Daylight Time "PST": Pacific Standard Time "PDT": Pacific Daylight Time "MST": Mountain Standard Time "MDT": Mountain Daylight Time "CST": Central Standard Time "CDT": Central Daylight Time "EST": Eastern Standard Time "EDT": Eastern Daylight Time In some cases different respondents use the same non-standard abbreviations to indicate different offsets, and so the fixes are applied on a per-respondent basis, as defined by offset_fixes. Args: df: DataFrame containing a utc_offset_code column that needs to be standardized. offset_fixes: A dictionary with respondent_id_ferc714 values as the keys, and a dictionary mapping non-standard UTC offset codes to the standardized UTC offset codes as the value. Returns: Standardized UTC offset codes. """ logger.info("Standardizing UTC offset codes.") # Clean UTC offset codes df["utc_offset_code"] = df["utc_offset_code"].str.strip().str.upper() # We only need a couple of columns here: codes = df[["respondent_id_ferc714", "utc_offset_code"]].copy() # Set all blank "" missing UTC codes to np.nan codes["utc_offset_code"] = codes.utc_offset_code.mask( codes.utc_offset_code == "" ) # Apply specific fixes on a per-respondent basis: codes = codes.groupby("respondent_id_ferc714").transform( lambda x: x.replace(offset_fixes[x.name]) if x.name in offset_fixes else x ) return codes
@staticmethod
[docs] def clean_utc_code_offsets_and_set_timezone(df): """Clean UTC Codes and set timezone.""" # NOTE: Assumes constant timezone for entire year for fix in TIMEZONE_OFFSET_CODE_FIXES_BY_YEAR: mask = (df["report_year"] == fix["report_year"]) & ( df["respondent_id_ferc714"] == fix["respondent_id_ferc714"] ) df.loc[mask, "utc_offset_code"] = fix["utc_offset_code"] # Replace UTC offset codes with UTC offset and timezone df["utc_offset"] = df["utc_offset_code"].map(TIMEZONE_OFFSET_CODES) df["timezone"] = df["utc_offset_code"].map(TIMEZONE_CODES) return df
@staticmethod
[docs] def drop_missing_utc_offset(df): """Drop records with missing UTC offsets and zero demand.""" # Assert that all records missing UTC offset have zero demand missing_offset = df["utc_offset"].isna() bad_offset_and_demand = df.loc[missing_offset & (df.demand_mwh != 0)] # there are 12 of these bad guys just in the 2023 fast test. if len(bad_offset_and_demand) > 12: raise AssertionError( "We expect all but 12 of the records without a cleaned " "utc_offset to not have any demand data, but we found " f"{len(bad_offset_and_demand)} records.\nUncleaned Codes: " f"{bad_offset_and_demand.utc_offset_code.unique()}\n{bad_offset_and_demand}" ) # Drop these records & then drop the original offset code df = df.query("~@missing_offset").drop(columns="utc_offset_code") return df
@staticmethod
[docs] def construct_utc_datetime(df: pd.DataFrame) -> pd.DataFrame: """Construct datetime_utc column.""" # Construct UTC datetime logger.info("Converting local time + offset code to UTC + timezone.") df["datetime_utc"] = df["report_date"] - df["utc_offset"] df = df.drop(columns=["utc_offset"]) return df
@staticmethod
[docs] def ensure_non_duplicated_datetimes(df): """Report and drop duplicated UTC datetimes.""" # There should be less than 10 of these, # resulting from changes to a planning area's reporting timezone. duplicated = df.duplicated(["respondent_id_ferc714", "datetime_utc"]) if (num_dupes := np.count_nonzero(duplicated)) > 10: raise AssertionError( f"Found {num_dupes} duplicate UTC datetimes, but we expected 10 or less.\n{df[duplicated]}" ) df = df.query("~@duplicated") return df
@staticmethod
[docs] def spot_fix_values(df: pd.DataFrame) -> pd.DataFrame: """Spot fix values.""" # Flip the sign on sections of demand which were reported as negative mask = ( df["report_year"].isin([2006, 2007, 2008, 2009]) & (df["respondent_id_ferc714"] == 156) ) | ( df["report_year"].isin([2006, 2007, 2008, 2009, 2010]) & (df["respondent_id_ferc714"] == 289) ) df.loc[mask, "demand_mwh"] *= -1 return df
@asset( ins={ "raw_csv": AssetIn(key="raw_ferc714_csv__hourly_planning_area_demand"), "raw_xbrl_duration": AssetIn( key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2_duration" ), "raw_xbrl_instant": AssetIn( key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2_instant" ), }, io_manager_key="parquet_io_manager", op_tags={"memory-use": "high"}, compute_kind="pandas", )
[docs] def out_ferc714__hourly_planning_area_demand( raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame, raw_xbrl_instant: pd.DataFrame, ) -> pd.DataFrame: """Build the :ref:`out_ferc714__hourly_planning_area_demand`. This is a light wrapper around :class:`HourlyPlanningAreaDemand` because it seems you need to build an asset from a function - not a staticmethod of a class. """ return HourlyPlanningAreaDemand.run(raw_csv, raw_xbrl_duration, raw_xbrl_instant)
[docs] class YearlyPlanningAreaDemandForecast: """Class for building the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset. The :ref:`core_ferc714__yearly_planning_area_demand_forecast` table is an annual, forecasted time series of demand by Planning Area. Most of the methods in this class as staticmethods. The purpose of using a class in this instance is mostly for organizing the table specific transforms under the same name-space. """ @classmethod
[docs] def run( cls, raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame, ) -> pd.DataFrame: """Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset. To transform this table we have to process the CSV data and the XBRL duration data (this data has not instant table), merge together the XBRL and CSV data, and process the combined datasets. The main transforms include spot-fixing forecast years with :meth:`spot_fix_forecast_years_xbrl` and averaging out duplicate forecast values for duplicate primary key rows in the CSV table. """ table_name = "core_ferc714__yearly_planning_area_demand_forecast" # XBRL STUFF xbrl = ( _filter_for_freshest_data_xbrl(raw_xbrl_duration, table_name, "duration") .pipe( rename_columns, params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), ) .pipe(_assign_respondent_id_ferc714, "xbrl") .pipe(cls.spot_fix_forecast_years_xbrl) ) # CSV STUFF csv = ( _pre_process_csv(raw_csv, table_name=table_name) .pipe(_assign_respondent_id_ferc714, "csv") .pipe(cls.average_duplicate_pks_csv) ) # CONCATED STUFF df = pd.concat([csv, xbrl]).reset_index(drop=True) return df
@staticmethod
[docs] def spot_fix_forecast_years_xbrl(df): """Spot fix forecast year errors. This function fixes the following errors: - There's one record with an NA forecast_year value. This row also has no demand forcast values. Because forcast_year is a primary key we can't have any NA values. Because there are no substantive forcasts in this row, we can safely remove this row. - respondent_id_ferc714 number 107 reported their forecast_year as YY instead of YYYY values. - There's also at least one forecast year value reported as 3033 that should be 2033. This function also checks that the values for forecast year are within an expected range. """ df = df.astype({"forecast_year": "Int64"}) # Make sure there's only one NA forecast_year value and remove it if len(nulls := df[df["forecast_year"].isna()]) > 2: raise AssertionError( f"We expected one or 0 NA forecast year, but found:\n{nulls}" ) df = df[df["forecast_year"].notna()] # Convert YY to YYYY for respondent 107 (the culprit). # The earliest forecast year reported as YY is 22. Any numbers # lower than that would signify a transition into 2100. mask = (df["respondent_id_ferc714"] == 107) & (df["forecast_year"] > 21) df.loc[mask, "forecast_year"] = df["forecast_year"] + 2000 # Fix extraneus 3022 value from respondent 17 mask = ( (df["respondent_id_ferc714"] == 17) & (df["report_year"] == 2023) & (df["forecast_year"] == 3033) ) df.loc[mask, "forecast_year"] = 2033 # Make sure forecast_year values are expected assert ( df["forecast_year"].isin(range(2021, 2100)).all() ), "Forecast year values not in expected range" return df
@staticmethod
[docs] def average_duplicate_pks_csv(df): """Average forecast values for duplicate primary keys. The XBRL data had duplicate primary keys, but it was easy to parse them by keeping rows with the most recent publication_time value. The CSVs have no such distinguishing column, dispite having some duplicate primary keys. This function takes the average of the forecast values for rows with duplicate primary keys. There are only 6 respondent/report_year/ forecast year rows where the forecast values differ. One of those is a pair where one forecast value is 0. We'll take the non-zero value here and average out the rest. """ # Record original length of dataframe original_len = len(df) # Remove duplicate row with 0 forecast values error_mask = ( (df["respondent_id_ferc714"] == 100) & (df["report_year"] == 2013) & (df["forecast_year"] == 2014) & (df["net_demand_forecast_mwh"] == 0) ) if (len_dupes := len(df[error_mask])) > 1: raise AssertionError( f"We found {len_dupes} duplicate errors, but expected 1 or less:\n{df[error_mask]}" ) df = df[~error_mask] # Take the average of duplicate PK forecast values. dupe_mask = df[ ["respondent_id_ferc714", "report_year", "forecast_year"] ].duplicated(keep=False) deduped_df = ( df[dupe_mask] .groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[ [ "summer_peak_demand_forecast_mw", "winter_peak_demand_forecast_mw", "net_demand_forecast_mwh", ] ] .mean() .reset_index() ) df = pd.concat([df[~dupe_mask], deduped_df]) # Make sure no more rows were dropped than expected assert ( original_len - len(df) <= 20 ), f"dropped {original_len - len(df)} rows, expected 20" return df
@asset( ins={ "raw_csv": AssetIn(key="raw_ferc714_csv__yearly_planning_area_demand_forecast"), "raw_xbrl_duration": AssetIn( key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2_duration" ), }, io_manager_key="pudl_io_manager", compute_kind="pandas", )
[docs] def core_ferc714__yearly_planning_area_demand_forecast( raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame, ) -> pd.DataFrame: """Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast`. This is a light wrapper around :class:`YearlyPlanningAreaDemandForecast` because it seems you need to build an asset from a function - not a staticmethod of a class. """ return YearlyPlanningAreaDemandForecast.run(raw_csv, raw_xbrl_duration)
@dataclass
[docs] class Ferc714CheckSpec: """Define some simple checks that can run on FERC 714 assets."""
[docs] name: str
[docs] asset: str
[docs] num_rows_by_report_year: dict[int, int]
[docs] check_specs = [ Ferc714CheckSpec( name="yearly_planning_area_demand_forecast_check_spec", asset="core_ferc714__yearly_planning_area_demand_forecast", num_rows_by_report_year={ 2006: 1819, 2007: 1570, 2008: 1540, 2009: 1269, 2010: 1259, 2011: 1210, 2012: 1210, 2013: 1192, 2014: 1000, 2015: 990, 2016: 990, 2017: 980, 2018: 961, 2019: 950, 2020: 950, 2021: 905, 2022: 904, 2023: 904, }, ) ]
[docs] def make_check(spec: Ferc714CheckSpec) -> AssetChecksDefinition: """Turn the Ferc714CheckSpec into an actual Dagster asset check.""" @asset_check(asset=spec.asset, blocking=True) def _check(df): errors = [] for year, expected_rows in spec.num_rows_by_report_year.items(): if (num_rows := len(df.loc[df.report_year == year])) != expected_rows: errors.append( f"Expected {expected_rows} for report year {year}, found {num_rows}" ) logger.info(errors) if errors: return AssetCheckResult(passed=False, metadata={"errors": errors}) return AssetCheckResult(passed=True) return _check
[docs] _checks = [make_check(spec) for spec in check_specs]