Source code for pudl.transform.ferc1

"""
Routines for transforming FERC Form 1 data before loading into the PUDL DB.

This module provides a variety of functions that are used in cleaning up the
FERC Form 1 data prior to loading into our database. This includes adopting
standardized units and column names, standardizing the formatting of some
string values, and correcting data entry errors which we can infer based on
the existing data. It may also include removing bad data, or replacing it
with the appropriate NA values.
"""

import importlib.resources
import logging
import re
from difflib import SequenceMatcher

# NetworkX is used to knit incomplete ferc plant time series together.
import networkx as nx
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
# These modules are required for the FERC Form 1 Plant ID & Time Series
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, Normalizer, OneHotEncoder

import pudl
import pudl.constants as pc

logger = logging.getLogger(__name__)

##############################################################################
# FERC TRANSFORM HELPER FUNCTIONS ############################################
##############################################################################


def _clean_cols(df, table_name):
    """Adds a FERC record ID and drop FERC columns not to be loaded into PUDL.

    It is often useful to be able to tell exactly which record in the FERC Form
    1 database a given record within the PUDL database came from. Within each
    FERC Form 1 table, each record is uniquely identified by the combination
    of:
      - report_year
      - respondent_id
      - spplmnt_num
      - row_number

    So this function takes a dataframe, checks to make sure it contains each of
    those columns and that none of them are NULL, and adds a new column to the
    dataframe containing a string of the format:

    {table_name}_{report_year}_{respondent_id}_{spplmnt_num}_{row_number}

    In addition there are some columns which are not meaningful or useful in
    the context of PUDL, but which show up in virtually every FERC table, and
    this function drops them if they are present. These columns include:
     - spplmnt_num (which goes into the record ID)
     - row_number (which goes into the record_ ID)
     - row_prvlg
     - row_seq
     - item
     - report_prd
     - record_number (a temporary column used in plants_small)
     - *_f (all footnote columns)

    Args:
        df (pandas.DataFrame): The DataFrame in which the function looks for
            columns for the unique identification of FERC records, and ensures
            that those columns are not NULL.
        table_name (str): The name of the table that we are cleaning.

    Returns:
        pandas.DataFrame: The same DataFrame with a column appended containing
            a string of the format
            {table_name}_{report_year}_{respondent_id}_{spplmnt_num}_{row_number}

    Raises:
        AssertionError: If the table input contains NULL columns
    """
    # Make sure that *all* of these columns exist in the proffered table:
    for field in ['report_year', 'respondent_id', 'spplmnt_num', 'row_number']:
        if df[field].isnull().any():
            raise AssertionError(
                f"Null field {field} found in ferc1 table {table_name}."
            )

    # Create a unique inter-year FERC table record ID:
    df['record_id'] = (
        table_name + '_' +
        df.report_year.astype(str) + '_' +
        df.respondent_id.astype(str) + '_' +
        df.spplmnt_num.astype(str) + '_' +
        df.row_number.astype(str)
    )

    # Check to make sure that the generated record_id is unique... since that's
    # kind of the whole point. There are couple of genuine bad records here
    # that are taken care of in the transform step, so just print a warning:
    n_dupes = df.record_id.duplicated().values.sum()
    if n_dupes:
        dupe_ids = df.record_id[df.record_id.duplicated()].values
        logger.warning(
            f"{n_dupes} duplicate record_id values found "
            f"in pre-transform table {table_name}: {dupe_ids}."
        )

    unused_cols = [
        'spplmnt_num',
        'row_number',
        'row_prvlg',
        'row_seq',
        'report_prd',
        'item',
        'record_number'
    ]

    # Drop any _f columns... since we're not using the FERC Footnotes...
    footnote_cols = [col for col in df.columns if re.match('.*_f$', col)]
    # Drop these columns and don't complain about it if they don't exist:
    df = df.drop(unused_cols + footnote_cols, errors='ignore', axis=1)

    return df


def _multiplicative_error_correction(tofix, mask, minval, maxval, mults):
    """Corrects data entry errors where data being multiplied by a factor.

    In many cases we know that a particular column in the database should have
    a value in a particular rage (e.g. the heat content of a ton of coal is a
    well defined physical quantity -- it can be 15 mmBTU/ton or 22 mmBTU/ton,
    but it can't be 1 mmBTU/ton or 100 mmBTU/ton). Sometimes these fields
    are reported in the wrong units (e.g. kWh of electricity generated rather
    than MWh) resulting in several distributions that have a similar shape
    showing up at different ranges of value within the data.  This function
    takes a one dimensional data series, a description of a valid range for
    the values, and a list of factors by which we expect to see some of the
    data multiplied due to unit errors.  Data found in these "ghost"
    distributions are multiplied by the appropriate factor to bring them into
    the expected range.

    Data values which are not found in one of the acceptable multiplicative
    ranges are set to NA.

    Args:
        tofix (pandas.Series): A 1-dimensional data series containing the
            values to be fixed.
        mask (pandas.Series): A 1-dimensional masking array of True/False
            values, which will be used to select a subset of the tofix
            series onto which we will apply the multiplicative fixes.
        min (float): the minimum realistic value for the data series.
        max (float): the maximum realistic value for the data series.
        mults (list of floats): values by which "real" data may have been
            multiplied due to common data entry errors. These values both
            show us where to look in the full data series to find recoverable
            data, and also tell us by what factor those values need to be
            multiplied to bring them back into the reasonable range.

    Returns:
        fixed (pandas.Series): a data series of the same length as the
            input, but with the transformed values.
    """
    # Grab the subset of the input series we are going to work on:
    records_to_fix = tofix[mask]
    # Drop those records from our output series
    fixed = tofix.drop(records_to_fix.index)
    # Iterate over the multipliers, applying fixes to outlying populations
    for mult in mults:
        records_to_fix = records_to_fix.apply(lambda x: x * mult
                                              if x > minval / mult
                                              and x < maxval / mult
                                              else x)
    # Set any record that wasn't inside one of our identified populations to
    # NA -- we are saying that these are true outliers, which can't be part
    # of the population of values we are examining.
    records_to_fix = records_to_fix.apply(lambda x: np.nan
                                          if x < minval
                                          or x > maxval
                                          else x)
    # Add our fixed records back to the complete data series and return it
    fixed = fixed.append(records_to_fix)
    return fixed


##############################################################################
# DATABASE TABLE SPECIFIC PROCEDURES ##########################################
##############################################################################
[docs]def plants_steam(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 plant_steam data for loading into PUDL Database. This includes converting to our preferred units of MWh and MW, as well as standardizing the strings describing the kind of plant and construction. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: of transformed dataframes, including the newly transformed plants_steam_ferc1 dataframe. """ ferc1_steam_df = ( ferc1_raw_dfs['plants_steam_ferc1']. pipe(_plants_steam_clean). pipe(_plants_steam_assign_plant_ids, ferc1_transformed_dfs['fuel_ferc1']) ) plants_steam_validate_ids(ferc1_steam_df) ferc1_transformed_dfs['plants_steam_ferc1'] = ferc1_steam_df return ferc1_transformed_dfs
def _plants_steam_clean(ferc1_steam_df): # grab table from dictionary of dfs ferc1_steam_df = ( ferc1_steam_df. pipe(_clean_cols, 'f1_steam'). pipe(pudl.helpers.strip_lower, ['plant_name']). # Take the messy free-form construction_type and plant_kind fields, and # do our best to map them to some canonical categories... this is # necessarily imperfect: pipe(pudl.helpers.cleanstrings, ['type_const', 'plant_kind'], [pc.ferc1_const_type_strings, pc.ferc1_plant_kind_strings], unmapped='') ) # Force the construction and installation years to be numeric values, and # set them to NA if they can't be converted. (table has some junk values) ferc1_steam_df['yr_const'] = pd.to_numeric( ferc1_steam_df['yr_const'], errors='coerce') ferc1_steam_df['yr_installed'] = pd.to_numeric( ferc1_steam_df['yr_installed'], errors='coerce') # There are also a few zeroes... which are not valid years for us: ferc1_steam_df = ferc1_steam_df.replace( {"yr_const": 0, "yr_installed": 0}, np.nan) # Converting everything to per MW and MWh units... ferc1_steam_df['cost_per_mw'] = 1000 * ferc1_steam_df['cost_per_kw'] ferc1_steam_df.drop('cost_per_kw', axis=1, inplace=True) ferc1_steam_df['net_generation_mwh'] = ferc1_steam_df['net_generation'] / 1000 ferc1_steam_df.drop('net_generation', axis=1, inplace=True) ferc1_steam_df['expns_per_mwh'] = 1000 * ferc1_steam_df['expns_kwh'] ferc1_steam_df.drop('expns_kwh', axis=1, inplace=True) ferc1_steam_df.rename(columns={ # FERC 1 DB Name PUDL DB Name 'respondent_id': 'utility_id_ferc1', 'yr_const': 'construction_year', 'plant_kind': 'plant_type', 'type_const': 'construction_type', 'asset_retire_cost': 'asset_retirement_cost', 'yr_installed': 'installation_year', 'tot_capacity': 'capacity_mw', 'peak_demand': 'peak_demand_mw', 'plant_hours': 'plant_hours_connected_while_generating', 'plnt_capability': 'plant_capability_mw', 'when_limited': 'water_limited_capacity_mw', 'when_not_limited': 'not_water_limited_capacity_mw', 'avg_num_of_emp': 'avg_num_employees', 'net_generation': 'net_generation_mwh', 'cost_land': 'capex_land', 'cost_structure': 'capex_structures', 'cost_equipment': 'capex_equipment', 'cost_of_plant_to': 'capex_total', 'cost_per_mw': 'capex_per_mw', 'expns_operations': 'opex_operations', 'expns_fuel': 'opex_fuel', 'expns_coolants': 'opex_coolants', 'expns_steam': 'opex_steam', 'expns_steam_othr': 'opex_steam_other', 'expns_transfer': 'opex_transfer', 'expns_electric': 'opex_electric', 'expns_misc_power': 'opex_misc_power', 'expns_rents': 'opex_rents', 'expns_allowances': 'opex_allowances', 'expns_engnr': 'opex_engineering', 'expns_structures': 'opex_structures', 'expns_boiler': 'opex_boiler', 'expns_plants': 'opex_plants', 'expns_misc_steam': 'opex_misc_steam', 'tot_prdctn_expns': 'opex_production_total', 'expns_per_mwh': 'opex_per_mwh'}, inplace=True) return ferc1_steam_df def _plants_steam_assign_plant_ids(ferc1_steam_df, ferc1_fuel_df): """Assign IDs to the large steam plants.""" ########################################################################### # FERC PLANT ID ASSIGNMENT ########################################################################### # Now we need to assign IDs to the large steam plants, since FERC doesn't # do this for us. logger.info("Identifying distinct large FERC plants for ID assignment.") # scikit-learn still doesn't deal well with NA values (this will be fixed # eventually) We need to massage the type and missing data for the # Classifier to work. ferc1_steam_df = pudl.helpers.fix_int_na( ferc1_steam_df, columns=['construction_year']) # Grab fuel consumption proportions for use in assigning plant IDs: fuel_fractions = fuel_by_plant_ferc1(ferc1_fuel_df) ffc = list(fuel_fractions.filter(regex='.*_fraction_mmbtu$').columns) ferc1_steam_df = ( ferc1_steam_df.merge( fuel_fractions[ ['utility_id_ferc1', 'plant_name', 'report_year'] + ffc], on=['utility_id_ferc1', 'plant_name', 'report_year'], how='left' ) ) ferc1_steam_df[ffc] = ferc1_steam_df[ffc].fillna(value=0.0) # Train the classifier using DEFAULT weights, parameters not listed here. ferc_clf = pudl.transform.ferc1.make_ferc_clf(ferc1_steam_df) ferc_clf = ferc_clf.fit_transform(ferc1_steam_df) # Use the classifier to generate groupings of similar records: record_groups = ferc_clf.predict(ferc1_steam_df.record_id) n_tot = len(ferc1_steam_df) n_grp = len(record_groups) pct_grp = n_grp / n_tot logger.info( f"Successfully associated {n_grp} of {n_tot} ({pct_grp:.2%}) " f"FERC Form 1 plant records with multi-year plant entities.") record_groups.columns = record_groups.columns.astype(str) cols = record_groups.columns record_groups = record_groups.reset_index() # Now we are going to create a graph (network) that describes all of the # binary relationships between a seed_id and the record_ids that it has # been associated with in any other year. Each connected component of that # graph is a ferc plant time series / plant_id logger.info("Assigning IDs to multi-year FERC plant entities.") edges_df = pd.DataFrame(columns=['source', 'target']) for col in cols: new_edges = record_groups[['seed_id', col]] new_edges = new_edges.rename( {'seed_id': 'source', col: 'target'}, axis=1) edges_df = pd.concat([edges_df, new_edges], sort=True) # Drop any records where there's no target ID (no match in a year) edges_df = edges_df[edges_df.target != ''] # We still have to deal with the orphaned records -- any record which # wasn't place in a time series but is still valid should be included as # its own independent "plant" for completeness, and use in aggregate # analysis. orphan_record_ids = np.setdiff1d(ferc1_steam_df.record_id.unique(), record_groups.values.flatten()) logger.info( f"Identified {len(orphan_record_ids)} orphaned FERC plant records. " f"Adding orphans to list of plant entities.") orphan_df = pd.DataFrame({'source': orphan_record_ids, 'target': orphan_record_ids}) edges_df = pd.concat([edges_df, orphan_df], sort=True) # Use the data frame we've compiled to create a graph G = nx.from_pandas_edgelist(edges_df, # noqa: N806 source='source', target='target') # Find the connected components of the graph ferc_plants = (G.subgraph(c) for c in nx.connected_components(G)) # Now we'll iterate through the connected components and assign each of # them a FERC Plant ID, and pull the results back out into a dataframe: plants_w_ids = pd.DataFrame() for plant_id_ferc1, plant in enumerate(ferc_plants): nx.set_edge_attributes(plant, plant_id_ferc1 + 1, name='plant_id_ferc1') new_plant_df = nx.to_pandas_edgelist(plant) plants_w_ids = plants_w_ids.append(new_plant_df) logger.info( f"Successfully Identified {plant_id_ferc1+1-len(orphan_record_ids)} " f"multi-year plant entities.") # Set the construction year back to numeric because it is. ferc1_steam_df['construction_year'] = pd.to_numeric( ferc1_steam_df['construction_year'], errors='coerce') # We don't actually want to save the fuel fractions in this table... they # were only here to help us match up the plants. ferc1_steam_df = ferc1_steam_df.drop(ffc, axis=1) # Now we need a list of all the record IDs, with their associated # FERC 1 plant IDs. However, the source-target listing isn't # guaranteed to list every one of the nodes in either list, so we # need to compile them together to ensure that we get every single sources = ( plants_w_ids. drop('target', axis=1). drop_duplicates(). rename({'source': 'record_id'}, axis=1) ) targets = ( plants_w_ids. drop('source', axis=1). drop_duplicates(). rename({'target': 'record_id'}, axis=1) ) plants_w_ids = ( pd.concat([sources, targets]). drop_duplicates(). sort_values(['plant_id_ferc1', 'record_id']) ) steam_rids = ferc1_steam_df.record_id.values pwids_rids = plants_w_ids.record_id.values missing_ids = [rid for rid in steam_rids if rid not in pwids_rids] if missing_ids: raise AssertionError( f"Uh oh, we lost {abs(len(steam_rids)-len(pwids_rids))} FERC " f"steam plant record IDs: {missing_ids}" ) ferc1_steam_df = pd.merge(ferc1_steam_df, plants_w_ids, on='record_id') return ferc1_steam_df
[docs]def plants_steam_validate_ids(ferc1_steam_df): """Tests that plant_id_ferc1 times series includes one record per year. Args: ferc1_steam_df (pandas.DataFrame): A DataFrame of the data from the FERC 1 Steam table. Returns: None """ ########################################################################## # FERC PLANT ID ERROR CHECKING STUFF ########################################################################## # Test to make sure that we don't have any plant_id_ferc1 time series # which include more than one record from a given year. Warn the user # if we find such cases (which... we do, as of writing) year_dupes = ( ferc1_steam_df. groupby(['plant_id_ferc1', 'report_year'])['utility_id_ferc1']. count(). reset_index(). rename(columns={'utility_id_ferc1': 'year_dupes'}). query('year_dupes>1') ) if len(year_dupes) > 0: for dupe in year_dupes.itertuples(): logger.error( f"Found report_year={dupe.report_year} " f"{dupe.year_dupes} times in " f"plant_id_ferc1={dupe.plant_id_ferc1}" ) else: logger.info( f"No duplicate years found in any plant_id_ferc1. Hooray!" )
[docs]def fuel(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 fuel data for loading into PUDL Database. This process includes converting some columns to be in terms of our preferred units, like MWh and mmbtu instead of kWh and btu. Plant names are also standardized (stripped & Title Case). Fuel and fuel unit strings are also standardized using our cleanstrings() function and string cleaning dictionaries found in pudl.constants. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of transformed dataframes. """ # grab table from dictionary of dfs, clean it up a bit fuel_ferc1_df = ( _clean_cols(ferc1_raw_dfs['fuel_ferc1'], 'f1_fuel'). # Standardize plant_name capitalization and remove leading/trailing # white space -- necesary b/c plant_name is part of many foreign keys. pipe(pudl.helpers.strip_lower, ['plant_name']). # Take the messy free-form fuel & fuel_unit fields, and do our best to # map them to some canonical categories... this is necessarily # imperfect: pipe(pudl.helpers.cleanstrings, ['fuel', 'fuel_unit'], [pc.ferc1_fuel_strings, pc.ferc1_fuel_unit_strings], unmapped=''). # Fuel cost per kWh is a per-unit value that doesn't make sense to # report for a single fuel that may be only a small part of the fuel # consumed. "fuel generaton" is heat rate, but as it's based only on # the heat content of a given fuel which may only be a small portion of # the overall fuel # consumption, it doesn't make any sense here. Drop # it. drop(['fuel_cost_kwh', 'fuel_generaton'], axis=1). # Convert from BTU/unit of fuel to 1e6 BTU/unit. assign(fuel_avg_mmbtu_per_unit=lambda x: x.fuel_avg_heat / 1e6). drop('fuel_avg_heat', axis=1). # Rename the columns to match our DB definitions rename(columns={ # FERC 1 DB Name PUDL DB Name 'respondent_id': 'utility_id_ferc1', 'fuel': 'fuel_type_code_pudl', 'fuel_avg_mmbtu_per_unit': 'fuel_mmbtu_per_unit', 'fuel_quantity': 'fuel_qty_burned', 'fuel_cost_burned': 'fuel_cost_per_unit_burned', 'fuel_cost_delvd': 'fuel_cost_per_unit_delivered', 'fuel_cost_btu': 'fuel_cost_per_mmbtu'}) ) ######################################################################### # CORRECT DATA ENTRY ERRORS ############################################# ######################################################################### coal_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'coal' gas_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'gas' oil_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'oil' corrections = [ # mult = 2000: reported in units of lbs instead of short tons # mult = 1e6: reported BTUs instead of mmBTUs # minval and maxval of 10 and 29 mmBTUs are the range of values # specified by EIA 923 instructions at: # https://www.eia.gov/survey/form/eia_923/instructions.pdf ['fuel_mmbtu_per_unit', coal_mask, 10.0, 29.0, (2e3, 1e6)], # mult = 1e-2: reported cents/mmBTU instead of USD/mmBTU # minval and maxval of .5 and 7.5 dollars per mmBTUs are the # end points of the primary distribution of EIA 923 fuel receipts # and cost per mmBTU data weighted by quantity delivered ['fuel_cost_per_mmbtu', coal_mask, 0.5, 7.5, (1e-2, )], # mult = 1e3: reported fuel quantity in cubic feet, not mcf # mult = 1e6: reported fuel quantity in BTU, not mmBTU # minval and maxval of .8 and 1.2 mmBTUs are the range of values # specified by EIA 923 instructions ['fuel_mmbtu_per_unit', gas_mask, 0.8, 1.2, (1e3, 1e6)], # mult = 1e-2: reported in cents/mmBTU instead of USD/mmBTU # minval and maxval of 1 and 35 dollars per mmBTUs are the # end points of the primary distribution of EIA 923 fuel receipts # and cost per mmBTU data weighted by quantity delivered ['fuel_cost_per_mmbtu', gas_mask, 1, 35, (1e-2, )], # mult = 42: reported fuel quantity in gallons, not barrels # mult = 1e6: reported fuel quantity in BTU, not mmBTU # minval and maxval of 3 and 6.9 mmBTUs are the range of values # specified by EIA 923 instructions ['fuel_mmbtu_per_unit', oil_mask, 3, 6.9, (42, )], # mult = 1e-2: reported in cents/mmBTU instead of USD/mmBTU # minval and maxval of 5 and 33 dollars per mmBTUs are the # end points of the primary distribution of EIA 923 fuel receipts # and cost per mmBTU data weighted by quantity delivered ['fuel_cost_per_mmbtu', oil_mask, 5, 33, (1e-2, )] ] for (coltofix, mask, minval, maxval, mults) in corrections: fuel_ferc1_df[coltofix] = \ _multiplicative_error_correction(fuel_ferc1_df[coltofix], mask, minval, maxval, mults) ######################################################################### # REMOVE BAD DATA ####################################################### ######################################################################### # Drop any records that are missing data. This is a blunt instrument, to # be sure. In some cases we lose data here, because some utilities have # (for example) a "Total" line w/ only fuel_mmbtu_per_kwh on it. Grr. fuel_ferc1_df.dropna(inplace=True) ferc1_transformed_dfs['fuel_ferc1'] = fuel_ferc1_df return ferc1_transformed_dfs
[docs]def plants_small(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 plant_small data for loading into PUDL Database. This FERC Form 1 table contains information about a large number of small plants, including many small hydroelectric and other renewable generation facilities. Unfortunately the data is not well standardized, and so the plants have been categorized manually, with the results of that categorization stored in an Excel spreadsheet. This function reads in the plant type data from the spreadsheet and merges it with the rest of the information from the FERC DB based on record number, FERC respondent ID, and report year. When possible the FERC license number for small hydro plants is also manually extracted from the data. This categorization will need to be renewed with each additional year of FERC data we pull in. As of v0.1 the small plants have been categorized for 2004-2015. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of transformed dataframes. """ # grab table from dictionary of dfs ferc1_small_df = ferc1_raw_dfs['plants_small_ferc1'] # Standardize plant_name_raw capitalization and remove leading/trailing # white space -- necesary b/c plant_name_raw is part of many foreign keys. ferc1_small_df = pudl.helpers.strip_lower( ferc1_small_df, ['plant_name', 'kind_of_fuel'] ) # Force the construction and installation years to be numeric values, and # set them to NA if they can't be converted. (table has some junk values) ferc1_small_df['yr_constructed'] = pd.to_numeric( ferc1_small_df['yr_constructed'], errors='coerce') # Convert from cents per mmbtu to dollars per mmbtu to be consistent # with the f1_fuel table data. Also, let's use a clearer name. ferc1_small_df['fuel_cost_per_mmbtu'] = ferc1_small_df['fuel_cost'] / 100.0 ferc1_small_df.drop('fuel_cost', axis=1, inplace=True) # Create a single "record number" for the individual lines in the FERC # Form 1 that report different small plants, so that we can more easily # tell whether they are adjacent to each other in the reporting. ferc1_small_df['record_number'] = 46 * ferc1_small_df['spplmnt_num'] + \ ferc1_small_df['row_number'] # Unforunately the plant types were not able to be parsed automatically # in this table. It's been done manually for 2004-2015, and the results # get merged in in the following section. small_types_file = importlib.resources.open_binary( 'pudl.package_data.ferc.form1', 'small_plants_2004-2016.xlsx') small_types_df = pd.read_excel(small_types_file) # Only rows with plant_type set will give us novel information. small_types_df.dropna(subset=['plant_type', ], inplace=True) # We only need this small subset of the columns to extract the plant type. small_types_df = small_types_df[['report_year', 'respondent_id', 'record_number', 'plant_name_clean', 'plant_type', 'ferc_license']] # Munge the two dataframes together, keeping everything from the # frame we pulled out of the FERC1 DB, and supplementing it with the # plant_name, plant_type, and ferc_license fields from our hand # made file. ferc1_small_df = pd.merge(ferc1_small_df, small_types_df, how='left', on=['report_year', 'respondent_id', 'record_number']) # Remove extraneous columns and add a record ID ferc1_small_df = _clean_cols(ferc1_small_df, 'f1_gnrt_plant') # Standardize plant_name capitalization and remove leading/trailing white # space, so that plant_name matches formatting of plant_name_raw ferc1_small_df = pudl.helpers.strip_lower( ferc1_small_df, ['plant_name_clean']) # in order to create one complete column of plant names, we have to use the # cleaned plant names when available and the orignial plant names when the # cleaned version is not available, but the strings first need cleaning ferc1_small_df['plant_name_clean'] = \ ferc1_small_df['plant_name_clean'].fillna(value="") ferc1_small_df['plant_name_clean'] = ferc1_small_df.apply( lambda row: row['plant_name'] if (row['plant_name_clean'] == "") else row['plant_name_clean'], axis=1) # now we don't need the uncleaned version anymore # ferc1_small_df.drop(['plant_name'], axis=1, inplace=True) ferc1_small_df.rename(columns={ # FERC 1 DB Name PUDL DB Name 'respondent_id': 'utility_id_ferc1', 'plant_name': 'plant_name_original', 'plant_name_clean': 'plant_name', 'ferc_license': 'ferc_license_id', 'yr_constructed': 'construction_year', 'capacity_rating': 'capacity_mw', 'net_demand': 'peak_demand_mw', 'net_generation': 'net_generation_mwh', 'plant_cost': 'total_cost_of_plant', 'plant_cost_mw': 'capex_per_mw', 'operation': 'opex_total', 'expns_fuel': 'opex_fuel', 'expns_maint': 'opex_maintenance', 'kind_of_fuel': 'fuel_type', 'fuel_cost': 'fuel_cost_per_mmbtu'}, inplace=True) ferc1_transformed_dfs['plants_small_ferc1'] = ferc1_small_df return ferc1_transformed_dfs
[docs]def plants_hydro(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 plant_hydro data for loading into PUDL Database. Standardizes plant names (stripping whitespace and Using Title Case). Also converts into our preferred units of MW and MWh. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of transformed dataframes. """ # grab table from dictionary of dfs ferc1_hydro_df = ( _clean_cols(ferc1_raw_dfs['plants_hydro_ferc1'], 'f1_hydro'). # Standardize plant_name capitalization and remove leading/trailing # white space -- necesary b/c plant_name is part of many foreign keys. pipe(pudl.helpers.strip_lower, ['plant_name']). pipe(pudl.helpers.cleanstrings, ['plant_const'], [pc.ferc1_const_type_strings], unmapped=''). assign( # Converting kWh to MWh net_generation_mwh=lambda x: x.net_generation / 1000.0, # Converting cost per kW installed to cost per MW installed: cost_per_mw=lambda x: x.cost_per_kw * 1000.0, # Converting kWh to MWh expns_per_mwh=lambda x: x.expns_kwh * 1000.0, yr_const=lambda x: pd.to_numeric(x.yr_const, errors='coerce'), yr_installed=lambda x: pd.to_numeric( x.yr_installed, errors='coerce'), ). drop(columns=['net_generation', 'cost_per_kw', 'expns_kwh']). dropna(). rename(columns={ # FERC1 DB PUDL DB 'respondent_id': 'utility_id_ferc1', 'project_no': 'project_num', 'yr_const': 'construction_year', 'plant_kind': 'plant_type', 'plant_const': 'construction_type', 'yr_installed': 'installation_year', 'tot_capacity': 'capacity_mw', 'peak_demand': 'peak_demand_mw', 'plant_hours': 'plant_hours_connected_while_generating', 'favorable_cond': 'net_capacity_favorable_conditions_mw', 'adverse_cond': 'net_capacity_adverse_conditions_mw', 'avg_num_of_emp': 'avg_num_employees', 'cost_of_land': 'capex_land', 'cost_structure': 'capex_structures', 'cost_facilities': 'capex_facilities', 'cost_equipment': 'capex_equipment', 'cost_roads': 'capex_roads', 'cost_plant_total': 'capex_total', 'cost_per_mw': 'capex_per_mw', 'expns_operations': 'opex_operations', 'expns_water_pwr': 'opex_water_for_power', 'expns_hydraulic': 'opex_hydraulic', 'expns_electric': 'opex_electric', 'expns_generation': 'opex_generation_misc', 'expns_rents': 'opex_rents', 'expns_engineering': 'opex_engineering', 'expns_structures': 'opex_structures', 'expns_dams': 'opex_dams', 'expns_plant': 'opex_plant', 'expns_misc_plant': 'opex_misc_plant', 'expns_per_mwh': 'opex_per_mwh', 'expns_engnr': 'opex_engineering', 'expns_total': 'opex_total', 'asset_retire_cost': 'asset_retirement_cost', '': '', }) ) ferc1_transformed_dfs['plants_hydro_ferc1'] = ferc1_hydro_df return ferc1_transformed_dfs
[docs]def plants_pumped_storage(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 pumped storage data for loading into PUDL. Standardizes plant names (stripping whitespace and Using Title Case). Also converts into our preferred units of MW and MWh. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of transformed dataframes. """ # grab table from dictionary of dfs ferc1_pump_df = ( _clean_cols(ferc1_raw_dfs['plants_pumped_storage_ferc1'], 'f1_pumped_storage'). # Standardize plant_name capitalization and remove leading/trailing # white space -- necesary b/c plant_name is part of many foreign keys. pipe(pudl.helpers.strip_lower, ['plant_name']). # Clean up the messy plant construction type column: pipe(pudl.helpers.cleanstrings, ['plant_kind'], [pc.ferc1_const_type_strings], unmapped=''). assign( # Converting from kW/kWh to MW/MWh net_generation_mwh=lambda x: x.net_generation / 1000.0, energy_used_for_pumping_mwh=lambda x: x.energy_used / 1000.0, net_load_mwh=lambda x: x.net_load / 1000.0, cost_per_mw=lambda x: x.cost_per_kw * 1000.0, expns_per_mwh=lambda x: x.expns_kwh * 1000.0, yr_const=lambda x: pd.to_numeric(x.yr_const, errors='coerce'), yr_installed=lambda x: pd.to_numeric( x.yr_installed, errors='coerce'), ). drop(columns=['net_generation', 'energy_used', 'net_load', 'cost_per_kw', 'expns_kwh']). dropna(). rename(columns={ # FERC1 DB PUDL DB 'respondent_id': 'utility_id_ferc1', 'project_number': 'project_num', 'tot_capacity': 'capacity_mw', 'project_no': 'project_num', 'plant_kind': 'construction_type', 'peak_demand': 'peak_demand_mw', 'yr_const': 'construction_year', 'yr_installed': 'installation_year', 'plant_hours': 'plant_hours_connected_while_generating', 'plant_capability': 'plant_capability_mw', 'avg_num_of_emp': 'avg_num_employees', 'cost_wheels': 'capex_wheels_turbines_generators', 'cost_land': 'capex_land', 'cost_structures': 'capex_structures', 'cost_facilties': 'capex_facilities', 'cost_wheels_turbines_generators': 'capex_wheels_turbines_generators', 'cost_electric': 'capex_equipment_electric', 'cost_misc_eqpmnt': 'capex_equipment_misc', 'cost_roads': 'capex_roads', 'asset_retire_cost': 'asset_retirement_cost', 'cost_of_plant': 'capex_total', 'cost_per_mw': 'capex_per_mw', 'expns_operations': 'opex_operations', 'expns_water_pwr': 'opex_water_for_power', 'expns_pump_strg': 'opex_pumped_storage', 'expns_electric': 'opex_electric', 'expns_misc_power': 'opex_generation_misc', 'expns_rents': 'opex_rents', 'expns_engneering': 'opex_engineering', 'expns_structures': 'opex_structures', 'expns_dams': 'opex_dams', 'expns_plant': 'opex_plant', 'expns_misc_plnt': 'opex_misc_plant', 'expns_producton': 'opex_production_before_pumping', 'pumping_expenses': 'opex_pumping', 'tot_prdctn_exns': 'opex_total', 'expns_per_mwh': 'opex_per_mwh', }) ) ferc1_transformed_dfs['plants_pumped_storage_ferc1'] = ferc1_pump_df return ferc1_transformed_dfs
[docs]def plant_in_service(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 plant_in_service data for loading into PUDL. This information is organized by FERC account, with each line of the FERC Form 1 having a different FERC account id (most are numeric and correspond to FERC's Uniform Electric System of Accounts). As of PUDL v0.1, this data is only valid from 2007 onward, as the line numbers for several accounts are different in earlier years. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of the transformed DataFrames. """ # grab table from dictionary of dfs ferc1_pis_df = ferc1_raw_dfs['plant_in_service_ferc1'] # Now we need to add a column to the DataFrame that has the FERC account # IDs corresponding to the row_number that's already in there... ferc_accts_df = pc.ferc_electric_plant_accounts.drop( ['ferc_account_description'], axis=1) ferc_accts_df.dropna(inplace=True) ferc_accts_df['row_number'] = ferc_accts_df['row_number'].astype(int) ferc1_pis_df = pd.merge(ferc1_pis_df, ferc_accts_df, how='left', on='row_number') ferc1_pis_df = _clean_cols(ferc1_pis_df, 'f1_plant_in_srvce') ferc1_pis_df.rename(columns={ # FERC 1 DB Name PUDL DB Name 'respondent_id': 'utility_id_ferc1', 'begin_yr_bal': 'beginning_year_balance', 'addition': 'additions', 'yr_end_bal': 'year_end_balance'}, inplace=True) ferc1_transformed_dfs['plant_in_service_ferc1'] = ferc1_pis_df return ferc1_transformed_dfs
[docs]def purchased_power(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 pumped storage data for loading into PUDL. This table has data about inter-utility power purchases into the PUDL DB. This includes how much electricty was purchased, how much it cost, and who it was purchased from. Unfortunately the field describing which other utility the power was being bought from is poorly standardized, making it difficult to correlate with other data. It will need to be categorized by hand or with some fuzzy matching eventually. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of the transformed DataFrames. """ # grab table from dictionary of dfs df = (_clean_cols(ferc1_raw_dfs['purchased_power_ferc1'], 'f1_purchased_pwr') .replace({"": "NA"}, "") .replace(to_replace='', value=np.nan) .rename(columns={ 'respondent_id': 'utility_id_ferc1', 'athrty_co_name': 'seller_name', 'sttstcl_clssfctn': 'purchase_type', 'rtsched_trffnbr': 'tariff', 'avgmth_bill_dmnd': 'billing_demand_mw', 'avgmth_ncp_dmnd': 'non_coincident_peak_demand_mw', 'avgmth_cp_dmnd': 'coincident_peak_demand_mw', 'mwh_purchased': 'purchased_mwh', 'mwh_recv': 'received_mwh', 'mwh_delvd': 'delivered_mwh', 'dmnd_charges': 'demand_charges', 'erg_charges': 'energy_charges', 'othr_charges': 'other_charges', 'settlement_tot': 'total_settlement'}) .replace({ # Remove all non-numeric characters from these columns "billing_demand_mw": r"[^0-9\.]", "non_coincident_peak_demand_mw": r"[^0-9\.]", "coincident_peak_demand_mw": r"[^0-9\.]"}, '', regex=True) .replace({ # If all that's left is a period, set to NaN. "billing_demand_mw": ".", "non_coincident_peak_demand_mw": ".", "coincident_peak_demand_mw": "."}, np.nan) .replace({ # Replace empty fields with NaN "billing_demand_mw": "", "non_coincident_peak_demand_mw": "", "coincident_peak_demand_mw": ""}, np.nan) .astype({ # Whatever is left can be cast to a float. "billing_demand_mw": float, "non_coincident_peak_demand_mw": float, "coincident_peak_demand_mw": float}) .fillna({ # Replace blanks w/ 0.0 in data columns. "purchased_mwh": 0.0, "received_mwh": 0.0, "delivered_mwh": 0.0, "demand_charges": 0.0, "energy_charges": 0.0, "other_charges": 0.0, "total_settlement": 0.0})) # Replace any invalid purchase types with the empty string bad_rows = (~df.purchase_type.isin(pc.ferc1_power_purchase_type.keys())) df.loc[bad_rows, 'purchase_type'] = "" # Replace inscrutable two letter codes with descriptive codes: df['purchase_type'] = df.purchase_type.replace( pc.ferc1_power_purchase_type) # Drop records containing no useful data. df = df.drop(df.loc[((df.purchased_mwh == 0) & (df.received_mwh == 0) & (df.delivered_mwh == 0) & (df.demand_charges == 0) & (df.energy_charges == 0) & (df.other_charges == 0) & (df.total_settlement == 0)), :].index) ferc1_transformed_dfs['purchased_power_ferc1'] = df return ferc1_transformed_dfs
[docs]def accumulated_depreciation(ferc1_raw_dfs, ferc1_transformed_dfs): """Transforms FERC Form 1 depreciation data for loading into PUDL. This information is organized by FERC account, with each line of the FERC Form 1 having a different descriptive identifier like 'balance_end_of_year' or 'transmission'. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database. ferc1_transformed_dfs (dict): A dictionary of DataFrames to be transformed. Returns: dict: The dictionary of the transformed DataFrames. """ # grab table from dictionary of dfs ferc1_apd_df = ferc1_raw_dfs['accumulated_depreciation_ferc1'] ferc1_acct_apd = pc.ferc_accumulated_depreciation.drop( ['ferc_account_description'], axis=1) ferc1_acct_apd.dropna(inplace=True) ferc1_acct_apd['row_number'] = ferc1_acct_apd['row_number'].astype(int) ferc1_accumdepr_prvsn_df = pd.merge(ferc1_apd_df, ferc1_acct_apd, how='left', on='row_number') ferc1_accumdepr_prvsn_df = _clean_cols( ferc1_accumdepr_prvsn_df, 'f1_accumdepr_prvsn') ferc1_accumdepr_prvsn_df.rename(columns={ # FERC1 DB PUDL DB 'respondent_id': 'utility_id_ferc1', 'total_cde': 'total'}, inplace=True) ferc1_transformed_dfs['accumulated_depreciation_ferc1'] = ferc1_accumdepr_prvsn_df return ferc1_transformed_dfs
[docs]def transform(ferc1_raw_dfs, ferc1_tables=pc.pudl_tables['ferc1']): """Transforms FERC 1. Args: ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame objects corresponds to a table from the FERC Form 1 DBC database ferc1_tables (tuple): A tuple containing the set of tables which have been successfully integrated into PUDL Returns: dict: A dictionary of the transformed DataFrames. """ ferc1_transform_functions = { # fuel must come before steam b/c fuel proportions are used to aid in # plant # ID assignment. 'fuel_ferc1': fuel, 'plants_steam_ferc1': plants_steam, 'plants_small_ferc1': plants_small, 'plants_hydro_ferc1': plants_hydro, 'plants_pumped_storage_ferc1': plants_pumped_storage, 'plant_in_service_ferc1': plant_in_service, 'purchased_power_ferc1': purchased_power, 'accumulated_depreciation_ferc1': accumulated_depreciation } # create an empty ditctionary to fill up through the transform fuctions ferc1_transformed_dfs = {} # for each ferc table, for table in ferc1_transform_functions: if table in ferc1_tables: logger.info( f"Transforming raw FERC Form 1 dataframe for " f"loading into {table}") ferc1_transform_functions[table](ferc1_raw_dfs, ferc1_transformed_dfs) return ferc1_transformed_dfs
############################################################################### # Identifying FERC Plants ############################################################################### # Sadly FERC doesn't provide any kind of real IDs for the plants that report to # them -- all we have is their names (a freeform string) and the data that is # reported alongside them. This is often enough information to be able to # recognize which records ought to be associated with each other year to year # to create a continuous time series. However, we want to do that # programmatically, which means using some clustering / categorization tools # from scikit-learn
[docs]class FERCPlantClassifier(BaseEstimator, ClassifierMixin): """A classifier for identifying FERC plant time series in FERC Form 1 data. We want to be able to give the classifier a FERC plant record, and get back the group of records(or the ID of the group of records) that it ought to be part of. There are hundreds of different groups of records, and we can only know what they are by looking at the whole dataset ahead of time. This is the "fitting" step, in which the groups of records resulting from a particular set of model parameters(e.g. the weights that are attributes of the class) are generated. Once we have that set of record categories, we can test how well the classifier performs, by checking it against test / training data which we have already classified by hand. The test / training set is a list of lists of unique FERC plant record IDs(each record ID is the concatenation of: report year, respondent id, supplement number, and row number). It could also be stored as a dataframe where each column is associated with a year of data(some of which could be empty). Not sure what the best structure would be. If it's useful, we can assign each group a unique ID that is the time ordered concatenation of each of the constituent record IDs. Need to understand what the process for checking the classification of an input record looks like. To score a given classifier, we can look at what proportion of the records in the test dataset are assigned to the same group as in our manual classification of those records. There are much more complicated ways to do the scoring too... but for now let's just keep it as simple as possible. """ def __init__(self, min_sim=0.75, plants_df=None): """ Initialize the classifier. Args: min_sim : Number between 0.0 and 1.0, indicating the minimum value of cosine similarity that we are willing to accept as indicating two records are part of the same plant record time series. All entries in the pairwise similarity matrix below this value will be zeroed out. plants_df : The entire FERC Form 1 plant table as a dataframe. Needed in order to calculate the distance metrics between all of the records so we can group the plants in the fit() step, so we can check how well they are categorized later... Todo: Zane revisit plants_df """ self.min_sim = min_sim self.plants_df = plants_df self._years = self.plants_df.report_year.unique()
[docs] def fit(self, X, y=None): # noqa: N803 Canonical capital letter... """ Use weighted FERC plant features to group records into time series. The fit method takes the vectorized, normalized, weighted FERC plant features (X) as input, calculates the pairwise cosine similarity matrix between all records, and groups the records in their best time series. The similarity matrix and best time series are stored as data members in the object for later use in scoring & predicting. This isn't quite the way a fit method would normally work. Args: X (): a sparse matrix of size n_samples x n_features. y (): Returns: pandas.DataFrame: TODO: Zane revisit args and returns """ self._cossim_df = pd.DataFrame(cosine_similarity(X)) self._best_of = self._best_by_year() # Make the best match indices integers rather than floats w/ NA values. self._best_of[self._years] = self._best_of[self._years].fillna( -1).astype(int) return self
[docs] def transform(self, X, y=None): # noqa: N803 """Passthrough transform method -- just returns self.""" return self
[docs] def predict(self, X, y=None): # noqa: N803 """ Identify time series of similar records to input record_ids. Given a one-dimensional dataframe X, containing FERC record IDs, return a dataframe in which each row corresponds to one of the input record_id values (ordered as the input was ordered), with each column corresponding to one of the years worth of data. Values in the returned dataframe are the FERC record_ids of the record most similar to the input record within that year. Some of them may be null, if there was no sufficiently good match. Row index is the seed record IDs. Column index is years. TODO: * This method is hideously inefficient. It should be vectorized. * There's a line that throws a FutureWarning that needs to be fixed. """ try: getattr(self, "_cossim_df") except AttributeError: raise RuntimeError( "You must train classifer before predicting data!") out_df = pd.DataFrame(columns=self._years) out_idx = [] # For each record_id we've been given: for x in X: # Find the index associated with the record ID we are predicting # a grouping for: idx = self._best_of[self._best_of.record_id == x].index.values[0] # Mask the best_of dataframe, keeping only those entries where # the index of the chosen record_id appears -- this results in a # huge dataframe almost full of NaN values. w_m = self._best_of[self._years][self._best_of[self._years] == idx] # Grab the index values of the rows in the masked dataframe which # are NOT all NaN -- these are the indices of the *other* records # which found the record x to be one of their best matches. w_m = w_m.dropna(how='all').index.values # Now look up the indices of the records which were found to be # best matches to the record x. b_m = self._best_of.loc[idx, self._years].astype(int) # Here we require that there is no conflict between the two sets # of indices -- that every time a record shows up in a grouping, # that grouping is either the same, or a subset of the other # groupings that it appears in. When no sufficiently good match # is found the "index" in the _best_of array is set to -1, so # requiring that the b_m value be >=0 screens out those no-match # cases. This is okay -- we're just trying to require that the # groupings be internally self-consistent, not that they are # completely identical. Being flexible on this dramatically # increases the number of records that get assigned a plant ID. if np.array_equiv(w_m, b_m[b_m >= 0].values): # This line is causing a warning. In cases where there are # some years no sufficiently good match exists, and so b_m # doesn't contain an index. Instead, it has a -1 sentinel # value, which isn't a label for which a record exists, which # upsets .loc. Need to find some way around this... but for # now it does what we want. We could use .iloc instead, but # then the -1 sentinel value maps to the last entry in the # dataframe, which also isn't what we want. Blargh. new_grp = self._best_of.loc[b_m, 'record_id'] # reshape into row, rather than column, new_grp = new_grp.values.reshape(1, len(self._years)) # Stack the new list of record_ids on our output DataFrame: new_grp = pd.DataFrame(new_grp, columns=self._years) out_df = pd.concat([out_df, new_grp]) # Save the seed record_id for use in indexing the output: out_idx = out_idx + [self._best_of.loc[idx, 'record_id']] out_df['seed_id'] = out_idx out_df = out_df.set_index('seed_id') out_df = out_df.fillna('') return out_df
[docs] def score(self, X, y=None): # noqa: N803 """Scores a collection of FERC plant categorizations. For every record ID in X, predict its record group and calculate a metric of similarity between the prediction and the "ground truth" group that was passed in for that value of X. Args: X (pandas.DataFrame): an n_samples x 1 pandas dataframe of FERC Form 1 record IDs. y (pandas.DataFrame): a dataframe of "ground truth" FERC Form 1 record groups, corresponding to the list record IDs in X Returns: numpy.ndarray: The average of all the similarity metrics as the score. """ scores = [] for true_group in y: true_group = str.split(true_group, sep=',') true_group = [s for s in true_group if s != ''] predicted_groups = self.predict(pd.DataFrame(true_group)) for rec_id in true_group: sm = SequenceMatcher(None, true_group, predicted_groups.loc[rec_id]) scores = scores + [sm.ratio()] return np.mean(scores)
def _best_by_year(self): """Finds the best match for each plant record in each other year.""" # only keep similarity matrix entries above our minimum threshold: out_df = self.plants_df.copy() sim_df = self._cossim_df[self._cossim_df >= self.min_sim] # Add a column for each of the years, in which we will store indices # of the records which best match the record in question: for yr in self._years: newcol = yr out_df[newcol] = -1 # seed_yr is the year we are matching *from* -- we do the entire # matching process from each year, since it may not be symmetric: for seed_yr in self._years: seed_idx = self.plants_df.index[ self.plants_df.report_year == seed_yr] # match_yr is all the other years, in which we are finding the best # match for match_yr in self._years: best_of_yr = match_yr match_idx = self.plants_df.index[ self.plants_df.report_year == match_yr] # For each record specified by seed_idx, obtain the index of # the record within match_idx that that is the most similar. best_idx = sim_df.iloc[seed_idx, match_idx].idxmax(axis=1) out_df.iloc[seed_idx, out_df.columns.get_loc(best_of_yr)] = best_idx return out_df
[docs]def make_ferc_clf(plants_df, ngram_min=2, ngram_max=10, min_sim=0.75, plant_name_wt=2.0, plant_type_wt=2.0, construction_type_wt=1.0, capacity_mw_wt=1.0, construction_year_wt=1.0, utility_id_ferc1_wt=1.0, fuel_fraction_wt=1.0): """ Create a FERC Plant Classifier using several weighted features. Given a FERC steam plants dataframe plants_df, which also includes fuel consumption information, transform a selection of useful columns into features suitable for use in calculating inter-record cosine similarities. Individual features are weighted according to the keyword arguments. Features include: * plant_name (via TF-IDF, with ngram_min and ngram_max as parameters) * plant_type (OneHot encoded categorical feature) * construction_type (OneHot encoded categorical feature) * capacity_mw (MinMax scaled numerical feature) * construction year (OneHot encoded categorical feature) * utility_id_ferc1 (OneHot encoded categorical feature) * fuel_fraction_mmbtu (several MinMax scaled numerical columns, which are normalized and treated as a single feature.) This feature matrix is then used to instantiate a FERCPlantClassifier. The combination of the ColumnTransformer and FERCPlantClassifier are combined in a sklearn Pipeline, which is returned by the function. Arguments: ngram_min (int): the minimum lengths to consider in the vectorization of the plant_name feature. ngram_max (int): the maximum n-gram lengths to consider in the vectorization of the plant_name feature. min_sim (float): the minimum cosine similarity between two records that can be considered a "match" (a number between 0.0 and 1.0). plant_name_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. plant_type_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. construction_type_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. capacity_mw_wt (float):weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. construction_year_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. utility_id_ferc1_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. fuel_fraction_wt (float): weight used to determine the relative importance of each of the features in the feature matrix used to calculate the cosine similarity between records. Used to scale each individual feature before the vectors are normalized. Returns: sklearn.pipeline.Pipeline: an sklearn Pipeline that performs reprocessing and classification with a FERCPlantClassifier object. """ # Make a list of all the fuel fraction columns for use as one feature. fuel_cols = list(plants_df.filter(regex='.*_fraction_mmbtu$').columns) ferc_pipe = Pipeline([ ('preprocessor', ColumnTransformer( transformers=[ ('plant_name', TfidfVectorizer( analyzer='char', ngram_range=(ngram_min, ngram_max)), 'plant_name'), ('plant_type', OneHotEncoder( categories='auto'), ['plant_type']), ('construction_type', OneHotEncoder( categories='auto'), ['construction_type']), ('capacity_mw', MinMaxScaler(), ['capacity_mw']), ('construction_year', OneHotEncoder( categories='auto'), ['construction_year']), ('utility_id_ferc1', OneHotEncoder( categories='auto'), ['utility_id_ferc1']), ('fuel_fraction_mmbtu', Pipeline([ ('scaler', MinMaxScaler()), ('norm', Normalizer()) ]), fuel_cols), ], transformer_weights={ 'plant_name': plant_name_wt, 'plant_type': plant_type_wt, 'construction_type': construction_type_wt, 'capacity_mw': capacity_mw_wt, 'construction_year': construction_year_wt, 'utility_id_ferc1': utility_id_ferc1_wt, 'fuel_fraction_mmbtu': fuel_fraction_wt, }) ), ('classifier', pudl.transform.ferc1.FERCPlantClassifier( min_sim=min_sim, plants_df=plants_df)) ]) return ferc_pipe
[docs]def fuel_by_plant_ferc1(fuel_df, thresh=0.5): """Calculates useful FERC Form 1 fuel metrics on a per plant-year basis. Each record in the FERC Form 1 corresponds to a particular type of fuel. Many plants -- especially coal plants -- use more than one fuel, with gas and/or diesel serving as startup fuels. In order to be able to classify the type of plant based on relative proportions of fuel consumed or fuel costs it is useful to aggregate these per-fuel records into a single record for each plant. Fuel cost (in nominal dollars) and fuel heat content (in mmBTU) are calculated for each fuel based on the cost and heat content per unit, and the number of units consumed, and then summed by fuel type (there can be more than one record for a given type of fuel in each plant because we are simplifying the fuel categories). The per-fuel records are then pivoted to create one column per fuel type. The total is summed and stored separately, and the individual fuel costs & heat contents are divided by that total, to yield fuel proportions. Based on those proportions and a minimum threshold that's passed in, a "primary" fuel type is then assigned to the plant-year record and given a string label. Args: fuel_df (pandas.DataFrame): Pandas DataFrame resembling the post-transform result for the fuel_ferc1 table. thresh (float): A value between 0.5 and 1.0 indicating the minimum fraction of overall heat content that must have been provided by a fuel in a plant-year for it to be considered the "primary" fuel for the plant in that year. Default value: 0.5. Returns: pandas.DataFrame: A DataFrame with a single record for each plant-year, including the columns required to merge it with the plants_steam_ferc1 table/DataFrame (report_year, utility_id_ferc1, and plant_name) as well as totals for fuel mmbtu consumed in that plant-year, and the cost of fuel in that year, the proportions of heat content and fuel costs for each fuel in that year, and a column that labels the plant's primary fuel for that year. Raises: AssertionError: If the DataFrame input does not have the columns required to run the function. """ keep_cols = [ 'report_year', # key 'utility_id_ferc1', # key 'plant_name', # key 'fuel_type_code_pudl', # pivot 'fuel_qty_burned', # value 'fuel_mmbtu_per_unit', # value 'fuel_cost_per_unit_burned', # value ] # Ensure that the dataframe we've gotten has all the information we need: for col in keep_cols: if col not in fuel_df.columns: raise AssertionError( f"Required column {col} not found in input fuel_df." ) # Calculate per-fuel derived values and add them to the DataFrame df = ( # Really there should *not* be any duplicates here but... there's a # bug somewhere that introduces them into the fuel_ferc1 table. fuel_df[keep_cols].drop_duplicates(). # Calculate totals for each record based on per-unit values: assign(fuel_mmbtu=lambda x: x.fuel_qty_burned * x.fuel_mmbtu_per_unit). assign(fuel_cost=lambda x: x.fuel_qty_burned * x.fuel_cost_per_unit_burned). # Drop the ratios and heterogeneous fuel "units" drop(['fuel_mmbtu_per_unit', 'fuel_cost_per_unit_burned', 'fuel_qty_burned'], axis=1). # Group by the keys and fuel type, and sum: groupby(['utility_id_ferc1', 'plant_name', 'report_year', 'fuel_type_code_pudl']). agg(sum).reset_index(). # Set the index to the keys, and pivot to get per-fuel columns: set_index(['utility_id_ferc1', 'plant_name', 'report_year']). pivot(columns='fuel_type_code_pudl').fillna(0.0) ) # Calculate total heat content burned for each plant, and divide it out mmbtu_group = ( pd.merge( # Sum up all the fuel heat content, and divide the individual fuel # heat contents by it (they are all contained in single higher # level group of columns labeled fuel_mmbtu) df.loc[:, 'fuel_mmbtu'].div( df.loc[:, 'fuel_mmbtu'].sum(axis=1), axis='rows'), # Merge that same total into the dataframe separately as well. df.sum(level=0, axis=1).loc[:, 'fuel_mmbtu'], right_index=True, left_index=True). rename(columns=lambda x: re.sub(r'$', '_fraction_mmbtu', x)). rename(columns=lambda x: re.sub(r'_mmbtu_fraction_mmbtu$', '_mmbtu', x)) ) # Calculate total fuel cost for each plant, and divide it out cost_group = ( pd.merge( # Sum up all the fuel costs, and divide the individual fuel # costs by it (they are all contained in single higher # level group of columns labeled fuel_cost) df.loc[:, 'fuel_cost'].div( df.loc[:, 'fuel_cost'].sum(axis=1), axis='rows'), # Merge that same total into the dataframe separately as well. df.sum(level=0, axis=1).loc[:, 'fuel_cost'], right_index=True, left_index=True). rename(columns=lambda x: re.sub(r'$', '_fraction_cost', x)). rename(columns=lambda x: re.sub(r'_cost_fraction_cost$', '_cost', x)) ) # Re-unify the cost and heat content information: df = pd.merge(mmbtu_group, cost_group, left_index=True, right_index=True).reset_index() # Label each plant-year record by primary fuel: for fuel_str in pc.ferc1_fuel_strings.keys(): try: mmbtu_mask = df[f'{fuel_str}_fraction_mmbtu'] > thresh df.loc[mmbtu_mask, 'primary_fuel_by_mmbtu'] = fuel_str except KeyError: pass try: cost_mask = df[f'{fuel_str}_fraction_cost'] > thresh df.loc[cost_mask, 'primary_fuel_by_cost'] = fuel_str except KeyError: pass df[['primary_fuel_by_cost', 'primary_fuel_by_mmbtu']] = \ df[['primary_fuel_by_cost', 'primary_fuel_by_mmbtu']].fillna('') return df