"""Module to perform data cleaning functions on EIA923 data tables."""
import logging
from typing import Dict
import numpy as np
import pandas as pd
import pudl
from pudl.constants import PUDL_TABLES
from pudl.metadata import RESOURCE_METADATA
from pudl.metadata.codes import ENERGY_SOURCES_EIA
from pudl.metadata.labels import COALMINE_TYPES_EIA
[docs]logger = logging.getLogger(__name__)
[docs]COALMINE_COUNTRY_CODES: Dict[str, str] = {
'AU': 'AUS', # Australia
'CL': 'COL', # Colombia
'CN': 'CAN', # Canada
'IS': 'IDN', # Indonesia
'PL': 'POL', # Poland
'RS': 'RUS', # Russia
'UK': 'GBR', # United Kingdom of Great Britain
'VZ': 'VEN', # Venezuela
'OT': 'other_country',
'IM': 'unknown'
}
"""
A mapping of EIA foreign coal mine country codes to 3-letter ISO-3166-1 codes.
The EIA-923 lists the US state of origin for coal deliveries using standard
2-letter US state abbreviations. However, foreign countries are also included
as "states" in this category and because some of them have 2-letter abbreviation
collisions with US states, their coding is non-standard.
Instead of using the provided non-standard codes, we convert to the ISO-3166-1
three letter country codes: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-3
"""
###############################################################################
###############################################################################
# HELPER FUNCTIONS
###############################################################################
###############################################################################
[docs]def _get_plant_nuclear_unit_id_map(nuc_fuel: pd.DataFrame) -> Dict[int, str]:
"""Get a plant_id -> nuclear_unit_id mapping for all plants with one nuclear unit.
Parameters:
nuc_fuel: dataframe of nuclear unit fuels.
Returns:
plant_to_nuc_id: one to one mapping of plant_id_eia to nuclear_unit_id.
"""
nuc_fuel = nuc_fuel[nuc_fuel.nuclear_unit_id.notna()].copy()
# Find the plants with one nuclear unit
plant_nuc_unit_counts = nuc_fuel.groupby(
"plant_id_eia").nuclear_unit_id.nunique().copy()
plant_id_with_one_unit = plant_nuc_unit_counts[plant_nuc_unit_counts.eq(1)].index
# get the nuclear_unit_id for the plants with one prime mover
plant_to_nuc_id = nuc_fuel.groupby(
"plant_id_eia").nuclear_unit_id.unique().loc[plant_id_with_one_unit]
plant_to_nuc_id = plant_to_nuc_id.explode()
# check there is one nuclear unit per plant.
assert plant_to_nuc_id.index.is_unique, "Found multiple nuclear units in plant_to_nuc_id mapping."
# Check there are no missing nuclear unit ids.
assert (~plant_to_nuc_id.isna()).all(
), "Found missing nuclear_unit_ids in plant_to_nuc_id mappings."
plant_to_nuc_id = plant_to_nuc_id.astype("string")
return dict(plant_to_nuc_id)
[docs]def _backfill_nuclear_unit_id(nuc_fuel: pd.DataFrame) -> pd.DataFrame:
"""Backfill 2001 and 2002 nuclear_unit_id for plants with one nuclear unit.
2001 and 2002 generation_fuel_eia923 records do not include nuclear_unit_id which is
required for the primary key of nuclear_unit_fuel_eia923. We backfill this field for
plants with one nuclear unit. nuclear_unit_id is filled with 'UNK' if the
nuclear_unit_id can't be recovered.
Parameters:
nuc_fuel: nuclear fuels dataframe.
Returns:
nuc_fuel: nuclear fuels dataframe with backfilled nuclear_unit_id field.
"""
plant_to_nuc_id_map = _get_plant_nuclear_unit_id_map(nuc_fuel)
missing_nuclear_unit_id = nuc_fuel.nuclear_unit_id.isna()
unit_id_fill = nuc_fuel.loc[missing_nuclear_unit_id,
"plant_id_eia"].map(plant_to_nuc_id_map)
# If we aren't able to impute nuclear_unit_id, fill them UNK.
unit_id_fill = unit_id_fill.fillna("UNK")
nuc_fuel.loc[missing_nuclear_unit_id, "nuclear_unit_id"] = unit_id_fill
missing_nuclear_unit_id = nuc_fuel.nuclear_unit_id.isna()
nuc_fuel.loc[missing_nuclear_unit_id, "nuclear_unit_id"] = "UNK"
return nuc_fuel
[docs]def _get_plant_prime_mover_map(gen_fuel: pd.DataFrame) -> Dict[int, str]:
"""Get a plant_id -> prime_mover_code mapping for all plants with one prime mover.
Parameters:
gen_fuel: dataframe of generation fuels.
Returns:
fuel_type_map: one to one mapping of plant_id_eia to prime_mover_codes.
"""
# Remove fuels that don't have a prime mover.
gen_fuel = gen_fuel[~gen_fuel.prime_mover_code.isna()].copy()
# find plants with one prime mover
plant_prime_movers_counts = (
gen_fuel.groupby("plant_id_eia")
.prime_mover_code.nunique()
.copy()
)
plant_ids_with_one_pm = plant_prime_movers_counts[plant_prime_movers_counts.eq(
1)].index
# get the prime mover codes for the plants with one prime mover
plant_to_prime_mover = gen_fuel.groupby(
"plant_id_eia").prime_mover_code.unique().loc[plant_ids_with_one_pm]
plant_to_prime_mover = plant_to_prime_mover.explode()
# check there is one prime mover per plant.
assert plant_to_prime_mover.index.is_unique, "Found multiple plants in plant_to_prime_mover mapping."
# Check there are no missing prime mover codes.
assert (plant_to_prime_mover.notnull()).all(
), "Found missing prime_mover_codes in plant_to_prime_mover mappings."
return dict(plant_to_prime_mover)
[docs]def _backfill_prime_mover_code(gen_fuel: pd.DataFrame) -> pd.DataFrame:
"""Backfill 2001 and 2002 prime_mover_code for plants with one prime mover.
2001 and 2002 generation_fuel_eia923 records do not include prime_mover_code
which is required for the primary key. We backfill this field for plants
with one prime mover. prime_mover_code is set to 'UNK' if future plants
have multiple prime movers.
Parameters:
gen_fuel: generation fuels dataframe.
Returns:
gen_fuel: generation fuels dataframe with backfilled prime_mover_code field.
"""
plant_to_prime_mover_map = _get_plant_prime_mover_map(gen_fuel)
missing_prime_movers = gen_fuel.prime_mover_code.isna()
gen_fuel.loc[missing_prime_movers, "prime_mover_code"] = (
gen_fuel.loc[missing_prime_movers, "plant_id_eia"]
.map(plant_to_prime_mover_map)
.astype("string")
)
# Assign prime mover codes for hydro fuels
hydro_map = {"HPS": "PS", "HYC": "HY"}
missing_hydro = (
gen_fuel.energy_source_code.eq("WAT")
& gen_fuel.prime_mover_code.isna()
)
gen_fuel.loc[missing_hydro, "prime_mover_code"] = (
gen_fuel.loc[missing_hydro, "fuel_type_code_aer"].map(hydro_map)
)
# Assign the rest to UNK
missing_prime_movers = gen_fuel.prime_mover_code.isna()
gen_fuel.loc[missing_prime_movers, "prime_mover_code"] = "UNK"
assert gen_fuel.prime_mover_code.notna().all(
), "generation_fuel_923.prime_mover_code has missing values after backfill."
return gen_fuel
[docs]def _get_most_frequent_energy_source_map(gen_fuel: pd.DataFrame) -> Dict[str, str]:
"""Get the a mapping of the most common energy_source for each fuel_type_code_aer.
Parameters:
gen_fuel: generation_fuel dataframe.
Returns:
energy_source_map: mapping of fuel_type_code_aer to energy_source_code.
"""
energy_source_counts = gen_fuel.groupby(
["fuel_type_code_aer", "energy_source_code"]).plant_id_eia.count()
energy_source_counts = energy_source_counts.reset_index(
).sort_values(by="plant_id_eia", ascending=False)
energy_source_map = (
energy_source_counts
.groupby(["fuel_type_code_aer"]).first()
.reset_index()[["fuel_type_code_aer", "energy_source_code"]]
)
return dict(energy_source_map.values)
[docs]def _clean_gen_fuel_energy_sources(gen_fuel: pd.DataFrame) -> pd.DataFrame:
"""
Clean the generator_fuel_eia923.energy_source_code field specifically.
Transformations include:
* Remap MSW to biogenic and non biogenic fuel types.
* Fill missing energy_source_code using most common code for each AER fuel codes.
Parameters:
gen_fuel: generation fuels dataframe.
Returns:
gen_fuel: generation fuels dataframe with cleaned energy_source_code field.
"""
# replace whitespace and empty strings with NA values.
gen_fuel['energy_source_code'] = (
gen_fuel.energy_source_code
.replace(to_replace=r'^\s*$', value=pd.NA, regex=True)
)
# Remap MSW: Prior to 2006, MSW contained biogenic and non biogenic fuel types.
# Starting in 2006 MSW got split into MSB and MSN. However, the AER fuel type
# codes always differentiated between biogenic and non-biogenic waste, so we can
# impose the more recent categorization on the older EIA fuel types.
msw_fuels = gen_fuel.energy_source_code.eq("MSW")
gen_fuel.loc[msw_fuels, "energy_source_code"] = (
gen_fuel.loc[msw_fuels, "fuel_type_code_aer"]
.map({
"OTH": "MSN", # non-biogenic municipal waste
"MLG": "MSB", # biogenic municipal waste
})
)
# Make sure we replaced all MSWs
assert gen_fuel.energy_source_code.ne("MSW").all()
# Fill in any missing fuel_types with the most common fuel type of each
# fuel_type_code_aer.
missing_energy_source = gen_fuel.energy_source_code.isna()
frequent_energy_source_map = _get_most_frequent_energy_source_map(gen_fuel)
gen_fuel.loc[missing_energy_source, "energy_source_code"] = (
gen_fuel.loc[missing_energy_source, "fuel_type_code_aer"]
.map(frequent_energy_source_map)
)
if gen_fuel.energy_source_code.isna().any():
raise AssertionError("Missing data in generator_fuel_eia923.energy_source_code")
return gen_fuel
[docs]def _aggregate_generation_fuel_duplicates(
gen_fuel: pd.DataFrame,
nuclear: bool = False,
) -> pd.DataFrame:
"""Aggregate remaining duplicate generation fuels.
There are a handful of plants (< 100) whose prime_mover_code can't be imputed
or duplicates exist in the raw table. We resolve these be aggregate the variable
fields.
Parameters:
gen_fuel: generation fuels dataframe.
nuclear: adds nuclear_unit_id to list of natural key fields.
Returns:
gen_fuel: generation fuels dataframe without duplicates in natural key fields.
"""
natural_key_fields = [
"report_date",
"plant_id_eia",
"energy_source_code",
"prime_mover_code",
]
if nuclear:
natural_key_fields += ["nuclear_unit_id"]
is_duplicate = gen_fuel.duplicated(subset=natural_key_fields, keep=False)
duplicates = gen_fuel[is_duplicate].copy()
fuel_type_code_aer_is_unique = (
duplicates.groupby(natural_key_fields)
.fuel_type_code_aer.nunique().eq(1).all()
)
if not fuel_type_code_aer_is_unique:
raise AssertionError("Duplicate fuels have different fuel_type_code_aer.")
agg_fields = {
'fuel_consumed_units': "sum",
'fuel_consumed_for_electricity_units': "sum",
'fuel_consumed_mmbtu': "sum",
'fuel_consumed_for_electricity_mmbtu': "sum",
'net_generation_mwh': "sum",
# We can safely select the first fuel_type_code_aer because we know they are the same for each group of duplicates.
'fuel_type_code_aer': "first"
}
resolved_duplicates = duplicates.groupby(
natural_key_fields).agg(agg_fields).reset_index()
# Recalculate fuel_mmbtu_per_unit after aggregation.
resolved_duplicates["fuel_mmbtu_per_unit"] = resolved_duplicates["fuel_consumed_mmbtu"] / \
resolved_duplicates["fuel_consumed_units"]
# Add the resolved records back to generation_fuel dataframe.
gen_df = gen_fuel[~is_duplicate].copy()
gen_df = gen_df.append(resolved_duplicates)
assert gen_df[natural_key_fields].notna().all().all(
), f"There are missing values in generation_fuel{'_nuclear' if nuclear else ''}_eia923 natural key fields."
assert (~gen_df.duplicated(subset=natural_key_fields)).all(
), "Duplicate generation fuels have not been resolved."
return gen_df
[docs]def _yearly_to_monthly_records(df: pd.DataFrame) -> pd.DataFrame:
"""Converts an EIA 923 record of 12 months of data into 12 monthly records.
Much of the data reported in EIA 923 is monthly, but all 12 months worth of data is
reported in a single record, with one field for each of the 12 months. This
function converts these annualized composite records into a set of 12 monthly
records containing the same information, by parsing the field names for months, and
adding a month field. Non - time series data is retained in the same format.
Args:
df: A pandas DataFrame containing the annual data to be
converted into monthly records.
Returns:
A dataframe containing the same data as was passed in via df,
but with monthly records as rows instead of as columns.
"""
month_dict = {
'january': 1,
'february': 2,
'march': 3,
'april': 4,
'may': 5,
'june': 6,
'july': 7,
'august': 8,
'september': 9,
'october': 10,
'november': 11,
'december': 12
}
multi_idx = df.columns.str.rsplit(
"_", n=1, expand=True).set_names([None, 'report_month'])
ends_with_month_filter = multi_idx.get_level_values(
'report_month').isin(set(month_dict.keys()))
if not ends_with_month_filter.any():
return df
index_cols = df.columns[~ends_with_month_filter]
# performance note: this was good enough for eia923 data size.
# Using .set_index() is simple but inefficient due to unecessary index creation.
# Performance may be improved by separating into two dataframes,
# .stack()ing the monthly data, then joining back together on the original index.
df = df.set_index(list(index_cols), append=True)
# convert month names to numbers (january -> 1)
col_df = multi_idx[ends_with_month_filter].to_frame(index=False)
col_df.loc[:, 'report_month'] = col_df.loc[:, 'report_month'].map(month_dict)
month_idx = pd.MultiIndex.from_frame(col_df).set_names([None, 'report_month'])
# reshape
df.columns = month_idx
df = df.stack()
# restore original index and columns - reset index except level 0
df = df.reset_index(level=list(range(1, df.index.nlevels)))
return df
[docs]def _coalmine_cleanup(cmi_df):
"""Cleans up the coalmine_eia923 table.
This function does most of the coalmine_eia923 table transformation. It is separate
from the coalmine() transform function because of the peculiar way that we are
normalizing the fuel_receipts_costs_eia923() table.
All of the coalmine information is originally coming from the EIA
fuel_receipts_costs spreadsheet, but it really belongs in its own table. We strip it
out of FRC, and create that separate table, but then we need to refer to that table
through a foreign key. To do so, we actually merge the entire contents of the
coalmine table into FRC, including the surrogate key, and then drop the data fields.
For this to work, we need to have exactly the same coalmine data fields in both the
new coalmine table, and the FRC table. To ensure that's true, we isolate the
transformations here in this function, and apply them to the coalmine columns in
both the FRC table and the coalmine table.
Args:
cmi_df (pandas.DataFrame): A DataFrame to be cleaned, containing coalmine
information (e.g. name, county, state)
Returns:
pandas.DataFrame: A cleaned DataFrame containing coalmine information.
"""
# Because we need to pull the mine_id_msha field into the FRC table,
# but we don't know what that ID is going to be until we've populated
# this table... we're going to functionally end up using the data in
# the coalmine info table as a "key." Whatever set of things we
# drop duplicates on will be the defacto key. Whatever massaging we do
# of the values here (case, removing whitespace, punctuation, etc.) will
# affect the total number of "unique" mines that we end up having in the
# table... and we probably want to minimize it (without creating
# collisions). We will need to do exactly the same transofrmations in the
# FRC ingest function before merging these values in, or they won't match
# up.
cmi_df = (
cmi_df.assign(
# Map mine type codes, which have changed over the years, to a few
# canonical values:
mine_type=lambda x: x.mine_type.replace(
{'[pP]': 'P', 'U/S': 'US', 'S/U': 'SU', 'Su': 'S'}, regex=True),
# replace 2-letter country codes w/ ISO 3 letter as appropriate:
state=lambda x: x.state.replace(COALMINE_COUNTRY_CODES),
# remove all internal non-alphanumeric characters:
mine_name=lambda x: x.mine_name.replace(
'[^a-zA-Z0-9 -]', '', regex=True),
# Homogenize the data type that we're finding inside the
# county_id_fips field (ugh, Excel sheets!). Mostly these are
# integers or NA values, but for imported coal, there are both
# 'IMP' and 'IM' string values.
county_id_fips=lambda x: x.county_id_fips.replace(
'[a-zA-Z]+', value=np.nan, regex=True
)
)
.assign(mine_type=lambda x: x.mine_type.map(COALMINE_TYPES_EIA))
# No leading or trailing whitespace:
.pipe(pudl.helpers.simplify_strings, columns=["mine_name"])
.astype({"county_id_fips": float})
.astype({"county_id_fips": pd.Int64Dtype()})
.fillna({"mine_type": pd.NA})
.astype({"mine_type": pd.StringDtype()})
)
return cmi_df
###############################################################################
###############################################################################
# DATATABLE TRANSFORM FUNCTIONS
###############################################################################
###############################################################################
[docs]def plants(eia923_dfs, eia923_transformed_dfs):
"""Transforms the plants_eia923 table.
Much of the static plant information is reported repeatedly, and scattered across
several different pages of EIA 923. The data frame that this function uses is
assembled from those many different pages, and passed in via the same dictionary of
dataframes that all the other ingest functions use for uniformity.
Transformations include:
* Map full spelling onto code values.
* Convert Y/N columns to booleans.
* Remove excess white space around values.
* Drop duplicate rows.
Args:
eia923_dfs (dictionary of pandas.DataFrame): Each entry in this dictionary of
DataFrame objects corresponds to a page from the EIA 923 form, as reported
in the Excel spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from that
page (values).
"""
plant_info_df = eia923_dfs['plant_frame'].copy()
# There are other fields being compiled in the plant_info_df from all of
# the various EIA923 spreadsheet pages. Do we want to add them to the
# database model too? E.g. capacity_mw, operator_name, etc.
plant_info_df = plant_info_df[['plant_id_eia',
'combined_heat_power',
'plant_state',
'eia_sector',
'naics_code',
'reporting_frequency',
'census_region',
'nerc_region',
'capacity_mw',
'report_year']]
plant_info_df['reporting_frequency'] = plant_info_df.reporting_frequency.replace(
{'M': 'monthly', 'A': 'annual'})
# Since this is a plain Yes/No variable -- just make it a real sa.Boolean.
plant_info_df.combined_heat_power.replace(
{'N': False, 'Y': True}, inplace=True)
# Get rid of excessive whitespace introduced to break long lines (ugh)
plant_info_df.census_region = plant_info_df.census_region.str.replace(
' ', '')
plant_info_df.drop_duplicates(subset='plant_id_eia')
plant_info_df['plant_id_eia'] = plant_info_df['plant_id_eia'].astype(int)
eia923_transformed_dfs['plants_eia923'] = plant_info_df
return eia923_transformed_dfs
[docs]def nuclear_unit_fuel(
nuclear_unit_fuel: pd.DataFrame,
eia923_transformed_dfs: Dict[str, pd.DataFrame]
) -> None:
"""Transforms the generation_fuel_nuclear_eia923 table.
Transformations include:
* Backfill nuclear_unit_ids for 2001 and 2002.
* Set all prime_mover_codes to 'ST'.
* Aggregate remaining duplicate units.
Parameters:
nuclear_unit_fuel: dataframe of nuclear unit fuels.
eia923_transformed_dfs: dictionary to hold all eia923 tables.
"""
nuclear_unit_fuel["nuclear_unit_id"] = nuclear_unit_fuel["nuclear_unit_id"].astype(
"Int64").astype("string")
nuclear_unit_fuel = _backfill_nuclear_unit_id(nuclear_unit_fuel)
# All nuclear plants have steam turbines.
nuclear_unit_fuel.loc[:, "prime_mover_code"] = nuclear_unit_fuel["prime_mover_code"].fillna(
"ST")
# Aggregate remaining duplicates.
nuclear_unit_fuel = _aggregate_generation_fuel_duplicates(
nuclear_unit_fuel, nuclear=True)
eia923_transformed_dfs["generation_fuel_nuclear_eia923"] = nuclear_unit_fuel
[docs]def generation_fuel(eia923_dfs, eia923_transformed_dfs):
"""Transforms the generation_fuel_eia923 table.
Transformations include:
* Remove fields implicated elsewhere.
* Replace . values with NA.
* Remove rows with utility ids 99999.
* Create a fuel_type_code_pudl field that organizes fuel types into
clean, distinguishable categories.
* Combine year and month columns into a single date column.
* Clean and impute fuel_type field.
* Backfill missing prime_mover_codes
* Create a separate nuclear_unit_fuel table.
* Aggregate records with duplicate natural keys.
Args:
eia923_dfs (dict): Each entry in this dictionary of DataFrame objects
corresponds to a page from the EIA923 form, as reported in the Excel
spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from that
page (values).
"""
# This needs to be a copy of what we're passed in so we can edit it.
gen_fuel = eia923_dfs['generation_fuel'].copy()
# Drop fields we're not inserting into the generation_fuel_eia923 table.
cols_to_drop = ['combined_heat_power',
'plant_name_eia',
'operator_name',
'operator_id',
'plant_state',
'census_region',
'nerc_region',
'naics_code',
'eia_sector',
'sector_name',
'fuel_unit',
'total_fuel_consumption_quantity',
'electric_fuel_consumption_quantity',
'total_fuel_consumption_mmbtu',
'elec_fuel_consumption_mmbtu',
'net_generation_megawatthours']
gen_fuel.drop(cols_to_drop, axis=1, inplace=True)
# Convert the EIA923 DataFrame from yearly to monthly records.
gen_fuel = _yearly_to_monthly_records(gen_fuel)
# Replace the EIA923 NA value ('.') with a real NA value.
gen_fuel = pudl.helpers.fix_eia_na(gen_fuel)
# Remove "State fuel-level increment" records... which don't pertain to
# any particular plant (they have plant_id_eia == operator_id == 99999)
gen_fuel = gen_fuel[gen_fuel.plant_id_eia != 99999]
# conservative manual correction for bad prime mover codes
gen_fuel['prime_mover_code'] = (
# one plant in 2004. Pre-2004, it was '',
# post-2004, it was broken into combined cycle parts
gen_fuel['prime_mover_code'].replace({'CC': ''})
# Empty strings and whitespace that should be NA.
.replace(to_replace=r'^\s*$', value=pd.NA, regex=True)
)
gen_fuel = _clean_gen_fuel_energy_sources(gen_fuel)
gen_fuel = PUDL_META.get_resource("generation_fuel_eia923").encode(gen_fuel)
gen_fuel['fuel_type_code_pudl'] = gen_fuel.energy_source_code.map(
pudl.helpers.label_map(
ENERGY_SOURCES_EIA["df"],
from_col="code",
to_col="fuel_type_code_pudl",
null_value=pd.NA,
)
)
# Drop records missing all variable fields.
variable_fields = [
"fuel_consumed_units",
"fuel_consumed_for_electricity_units",
"fuel_mmbtu_per_unit",
"fuel_consumed_mmbtu",
"fuel_consumed_for_electricity_mmbtu",
"net_generation_mwh"
]
gen_fuel = gen_fuel.dropna(subset=variable_fields, how="all")
# Convert Year/Month columns into a single Date column...
gen_fuel = pudl.helpers.convert_to_date(gen_fuel)
# Create separate nuclear unit fuel table
nuclear_units = gen_fuel[gen_fuel.nuclear_unit_id.notna() |
gen_fuel.energy_source_code.eq("NUC")].copy()
nuclear_unit_fuel(nuclear_units, eia923_transformed_dfs)
gen_fuel = gen_fuel[gen_fuel.nuclear_unit_id.isna(
) & gen_fuel.energy_source_code.ne("NUC")].copy()
gen_fuel = gen_fuel.drop(columns=["nuclear_unit_id"])
# Backfill 2001, 2002 prime_mover_codes.
gen_fuel = _backfill_prime_mover_code(gen_fuel)
# Aggregate any remaining duplicates.
gen_fuel = _aggregate_generation_fuel_duplicates(gen_fuel)
eia923_transformed_dfs['generation_fuel_eia923'] = gen_fuel
return eia923_transformed_dfs
[docs]def _map_prime_mover_sets(prime_mover_set: np.ndarray) -> str:
"""Map unique prime mover combinations to a single prime mover code.
In 2001-2019 data, the .value_counts() of the combinations is:
(CA, CT) 750
(ST, CA) 101
(ST) 60
(CA) 17
(CS, ST, CT) 2
Args:
prime_mover_set (np.ndarray): unique combinations of prime_mover_code
Returns:
str: single prime mover code
"""
if len(prime_mover_set) == 1: # single valued
return prime_mover_set[0]
elif 'CA' in prime_mover_set:
return 'CA' # arbitrary choice
elif 'CS' in prime_mover_set:
return 'CS'
else:
raise ValueError(
"Dataset contains new kinds of duplicate boiler_fuel rows. "
f"Prime movers are {prime_mover_set}"
)
[docs]def _aggregate_duplicate_boiler_fuel_keys(
boiler_fuel_df: pd.DataFrame
) -> pd.DataFrame:
"""Combine boiler_fuel rows with duplicate keys by aggregating them.
Boiler_fuel_eia923 contains a few records with duplicate keys, mostly caused by
CA and CT parts of combined cycle plants being mapped to the same boiler ID.
This is most likely a data entry error. See GitHub issue #852
One solution (implemented here) is to simply aggregate those records together.
This is cheap and easy compared to the more thorough solution of making
surrogate boiler IDs. Aggregation was preferred to purity due to the low volume of
affected records (4.5% of combined cycle plants).
Args:
boiler_fuel_df: the boiler_fuel dataframe
Returns:
A copy of boiler_fuel dataframe with duplicates removed and aggregates appended.
"""
quantity_cols = ['fuel_consumed_units', ]
relative_cols = ['ash_content_pct', 'sulfur_content_pct', 'fuel_mmbtu_per_unit']
key_cols = ['boiler_id', 'energy_source_code', 'plant_id_eia', 'report_date']
expected_cols = set(quantity_cols + relative_cols + key_cols + ['prime_mover_code'])
actual_cols = set(boiler_fuel_df.columns)
difference = actual_cols.symmetric_difference(expected_cols)
if difference:
raise AssertionError(
"Columns were expected to align, instead found this difference: "
f"{difference}"
)
is_duplicate = boiler_fuel_df.duplicated(subset=key_cols, keep=False)
duplicates: pd.DataFrame = boiler_fuel_df[is_duplicate]
boiler_fuel_groups = duplicates.groupby(key_cols)
# For relative columns, take average weighted by fuel usage
total_fuel: pd.Series = boiler_fuel_groups['fuel_consumed_units'].transform('sum')
# division by zero -> NaN, so fill with 0 in those cases
fuel_fraction = duplicates['fuel_consumed_units'].div(
total_fuel.to_numpy()).fillna(0.0)
# overwrite with weighted values
duplicates[relative_cols] = duplicates[relative_cols].mul(
fuel_fraction.to_numpy().reshape(-1, 1))
aggregates = boiler_fuel_groups[quantity_cols + relative_cols].sum()
# apply manual mapping to prime_mover_code
aggregates['prime_mover_code'] = boiler_fuel_groups['prime_mover_code'].unique().apply(
_map_prime_mover_sets)
# NOTE: the following method changes the order of the data and resets the index
modified_boiler_fuel_df = boiler_fuel_df[~is_duplicate].append(
aggregates.reset_index(), ignore_index=True)
return modified_boiler_fuel_df
[docs]def boiler_fuel(eia923_dfs, eia923_transformed_dfs):
"""Transforms the boiler_fuel_eia923 table.
Transformations include:
* Remove fields implicated elsewhere.
* Drop values with plant and boiler id values of NA.
* Replace . values with NA.
* Create a fuel_type_code_pudl field that organizes fuel types into clean,
distinguishable categories.
* Combine year and month columns into a single date column.
Args:
eia923_dfs (dict): Each entry in this dictionary of DataFrame objects
corresponds to a page from the EIA923 form, as reported in the Excel
spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
"""
bf_df = eia923_dfs['boiler_fuel'].copy()
# Need to stop dropping fields that contain harvestable entity attributes.
# See https://github.com/catalyst-cooperative/pudl/issues/509
cols_to_drop = [
'combined_heat_power',
'plant_name_eia',
'operator_name',
'operator_id',
'plant_state',
'census_region',
'nerc_region',
'naics_code',
'eia_sector',
'sector_name',
'fuel_unit',
'total_fuel_consumption_quantity',
'respondent_frequency',
'balancing_authority_code_eia',
]
bf_df.drop(cols_to_drop, axis=1, inplace=True)
bf_df.dropna(subset=['boiler_id', 'plant_id_eia'], inplace=True)
bf_df = _yearly_to_monthly_records(bf_df)
# Replace the EIA923 NA value ('.') with a real NA value.
bf_df = pudl.helpers.fix_eia_na(bf_df)
# Convert Year/Month columns into a single Date column...
bf_df = pudl.helpers.convert_to_date(bf_df)
bf_df = PUDL_META.get_resource("boiler_fuel_eia923").encode(bf_df)
bf_df = _aggregate_duplicate_boiler_fuel_keys(bf_df)
# Add a simplified PUDL fuel type
bf_df['fuel_type_code_pudl'] = bf_df.energy_source_code.map(
pudl.helpers.label_map(
ENERGY_SOURCES_EIA["df"],
from_col="code",
to_col="fuel_type_code_pudl",
null_value=pd.NA,
)
)
eia923_transformed_dfs['boiler_fuel_eia923'] = bf_df
return eia923_transformed_dfs
[docs]def generation(eia923_dfs, eia923_transformed_dfs):
"""Transforms the generation_eia923 table.
Transformations include:
* Drop rows with NA for generator id.
* Remove fields implicated elsewhere.
* Replace . values with NA.
* Drop generator-date row duplicates (all have no data).
Args:
eia923_dfs (dict): Each entry in this dictionary of DataFrame objects
corresponds to a page from the EIA923 form, as reported in the Excel
spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from that
page (values).
"""
gen_df = (
eia923_dfs['generator']
.dropna(subset=['generator_id'])
.drop(['combined_heat_power',
'plant_name_eia',
'operator_name',
'operator_id',
'plant_state',
'census_region',
'nerc_region',
'naics_code',
'eia_sector',
'sector_name',
'net_generation_mwh_year_to_date'],
axis="columns")
.pipe(_yearly_to_monthly_records)
.pipe(pudl.helpers.fix_eia_na)
.pipe(pudl.helpers.convert_to_date)
)
# There are a few records that contain (literal) "nan"s in the generator_id
# field. We are doing a targeted drop here instead of a full drop because
# We don't want to drop a bunch of data points if new nans are introduced
# into the data. See issue #1208 for targeted drop reasoning.
drop_plant_ids = [54587]
missing_data_strings = ["nan"]
row_drop_mask = (
gen_df.plant_id_eia.isin(drop_plant_ids)
& gen_df.generator_id.isin(missing_data_strings)
)
gen_df = gen_df[~row_drop_mask]
# There are a few hundred (out of a few hundred thousand) records which
# have duplicate records for a given generator/date combo. However, in all
# cases one of them has no data (net_generation_mwh) associated with it,
# so it's pretty clear which one to drop.
unique_subset = ["report_date", "plant_id_eia", "generator_id"]
dupes = gen_df[gen_df.duplicated(subset=unique_subset, keep=False)]
gen_df = gen_df.drop(dupes.net_generation_mwh.isna().index)
gen_df = PUDL_META.get_resource("generation_eia923").encode(gen_df)
eia923_transformed_dfs['generation_eia923'] = gen_df
return eia923_transformed_dfs
[docs]def coalmine(eia923_dfs, eia923_transformed_dfs):
"""Transforms the coalmine_eia923 table.
Transformations include:
* Remove fields implicated elsewhere.
* Drop duplicates with MSHA ID.
Args:
eia923_dfs (dict): Each entry in this dictionary of DataFrame objects
corresponds to a page from the EIA923 form, as reported in the Excel
spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from that
page (values).
"""
# These are the columns that we want to keep from FRC for the
# coal mine info table.
coalmine_cols = ['mine_name',
'mine_type',
'state',
'county_id_fips',
'mine_id_msha']
# Make a copy so we don't alter the FRC data frame... which we'll need
# to use again for populating the FRC table (see below)
cmi_df = eia923_dfs['fuel_receipts_costs'].copy()
# Keep only the columns listed above:
cmi_df = _coalmine_cleanup(cmi_df)
cmi_df = cmi_df[coalmine_cols]
# If we actually *have* an MSHA ID for a mine, then we have a totally
# unique identifier for that mine, and we can safely drop duplicates and
# keep just one copy of that mine, no matter how different all the other
# fields associated with the mine info are... Here we split out all the
# coalmine records that have an MSHA ID, remove them from the CMI
# data frame, drop duplicates, and then bring the unique mine records
# back into the overall CMI dataframe...
cmi_with_msha = cmi_df[cmi_df['mine_id_msha'] > 0]
cmi_with_msha = cmi_with_msha.drop_duplicates(subset=['mine_id_msha', ])
cmi_df.drop(cmi_df[cmi_df['mine_id_msha'] > 0].index)
cmi_df.append(cmi_with_msha)
cmi_df = cmi_df.drop_duplicates(subset=['mine_name',
'state',
'mine_id_msha',
'mine_type',
'county_id_fips'])
# drop null values if they occur in vital fields....
cmi_df.dropna(subset=['mine_name', 'state'], inplace=True)
# we need an mine id to associate this coalmine table with the frc
# table. In order to do that, we need to create a clean index, like
# an autoincremeted id column in a db, which will later be used as a
# primary key in the coalmine table and a forigen key in the frc table
# first we reset the index to get a clean index
cmi_df = cmi_df.reset_index()
# then we get rid of the old index
cmi_df = cmi_df.drop(labels=['index'], axis=1)
# then name the index id
cmi_df.index.name = 'mine_id_pudl'
# then make the id index a column for simpler transferability
cmi_df = cmi_df.reset_index()
cmi_df = PUDL_META.get_resource("coalmine_eia923").encode(cmi_df)
eia923_transformed_dfs['coalmine_eia923'] = cmi_df
return eia923_transformed_dfs
[docs]def fuel_receipts_costs(eia923_dfs, eia923_transformed_dfs):
"""Transforms the fuel_receipts_costs_eia923 dataframe.
Transformations include:
* Remove fields implicated elsewhere.
* Replace . values with NA.
* Standardize codes values.
* Fix dates.
* Replace invalid mercury content values with NA.
Fuel cost is reported in cents per mmbtu. Converts cents to dollars.
Args:
eia923_dfs (dict): Each entry in this dictionary of DataFrame objects
corresponds to a page from the EIA923 form, as reported in the Excel
spreadsheets they distribute.
eia923_transformed_dfs (dict): A dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from
that page (values).
Returns:
dict: eia923_transformed_dfs, a dictionary of DataFrame objects in which pages
from EIA923 form (keys) correspond to normalized DataFrames of values from that
page (values).
"""
frc_df = eia923_dfs['fuel_receipts_costs'].copy()
# Drop fields we're not inserting into the fuel_receipts_costs_eia923
# table.
cols_to_drop = ['plant_name_eia',
'plant_state',
'operator_name',
'operator_id',
'mine_id_msha',
'mine_type',
'state',
'county_id_fips',
'mine_name',
'regulated',
'reporting_frequency']
cmi_df = (
eia923_transformed_dfs['coalmine_eia923'].copy()
# In order for the merge to work, we need to get the county_id_fips
# field back into ready-to-dump form... so it matches the types of the
# county_id_fips field that we are going to be merging on in the
# frc_df.
# rename(columns={'id': 'mine_id_pudl'})
)
# This type/naming cleanup function is separated out so that we can be
# sure it is applied exactly the same both when the coalmine_eia923 table
# is populated, and here (since we need them to be identical for the
# following merge)
frc_df = (
frc_df.pipe(_coalmine_cleanup).
merge(cmi_df, how='left',
on=['mine_name', 'state', 'mine_id_msha',
'mine_type', 'county_id_fips']).
drop(cols_to_drop, axis=1).
# Replace the EIA923 NA value ('.') with a real NA value.
pipe(pudl.helpers.fix_eia_na).
# These come in ALL CAPS from EIA...
pipe(pudl.helpers.simplify_strings, columns=['supplier_name']).
pipe(pudl.helpers.fix_int_na, columns=['contract_expiration_date', ]).
assign(
fuel_cost_per_mmbtu=lambda x: x.fuel_cost_per_mmbtu / 100,
fuel_group_code=lambda x: (
x.fuel_group_code.str.lower().str.replace(' ', '_')),
contract_expiration_month=lambda x: x.contract_expiration_date.apply(
lambda y: y[:-2] if y != '' else y)).
assign(
# These assignments are separate b/c they exp_month is altered 2x
contract_expiration_month=lambda x: x.contract_expiration_month.apply(
lambda y: y if y != '' and int(y) <= 12 else ''),
contract_expiration_year=lambda x: x.contract_expiration_date.apply(
lambda y: '20' + y[-2:] if y != '' else y)).
# Now that we will create our own real date field, so chuck this one.
drop('contract_expiration_date', axis=1).
pipe(pudl.helpers.convert_to_date,
date_col='contract_expiration_date',
year_col='contract_expiration_year',
month_col='contract_expiration_month').
pipe(pudl.helpers.convert_to_date).
pipe(pudl.helpers.cleanstrings,
['natural_gas_transport_code',
'natural_gas_delivery_contract_type_code'],
[{'firm': ['F'], 'interruptible': ['I']},
{'firm': ['F'], 'interruptible': ['I']}],
unmapped='')
)
frc_df = PUDL_META.get_resource("fuel_receipts_costs_eia923").encode(frc_df)
frc_df["fuel_type_code_pudl"] = (
frc_df.energy_source_code.map(
pudl.helpers.label_map(
ENERGY_SOURCES_EIA["df"],
from_col="code",
to_col="fuel_type_code_pudl",
null_value=pd.NA,
)
)
)
# Remove known to be invalid mercury content values. Almost all of these
# occur in the 2012 data. Real values should be <0.25ppm.
bad_hg_idx = frc_df.mercury_content_ppm >= 7.0
frc_df.loc[bad_hg_idx, "mercury_content_ppm"] = np.nan
eia923_transformed_dfs['fuel_receipts_costs_eia923'] = frc_df
return eia923_transformed_dfs