"""
Routines for transforming FERC Form 1 data before loading into the PUDL DB.
This module provides a variety of functions that are used in cleaning up the
FERC Form 1 data prior to loading into our database. This includes adopting
standardized units and column names, standardizing the formatting of some
string values, and correcting data entry errors which we can infer based on
the existing data. It may also include removing bad data, or replacing it
with the appropriate NA values.
"""
import importlib.resources
import logging
import re
from difflib import SequenceMatcher
# NetworkX is used to knit incomplete ferc plant time series together.
import networkx as nx
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
# These modules are required for the FERC Form 1 Plant ID & Time Series
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, Normalizer, OneHotEncoder
import pudl
import pudl.constants as pc
logger = logging.getLogger(__name__)
##############################################################################
# FERC TRANSFORM HELPER FUNCTIONS ############################################
##############################################################################
[docs]def unpack_table(ferc1_df, table_name, data_cols, data_rows):
"""
Normalize a row-and-column based FERC Form 1 table.
Pulls the named database table from the FERC Form 1 DB and uses the
corresponding ferc1_row_map to unpack the row_number coded data.
Args:
ferc1_df (pandas.DataFrame): Raw FERC Form 1 DataFrame from the DB.
table_name (str): Original name of the FERC Form 1 DB table.
data_cols (list): List of strings corresponding to the original FERC
Form 1 database table column labels -- these are the columns of
data that we are extracting (it can be a subset of the columns
which are present in the original database).
data_rows (list): List of row_names to extract, as defined in the
FERC 1 row maps. Set to slice(None) if you want all rows.
Returns:
pandas.DataFrame
"""
# Read in the corresponding row map:
row_map = (
pd.read_csv(
importlib.resources.open_text(
"pudl.package_data.meta.ferc1_row_maps",
f"{table_name}.csv"),
index_col=0, comment="#")
.copy().transpose()
.rename_axis(index="year_index", columns=None)
)
row_map.index = row_map.index.astype(int)
# For each year, rename row numbers to variable names based on row_map.
rename_dict = {}
out_df = pd.DataFrame()
for year in row_map.index:
rename_dict = {v: k for k, v in dict(row_map.loc[year, :]).items()}
_ = rename_dict.pop(-1, None)
df = ferc1_df.loc[ferc1_df.report_year == year].copy()
df.loc[:, "row_name"] = (
df.loc[:, "row_number"]
.replace(rename_dict, value=None)
)
# The concatenate according to row_name
out_df = pd.concat([out_df, df], axis="index")
# Is this list of index columns universal? Or should they be an argument?
idx_cols = [
"respondent_id",
"report_year",
"report_prd",
"spplmnt_num",
"row_name"
]
logger.info(
f"{len(out_df[out_df.duplicated(idx_cols)])/len(out_df):.4%} "
f"of unpacked records were duplicates, and discarded."
)
# Index the dataframe based on the list of index_cols
# Unstack the dataframe based on variable names
out_df = (
out_df.loc[:, idx_cols + data_cols]
# These lost records should be minimal. If not, something's wrong.
.drop_duplicates(subset=idx_cols)
.set_index(idx_cols)
.unstack("row_name")
.loc[:, (slice(None), data_rows)]
)
return out_df
[docs]def cols_to_cats(df, cat_name, col_cats):
"""
Turn top-level MultiIndex columns into a categorial column.
In some cases FERC Form 1 data comes with many different types of related
values interleaved in the same table -- e.g. current year and previous year
income -- this can result in DataFrames that are hundreds of columns wide,
which is unwieldy. This function takes those top level MultiIndex labels
and turns them into categories in a single column, which can be used to
select a particular type of report.
Args:
df (pandas.DataFrame): the dataframe to be simplified.
cat_name (str): the label of the column to be created indicating what
MultiIndex label the values came from.
col_cats (dict): a dictionary with top level MultiIndex labels as keys,
and the category to which they should be mapped as values.
Returns:
pandas.DataFrame: A re-shaped/re-labeled dataframe with one fewer
levels of MultiIndex in the columns, and an additional column
containing the assigned labels.
"""
out_df = pd.DataFrame()
for col, cat in col_cats.items():
logger.info(f"Col: {col}, Cat: {cat}")
tmp_df = df.loc[:, col].copy().dropna(how='all')
tmp_df.loc[:, cat_name] = cat
out_df = pd.concat([out_df, tmp_df])
return out_df.reset_index()
def _clean_cols(df, table_name):
"""Adds a FERC record ID and drop FERC columns not to be loaded into PUDL.
It is often useful to be able to tell exactly which record in the FERC Form
1 database a given record within the PUDL database came from. Within each
FERC Form 1 table, each record is supposed to be uniquely identified by the
combination of: report_year, report_prd, respondent_id, spplmnt_num,
row_number.
So this function takes a dataframe, checks to make sure it contains each of
those columns and that none of them are NULL, and adds a new column to the
dataframe containing a string of the format:
{table_name}_{report_year}_{report_prd}_{respondent_id}_{spplmnt_num}_{row_number}
In some PUDL FERC Form 1 tables (e.g. plant_in_service_ferc1) a single row
is re-organized into several new records in order to normalize the data and
ensure it is stored in a "tidy" format. In such cases each of the resulting
PUDL records will have the same ``record_id``. Otherwise, the
``record_id`` is expected to be unique within each FERC Form 1 table.
However there are a handful of cases in which this uniqueness constraint is
violated due to data reporting issues in FERC Form 1.
In addition to those primary key columns, there are some columns which are
not meaningful or useful in the context of PUDL, but which show up in
virtually every FERC table, and this function drops them if they are
present. These columns include: row_prvlg, row_seq, item, record_number (a
temporary column used in plants_small) and all the footnote columns, which
end in "_f".
Args:
df (pandas.DataFrame): The DataFrame in which the function looks
for columns for the unique identification of FERC records, and
ensures that those columns are not NULL.
table_name (str): The name of the table that we are cleaning.
Returns:
pandas.DataFrame: The same DataFrame with a column appended
containing a string of the format {table_name}_{report_year}_{report_prd}_{respondent_id}_{spplmnt_num}_{row_number}
Raises:
AssertionError: If the table input contains NULL columns
"""
# Make sure that *all* of these columns exist in the proffered table:
for field in ['report_year', 'report_prd', 'respondent_id', 'spplmnt_num', 'row_number']:
if field in df.columns:
if df[field].isnull().any():
raise AssertionError(
f"Null field {field} found in ferc1 table {table_name}."
)
# Create a unique inter-year FERC table record ID:
df['record_id'] = (
table_name + '_' +
df.report_year.astype(str) + '_' +
df.report_prd.astype(str) + '_' +
df.respondent_id.astype(str) + '_' +
df.spplmnt_num.astype(str)
)
# Because of the way we are re-organizing columns and rows to create well
# normalized tables, there may or may not be a row number available.
if "row_number" in df.columns:
df["record_id"] = df["record_id"] + "_" + df.row_number.astype(str)
# Check to make sure that the generated record_id is unique... since
# that's kind of the whole point. There are couple of genuine bad
# records here that are taken care of in the transform step, so just
# print a warning.
n_dupes = df.record_id.duplicated().values.sum()
if n_dupes:
dupe_ids = df.record_id[df.record_id.duplicated()].values
logger.warning(
f"{n_dupes} duplicate record_id values found "
f"in pre-transform table {table_name}: {dupe_ids}."
)
# Drop any _f columns... since we're not using the FERC Footnotes...
# Drop columns and don't complain about it if they don't exist:
no_f = [c for c in df.columns if not re.match(".*_f$", c)]
df = (
df.loc[:, no_f]
.drop(['spplmnt_num', 'row_number', 'row_prvlg', 'row_seq',
'report_prd', 'item', 'record_number'],
errors='ignore', axis="columns")
.rename(columns={"respondent_id": "utility_id_ferc1"})
)
return df
def _multiplicative_error_correction(tofix, mask, minval, maxval, mults):
"""Corrects data entry errors where data being multiplied by a factor.
In many cases we know that a particular column in the database should have
a value in a particular rage (e.g. the heat content of a ton of coal is a
well defined physical quantity -- it can be 15 mmBTU/ton or 22 mmBTU/ton,
but it can't be 1 mmBTU/ton or 100 mmBTU/ton). Sometimes these fields
are reported in the wrong units (e.g. kWh of electricity generated rather
than MWh) resulting in several distributions that have a similar shape
showing up at different ranges of value within the data. This function
takes a one dimensional data series, a description of a valid range for
the values, and a list of factors by which we expect to see some of the
data multiplied due to unit errors. Data found in these "ghost"
distributions are multiplied by the appropriate factor to bring them into
the expected range.
Data values which are not found in one of the acceptable multiplicative
ranges are set to NA.
Args:
tofix (pandas.Series): A 1-dimensional data series containing the
values to be fixed.
mask (pandas.Series): A 1-dimensional masking array of True/False
values, which will be used to select a subset of the tofix
series onto which we will apply the multiplicative fixes.
min (float): the minimum realistic value for the data series.
max (float): the maximum realistic value for the data series.
mults (list of floats): values by which "real" data may have been
multiplied due to common data entry errors. These values both
show us where to look in the full data series to find recoverable
data, and also tell us by what factor those values need to be
multiplied to bring them back into the reasonable range.
Returns:
fixed (pandas.Series): a data series of the same length as the
input, but with the transformed values.
"""
# Grab the subset of the input series we are going to work on:
records_to_fix = tofix[mask]
# Drop those records from our output series
fixed = tofix.drop(records_to_fix.index)
# Iterate over the multipliers, applying fixes to outlying populations
for mult in mults:
records_to_fix = records_to_fix.apply(lambda x: x * mult
if x > minval / mult
and x < maxval / mult
else x)
# Set any record that wasn't inside one of our identified populations to
# NA -- we are saying that these are true outliers, which can't be part
# of the population of values we are examining.
records_to_fix = records_to_fix.apply(lambda x: np.nan
if x < minval
or x > maxval
else x)
# Add our fixed records back to the complete data series and return it
fixed = fixed.append(records_to_fix)
return fixed
##############################################################################
# DATABASE TABLE SPECIFIC PROCEDURES ##########################################
##############################################################################
[docs]def plants_steam(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 plant_steam data for loading into PUDL Database.
This includes converting to our preferred units of MWh and MW, as well as
standardizing the strings describing the kind of plant and construction.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: of transformed dataframes, including the newly transformed
plants_steam_ferc1 dataframe.
"""
ferc1_steam_df = (
ferc1_raw_dfs['plants_steam_ferc1'].
pipe(_plants_steam_clean).
pipe(_plants_steam_assign_plant_ids,
ferc1_transformed_dfs['fuel_ferc1'])
)
plants_steam_validate_ids(ferc1_steam_df)
ferc1_transformed_dfs['plants_steam_ferc1'] = ferc1_steam_df
return ferc1_transformed_dfs
def _plants_steam_clean(ferc1_steam_df):
ferc1_steam_df = (
ferc1_steam_df.rename(columns={
"plant_name": "plant_name_ferc1",
"yr_const": 'construction_year',
"plant_kind": 'plant_type',
"type_const": 'construction_type',
"asset_retire_cost": 'asset_retirement_cost',
"yr_installed": 'installation_year',
"tot_capacity": 'capacity_mw',
"peak_demand": 'peak_demand_mw',
"plant_hours": 'plant_hours_connected_while_generating',
"plnt_capability": 'plant_capability_mw',
"when_limited": 'water_limited_capacity_mw',
"when_not_limited": 'not_water_limited_capacity_mw',
"avg_num_of_emp": 'avg_num_employees',
"net_generation": 'net_generation_kwh',
"cost_land": 'capex_land',
"cost_structure": 'capex_structures',
"cost_equipment": 'capex_equipment',
"cost_of_plant_to": 'capex_total',
"cost_per_kw": 'capex_per_kw',
"expns_operations": 'opex_operations',
"expns_fuel": 'opex_fuel',
"expns_coolants": 'opex_coolants',
"expns_steam": 'opex_steam',
"expns_steam_othr": 'opex_steam_other',
"expns_transfer": 'opex_transfer',
"expns_electric": 'opex_electric',
"expns_misc_power": 'opex_misc_power',
"expns_rents": 'opex_rents',
"expns_allowances": 'opex_allowances',
"expns_engnr": 'opex_engineering',
"expns_structures": 'opex_structures',
"expns_boiler": 'opex_boiler',
"expns_plants": 'opex_plants',
"expns_misc_steam": 'opex_misc_steam',
"tot_prdctn_expns": 'opex_production_total',
"expns_kwh": 'opex_per_kwh'})
.pipe(_clean_cols, "f1_steam")
.pipe(pudl.helpers.strip_lower, ['plant_name_ferc1'])
.pipe(pudl.helpers.cleanstrings,
['construction_type', 'plant_type'],
[pc.ferc1_const_type_strings, pc.ferc1_plant_kind_strings],
unmapped='')
.pipe(pudl.helpers.oob_to_nan,
cols=["construction_year", "installation_year"],
lb=1850, ub=max(pc.working_years["ferc1"]) + 1)
.assign(
capex_per_mw=lambda x: 1000.0 * x.capex_per_kw,
opex_per_mwh=lambda x: 1000.0 * x.opex_per_kwh,
net_generation_mwh=lambda x: x.net_generation_kwh / 1000.0,
)
.drop(columns=["capex_per_kw", "opex_per_kwh", "net_generation_kwh"])
)
return ferc1_steam_df
def _plants_steam_assign_plant_ids(ferc1_steam_df, ferc1_fuel_df):
"""Assign IDs to the large steam plants."""
###########################################################################
# FERC PLANT ID ASSIGNMENT
###########################################################################
# Now we need to assign IDs to the large steam plants, since FERC doesn't
# do this for us.
logger.info("Identifying distinct large FERC plants for ID assignment.")
# scikit-learn still doesn't deal well with NA values (this will be fixed
# eventually) We need to massage the type and missing data for the
# Classifier to work.
ferc1_steam_df = pudl.helpers.fix_int_na(
ferc1_steam_df, columns=['construction_year'])
# Grab fuel consumption proportions for use in assigning plant IDs:
fuel_fractions = fuel_by_plant_ferc1(ferc1_fuel_df)
ffc = list(fuel_fractions.filter(regex='.*_fraction_mmbtu$').columns)
ferc1_steam_df = (
ferc1_steam_df.merge(
fuel_fractions[
['utility_id_ferc1', 'plant_name_ferc1', 'report_year'] + ffc],
on=['utility_id_ferc1', 'plant_name_ferc1', 'report_year'],
how='left'
)
)
ferc1_steam_df[ffc] = ferc1_steam_df[ffc].fillna(value=0.0)
# Train the classifier using DEFAULT weights, parameters not listed here.
ferc1_clf = pudl.transform.ferc1.make_ferc1_clf(ferc1_steam_df)
ferc1_clf = ferc1_clf.fit_transform(ferc1_steam_df)
# Use the classifier to generate groupings of similar records:
record_groups = ferc1_clf.predict(ferc1_steam_df.record_id)
n_tot = len(ferc1_steam_df)
n_grp = len(record_groups)
pct_grp = n_grp / n_tot
logger.info(
f"Successfully associated {n_grp} of {n_tot} ({pct_grp:.2%}) "
f"FERC Form 1 plant records with multi-year plant entities.")
record_groups.columns = record_groups.columns.astype(str)
cols = record_groups.columns
record_groups = record_groups.reset_index()
# Now we are going to create a graph (network) that describes all of the
# binary relationships between a seed_id and the record_ids that it has
# been associated with in any other year. Each connected component of that
# graph is a ferc plant time series / plant_id
logger.info("Assigning IDs to multi-year FERC plant entities.")
edges_df = pd.DataFrame(columns=['source', 'target'])
for col in cols:
new_edges = record_groups[['seed_id', col]]
new_edges = new_edges.rename(
{'seed_id': 'source', col: 'target'}, axis=1)
edges_df = pd.concat([edges_df, new_edges], sort=True)
# Drop any records where there's no target ID (no match in a year)
edges_df = edges_df[edges_df.target != '']
# We still have to deal with the orphaned records -- any record which
# wasn't place in a time series but is still valid should be included as
# its own independent "plant" for completeness, and use in aggregate
# analysis.
orphan_record_ids = np.setdiff1d(ferc1_steam_df.record_id.unique(),
record_groups.values.flatten())
logger.info(
f"Identified {len(orphan_record_ids)} orphaned FERC plant records. "
f"Adding orphans to list of plant entities.")
orphan_df = pd.DataFrame({'source': orphan_record_ids,
'target': orphan_record_ids})
edges_df = pd.concat([edges_df, orphan_df], sort=True)
# Use the data frame we've compiled to create a graph
G = nx.from_pandas_edgelist(edges_df, # noqa: N806
source='source',
target='target')
# Find the connected components of the graph
ferc1_plants = (G.subgraph(c) for c in nx.connected_components(G))
# Now we'll iterate through the connected components and assign each of
# them a FERC Plant ID, and pull the results back out into a dataframe:
plants_w_ids = pd.DataFrame()
for plant_id_ferc1, plant in enumerate(ferc1_plants):
nx.set_edge_attributes(plant,
plant_id_ferc1 + 1,
name='plant_id_ferc1')
new_plant_df = nx.to_pandas_edgelist(plant)
plants_w_ids = plants_w_ids.append(new_plant_df)
logger.info(
f"Successfully Identified {plant_id_ferc1+1-len(orphan_record_ids)} "
f"multi-year plant entities.")
# Set the construction year back to numeric because it is.
ferc1_steam_df['construction_year'] = pd.to_numeric(
ferc1_steam_df['construction_year'], errors='coerce')
# We don't actually want to save the fuel fractions in this table... they
# were only here to help us match up the plants.
ferc1_steam_df = ferc1_steam_df.drop(ffc, axis=1)
# Now we need a list of all the record IDs, with their associated
# FERC 1 plant IDs. However, the source-target listing isn't
# guaranteed to list every one of the nodes in either list, so we
# need to compile them together to ensure that we get every single
sources = (
plants_w_ids.
drop('target', axis=1).
drop_duplicates().
rename({'source': 'record_id'}, axis=1)
)
targets = (
plants_w_ids.
drop('source', axis=1).
drop_duplicates().
rename({'target': 'record_id'}, axis=1)
)
plants_w_ids = (
pd.concat([sources, targets]).
drop_duplicates().
sort_values(['plant_id_ferc1', 'record_id'])
)
steam_rids = ferc1_steam_df.record_id.values
pwids_rids = plants_w_ids.record_id.values
missing_ids = [rid for rid in steam_rids if rid not in pwids_rids]
if missing_ids:
raise AssertionError(
f"Uh oh, we lost {abs(len(steam_rids)-len(pwids_rids))} FERC "
f"steam plant record IDs: {missing_ids}"
)
ferc1_steam_df = pd.merge(ferc1_steam_df, plants_w_ids, on='record_id')
return ferc1_steam_df
[docs]def plants_steam_validate_ids(ferc1_steam_df):
"""Tests that plant_id_ferc1 times series includes one record per year.
Args:
ferc1_steam_df (pandas.DataFrame): A DataFrame of the data from the
FERC 1 Steam table.
Returns:
None
"""
##########################################################################
# FERC PLANT ID ERROR CHECKING STUFF
##########################################################################
# Test to make sure that we don't have any plant_id_ferc1 time series
# which include more than one record from a given year. Warn the user
# if we find such cases (which... we do, as of writing)
year_dupes = (
ferc1_steam_df.
groupby(['plant_id_ferc1', 'report_year'])['utility_id_ferc1'].
count().
reset_index().
rename(columns={'utility_id_ferc1': 'year_dupes'}).
query('year_dupes>1')
)
if len(year_dupes) > 0:
for dupe in year_dupes.itertuples():
logger.error(
f"Found report_year={dupe.report_year} "
f"{dupe.year_dupes} times in "
f"plant_id_ferc1={dupe.plant_id_ferc1}"
)
else:
logger.info(
f"No duplicate years found in any plant_id_ferc1. Hooray!"
)
[docs]def fuel(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 fuel data for loading into PUDL Database.
This process includes converting some columns to be in terms of our
preferred units, like MWh and mmbtu instead of kWh and btu. Plant names are
also standardized (stripped & Title Case). Fuel and fuel unit strings are
also standardized using our cleanstrings() function and string cleaning
dictionaries found in pudl.constants.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of transformed dataframes.
"""
# grab table from dictionary of dfs, clean it up a bit
fuel_ferc1_df = (
_clean_cols(ferc1_raw_dfs['fuel_ferc1'], 'f1_fuel').
# Standardize plant_name capitalization and remove leading/trailing
# white space -- necesary b/c plant_name is part of many foreign keys.
pipe(pudl.helpers.strip_lower, ['plant_name']).
# Take the messy free-form fuel & fuel_unit fields, and do our best to
# map them to some canonical categories... this is necessarily
# imperfect:
pipe(pudl.helpers.cleanstrings, ['fuel', 'fuel_unit'],
[pc.ferc1_fuel_strings, pc.ferc1_fuel_unit_strings],
unmapped='').
# Fuel cost per kWh is a per-unit value that doesn't make sense to
# report for a single fuel that may be only a small part of the fuel
# consumed. "fuel generaton" is heat rate, but as it's based only on
# the heat content of a given fuel which may only be a small portion of
# the overall fuel # consumption, it doesn't make any sense here. Drop
# it.
drop(['fuel_cost_kwh', 'fuel_generaton'], axis=1).
# Convert from BTU/unit of fuel to 1e6 BTU/unit.
assign(fuel_avg_mmbtu_per_unit=lambda x: x.fuel_avg_heat / 1e6).
drop('fuel_avg_heat', axis=1).
# Rename the columns to match our DB definitions
rename(columns={
# FERC 1 DB Name PUDL DB Name
"plant_name": "plant_name_ferc1",
'fuel': 'fuel_type_code_pudl',
'fuel_avg_mmbtu_per_unit': 'fuel_mmbtu_per_unit',
'fuel_quantity': 'fuel_qty_burned',
'fuel_cost_burned': 'fuel_cost_per_unit_burned',
'fuel_cost_delvd': 'fuel_cost_per_unit_delivered',
'fuel_cost_btu': 'fuel_cost_per_mmbtu'})
)
#########################################################################
# CORRECT DATA ENTRY ERRORS #############################################
#########################################################################
coal_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'coal'
gas_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'gas'
oil_mask = fuel_ferc1_df['fuel_type_code_pudl'] == 'oil'
corrections = [
# mult = 2000: reported in units of lbs instead of short tons
# mult = 1e6: reported BTUs instead of mmBTUs
# minval and maxval of 10 and 29 mmBTUs are the range of values
# specified by EIA 923 instructions at:
# https://www.eia.gov/survey/form/eia_923/instructions.pdf
['fuel_mmbtu_per_unit', coal_mask, 10.0, 29.0, (2e3, 1e6)],
# mult = 1e-2: reported cents/mmBTU instead of USD/mmBTU
# minval and maxval of .5 and 7.5 dollars per mmBTUs are the
# end points of the primary distribution of EIA 923 fuel receipts
# and cost per mmBTU data weighted by quantity delivered
['fuel_cost_per_mmbtu', coal_mask, 0.5, 7.5, (1e-2, )],
# mult = 1e3: reported fuel quantity in cubic feet, not mcf
# mult = 1e6: reported fuel quantity in BTU, not mmBTU
# minval and maxval of .8 and 1.2 mmBTUs are the range of values
# specified by EIA 923 instructions
['fuel_mmbtu_per_unit', gas_mask, 0.8, 1.2, (1e3, 1e6)],
# mult = 1e-2: reported in cents/mmBTU instead of USD/mmBTU
# minval and maxval of 1 and 35 dollars per mmBTUs are the
# end points of the primary distribution of EIA 923 fuel receipts
# and cost per mmBTU data weighted by quantity delivered
['fuel_cost_per_mmbtu', gas_mask, 1, 35, (1e-2, )],
# mult = 42: reported fuel quantity in gallons, not barrels
# mult = 1e6: reported fuel quantity in BTU, not mmBTU
# minval and maxval of 3 and 6.9 mmBTUs are the range of values
# specified by EIA 923 instructions
['fuel_mmbtu_per_unit', oil_mask, 3, 6.9, (42, )],
# mult = 1e-2: reported in cents/mmBTU instead of USD/mmBTU
# minval and maxval of 5 and 33 dollars per mmBTUs are the
# end points of the primary distribution of EIA 923 fuel receipts
# and cost per mmBTU data weighted by quantity delivered
['fuel_cost_per_mmbtu', oil_mask, 5, 33, (1e-2, )]
]
for (coltofix, mask, minval, maxval, mults) in corrections:
fuel_ferc1_df[coltofix] = \
_multiplicative_error_correction(fuel_ferc1_df[coltofix],
mask, minval, maxval, mults)
#########################################################################
# REMOVE BAD DATA #######################################################
#########################################################################
# Drop any records that are missing data. This is a blunt instrument, to
# be sure. In some cases we lose data here, because some utilities have
# (for example) a "Total" line w/ only fuel_mmbtu_per_kwh on it. Grr.
fuel_ferc1_df.dropna(inplace=True)
ferc1_transformed_dfs['fuel_ferc1'] = fuel_ferc1_df
return ferc1_transformed_dfs
[docs]def plants_small(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 plant_small data for loading into PUDL Database.
This FERC Form 1 table contains information about a large number of small
plants, including many small hydroelectric and other renewable generation
facilities. Unfortunately the data is not well standardized, and so the
plants have been categorized manually, with the results of that
categorization stored in an Excel spreadsheet. This function reads in the
plant type data from the spreadsheet and merges it with the rest of the
information from the FERC DB based on record number, FERC respondent ID,
and report year. When possible the FERC license number for small hydro
plants is also manually extracted from the data.
This categorization will need to be renewed with each additional year of
FERC data we pull in. As of v0.1 the small plants have been categorized
for 2004-2015.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of transformed dataframes.
"""
# grab table from dictionary of dfs
ferc1_small_df = ferc1_raw_dfs['plants_small_ferc1']
# Standardize plant_name_raw capitalization and remove leading/trailing
# white space -- necesary b/c plant_name_raw is part of many foreign keys.
ferc1_small_df = pudl.helpers.strip_lower(
ferc1_small_df, ['plant_name', 'kind_of_fuel']
)
# Force the construction and installation years to be numeric values, and
# set them to NA if they can't be converted. (table has some junk values)
ferc1_small_df = pudl.helpers.oob_to_nan(
ferc1_small_df, cols=["yr_constructed"],
lb=1850, ub=max(pc.working_years["ferc1"]) + 1)
# Convert from cents per mmbtu to dollars per mmbtu to be consistent
# with the f1_fuel table data. Also, let's use a clearer name.
ferc1_small_df['fuel_cost_per_mmbtu'] = ferc1_small_df['fuel_cost'] / 100.0
ferc1_small_df.drop('fuel_cost', axis=1, inplace=True)
# Create a single "record number" for the individual lines in the FERC
# Form 1 that report different small plants, so that we can more easily
# tell whether they are adjacent to each other in the reporting.
ferc1_small_df['record_number'] = 46 * ferc1_small_df['spplmnt_num'] + \
ferc1_small_df['row_number']
# Unforunately the plant types were not able to be parsed automatically
# in this table. It's been done manually for 2004-2015, and the results
# get merged in in the following section.
small_types_file = importlib.resources.open_binary(
'pudl.package_data.ferc.form1', 'small_plants_2004-2016.xlsx')
small_types_df = pd.read_excel(small_types_file)
# Only rows with plant_type set will give us novel information.
small_types_df.dropna(subset=['plant_type', ], inplace=True)
# We only need this small subset of the columns to extract the plant type.
small_types_df = small_types_df[['report_year', 'respondent_id',
'record_number', 'plant_name_clean',
'plant_type', 'ferc_license']]
# Munge the two dataframes together, keeping everything from the
# frame we pulled out of the FERC1 DB, and supplementing it with the
# plant_name, plant_type, and ferc_license fields from our hand
# made file.
ferc1_small_df = pd.merge(ferc1_small_df,
small_types_df,
how='left',
on=['report_year',
'respondent_id',
'record_number'])
# Remove extraneous columns and add a record ID
ferc1_small_df = _clean_cols(ferc1_small_df, 'f1_gnrt_plant')
# Standardize plant_name capitalization and remove leading/trailing white
# space, so that plant_name matches formatting of plant_name_raw
ferc1_small_df = pudl.helpers.strip_lower(
ferc1_small_df, ['plant_name_clean'])
# in order to create one complete column of plant names, we have to use the
# cleaned plant names when available and the orignial plant names when the
# cleaned version is not available, but the strings first need cleaning
ferc1_small_df['plant_name_clean'] = ferc1_small_df['plant_name_clean'].fillna(
value="")
ferc1_small_df['plant_name_clean'] = ferc1_small_df.apply(
lambda row: row['plant_name']
if (row['plant_name_clean'] == "")
else row['plant_name_clean'],
axis=1)
# now we don't need the uncleaned version anymore
# ferc1_small_df.drop(['plant_name'], axis=1, inplace=True)
ferc1_small_df.rename(columns={
# FERC 1 DB Name PUDL DB Name
'plant_name': 'plant_name_original',
'plant_name_clean': 'plant_name_ferc1',
'ferc_license': 'ferc_license_id',
'yr_constructed': 'construction_year',
'capacity_rating': 'capacity_mw',
'net_demand': 'peak_demand_mw',
'net_generation': 'net_generation_mwh',
'plant_cost': 'total_cost_of_plant',
'plant_cost_mw': 'capex_per_mw',
'operation': 'opex_total',
'expns_fuel': 'opex_fuel',
'expns_maint': 'opex_maintenance',
'kind_of_fuel': 'fuel_type',
'fuel_cost': 'fuel_cost_per_mmbtu'},
inplace=True)
ferc1_transformed_dfs['plants_small_ferc1'] = ferc1_small_df
return ferc1_transformed_dfs
[docs]def plants_hydro(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 plant_hydro data for loading into PUDL Database.
Standardizes plant names (stripping whitespace and Using Title Case). Also
converts into our preferred units of MW and MWh.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of transformed dataframes.
"""
# grab table from dictionary of dfs
ferc1_hydro_df = (
_clean_cols(ferc1_raw_dfs['plants_hydro_ferc1'], 'f1_hydro')
# Standardize plant_name capitalization and remove leading/trailing
# white space -- necesary b/c plant_name is part of many foreign keys.
.pipe(pudl.helpers.strip_lower, ['plant_name'])
.pipe(pudl.helpers.cleanstrings, ['plant_const'],
[pc.ferc1_const_type_strings], unmapped='')
.assign(
# Converting kWh to MWh
net_generation_mwh=lambda x: x.net_generation / 1000.0,
# Converting cost per kW installed to cost per MW installed:
cost_per_mw=lambda x: x.cost_per_kw * 1000.0,
# Converting kWh to MWh
expns_per_mwh=lambda x: x.expns_kwh * 1000.0)
.pipe(pudl.helpers.oob_to_nan, cols=["yr_const", "yr_installed"],
lb=1850, ub=max(pc.working_years["ferc1"]) + 1)
.drop(columns=['net_generation', 'cost_per_kw', 'expns_kwh'])
.rename(columns={
# FERC1 DB PUDL DB
"plant_name": "plant_name_ferc1",
'project_no': 'project_num',
'yr_const': 'construction_year',
'plant_kind': 'plant_type',
'plant_const': 'construction_type',
'yr_installed': 'installation_year',
'tot_capacity': 'capacity_mw',
'peak_demand': 'peak_demand_mw',
'plant_hours': 'plant_hours_connected_while_generating',
'favorable_cond': 'net_capacity_favorable_conditions_mw',
'adverse_cond': 'net_capacity_adverse_conditions_mw',
'avg_num_of_emp': 'avg_num_employees',
'cost_of_land': 'capex_land',
'cost_structure': 'capex_structures',
'cost_facilities': 'capex_facilities',
'cost_equipment': 'capex_equipment',
'cost_roads': 'capex_roads',
'cost_plant_total': 'capex_total',
'cost_per_mw': 'capex_per_mw',
'expns_operations': 'opex_operations',
'expns_water_pwr': 'opex_water_for_power',
'expns_hydraulic': 'opex_hydraulic',
'expns_electric': 'opex_electric',
'expns_generation': 'opex_generation_misc',
'expns_rents': 'opex_rents',
'expns_engineering': 'opex_engineering',
'expns_structures': 'opex_structures',
'expns_dams': 'opex_dams',
'expns_plant': 'opex_plant',
'expns_misc_plant': 'opex_misc_plant',
'expns_per_mwh': 'opex_per_mwh',
'expns_engnr': 'opex_engineering',
'expns_total': 'opex_total',
'asset_retire_cost': 'asset_retirement_cost',
'': '',
})
.drop_duplicates(
subset=["report_year",
"utility_id_ferc1",
"plant_name_ferc1",
"capacity_mw"],
keep=False)
)
ferc1_transformed_dfs['plants_hydro_ferc1'] = ferc1_hydro_df
return ferc1_transformed_dfs
[docs]def plants_pumped_storage(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 pumped storage data for loading into PUDL.
Standardizes plant names (stripping whitespace and Using Title Case). Also
converts into our preferred units of MW and MWh.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of transformed dataframes.
"""
# grab table from dictionary of dfs
ferc1_pump_df = (
_clean_cols(
ferc1_raw_dfs['plants_pumped_storage_ferc1'], 'f1_pumped_storage')
# Standardize plant_name capitalization and remove leading/trailing
# white space -- necesary b/c plant_name is part of many foreign keys.
.pipe(pudl.helpers.strip_lower, ['plant_name'])
# Clean up the messy plant construction type column:
.pipe(pudl.helpers.cleanstrings, ['plant_kind'],
[pc.ferc1_const_type_strings], unmapped='')
.assign(
# Converting from kW/kWh to MW/MWh
net_generation_mwh=lambda x: x.net_generation / 1000.0,
energy_used_for_pumping_mwh=lambda x: x.energy_used / 1000.0,
net_load_mwh=lambda x: x.net_load / 1000.0,
cost_per_mw=lambda x: x.cost_per_kw * 1000.0,
expns_per_mwh=lambda x: x.expns_kwh * 1000.0)
.pipe(pudl.helpers.oob_to_nan, cols=["yr_const", "yr_installed"],
lb=1850, ub=max(pc.working_years["ferc1"]) + 1)
.drop(columns=['net_generation', 'energy_used', 'net_load',
'cost_per_kw', 'expns_kwh'])
.rename(columns={
# FERC1 DB PUDL DB
"plant_name": "plant_name_ferc1",
'project_number': 'project_num',
'tot_capacity': 'capacity_mw',
'project_no': 'project_num',
'plant_kind': 'construction_type',
'peak_demand': 'peak_demand_mw',
'yr_const': 'construction_year',
'yr_installed': 'installation_year',
'plant_hours': 'plant_hours_connected_while_generating',
'plant_capability': 'plant_capability_mw',
'avg_num_of_emp': 'avg_num_employees',
'cost_wheels': 'capex_wheels_turbines_generators',
'cost_land': 'capex_land',
'cost_structures': 'capex_structures',
'cost_facilties': 'capex_facilities',
'cost_wheels_turbines_generators': 'capex_wheels_turbines_generators',
'cost_electric': 'capex_equipment_electric',
'cost_misc_eqpmnt': 'capex_equipment_misc',
'cost_roads': 'capex_roads',
'asset_retire_cost': 'asset_retirement_cost',
'cost_of_plant': 'capex_total',
'cost_per_mw': 'capex_per_mw',
'expns_operations': 'opex_operations',
'expns_water_pwr': 'opex_water_for_power',
'expns_pump_strg': 'opex_pumped_storage',
'expns_electric': 'opex_electric',
'expns_misc_power': 'opex_generation_misc',
'expns_rents': 'opex_rents',
'expns_engneering': 'opex_engineering',
'expns_structures': 'opex_structures',
'expns_dams': 'opex_dams',
'expns_plant': 'opex_plant',
'expns_misc_plnt': 'opex_misc_plant',
'expns_producton': 'opex_production_before_pumping',
'pumping_expenses': 'opex_pumping',
'tot_prdctn_exns': 'opex_total',
'expns_per_mwh': 'opex_per_mwh',
})
.drop_duplicates(
subset=["report_year",
"utility_id_ferc1",
"plant_name_ferc1",
"capacity_mw"],
keep=False)
)
ferc1_transformed_dfs['plants_pumped_storage_ferc1'] = ferc1_pump_df
return ferc1_transformed_dfs
[docs]def plant_in_service(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 Plant in Service data for loading into PUDL.
Re-organizes the original FERC Form 1 Plant in Service data by unpacking
the rows as needed on a year by year basis, to organize them into columns.
The "columns" in the original FERC Form 1 denote starting balancing, ending
balance, additions, retirements, adjustments, and transfers -- these
categories are turned into labels in a column called "amount_type". Because
each row in the transformed table is composed of many individual records
(rows) from the original table, row_number can't be part of the record_id,
which means they are no longer unique. To infer exactly what record a given
piece of data came from, the record_id and the row_map (found in the PUDL
package_data directory) can be used.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of the transformed DataFrames.
"""
pis_df = (
unpack_table(
ferc1_df=ferc1_raw_dfs["plant_in_service_ferc1"],
table_name="f1_plant_in_srvce",
data_rows=slice(None), # Gotta catch 'em all!
data_cols=[
"begin_yr_bal",
"addition",
"retirements",
"adjustments",
"transfers",
"yr_end_bal"
])
.pipe( # Convert top level of column index into a categorical column:
cols_to_cats,
cat_name="amount_type",
col_cats={
"begin_yr_bal": "starting_balance",
"addition": "additions",
"retirements": "retirements",
"adjustments": "adjustments",
"transfers": "transfers",
"yr_end_bal": "ending_balance",
})
.rename_axis(columns=None)
.pipe(_clean_cols, "f1_plant_in_srvce")
.set_index([
"utility_id_ferc1",
"report_year",
"amount_type",
"record_id"])
.reset_index()
)
# Get rid of the columns corresponding to "header" rows in the FERC
# form, which should *never* contain data... but in about 2 dozen cases,
# they do. See this issue on Github for more information:
# https://github.com/catalyst-cooperative/pudl/issues/471
pis_df = pis_df.drop(columns=pis_df.filter(regex=".*_head$").columns)
ferc1_transformed_dfs["plant_in_service_ferc1"] = pis_df
return ferc1_transformed_dfs
[docs]def purchased_power(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 pumped storage data for loading into PUDL.
This table has data about inter-utility power purchases into the PUDL DB.
This includes how much electricty was purchased, how much it cost, and who
it was purchased from. Unfortunately the field describing which other
utility the power was being bought from is poorly standardized, making it
difficult to correlate with other data. It will need to be categorized by
hand or with some fuzzy matching eventually.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of the transformed DataFrames.
"""
# grab table from dictionary of dfs
df = (_clean_cols(ferc1_raw_dfs['purchased_power_ferc1'],
'f1_purchased_pwr')
.rename(columns={
'athrty_co_name': 'seller_name',
'sttstcl_clssfctn': 'purchase_type',
'rtsched_trffnbr': 'tariff',
'avgmth_bill_dmnd': 'billing_demand_mw',
'avgmth_ncp_dmnd': 'non_coincident_peak_demand_mw',
'avgmth_cp_dmnd': 'coincident_peak_demand_mw',
'mwh_purchased': 'purchased_mwh',
'mwh_recv': 'received_mwh',
'mwh_delvd': 'delivered_mwh',
'dmnd_charges': 'demand_charges',
'erg_charges': 'energy_charges',
'othr_charges': 'other_charges',
'settlement_tot': 'total_settlement'})
.assign( # Require these columns to numeric, or NaN
billing_demand_mw=lambda x: pd.to_numeric(
x.billing_demand_mw, errors="coerce"),
non_coincident_peak_demand_mw=lambda x: pd.to_numeric(
x.non_coincident_peak_demand_mw, errors="coerce"),
coincident_peak_demand_mw=lambda x: pd.to_numeric(
x.coincident_peak_demand_mw, errors="coerce"))
.fillna({ # Replace blanks w/ 0.0 in data columns.
"purchased_mwh": 0.0,
"received_mwh": 0.0,
"delivered_mwh": 0.0,
"demand_charges": 0.0,
"energy_charges": 0.0,
"other_charges": 0.0,
"total_settlement": 0.0}))
# Replace any invalid purchase types with the empty string
bad_rows = (~df.purchase_type.isin(pc.ferc1_power_purchase_type.keys()))
df.loc[bad_rows, 'purchase_type'] = ""
# Replace inscrutable two letter codes with descriptive codes:
df['purchase_type'] = df.purchase_type.replace(
pc.ferc1_power_purchase_type)
# Drop records containing no useful data and also any completely duplicate
# records -- there are 6 in 1998 for utility 238 for some reason...
df = (
df.drop_duplicates()
.drop(df.loc[((df.purchased_mwh == 0)
& (df.received_mwh == 0)
& (df.delivered_mwh == 0)
& (df.demand_charges == 0)
& (df.energy_charges == 0)
& (df.other_charges == 0)
& (df.total_settlement == 0)), :].index)
)
ferc1_transformed_dfs['purchased_power_ferc1'] = df
return ferc1_transformed_dfs
[docs]def accumulated_depreciation(ferc1_raw_dfs, ferc1_transformed_dfs):
"""Transforms FERC Form 1 depreciation data for loading into PUDL.
This information is organized by FERC account, with each line of the FERC
Form 1 having a different descriptive identifier like 'balance_end_of_year'
or 'transmission'.
Args:
ferc1_raw_dfs (dict): Each entry in this dictionary of DataFrame
objects corresponds to a table from the FERC Form 1 DBC database.
ferc1_transformed_dfs (dict): A dictionary of DataFrames to be
transformed.
Returns:
dict: The dictionary of the transformed DataFrames.
"""
# grab table from dictionary of dfs
ferc1_apd_df = ferc1_raw_dfs['accumulated_depreciation_ferc1']
ferc1_acct_apd = pc.ferc_accumulated_depreciation.drop(
['ferc_account_description'], axis=1)
ferc1_acct_apd.dropna(inplace=True)
ferc1_acct_apd['row_number'] = ferc1_acct_apd['row_number'].astype(int)
ferc1_accumdepr_prvsn_df = pd.merge(ferc1_apd_df, ferc1_acct_apd,
how='left', on='row_number')
ferc1_accumdepr_prvsn_df = _clean_cols(
ferc1_accumdepr_prvsn_df, 'f1_accumdepr_prvsn')
ferc1_accumdepr_prvsn_df.rename(columns={
# FERC1 DB PUDL DB
'total_cde': 'total'},
inplace=True)
ferc1_transformed_dfs['accumulated_depreciation_ferc1'] = ferc1_accumdepr_prvsn_df
return ferc1_transformed_dfs
###############################################################################
# Identifying FERC Plants
###############################################################################
# Sadly FERC doesn't provide any kind of real IDs for the plants that report to
# them -- all we have is their names (a freeform string) and the data that is
# reported alongside them. This is often enough information to be able to
# recognize which records ought to be associated with each other year to year
# to create a continuous time series. However, we want to do that
# programmatically, which means using some clustering / categorization tools
# from scikit-learn
[docs]class FERCPlantClassifier(BaseEstimator, ClassifierMixin):
"""A classifier for identifying FERC plant time series in FERC Form 1 data.
We want to be able to give the classifier a FERC plant record, and get back
the group of records(or the ID of the group of records) that it ought to
be part of.
There are hundreds of different groups of records, and we can only know
what they are by looking at the whole dataset ahead of time. This is the
"fitting" step, in which the groups of records resulting from a particular
set of model parameters(e.g. the weights that are attributes of the class)
are generated.
Once we have that set of record categories, we can test how well the
classifier performs, by checking it against test / training data which we
have already classified by hand. The test / training set is a list of lists
of unique FERC plant record IDs(each record ID is the concatenation of:
report year, respondent id, supplement number, and row number). It could
also be stored as a dataframe where each column is associated with a year
of data(some of which could be empty). Not sure what the best structure
would be.
If it's useful, we can assign each group a unique ID that is the time
ordered concatenation of each of the constituent record IDs. Need to
understand what the process for checking the classification of an input
record looks like.
To score a given classifier, we can look at what proportion of the records
in the test dataset are assigned to the same group as in our manual
classification of those records. There are much more complicated ways to
do the scoring too... but for now let's just keep it as simple as possible.
"""
def __init__(self, min_sim=0.75, plants_df=None):
"""
Initialize the classifier.
Args:
min_sim : Number between 0.0 and 1.0, indicating the minimum value
of cosine similarity that we are willing to accept as
indicating two records are part of the same plant record time
series. All entries in the pairwise similarity matrix below
this value will be zeroed out.
plants_df : The entire FERC Form 1 plant table as a dataframe.
Needed in order to calculate the distance metrics between all
of the records so we can group the plants in the fit() step, so
we can check how well they are categorized later...
Todo:
Zane revisit plants_df
"""
self.min_sim = min_sim
self.plants_df = plants_df
self._years = self.plants_df.report_year.unique()
[docs] def fit(self, X, y=None): # noqa: N803 Canonical capital letter...
"""
Use weighted FERC plant features to group records into time series.
The fit method takes the vectorized, normalized, weighted FERC plant
features (X) as input, calculates the pairwise cosine similarity matrix
between all records, and groups the records in their best time series.
The similarity matrix and best time series are stored as data members
in the object for later use in scoring & predicting.
This isn't quite the way a fit method would normally work.
Args:
X (): a sparse matrix of size n_samples x n_features.
y ():
Returns:
pandas.DataFrame:
TODO:
Zane revisit args and returns
"""
self._cossim_df = pd.DataFrame(cosine_similarity(X))
self._best_of = self._best_by_year()
# Make the best match indices integers rather than floats w/ NA values.
self._best_of[self._years] = self._best_of[self._years].fillna(
-1).astype(int)
return self
[docs] def predict(self, X, y=None): # noqa: N803
"""
Identify time series of similar records to input record_ids.
Given a one-dimensional dataframe X, containing FERC record IDs, return
a dataframe in which each row corresponds to one of the input record_id
values (ordered as the input was ordered), with each column
corresponding to one of the years worth of data. Values in the returned
dataframe are the FERC record_ids of the record most similar to the
input record within that year. Some of them may be null, if there was
no sufficiently good match.
Row index is the seed record IDs. Column index is years.
TODO:
* This method is hideously inefficient. It should be vectorized.
* There's a line that throws a FutureWarning that needs to be fixed.
"""
try:
getattr(self, "_cossim_df")
except AttributeError:
raise RuntimeError(
"You must train classifer before predicting data!")
out_df = pd.DataFrame(columns=self._years)
out_idx = []
# For each record_id we've been given:
for x in X:
# Find the index associated with the record ID we are predicting
# a grouping for:
idx = self._best_of[self._best_of.record_id == x].index.values[0]
# Mask the best_of dataframe, keeping only those entries where
# the index of the chosen record_id appears -- this results in a
# huge dataframe almost full of NaN values.
w_m = self._best_of[self._years][self._best_of[self._years] == idx]
# Grab the index values of the rows in the masked dataframe which
# are NOT all NaN -- these are the indices of the *other* records
# which found the record x to be one of their best matches.
w_m = w_m.dropna(how='all').index.values
# Now look up the indices of the records which were found to be
# best matches to the record x.
b_m = self._best_of.loc[idx, self._years].astype(int)
# Here we require that there is no conflict between the two sets
# of indices -- that every time a record shows up in a grouping,
# that grouping is either the same, or a subset of the other
# groupings that it appears in. When no sufficiently good match
# is found the "index" in the _best_of array is set to -1, so
# requiring that the b_m value be >=0 screens out those no-match
# cases. This is okay -- we're just trying to require that the
# groupings be internally self-consistent, not that they are
# completely identical. Being flexible on this dramatically
# increases the number of records that get assigned a plant ID.
if np.array_equiv(w_m, b_m[b_m >= 0].values):
# This line is causing a warning. In cases where there are
# some years no sufficiently good match exists, and so b_m
# doesn't contain an index. Instead, it has a -1 sentinel
# value, which isn't a label for which a record exists, which
# upsets .loc. Need to find some way around this... but for
# now it does what we want. We could use .iloc instead, but
# then the -1 sentinel value maps to the last entry in the
# dataframe, which also isn't what we want. Blargh.
new_grp = self._best_of.loc[b_m, 'record_id']
# reshape into row, rather than column,
new_grp = new_grp.values.reshape(1, len(self._years))
# Stack the new list of record_ids on our output DataFrame:
new_grp = pd.DataFrame(new_grp, columns=self._years)
out_df = pd.concat([out_df, new_grp])
# Save the seed record_id for use in indexing the output:
out_idx = out_idx + [self._best_of.loc[idx, 'record_id']]
out_df['seed_id'] = out_idx
out_df = out_df.set_index('seed_id')
out_df = out_df.fillna('')
return out_df
[docs] def score(self, X, y=None): # noqa: N803
"""Scores a collection of FERC plant categorizations.
For every record ID in X, predict its record group and calculate
a metric of similarity between the prediction and the "ground
truth" group that was passed in for that value of X.
Args:
X (pandas.DataFrame): an n_samples x 1 pandas dataframe of FERC
Form 1 record IDs.
y (pandas.DataFrame): a dataframe of "ground truth" FERC Form 1
record groups, corresponding to the list record IDs in X
Returns:
numpy.ndarray: The average of all the similarity metrics as the
score.
"""
scores = []
for true_group in y:
true_group = str.split(true_group, sep=',')
true_group = [s for s in true_group if s != '']
predicted_groups = self.predict(pd.DataFrame(true_group))
for rec_id in true_group:
sm = SequenceMatcher(None, true_group,
predicted_groups.loc[rec_id])
scores = scores + [sm.ratio()]
return np.mean(scores)
def _best_by_year(self):
"""Finds the best match for each plant record in each other year."""
# only keep similarity matrix entries above our minimum threshold:
out_df = self.plants_df.copy()
sim_df = self._cossim_df[self._cossim_df >= self.min_sim]
# Add a column for each of the years, in which we will store indices
# of the records which best match the record in question:
for yr in self._years:
newcol = yr
out_df[newcol] = -1
# seed_yr is the year we are matching *from* -- we do the entire
# matching process from each year, since it may not be symmetric:
for seed_yr in self._years:
seed_idx = self.plants_df.index[
self.plants_df.report_year == seed_yr]
# match_yr is all the other years, in which we are finding the best
# match
for match_yr in self._years:
best_of_yr = match_yr
match_idx = self.plants_df.index[
self.plants_df.report_year == match_yr]
# For each record specified by seed_idx, obtain the index of
# the record within match_idx that that is the most similar.
best_idx = sim_df.iloc[seed_idx, match_idx].idxmax(axis=1)
out_df.iloc[seed_idx,
out_df.columns.get_loc(best_of_yr)] = best_idx
return out_df
[docs]def make_ferc1_clf(plants_df,
ngram_min=2,
ngram_max=10,
min_sim=0.75,
plant_name_ferc1_wt=2.0,
plant_type_wt=2.0,
construction_type_wt=1.0,
capacity_mw_wt=1.0,
construction_year_wt=1.0,
utility_id_ferc1_wt=1.0,
fuel_fraction_wt=1.0):
"""
Create a FERC Plant Classifier using several weighted features.
Given a FERC steam plants dataframe plants_df, which also includes fuel
consumption information, transform a selection of useful columns into
features suitable for use in calculating inter-record cosine similarities.
Individual features are weighted according to the keyword arguments.
Features include:
* plant_name (via TF-IDF, with ngram_min and ngram_max as parameters)
* plant_type (OneHot encoded categorical feature)
* construction_type (OneHot encoded categorical feature)
* capacity_mw (MinMax scaled numerical feature)
* construction year (OneHot encoded categorical feature)
* utility_id_ferc1 (OneHot encoded categorical feature)
* fuel_fraction_mmbtu (several MinMax scaled numerical columns, which
are normalized and treated as a single feature.)
This feature matrix is then used to instantiate a FERCPlantClassifier.
The combination of the ColumnTransformer and FERCPlantClassifier are
combined in a sklearn Pipeline, which is returned by the function.
Arguments:
ngram_min (int): the minimum lengths to consider in the vectorization
of the plant_name feature.
ngram_max (int): the maximum n-gram lengths to consider in the
vectorization of the plant_name feature.
min_sim (float): the minimum cosine similarity between two records that
can be considered a "match" (a number between 0.0 and 1.0).
plant_name_ferc1_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
plant_type_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
construction_type_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
capacity_mw_wt (float):weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
construction_year_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
utility_id_ferc1_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
fuel_fraction_wt (float): weight used to determine the relative
importance of each of the features in the feature matrix used to
calculate the cosine similarity between records. Used to scale each
individual feature before the vectors are normalized.
Returns:
sklearn.pipeline.Pipeline: an sklearn Pipeline that performs
reprocessing and classification with a FERCPlantClassifier object.
"""
# Make a list of all the fuel fraction columns for use as one feature.
fuel_cols = list(plants_df.filter(regex='.*_fraction_mmbtu$').columns)
ferc1_pipe = Pipeline([
('preprocessor', ColumnTransformer(
transformers=[
('plant_name_ferc1', TfidfVectorizer(
analyzer='char',
ngram_range=(ngram_min, ngram_max)),
'plant_name_ferc1'),
('plant_type', OneHotEncoder(
categories='auto'), ['plant_type']),
('construction_type', OneHotEncoder(
categories='auto'), ['construction_type']),
('capacity_mw', MinMaxScaler(), ['capacity_mw']),
('construction_year', OneHotEncoder(
categories='auto'), ['construction_year']),
('utility_id_ferc1', OneHotEncoder(
categories='auto'), ['utility_id_ferc1']),
('fuel_fraction_mmbtu', Pipeline([
('scaler', MinMaxScaler()),
('norm', Normalizer())
]), fuel_cols),
],
transformer_weights={
'plant_name_ferc1': plant_name_ferc1_wt,
'plant_type': plant_type_wt,
'construction_type': construction_type_wt,
'capacity_mw': capacity_mw_wt,
'construction_year': construction_year_wt,
'utility_id_ferc1': utility_id_ferc1_wt,
'fuel_fraction_mmbtu': fuel_fraction_wt,
})
),
('classifier', pudl.transform.ferc1.FERCPlantClassifier(
min_sim=min_sim, plants_df=plants_df))
])
return ferc1_pipe
[docs]def fuel_by_plant_ferc1(fuel_df, thresh=0.5):
"""Calculates useful FERC Form 1 fuel metrics on a per plant-year basis.
Each record in the FERC Form 1 corresponds to a particular type of fuel.
Many plants -- especially coal plants -- use more than one fuel, with gas
and/or diesel serving as startup fuels. In order to be able to classify
the type of plant based on relative proportions of fuel consumed or
fuel costs it is useful to aggregate these per-fuel records into a single
record for each plant.
Fuel cost (in nominal dollars) and fuel heat content (in mmBTU) are
calculated for each fuel based on the cost and heat content per unit, and
the number of units consumed, and then summed by fuel type (there can be
more than one record for a given type of fuel in each plant because we
are simplifying the fuel categories). The per-fuel records are then
pivoted to create one column per fuel type. The total is summed and
stored separately, and the individual fuel costs & heat contents are
divided by that total, to yield fuel proportions. Based on those
proportions and a minimum threshold that's passed in, a "primary" fuel
type is then assigned to the plant-year record and given a string label.
Args:
fuel_df (pandas.DataFrame): Pandas DataFrame resembling the
post-transform result for the fuel_ferc1 table.
thresh (float): A value between 0.5 and 1.0 indicating the minimum
fraction of overall heat content that must have been provided by a
fuel in a plant-year for it to be considered the "primary" fuel for
the plant in that year. Default value: 0.5.
Returns:
pandas.DataFrame: A DataFrame with a single record for each
plant-year, including the columns required to merge it with the
plants_steam_ferc1 table/DataFrame (report_year, utility_id_ferc1,
and plant_name) as well as totals for fuel mmbtu consumed in that
plant-year, and the cost of fuel in that year, the proportions of
heat content and fuel costs for each fuel in that year, and a
column that labels the plant's primary fuel for that year.
Raises:
AssertionError: If the DataFrame input does not have the columns
required to run the function.
"""
keep_cols = [
'report_year', # key
'utility_id_ferc1', # key
'plant_name_ferc1', # key
'fuel_type_code_pudl', # pivot
'fuel_qty_burned', # value
'fuel_mmbtu_per_unit', # value
'fuel_cost_per_unit_burned', # value
]
# Ensure that the dataframe we've gotten has all the information we need:
for col in keep_cols:
if col not in fuel_df.columns:
raise AssertionError(
f"Required column {col} not found in input fuel_df."
)
# Calculate per-fuel derived values and add them to the DataFrame
df = (
# Really there should *not* be any duplicates here but... there's a
# bug somewhere that introduces them into the fuel_ferc1 table.
fuel_df[keep_cols].drop_duplicates().
# Calculate totals for each record based on per-unit values:
assign(fuel_mmbtu=lambda x: x.fuel_qty_burned * x.fuel_mmbtu_per_unit).
assign(fuel_cost=lambda x: x.fuel_qty_burned * x.fuel_cost_per_unit_burned).
# Drop the ratios and heterogeneous fuel "units"
drop(['fuel_mmbtu_per_unit', 'fuel_cost_per_unit_burned', 'fuel_qty_burned'], axis=1).
# Group by the keys and fuel type, and sum:
groupby(['utility_id_ferc1', 'plant_name_ferc1', 'report_year', 'fuel_type_code_pudl']).
agg(sum).reset_index().
# Set the index to the keys, and pivot to get per-fuel columns:
set_index(['utility_id_ferc1', 'plant_name_ferc1', 'report_year']).
pivot(columns='fuel_type_code_pudl').fillna(0.0)
)
# Calculate total heat content burned for each plant, and divide it out
mmbtu_group = (
pd.merge(
# Sum up all the fuel heat content, and divide the individual fuel
# heat contents by it (they are all contained in single higher
# level group of columns labeled fuel_mmbtu)
df.loc[:, 'fuel_mmbtu'].div(
df.loc[:, 'fuel_mmbtu'].sum(axis=1), axis='rows'),
# Merge that same total into the dataframe separately as well.
df.sum(level=0, axis=1).loc[:, 'fuel_mmbtu'],
right_index=True, left_index=True).
rename(columns=lambda x: re.sub(r'$', '_fraction_mmbtu', x)).
rename(columns=lambda x: re.sub(r'_mmbtu_fraction_mmbtu$', '_mmbtu', x))
)
# Calculate total fuel cost for each plant, and divide it out
cost_group = (
pd.merge(
# Sum up all the fuel costs, and divide the individual fuel
# costs by it (they are all contained in single higher
# level group of columns labeled fuel_cost)
df.loc[:, 'fuel_cost'].div(
df.loc[:, 'fuel_cost'].sum(axis=1), axis='rows'),
# Merge that same total into the dataframe separately as well.
df.sum(level=0, axis=1).loc[:, 'fuel_cost'],
right_index=True, left_index=True).
rename(columns=lambda x: re.sub(r'$', '_fraction_cost', x)).
rename(columns=lambda x: re.sub(r'_cost_fraction_cost$', '_cost', x))
)
# Re-unify the cost and heat content information:
df = pd.merge(mmbtu_group, cost_group,
left_index=True, right_index=True).reset_index()
# Label each plant-year record by primary fuel:
for fuel_str in pc.ferc1_fuel_strings.keys():
try:
mmbtu_mask = df[f'{fuel_str}_fraction_mmbtu'] > thresh
df.loc[mmbtu_mask, 'primary_fuel_by_mmbtu'] = fuel_str
except KeyError:
pass
try:
cost_mask = df[f'{fuel_str}_fraction_cost'] > thresh
df.loc[cost_mask, 'primary_fuel_by_cost'] = fuel_str
except KeyError:
pass
df[['primary_fuel_by_cost', 'primary_fuel_by_mmbtu']] = df[[
'primary_fuel_by_cost', 'primary_fuel_by_mmbtu']].fillna('')
return df