"""Aggregate plant parts to make an EIA master plant-part table.
Practically speaking, a plant is a collection of generator(s). There are many attributes
of generators (i.e. prime mover, primary fuel source, technology type). We can use these
generator attributes to group generator records into larger aggregate records which we
call "plant-parts". A plant part is a record which corresponds to a particular
collection of generators that all share an identical attribute. E.g. all of the
generators with unit_id=2, or all of the generators with coal as their primary fuel
source.
The EIA data about power plants (from EIA 923 and 860) is reported in tables with
records that correspond to mostly generators and plants. Other datasets (cough cough
FERC1) are less well organized and include plants, generators and other plant-parts all
in the same table without any clear labels. The master plant-part table is an attempt to
create records corresponding to many different plant-parts in order to connect specific
slices of EIA plants to other datasets.
Because generators are often owned by multiple utilities, another dimension of the
plant-part table involves generating two records for each owner: one for the portion of
the plant part they own and one for the plant part as a whole. The portion records are
labeled in the ``ownership_record_type`` column as "owned" and the total records are
labeled as "total".
This module refers to "true granularities". Many plant parts we cobble together here in
the master plant-part list refer to the same collection of infrastructure as other
plant-part list records. For example, if we have a "plant_prime_mover" plant part record
and a "plant_unit" plant part record which were both cobbled together from the same two
generators. We want to be able to reduce the plant-part list to only unique collections
of generators, so we label the first unique granularity as a true granularity and label
the subsequent records as false granularities with the ``true_gran`` column. In order to
choose which plant-part to keep in these instances, we assigned a hierarchy of plant
parts, the order of the keys in :py:const:`PLANT_PARTS` and label whichever plant-part
comes first as the unique granularity.
**Recipe Book for the plant-part list**
:py:const:`PLANT_PARTS` is the main recipe book for how each of the plant-parts need to
be compiled. These plant-parts represent ways to group generators based on widely
reported values in EIA. All of these are logical ways to group collections of generators
- in most cases - but some groupings of generators are more prevalent or relevant than
others for certain types of plants.
The canonical example here is the ``plant_unit``. A unit is a collection of generators
that operate together - most notably the combined-cycle natural gas plants.
Combined-cycle units generally consist of a number of gas turbines which feed excess
steam to a number of steam turbines.
>>> df_gens = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1],
... 'generator_id': ['a', 'b', 'c'],
... 'unit_id_pudl': [1, 1, 1],
... 'prime_mover_code': ['CT', 'CT', 'CA'],
... 'capacity_mw': [50, 50, 100],
... })
>>> df_gens
plant_id_eia generator_id unit_id_pudl prime_mover_code capacity_mw
0 1 a 1 CT 50
1 1 b 1 CT 50
2 1 c 1 CA 100
A good example of a plant-part that isn't really logical also comes from a
combined-cycle unit. Grouping this example plant by the ``prime_mover_code``
would generate two records that would basically never show up in FERC1.
This stems from the inseparability of the generators.
>>> df_plant_prime_mover = pd.DataFrame({
... 'plant_id_eia': [1, 1],
... 'plant_part': ['plant_prime_mover', 'plant_prime_mover'],
... 'prime_mover_code': ['CT', 'CA'],
... 'capacity_mw': [100, 100],
... })
>>> df_plant_prime_mover
plant_id_eia plant_part prime_mover_code capacity_mw
0 1 plant_prime_mover CT 100
1 1 plant_prime_mover CA 100
In this case the unit is more relevant:
>>> df_plant_unit = pd.DataFrame({
... 'plant_id_eia': [1],
... 'plant_part': ['plant_unit'],
... 'unit_id_pudl': [1],
... 'capacity_mw': [200],
... })
>>> df_plant_unit
plant_id_eia plant_part unit_id_pudl capacity_mw
0 1 plant_unit 1 200
But if this same plant had both this combined-cycle unit and two more
generators that were self contained "GT" or gas combustion turbine, a logical
way to group these generators is to have different recprds for the
combined-cycle unit and the gas-turbine.
>>> df_gens = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1, 1],
... 'generator_id': ['a', 'b', 'c', 'd', 'e'],
... 'unit_id_pudl': [1, 1, 1, 2, 3],
... 'prime_mover_code': ['CT', 'CT', 'CA', 'GT', 'GT'],
... 'capacity_mw': [50, 50, 100, 75, 75],
... })
>>> df_gens
plant_id_eia generator_id unit_id_pudl prime_mover_code capacity_mw
0 1 a 1 CT 50
1 1 b 1 CT 50
2 1 c 1 CA 100
3 1 d 2 GT 75
4 1 e 3 GT 75
>>> df_plant_part = pd.DataFrame({
... 'plant_id_eia': [1, 1],
... 'plant_part': ['plant_unit', 'plant_prime_mover'],
... 'unit_id_pudl': [1, pd.NA],
... 'prime_mover_code': [pd.NA, 'GT',],
... 'capacity_mw': [200, 150],
... })
>>> df_plant_part
plant_id_eia plant_part unit_id_pudl prime_mover_code capacity_mw
0 1 plant_unit 1 <NA> 200
1 1 plant_prime_mover <NA> GT 150
In this case, the ``plant_unit`` record would have a null
``prime_mover_code`` because the unit contains more than one
``prime_mover_code``. Same goes for the ``unit_id_pudl`` of the
``plant_prime_mover`` record. This is handled in the :class:``AddConsistentAttributes``.
**Overview of flow for generating the plant-part table:**
The two main classes which enable the generation of the plant-part table are:
* :class:`MakeMegaGenTbl`: All of the plant parts are compiled from generators.
So this class generates a big dataframe of generators with any ID and data
columns we'll need. This is also where we add records regarding utility
ownership slices. The table includes two records for every generator-owner:
one for the "total" generator (assuming the owner owns 100% of the generator)
and one for the report ownership fraction of that generator with all of the
data columns scaled to the ownership fraction.
* :class:`MakePlantParts`: This class uses the generator dataframe as well as
the information stored in :py:const:`PLANT_PARTS` to know how to aggregate each
of the plant parts. Then we have plant part dataframes with the columns which
identify the plant part and all of the data columns aggregated to the level of
the plant part. With that compiled plant part dataframe we also add in qualifier
columns with :class:`AddConsistentAttributes`. A qualifier column is a column which
contain data that is not endemic to the plant part record (it is not one of
the identifying columns or aggregated data columns) but the data is still
useful data that is attributable to each of the plant part records. For more
detail on what a qualifier column is, see :meth:`AddConsistentAttributes.execute`.
**Generating the plant-parts list**
There are two ways to generate the plant-parts table: one directly using the
:class:`pudl.output.pudltabl.PudlTabl` object and the other using the classes
from this module. Either option needs a :class:`pudl.output.pudltabl.PudlTabl`
object.
Create the :class:`pudl.output.pudltabl.PudlTabl` object:
.. code-block:: python
import pudl
from pudl.workspace.setup import PudlPaths
pudl_engine = sa.create_engine(PudlPaths().pudl_db)
pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine,freq='YS')
Then make the table via pudl_out:
.. code-block:: python
plant_parts_eia = pudl_out.plant_parts_eia()
OR make the table via objects in this module:
.. code-block:: python
gens_mega = MakeMegaGenTbl().execute(mcoe, own_eia860)
parts_compiler = MakePlantParts(pudl_out)
plant_parts_eia = parts_compiler.execute(gens_mega=gens_mega)
"""
from collections import OrderedDict
from copy import deepcopy
from importlib import resources
from pathlib import Path
from typing import Literal
import numpy as np
import pandas as pd
from dagster import AssetCheckResult, AssetIn, asset, asset_check
import pudl
from pudl.metadata.classes import Resource
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
# HALP: I need both of these setting set in order for the dfs in the docstrings
# to pass the doctests. Without them the formatting get all jumbled.
# but obviously this is the wrong place to do this.
# I tried adding these into conftest.py in pandas_terminal_width().
# I tried adding this into __init__.py.
# I tried adding this into the module docstring.
pd.options.display.width = 1000
pd.options.display.max_columns = 1000
[docs]
PLANT_PARTS: OrderedDict[str, dict[str, list]] = OrderedDict(
{
"plant": {
"id_cols": ["plant_id_eia"],
},
"plant_match_ferc1": {
"id_cols": ["plant_id_eia", "ferc1_generator_agg_id"],
},
"plant_unit": {
"id_cols": ["plant_id_eia", "unit_id_pudl"],
},
"plant_prime_mover": {
"id_cols": ["plant_id_eia", "prime_mover_code"],
},
"plant_technology": {
"id_cols": ["plant_id_eia", "technology_description"],
},
"plant_prime_fuel": {
"id_cols": ["plant_id_eia", "energy_source_code_1"],
},
"plant_ferc_acct": {
"id_cols": ["plant_id_eia", "ferc_acct_name"],
},
"plant_operating_year": {
"id_cols": ["plant_id_eia", "generator_operating_year"],
},
"plant_gen": {
"id_cols": ["plant_id_eia", "generator_id"],
},
}
)
"""Dict: this dictionary contains a key for each of the 'plant parts' that should end up
in the plant parts list. The top-level value for each key is another dictionary, which
contains keys:
* id_cols (the primary key type id columns for this plant part). The
plant_id_eia column must come first.
"""
[docs]
PLANT_PARTS_LITERAL = Literal[
"plant",
"plant_unit",
"plant_prime_mover",
"plant_technology",
"plant_prime_fuel",
"plant_ferc_acct",
"plant_operating_year",
"plant_gen",
]
[docs]
IDX_TO_ADD: list[str] = ["report_date", "operational_status_pudl"]
"""
list: list of additional columns to add to the id_cols in :py:const:`PLANT_PARTS`.
The id_cols are the base columns that we need to aggregate on, but we also need
to add the report date to keep the records time sensitive and the
operational_status_pudl to separate the operating plant-parts from the
non-operating plant-parts.
"""
[docs]
IDX_OWN_TO_ADD: list[str] = ["utility_id_eia", "ownership_record_type"]
"""
list: list of additional columns beyond the :py:const:`IDX_TO_ADD` to add to the
id_cols in :py:const:`PLANT_PARTS` when we are dealing with plant-part records
that have been broken out into "owned" and "total" records for each of their
owners.
"""
[docs]
SUM_COLS: list[str] = [
"total_fuel_cost",
"net_generation_mwh",
"capacity_mw",
"capacity_eoy_mw",
"total_mmbtu",
]
"""List: list of columns to sum when aggregating a table."""
[docs]
WTAVG_DICT = {
"fuel_cost_per_mwh": "capacity_mw",
"unit_heat_rate_mmbtu_per_mwh": "capacity_mw",
"fuel_cost_per_mmbtu": "capacity_mw",
}
"""Dict: a dictionary of columns (keys) to perform weighted averages on and the weight
column (values)"""
[docs]
CONSISTENT_ATTRIBUTE_COLS = [
"fuel_type_code_pudl",
"planned_generator_retirement_date",
"generator_retirement_date",
"generator_id",
"unit_id_pudl",
"technology_description",
"energy_source_code_1",
"prime_mover_code",
"ferc_acct_name",
"generator_operating_year",
]
"""List: a list of column names to add as attributes when they are consistent into the
aggregated plant-part records.
All the plant part ID columns must be in consistent attributes.
"""
[docs]
PRIORITY_ATTRIBUTES_DICT = {
"operational_status": ["existing", "proposed", "retired"],
}
[docs]
MAX_MIN_ATTRIBUTES_DICT = {
"installation_year": {
"assign_col": {"installation_year": lambda x: x.generator_operating_year},
"dtype": "Int64",
"keep": "first",
},
"construction_year": {
"assign_col": {"construction_year": lambda x: x.generator_operating_year},
"dtype": "Int64",
"keep": "last",
},
}
[docs]
FIRST_COLS = [
"plant_id_eia",
"report_date",
"plant_part",
"generator_id",
"unit_id_pudl",
"prime_mover_code",
"energy_source_code_1",
"technology_description",
"ferc_acct_name",
"utility_id_eia",
"true_gran",
"appro_part_label",
]
@asset(
io_manager_key="pudl_io_manager",
compute_kind="Python",
op_tags={"memory-use": "high"},
)
[docs]
def out_eia__yearly_generators_by_ownership(
out_eia__yearly_generators: pd.DataFrame, out_eia860__yearly_ownership: pd.DataFrame
) -> pd.DataFrame:
"""Create mega generators table asset."""
return MakeMegaGenTbl().execute(
mcoe=out_eia__yearly_generators,
own_eia860=out_eia860__yearly_ownership,
)
@asset(
io_manager_key="pudl_io_manager",
compute_kind="Python",
op_tags={"memory-use": "high"},
)
[docs]
def out_eia__yearly_plant_parts(
out_eia__yearly_generators_by_ownership: pd.DataFrame,
out_eia__yearly_plants: pd.DataFrame,
out_eia__yearly_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Create plant parts list asset."""
return MakePlantParts().execute(
gens_mega=out_eia__yearly_generators_by_ownership,
plants_eia860=out_eia__yearly_plants,
utils_eia860=out_eia__yearly_utilities,
)
[docs]
class MakeMegaGenTbl:
"""Compiler for a MEGA generator table with ownership integrated.
Examples:
---------
**Input Tables**
Here is an example of one plant with three generators. We will use
``capacity_mw`` as the data column.
>>> mcoe = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01','2020-01-01'],
... 'generator_id': ['a', 'b', 'c'],
... 'utility_id_eia': [111, 111, 111],
... 'unit_id_pudl': [1, 1, 1],
... 'prime_mover_code': ['CT', 'CT', 'CA'],
... 'technology_description': [
... 'Natural Gas Fired Combined Cycle', 'Natural Gas Fired Combined Cycle', 'Natural Gas Fired Combined Cycle'
... ],
... 'operational_status': ['existing', 'existing','existing'],
... 'generator_retirement_date': [pd.NA, pd.NA, pd.NA],
... 'capacity_mw': [50, 50, 100],
... }).astype({
... 'generator_retirement_date': "datetime64[ns]",
... 'report_date': "datetime64[ns]",
... })
>>> mcoe
plant_id_eia report_date generator_id utility_id_eia unit_id_pudl prime_mover_code technology_description operational_status generator_retirement_date capacity_mw
0 1 2020-01-01 a 111 1 CT Natural Gas Fired Combined Cycle existing NaT 50
1 1 2020-01-01 b 111 1 CT Natural Gas Fired Combined Cycle existing NaT 50
2 1 2020-01-01 c 111 1 CA Natural Gas Fired Combined Cycle existing NaT 100
The ownership table from EIA 860 includes one record for every owner of
each generator. In this example generator ``c`` has two owners.
>>> df_own_eia860 = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01','2020-01-01', '2020-01-01'],
... 'generator_id': ['a', 'b', 'c', 'c'],
... 'utility_id_eia': [111, 111, 111, 111],
... 'owner_utility_id_eia': [111, 111, 111, 888],
... 'fraction_owned': [1, 1, .75, .25]
... }).astype({'report_date': "datetime64[ns]"})
>>> df_own_eia860
plant_id_eia report_date generator_id utility_id_eia owner_utility_id_eia fraction_owned
0 1 2020-01-01 a 111 111 1.00
1 1 2020-01-01 b 111 111 1.00
2 1 2020-01-01 c 111 111 0.75
3 1 2020-01-01 c 111 888 0.25
**Output Mega Generators Table**
``MakeMegaGenTbl().execute(mcoe, df_own_eia860, slice_cols=['capacity_mw'])``
produces the output table ``gens_mega`` which includes two main sections:
the generators with a "total" ownership stake for each of their owners and
the generators with an "owned" ownership stake for each of their
owners. For the generators that are owned 100% by one utility, the
records are identical except the ``ownership_record_type`` column. For the
generators that have more than one owner, there are two "total" records
with 100% of the capacity of that generator - one for each owner - and
two "owned" records with the capacity scaled to the ownership stake
of each of the owner utilities - represented by ``fraction_owned``.
"""
def __init__(self):
"""Initialize object which creates a MEGA generator table.
The coordinating function here is :meth:`execute`.
"""
[docs]
self.id_cols_list = make_id_cols_list()
[docs]
def execute(
self,
mcoe: pd.DataFrame,
own_eia860: pd.DataFrame,
slice_cols: list[str] = SUM_COLS,
validate_own_merge: str = "1:m",
) -> pd.DataFrame:
"""Make the mega generators table with ownership integrated.
Args:
mcoe: generator-based mcoe table with DEFAULT_GENS_COLS generator attributes
from :meth:`pudl.output.PudlTabl.mcoe_generators()`
own_eia860: ownership table from :meth:`pudl.output.PudlTabl.own_eia860()`
scale_cols: list of columns to slice by ownership fraction in
:meth:`pudl.helpers.scale_by_ownership`. Default is :py:const:`SUM_COLS`
validate_own_merge: how the merge between ``mcoe`` and ``own_eia860``
is to be validated via ``pd.merge``. If there should be one
record for each plant/generator/date in ``mcoe`` then the default
`1:m` should be used.
Returns:
a table of all of the generators with identifying columns and data
columns, sliced by ownership which makes "total" and "owned"
records for each generator owner. The "owned" records have the
generator's data scaled to the ownership percentage (e.g. if a 200
MW generator has a 75% stake owner and a 25% stake owner, this will
result in two "owned" records with 150 MW and 50 MW). The "total"
records correspond to the full plant for every owner (e.g. using
the same 2-owner 200 MW generator as above, each owner will have a
records with 200 MW).
"""
logger.info("Generating the mega generator table with ownership.")
gens_mega = (
self.get_gens_mega_table(mcoe)
.pipe(self.label_operating_gens)
.pipe(
pudl.helpers.scale_by_ownership,
own_eia860,
slice_cols,
validate_own_merge,
)
)
gens_mega = gens_mega.convert_dtypes()
return gens_mega
[docs]
def get_gens_mega_table(self, mcoe):
"""Compile the main generators table that will be used as base of PPL.
Get a table of all of the generators there ever were and all of the
data PUDL has to offer about those generators. This generator table
will be used to compile all of the "plant-parts", so we need to ensure
that any of the id columns from the other plant-parts are in this
generator table as well as all of the data columns that we are going to
aggregate to the various plant-parts.
Returns:
pandas.DataFrame
"""
all_gens = pd.merge( # Add EIA FERC acct fields
mcoe,
pudl.helpers.get_eia_ferc_acct_map(),
on=["technology_description", "prime_mover_code"],
validate="m:1",
how="left",
)
all_gens.loc[:, "generator_operating_year"] = all_gens[
"generator_operating_date"
].dt.year
all_gens = all_gens.astype({"generator_operating_year": "Int64"})
return all_gens
[docs]
def label_operating_gens(self, gen_df: pd.DataFrame) -> pd.DataFrame:
"""Label the operating generators.
We want to distinguish between "operating" generators (those that
report as "existing" and those that retire mid-year) and everything
else so that we can group the operating generators into their own
plant-parts separate from retired or proposed generators. We do this by
creating a new label column called "operational_status_pudl".
This method also adds a column called "capacity_eoy_mw", which is the
end of year capacity of the generators. We assume that if a generator
isn't "existing", its EOY capacity should be zero.
Args:
gen_df (pandas.DataFrame): annual table of all generators from EIA.
Returns:
pandas.DataFrame: annual table of all generators from EIA that
operated within each reporting year.
Todo:
This function results in warning: `PerformanceWarning: DataFrame
is highly fragmented...` I expect this is because of the number of
columns that are being assigned here via `.loc[:, col_to_assign]`.
"""
mid_year_retiree_mask = (
gen_df.generator_retirement_date.dt.year == gen_df.report_date.dt.year
)
existing_mask = gen_df.operational_status == "existing"
operating_mask = existing_mask | mid_year_retiree_mask
# we've going to make a new column which combines both the mid-year
# reitrees and the fully existing gens into one code so we can group
# them together later on
gen_df.loc[:, "operational_status_pudl"] = gen_df.loc[
:, "operational_status"
].mask(operating_mask, "operating")
gen_df.loc[:, "capacity_eoy_mw"] = gen_df.loc[:, "capacity_mw"].mask(
~existing_mask, 0
)
logger.info(
f"Labeled {len(gen_df.loc[~existing_mask])/len(gen_df):.02%} of "
"generators as non-operative."
)
return gen_df
[docs]
class MakePlantParts:
"""Compile the plant parts for the master unit list.
This object generates a master list of different "plant-parts", which
are various collections of generators - i.e. units, fuel-types, whole
plants, etc. - as well as various ownership arrangements. Each
plant-part is included in the master plant-part table associated with
each of the plant-part's owner twice - once with the data scaled to the
fraction of each owners' ownership and another for a total plant-part
for each owner.
This master plant parts table is generated by first creating a complete
generators table - with all of the data columns we will be aggregating
to different plant-part's and sliced and scaled by ownership. Then we use the
complete generator table to aggregate by each of the plant-part
categories. Next we add a label for each plant-part record which indicates
whether or not the record is a unique grouping of generator records.
The coordinating function here is :meth:`execute`.
"""
def __init__(self):
"""Initialize instance of :class:`MakePlantParts`."""
[docs]
self.parts_to_ids = make_parts_to_ids_dict()
# get a list of all of the id columns that constitute the primary keys
# for all of the plant parts
[docs]
self.id_cols_list = make_id_cols_list()
[docs]
def execute(self, gens_mega, plants_eia860, utils_eia860):
"""Aggregate and slice data points by each plant part.
Returns:
pandas.DataFrame: The complete plant parts list
"""
# aggregate everything by each plant part
part_dfs = []
for part_name in PLANT_PARTS:
if part_name == "plant_match_ferc1":
continue
part_df = PlantPart(part_name).execute(gens_mega)
# add in the attributes!
part_df = self.add_attributes(part_df, gens_mega, part_name)
# assert that all the plant part ID columns are now in part_df
assert {
col
for part in PLANT_PARTS
if part != "plant_match_ferc1"
for col in PLANT_PARTS[part]["id_cols"]
}.issubset(part_df.columns)
part_dfs.append(part_df)
plant_parts_eia = pd.concat(part_dfs)
# Add in 1:m matches.
self.plant_parts_eia = self.add_one_to_many(
plant_parts_eia=plant_parts_eia,
part_name="plant_match_ferc1",
path_to_one_to_many=resources.files("pudl.package_data.glue").joinpath(
"eia_ferc1_one_to_many.csv",
),
)
self.plant_parts_eia = TrueGranLabeler().execute(self.plant_parts_eia)
# clean up, add additional columns
self.plant_parts_eia = (
self.add_additional_cols(
plant_parts_eia=self.plant_parts_eia,
plants_eia860=plants_eia860,
utils_eia860=utils_eia860,
)
.pipe(pudl.helpers.organize_cols, FIRST_COLS)
.pipe(self._clean_plant_parts)
.pipe(Resource.from_id("out_eia__yearly_plant_parts").format_df)
)
return self.plant_parts_eia
#######################################
# Add Entity Columns and Final Cleaning
#######################################
[docs]
def add_one_to_many(
self,
plant_parts_eia: pd.DataFrame,
part_name: Literal["plant_match_ferc1"],
path_to_one_to_many: Path,
) -> pd.DataFrame:
"""Integrate 1:m FERC-EIA matches into the plant part list.
In the FERC:EIA manual match, more than one EIA record may be matched to a
FERC record. This method reads in a .csv of one to many matches generated during
the validation stage of.
Args:
plant_parts_eia (pandas.DataFrame): the master unit list table.
part_name: should always be "plant_match_ferc1".
path_to_one_to_many: a Path to the one_to_many csv
file in `pudl.package_data.glue`.
Returns:
pandas.DataFrame: master unit list table with one-to-many matches aggregated
as plant parts.
"""
# Read in csv.
try:
with resources.as_file(path_to_one_to_many) as override_source:
one_to_many = pd.read_csv(override_source)
except FileNotFoundError:
return plant_parts_eia
logger.info("Aggregate any 1:m FERC-EIA match records.")
# Filter plant part list to records in 1:m
plant_parts_eia_filt = plant_parts_eia.loc[
plant_parts_eia.record_id_eia.isin(one_to_many.record_id_eia)
]
if plant_parts_eia_filt.empty: # If 1:m matches not in plant_part subset
return plant_parts_eia.assign(ferc1_generator_agg_id=pd.NA)
# Get the 'm' generator IDs 1:m
one_to_many_single = match_to_single_plant_part(
multi_gran_df=plant_parts_eia_filt,
ppe=plant_parts_eia,
part_name="plant_gen",
cols_to_keep=["plant_part"],
)[["record_id_eia", "record_id_eia_plant_gen", "plant_part"]].rename(
columns={"record_id_eia_plant_gen": "gen_id"}
)
# If any 'duplicate records' actually match to the same generator, these aren't 1:m matches. Drop them and any corresponding non-matches.
one_to_many = one_to_many.merge(
one_to_many_single, on="record_id_eia", validate="1:m"
).drop_duplicates("gen_id")
one_to_many = one_to_many.loc[
one_to_many.duplicated(subset="record_id_ferc1", keep=False), :
]
# For each FERC ID, group and add a unique ferc1_generator_agg_id.
one_to_many["ferc1_generator_agg_id"] = (
one_to_many.groupby(["record_id_ferc1"]).ngroup().astype("Int64")
)
# Add this column back to the main plant part table to enable later identification of grouped records.
plant_parts_eia = plant_parts_eia.merge(
one_to_many[["gen_id", "ferc1_generator_agg_id"]],
left_on="record_id_eia",
right_on="gen_id",
how="left",
validate="1:1",
)
# Aggregate these records into "plant parts" and concatenate onto plant_part table
part_df = PlantPart(part_name).execute(plant_parts_eia)
# add in the attributes!
part_df = self.add_attributes(part_df, plant_parts_eia, part_name)
# Assert that each 'ferc1_generator_agg_id' only contains one faked record
double_df = part_df[
part_df.groupby("ferc1_generator_agg_id")[
"ferc1_generator_agg_id"
].transform("size")
> 1
]
orig_ids = plant_parts_eia.loc[
plant_parts_eia.ferc1_generator_agg_id.isin(
double_df.ferc1_generator_agg_id
)
]
assert double_df.empty, f"The following record ids have >1 faked part. Double-check these records or move them to the eia_ferc1_null.csv: {one_to_many.loc[one_to_many.gen_id.isin(orig_ids.record_id_eia), 'record_id_ferc1'].drop_duplicates().tolist()}"
return pd.concat([plant_parts_eia, part_df])
[docs]
def add_additional_cols(self, plant_parts_eia, plants_eia860, utils_eia860):
"""Add additional data and id columns.
This method adds a set of either calculated columns or PUDL ID columns.
Returns:
pandas.DataFrame: master unit list table with these additional
columns:
* utility_id_pudl +
* plant_id_pudl +
* capacity_factor +
* ownership_dupe (boolean): indicator of whether the "owned"
record has a corresponding "total" duplicate.
"""
plant_parts_eia = (
pudl.helpers.calc_capacity_factor(
df=plant_parts_eia, min_cap_fact=-0.5, max_cap_fact=1.5, freq="YS"
)
.merge(
plants_eia860[["plant_id_eia", "plant_id_pudl"]].drop_duplicates(),
how="left",
on=[
"plant_id_eia",
],
)
.merge(
utils_eia860[
["utility_id_eia", "utility_id_pudl", "utility_name_eia"]
].drop_duplicates(),
how="left",
on=["utility_id_eia"],
)
.assign(
ownership_dupe=lambda x: np.where(
(x.ownership_record_type == "owned") & (x.fraction_owned == 1),
True,
False,
)
)
)
return plant_parts_eia
[docs]
def _clean_plant_parts(self, plant_parts_eia):
plant_parts_eia = plant_parts_eia.assign(
report_year=lambda x: x.report_date.dt.year,
plant_id_report_year=lambda x: x.plant_id_pudl.astype(str)
+ "_"
+ x.report_year.astype(str),
).pipe(
pudl.helpers.cleanstrings_snake,
["record_id_eia", "appro_record_id_eia"],
)
return plant_parts_eia[
~plant_parts_eia["record_id_eia"].duplicated(keep="first")
]
[docs]
def add_attributes(self, part_df, attribute_df, part_name):
"""Add constant and min/max attributes to plant parts."""
for attribute_col in CONSISTENT_ATTRIBUTE_COLS:
part_df = AddConsistentAttributes(attribute_col, part_name).execute(
part_df, attribute_df
)
for attribute_col in PRIORITY_ATTRIBUTES_DICT:
part_df = AddPriorityAttribute(attribute_col, part_name).execute(
part_df, attribute_df
)
for attribute_col in MAX_MIN_ATTRIBUTES_DICT:
part_df = AddMaxMinAttribute(
attribute_col,
part_name,
assign_col_dict=MAX_MIN_ATTRIBUTES_DICT[attribute_col]["assign_col"],
).execute(
part_df,
attribute_df,
att_dtype=MAX_MIN_ATTRIBUTES_DICT[attribute_col]["dtype"],
keep=MAX_MIN_ATTRIBUTES_DICT[attribute_col]["keep"],
)
return part_df
[docs]
class PlantPart:
"""Plant-part table maker.
The coordinating method here is :meth:`execute`.
**Examples**
Below are some examples of how the main processing step in this class
operates: :meth:`PlantPart.ag_part_by_own_slice`. If we have a plant with
four generators that looks like this:
>>> gens_mega = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-01',],
... 'utility_id_eia': [111, 111, 111, 111],
... 'generator_id': ['a', 'b', 'c', 'd'],
... 'prime_mover_code': ['ST', 'GT', 'CT', 'CA'],
... 'energy_source_code_1': ['BIT', 'NG', 'NG', 'NG'],
... 'ownership_record_type': ['total', 'total', 'total', 'total',],
... 'operational_status_pudl': ['operating', 'operating', 'operating', 'operating'],
... 'capacity_mw': [400, 50, 125, 75],
... }).astype({
... 'report_date': 'datetime64[ns]',
... })
>>> gens_mega
plant_id_eia report_date utility_id_eia generator_id prime_mover_code energy_source_code_1 ownership_record_type operational_status_pudl capacity_mw
0 1 2020-01-01 111 a ST BIT total operating 400
1 1 2020-01-01 111 b GT NG total operating 50
2 1 2020-01-01 111 c CT NG total operating 125
3 1 2020-01-01 111 d CA NG total operating 75
This ``gens_mega`` table can then be aggregated by ``plant``, ``plant_prime_fuel``,
``plant_prime_mover``, or ``plant_gen``.
"""
def __init__(self, part_name: Literal[PLANT_PARTS_LITERAL, "plant_match_ferc1"]):
"""Initialize an object which makes a tbl for a specific plant-part.
Args:
part_name (str): the name of the part to aggregate to. Names can be
only those in :py:const:`PLANT_PARTS`
"""
[docs]
self.part_name = part_name
[docs]
self.id_cols = PLANT_PARTS[part_name]["id_cols"]
[docs]
def execute(
self,
gens_mega: pd.DataFrame,
sum_cols: list[str] = SUM_COLS,
wtavg_dict: dict = WTAVG_DICT,
) -> pd.DataFrame:
"""Get a table of data aggregated by a specific plant-part.
This method will take ``gens_mega`` and aggregate the generator records
to the level of the plant-part. This is mostly done via
:meth:`ag_part_by_own_slice`. Then several additional columns are added
and the records are labeled as true or false granularities.
Returns:
a table with records that have been aggregated to a plant-part.
"""
part_df = (
self.ag_part_by_own_slice(
gens_mega, sum_cols=sum_cols, wtavg_dict=wtavg_dict
)
.pipe(self.ag_fraction_owned)
.assign(plant_part=self.part_name)
.pipe( # add standard record id w/ year
add_record_id,
id_cols=self.id_cols,
plant_part_col="plant_part",
year=True,
)
.pipe( # add additional record id that DOESN'T CARE ABOUT TIME
add_record_id,
id_cols=self.id_cols,
plant_part_col="plant_part",
year=False,
)
.pipe(self.add_new_plant_name, gens_mega)
.pipe(self.add_record_count_per_plant)
)
return part_df
[docs]
def ag_part_by_own_slice(
self,
gens_mega,
sum_cols=SUM_COLS,
wtavg_dict=WTAVG_DICT,
) -> pd.DataFrame:
"""Aggregate the plant part by separating ownership types.
There are total records and owned records in this master unit list.
Those records need to be aggregated differently to scale. The "total"
ownership slice is now grouped and aggregated as a single version of the
full plant and then the utilities are merged back. The "owned"
ownership slice is grouped and aggregated with the utility_id_eia, so
the portions of generators created by scale_by_ownership will be
appropriately aggregated to each plant part level.
Returns:
pandas.DataFrame: dataframe aggregated to the level of the
part_name
"""
logger.info(f"begin aggregation for: {self.part_name}")
# id_cols = PLANT_PARTS[self.part_name]["id_cols"]
# split up the 'owned' slices from the 'total' slices.
# this is because the aggregations are different
part_own = gens_mega.loc[gens_mega.ownership_record_type == "owned"].copy()
part_tot = gens_mega.loc[gens_mega.ownership_record_type == "total"].copy()
if len(gens_mega) != len(part_own) + len(part_tot):
raise AssertionError(
"Error occurred in breaking apart ownership types."
"The total and owned slices should equal the total records."
"Check for nulls in the ownership_record_type column."
)
part_own = pudl.helpers.sum_and_weighted_average_agg(
df_in=part_own,
by=self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD,
sum_cols=sum_cols,
wtavg_dict=wtavg_dict,
)
# we want a "total" record for each of the utilities that own any slice
# of a particular plant-part. To achieve this, we are going to remove
# the utility info (and drop duplicates bc a plant-part with many
# generators will have multiple duplicate records for each owner)
# we are going to generate the aggregated output for a utility-less
# "total" record and then merge back in the many utilities so each of
# the utilities is associated with an aggregated "total" plant-part
# record
part_tot_no_utils = part_tot.drop(columns=["utility_id_eia"]).drop_duplicates()
# still need to re-calc the fraction owned for the part
part_tot_out = (
pudl.helpers.sum_and_weighted_average_agg(
df_in=part_tot_no_utils,
by=self.id_cols + IDX_TO_ADD,
sum_cols=sum_cols,
wtavg_dict=wtavg_dict,
)
.pipe(pudl.helpers.convert_cols_dtypes, "eia")
.merge(
part_tot[self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD].drop_duplicates(),
on=self.id_cols + IDX_TO_ADD,
how="left",
validate="1:m",
)
)
part_ag = pd.concat([part_own, part_tot_out]).pipe(
pudl.helpers.convert_cols_dtypes, "eia"
)
return part_ag
[docs]
def ag_fraction_owned(self, part_ag: pd.DataFrame):
"""Calculate the fraction owned for a plant-part df.
This method takes a dataframe of records that are aggregated to the
level of a plant-part (with certain ``id_cols``) and appends a
fraction_owned column, which indicates the % ownership that a
particular utility owner has for each aggreated plant-part record.
For partial owner records (ownership_record_type == "owned"), fraction_owned is
calcuated based on the portion of the capacity and the total capacity
of the plant. For total owner records (ownership_record_type == "total"), the
fraction_owned is always 1.
This method is meant to be run after :meth:`ag_part_by_own_slice`.
Args:
part_ag:
"""
# we must first get the total capacity of the full plant
# Note: we could simply not include the ownership_record_type == "total" records
# We are automatically assign fraction_owned == 1 to them, but it seems
# cleaner to run the full df through this same groupby
frac_owned = part_ag.groupby(
by=self.id_cols + IDX_TO_ADD + ["ownership_record_type"], observed=True
)[["capacity_mw"]].sum(min_count=1)
# then merge the total capacity with the plant-part capacity to use to
# calculate the fraction_owned
part_frac = (
pd.merge(
part_ag,
frac_owned,
right_index=True,
left_on=frac_owned.index.names,
suffixes=("", "_total"),
)
.assign(
fraction_owned=lambda x: np.where(
x.ownership_record_type == "owned",
x.capacity_mw / x.capacity_mw_total,
1,
)
)
.drop(columns=["capacity_mw_total"])
.pipe(pudl.helpers.convert_cols_dtypes, "eia")
)
return part_frac
[docs]
def add_new_plant_name(self, part_df, gens_mega):
"""Add plants names into the compiled plant part df.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part (ex: all plant's or plant_prime_mover's).
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
part_df = pd.merge(
part_df,
gens_mega[self.id_cols + ["plant_name_eia"]].drop_duplicates(),
on=self.id_cols,
how="left",
).assign(plant_name_ppe=lambda x: x.plant_name_eia)
# we don't want the plant_id_eia to be part of the plant name, but all
# of the other parts should have their id column in the new plant name
if self.part_name != "plant":
col = [x for x in self.id_cols if x != "plant_id_eia"][0]
part_df.loc[part_df[col].notnull(), "plant_name_ppe"] = (
part_df["plant_name_ppe"] + " " + part_df[col].astype(str)
)
return part_df
[docs]
def add_record_count_per_plant(self, part_df: pd.DataFrame) -> pd.DataFrame:
"""Add a record count for each set of plant part records in each plant.
Args:
part_df: dataframe containing records associated
with one plant part (ex: all plant's or plant_prime_mover's).
Returns:
augmented version of ``part_df`` with a new column named
``record_count``
"""
group_cols = ["plant_id_eia"] + IDX_TO_ADD + IDX_OWN_TO_ADD
# count unique records per plant
part_df.loc[:, "record_count"] = part_df.groupby(group_cols, observed=True)[
"record_id_eia"
].transform("count")
return part_df
[docs]
class TrueGranLabeler:
"""Label the plant-part table records with their true granularity.
The coordinating function here is :meth``execute``.
"""
[docs]
def execute(self, ppe):
"""Merge the true granularity labels onto the plant part df.
This method will add the columns ``true_gran``, ``appro_part_label``, and
``appro_record_id_eia`` to the plant parts list which denote whether
each plant-part is a true or false granularity.
First the plant part list records are matched to generators. Then
the matched records are sorted by the order of keys in PLANT_PARTS and the
highest granularity record for each generator is marked as the true
granularity. The appropriate true granular part label and record id
is then merged on to get the plant part table with true granularity labels.
Arguments:
ppe: (pd.DataFrame) The plant parts list
"""
parts_to_gens = match_to_single_plant_part(
multi_gran_df=ppe,
ppe=ppe,
part_name="plant_gen",
cols_to_keep=["plant_part"],
one_to_many=True,
)[["record_id_eia", "record_id_eia_plant_gen", "plant_part"]].rename(
columns={"record_id_eia_plant_gen": "gen_id"},
)
# concatenate the gen id's to get the combo of gens for each record
combos = (
parts_to_gens.sort_values(["gen_id"])
.groupby(["record_id_eia"])["gen_id"]
.apply(lambda x: ",".join(x))
.rename("gens_combo")
)
parts_to_gens = parts_to_gens.merge(
combos, how="left", left_on="record_id_eia", right_index=True
)
# categorical columns allow sorting by PLANT_PARTS key order
parts_to_gens["plant_part"] = pd.Categorical(
parts_to_gens["plant_part"], PLANT_PARTS.keys()
)
parts_to_gens = parts_to_gens.sort_values("plant_part")
# get the true gran records by finding duplicate gen combos
# this marks duplicate grans as True except for the first occurrence
# non-duplicated granularities (unique records) are also marked False
dupes = parts_to_gens.duplicated(subset=["gens_combo"], keep="first")
# the False (non duplicated) granularities are now True in true_gran
parts_to_gens.loc[:, "true_gran"] = ~(dupes)
# drop duplicate record ids so there is one row for each record
parts_to_gens = parts_to_gens.drop_duplicates(subset=["record_id_eia"])
true_grans = (
parts_to_gens[parts_to_gens.true_gran][
["record_id_eia", "plant_part", "gens_combo"]
]
.rename(
columns={
"record_id_eia": "appro_record_id_eia",
"plant_part": "appro_part_label",
}
)
.astype({"appro_part_label": "string"})
)
# merge the true gran cols onto the parts to gens dataframe
# drop cols to get a table with just record id and true gran cols
record_id_true_gran = parts_to_gens.merge(
true_grans, on="gens_combo", how="left", validate="m:1"
).drop(["plant_part", "gens_combo", "gen_id"], axis=1)
ppe_true_gran = ppe.merge(
record_id_true_gran, how="left", on="record_id_eia", validate="1:1"
)
return ppe_true_gran
[docs]
class AddAttribute:
"""Base class for adding attributes to plant-part tables."""
def __init__(
self,
attribute_col: str,
part_name: str,
assign_col_dict: dict[str, str] | None = None,
):
"""Initialize a attribute adder.
Args:
attribute_col (string): name of qualifier record that you want added.
Must be in :py:const:`CONSISTENT_ATTRIBUTE_COLS` or a key in
:py:const:`PRIORITY_ATTRIBUTES_DICT`
or :py:const:`MAX_MIN_ATTRIBUTES_DICT`.
part_name (str): the name of the part to aggregate to. Names can be
only those in :py:const:`PLANT_PARTS` or `plant_match_ferc1`
"""
assert attribute_col in CONSISTENT_ATTRIBUTE_COLS + list(
PRIORITY_ATTRIBUTES_DICT.keys()
) + list(MAX_MIN_ATTRIBUTES_DICT.keys())
[docs]
self.attribute_col = attribute_col
# the base columns will be the id columns, plus the other two main ids
[docs]
self.part_name = part_name
[docs]
self.id_cols = PLANT_PARTS[part_name]["id_cols"]
[docs]
self.base_cols = self.id_cols + IDX_TO_ADD
[docs]
self.assign_col_dict = assign_col_dict
[docs]
def assign_col(self, gens_mega):
"""Add a new column to gens_mega."""
if self.assign_col_dict is not None:
return gens_mega.assign(**self.assign_col_dict)
return gens_mega
[docs]
class AddConsistentAttributes(AddAttribute):
"""Adder of attributes records to a plant-part table."""
[docs]
def execute(self, part_df, gens_mega):
"""Get qualifier records.
For an individual dataframe of one plant part (e.g. only
"plant_prime_mover" plant part records), we typically have identifying
columns and aggregated data columns. The identifying columns for a
given plant part are only those columns which are required to uniquely
specify a record of that type of plant part. For example, to uniquely
specify a plant_unit record, we need both ``plant_id_eia``,
``unit_id_pudl``, ``report_date`` and nothing else. In other words,
the identifying columns for a given plant part would make up a natural
composite primary key for a table composed entirely of that type of
plant part. Every plant part is cobbled together from generator
records, so each record in each part_df can be thought of as a
collection of generators.
Identifier and qualifier columns are the same columns; whether a column
is an identifier or a qualifier is a function of the plant part you're
considering. All the other columns which could be identifiers in the
context of other plant parts (but aren't for this plant part) are
qualifiers.
This method takes a part_df and goes and checks whether or not the data
we are trying to grab from the record_name column is consistent across
every component generator from each record.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part.
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
attribute_col = self.attribute_col
if attribute_col in part_df.columns:
logger.debug(f"{attribute_col} already here.. ")
return part_df
record_df = gens_mega.copy()
record_df = self.assign_col(record_df)
consistent_records = self.get_consistent_qualifiers(record_df)
non_nulls = consistent_records[consistent_records[attribute_col].notnull()]
logger.debug(f"merging in consistent {attribute_col}: {len(non_nulls)}")
return part_df.merge(consistent_records, how="left")
[docs]
def get_consistent_qualifiers(self, record_df):
"""Get fully consistent qualifier records.
When data is a qualifier column is identical for every record in a
plant part, we associate this data point with the record. If the data
points for the related generator records are not identical, then
nothing is associated with the record.
Args:
record_df (pandas.DataFrame): the dataframe with the record
base_cols (list) : list of identifying columns.
record_name (string) : name of qualitative record
"""
# TODO: determine if we can move this up the chain so we can do this
# once per plant-part, not once per plant-part * qualifier record
attribute_col = self.attribute_col
base_cols = self.base_cols
entity_count_df = pudl.helpers.count_records(
record_df, base_cols, "entity_occurrences"
).pipe(pudl.helpers.convert_cols_dtypes, "eia")
record_count_df = pudl.helpers.count_records(
record_df, base_cols + [attribute_col], "record_occurrences"
).pipe(pudl.helpers.convert_cols_dtypes, "eia")
re_count = (
record_df[base_cols + [attribute_col]]
.merge(entity_count_df, how="left", on=base_cols)
.merge(record_count_df, how="left", on=base_cols + [attribute_col])
)
# find all of the matching records..
consistent_records = (
re_count[re_count["entity_occurrences"] == re_count["record_occurrences"]]
.drop(columns=["entity_occurrences", "record_occurrences"])
.drop_duplicates()
)
return consistent_records
[docs]
class AddPriorityAttribute(AddAttribute):
"""Add Attributes based on a priority sorting from :py:const:`PRIORITY_ATTRIBUTES`.
This object associates one attribute from the generators that make up a plant-part
based on a sorted list within :py:const:`PRIORITY_ATTRIBUTES`. For example, for
"operational_status" we will grab the highest level of operational status that is
associated with each records' component generators. The order of operational status
is defined within the method as: 'existing', 'proposed', then 'retired'. For example
if a plant_unit is composed of two generators, and one of them is "existing" and
another is "retired" the entire plant_unit will be considered "existing".
"""
[docs]
def execute(self, part_df, gens_mega):
"""Add the attribute to the plant-part df based on priority.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part.
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
attribute_col = self.attribute_col
if attribute_col in part_df.columns:
logger.debug(f"{attribute_col} already here.. ")
return part_df
gens_mega = self.assign_col(gens_mega)
logger.debug(f"getting max {attribute_col}")
consistent_records = pudl.helpers.dedupe_on_category(
gens_mega.copy()[self.base_cols + [attribute_col]],
self.base_cols,
attribute_col,
PRIORITY_ATTRIBUTES_DICT[attribute_col],
)
part_df = part_df.merge(consistent_records, on=self.base_cols, how="left")
return part_df
[docs]
class AddMaxMinAttribute(AddAttribute):
"""Add Attributes based on the maximum or minimum value of a sorted attribute.
This object adds an attribute based on the maximum or minimum of another attribute
within a group of plant parts uniquely identified by their base ID columns.
"""
[docs]
def execute(
self,
part_df,
gens_mega,
att_dtype: str,
keep: Literal["first", "last"] = "first",
):
"""Add the attribute to the plant part df based on sorting of another attribute.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part.
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
att_dtype (string): Pandas data type of the new attribute
keep (string): Whether to keep the first or last record in a sorted
grouping of attributes. Passing in "first" indicates the new
attribute is a maximum attribute.
See :func:`pandas.drop_duplicates`.
"""
attribute_col = self.attribute_col
if attribute_col in part_df.columns:
logger.debug(f"{attribute_col} already here.. ")
return part_df
logger.debug(f"pre count of part DataFrame: {len(part_df)}")
gens_mega = self.assign_col(gens_mega)
new_attribute_df = (
gens_mega.astype({attribute_col: att_dtype})[
self.base_cols + [attribute_col]
]
.sort_values(attribute_col, ascending=False)
.drop_duplicates(subset=self.base_cols, keep=keep)
.dropna(subset=self.base_cols)
)
part_df = part_df.merge(
new_attribute_df, how="left", on=self.base_cols, validate="m:1"
)
logger.debug(f"post count of part DataFrame: {len(part_df)}")
return part_df
#########################
# Module Helper Functions
#########################
[docs]
def make_id_cols_list():
"""Get a list of the id columns (primary keys) for all of the plant parts.
Returns:
list: a list of the ID columns for all of the plant-parts, including
``report_date``
"""
return IDX_TO_ADD + pudl.helpers.dedupe_n_flatten_list_of_lists(
[x["id_cols"] for x in PLANT_PARTS.values()]
)
[docs]
def make_parts_to_ids_dict():
"""Make dict w/ plant-part names (keys) to the main id column (values).
All plant-parts have 1 or 2 ID columns in :py:const:`PLANT_PARTS` plant_id_eia and
a secondary column (with the exception of the "plant" plant-part). The
plant_id_eia column is always first, so we're going to grab the last column.
Returns:
dictionary: plant-part names (keys) corresponding to the main ID column
(value).
"""
parts_to_ids = {}
for part, part_dict in PLANT_PARTS.items():
parts_to_ids[part] = part_dict["id_cols"][-1]
return parts_to_ids
[docs]
def add_record_id(part_df, id_cols, plant_part_col="plant_part", year=True):
"""Add a record id to a compiled part df.
We need a standardized way to refer to these compiled records that contains enough
information in the id itself that in theory we could deconstruct the id and
determine which plant id and plant part id columns are associated with this record.
"""
ids = deepcopy(id_cols)
# we want the plant id first... mostly just bc it'll be easier to read
part_df = part_df.assign(record_id_eia_temp=lambda x: x.plant_id_eia.astype(str))
ids.remove("plant_id_eia")
for col in ids:
part_df = part_df.assign(
record_id_eia_temp=lambda x, col=col: x.record_id_eia_temp
+ "_"
+ x[col].astype(str)
)
if year:
part_df = part_df.assign(
record_id_eia_temp=lambda x: x.record_id_eia_temp
+ "_"
+ x.report_date.dt.year.astype(str)
)
part_df = part_df.assign(
record_id_eia_temp=lambda x: x.record_id_eia_temp
+ "_"
+ x[plant_part_col]
+ "_"
+ x.ownership_record_type.astype(str)
+ "_"
+ x.utility_id_eia.astype("Int64").astype(str)
)
# add operational status only when records are not "operating" (i.e.
# existing or retiring mid-year see MakeMegaGenTbl.abel_operating_gens()
# for more details)
non_op_mask = part_df.operational_status_pudl != "operating"
part_df.loc[non_op_mask, "record_id_eia_temp"] = (
part_df.loc[non_op_mask, "record_id_eia_temp"]
+ "_"
+ part_df.loc[non_op_mask, "operational_status_pudl"]
)
if year:
part_df = part_df.rename(columns={"record_id_eia_temp": "record_id_eia"})
part_df["record_id_eia"] = part_df["record_id_eia"].astype("string")
else:
part_df = part_df.rename(columns={"record_id_eia_temp": "plant_part_id_eia"})
part_df["plant_part_id_eia"] = part_df["plant_part_id_eia"].astype("string")
return part_df
[docs]
def match_to_single_plant_part(
multi_gran_df: pd.DataFrame,
ppe: pd.DataFrame,
part_name: PLANT_PARTS_LITERAL = "plant_gen",
cols_to_keep: list[str] = [],
one_to_many: bool = False,
) -> pd.DataFrame:
"""Match data with a variety of granularities to a single plant-part.
This method merges an input dataframe (``multi_gran_df``) containing data that has a
heterogeneous set of plant-part granularities with a subset of the EIA plant-part
list that has a single granularity. Currently this is only tested where the single
granularity is generators. In general this will be a one-to-many merge in which
values from single records in the input data end up associated with several records
from the plant part list.
First, we select a subset of the full EIA plant-part list corresponding to the plant
part specified by the ``part_name`` argument. In theory this could be the plant,
generator, fuel type, etc. Currently only generators are supported. Then, we iterate
over all the possible plant parts, selecting the subset of records in
``multi_gran_df`` that have that granularity, and merge the homogeneous subset of
the plant part list that we selected above onto that subset of the input data. Each
iteration uses a different set of columns to merge on -- the columns which define
the primary key for the plant part being merged. Each iteration creates a separate
dataframe, corresponding to a particular plant part, and at the end they are all
concatenated together and returned.
Args:
multi_gran_df: a data table where all records have been linked to EIA plant-part
list but they may be heterogeneous in its plant-part granularities (i.e.
some records could be of ``plant`` plant-part type while others are
``plant_gen`` or ``plant_prime_mover``). All of the plant-part list columns
need to be present in this table.
ppe: the EIA plant-part list.
part_name: name of the single plant part to match to. Must be a key in
PLANT_PARTS dictionary.
cols_to_keep: columns from the original data ``multi_gran_df`` that you want to
show up in the output. These should not be columns that show up in the
``ppe``.
one_to_many: boolean (False by default). If True, add `plant_match_ferc1` into
plant parts list.
Returns:
A dataframe in which records correspond to :attr:`part_name` (in the current
implementation: the records all correspond to EIA generators!). This is an
intermediate table that cannot be used directly for analysis because the data
columns from the original dataset are duplicated and still need to be scaled
up/down.
"""
# select only the plant-part records that we are trying to scale to
ppe_part_df = ppe[ppe.plant_part == part_name]
# convert the date to year start - this is necessary because the
# depreciation data is often reported as EOY and the ppe is always SOY
multi_gran_df.loc[:, "report_date"] = pd.to_datetime(
multi_gran_df["report_date"].dt.year, format="%Y"
)
out_dfs = []
for merge_part in PLANT_PARTS:
if (
not one_to_many and merge_part == "plant_match_ferc1"
): # Skip plant_match_ferc1
continue
pk_cols = PLANT_PARTS[merge_part]["id_cols"] + IDX_TO_ADD + IDX_OWN_TO_ADD
part_df = pd.merge(
(
# select just the records that correspond to merge_part
multi_gran_df[multi_gran_df.plant_part == merge_part][
pk_cols + ["record_id_eia"] + cols_to_keep
]
),
ppe_part_df,
on=pk_cols,
how="left",
# this unfortunately needs to be a m:m bc sometimes the df
# multi_gran_df has multiple record associated with the same
# record_id_eia but are unique records and are not aggregated
# in aggregate_duplicate_eia. For instance, the depreciation
# data has both PUC and FERC studies.
validate="m:m",
suffixes=("", f"_{part_name}"),
)
# there should be no records without a matching generator
assert ~(part_df.record_id_eia.isnull().to_numpy().any())
out_dfs.append(part_df)
out_df = pd.concat(out_dfs)
return out_df
[docs]
def plant_parts_eia_distinct(plant_parts_eia: pd.DataFrame) -> pd.DataFrame:
"""Get the EIA plant_parts with only the unique granularities.
Read in the pickled dataframe or generate it from the full PPE. Get only
the records of the PPE that are "true granularities" and those which are not
duplicates based on their ownership so the FERC to EIA matching model
doesn't get confused as to which option to pick if there are many records
with duplicate data.
Arguments:
plant_parts_eia: EIA plant parts table.
"""
plant_parts_eia = plant_parts_eia.assign(
plant_id_report_year_util_id=lambda x: x.plant_id_report_year
+ "_"
+ x.utility_id_pudl.astype(str)
).astype({"installation_year": "float"})
distinct_ppe = plant_parts_eia[
(plant_parts_eia["true_gran"]) & (~plant_parts_eia["ownership_dupe"])
]
if distinct_ppe.index.name != "record_id_eia":
logger.error("Plant parts list index is not record_id_eia.")
return distinct_ppe
[docs]
def reassign_id_ownership_dupes(plant_parts_eia: pd.DataFrame) -> pd.DataFrame:
"""Reassign the record_id for the records that are labeled ownership_dupe.
This function is used after the EIA plant-parts table is created.
Args:
plant_parts_eia: EIA plant parts table.
"""
# the record_id_eia's need to be a column to mess with it and record_id_eia
# is typically the index of plant_parts_df, so we are going to reset index
# if record_id_eia is the index
og_index = False
if plant_parts_eia.index.name == "record_id_eia":
plant_parts_eia = plant_parts_eia.reset_index()
og_index = True
# reassign the record id and ownership col when the record is a dupe
plant_parts_eia = plant_parts_eia.assign(
record_id_eia=lambda x: np.where(
x.ownership_dupe,
x.record_id_eia.str.replace("owned", "total"),
x.record_id_eia,
)
)
if "ownership" in plant_parts_eia.columns:
plant_parts_eia = plant_parts_eia.assign(
ownership=lambda x: np.where(x.ownership_dupe, "total", x.ownership)
)
# then we reset the index so we return the dataframe in the same structure
# as we got it.
if og_index:
plant_parts_eia = plant_parts_eia.set_index("record_id_eia")
return plant_parts_eia
@asset(
io_manager_key="pudl_io_manager",
compute_kind="Python",
)
[docs]
def out_eia__yearly_assn_plant_parts_plant_gen(
out_eia__yearly_plant_parts: pd.DataFrame,
) -> pd.DataFrame:
"""Build association table between EIA plant parts and EIA generators.
In order to easily determine what generator records are associated with every
plant part record, we made this association table. This table associates every plant part
record (identified as ``record_id_eia``) to the possibly many 'plant_gen' records
(identified as ``record_id_eia_plant_gen``).
"""
# luckily we already had a function that did the core of the work here to associate
# a ppe record with all of the associated records of a particular granularity (part_name).
ppe_assn = match_to_single_plant_part(
multi_gran_df=out_eia__yearly_plant_parts,
ppe=out_eia__yearly_plant_parts,
part_name="plant_gen",
one_to_many=True,
cols_to_keep=[],
)
# the output of match_to_single_plant_part had the record_id_eia of the
# multi_gran_df, but no other columns because we set cols_to_keep=[], so
# every other column in here is associated with the `plant_gen`. We are
# giving everything a plant_gen suffix so its clear everything in those
# columns are about the generator ppe record.
ppe_assn = (
ppe_assn.set_index(
["record_id_eia", "record_id_eia_plant_gen", "plant_id_eia", "report_date"]
+ IDX_OWN_TO_ADD
)
.add_suffix("_plant_gen")
.reset_index()
.assign(
generators_number=lambda x: x.groupby(["record_id_eia"])[
"record_id_eia_plant_gen"
].transform("count")
)
)
return ppe_assn
@asset_check(
asset="out_eia__yearly_assn_plant_parts_plant_gen",
additional_ins={"ppe": AssetIn("out_eia__yearly_plant_parts")},
)
[docs]
def ensure_all_ppe_ids_are_in_assn(ppe_assn, ppe):
"""Check to see if every ID in the EIA plant parts shows up in the assn table."""
assert set(ppe.record_id_eia) == set(ppe_assn.record_id_eia)
return AssetCheckResult(passed=True)