"""
Aggregate plant parts to make an EIA master plant-part table.
Practically speaking, a plant is a collection of generator(s). There are many
attributes of generators (i.e. prime mover, primary fuel source, technology
type). We can use these generator attributes to group generator records into
larger aggregate records which we call "plant-parts". A plant part is a record
which corresponds to a particular collection of generators that all share an
identical attribute. E.g. all of the generators with unit_id=2, or all of the
generators with coal as their primary fuel source.
The EIA data about power plants (from EIA 923 and 860) is reported in tables
with records that correspond to mostly generators and plants. Other datasets
(cough cough FERC1) are less well organized and include plants, generators and
other plant-parts all in the same table without any clear labels. The master
plant-part table is an attempt to create records corresponding to many
different plant-parts in order to connect specific slices of EIA plants to
other datasets.
Because generators are often owned by multiple utilities, another dimention of
the master unit list involves generating two records for each owner: one of the
portion of the plant part they own and one for the plant part as a whole. The
portion records are labeled in the ``ownership`` column as "owned" and the total
records are labeled as "total".
This module refers to "true granularies". Many plant parts we cobble together
here in the master plant-part list refer to the same collection of
infrastructure as other plant-part list records. For example, if we have a
"plant_prime_mover" plant part record and a "plant_unit" plant part record
which were both cobbled together from the same two generators. We want to be
able to reduce the plant-part list to only unique collections of generators,
so we label the first unique granularity as a true granularity and label the
subsequent records as false granularities with the ``true_gran`` column. In
order to choose which plant-part to keep in these instances, we assigned a
:py:const:`PLANT_PARTS_ORDERED` and label whichever plant-part comes first as
the unique granularity.
**Recipe Book for the plant-part list**
:py:const:`PLANT_PARTS` is the main recipe book for how each of the plant-parts
need to be compiled. These plant-parts represent ways to group generators based
on widely reported values in EIA. All of these are logical ways to group
collections of generators - in most cases - but some groupings of generators
are more prevelant or relevant than others for certain types of plants.
The canonical example here is the ``plant_unit``. A unit is a collection of
generators that operate together - most notably the combined-cycle natural gas
plants. Combined-cycle units generally consist of a number of gas turbines
which feed excess steam to a number of steam turbines.
>>> df_gens = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1],
... 'generator_id': ['a', 'b', 'c'],
... 'unit_id_pudl': [1, 1, 1],
... 'prime_mover_code': ['CT', 'CT', 'CA'],
... 'capacity_mw': [50, 50, 100],
... })
>>> df_gens
plant_id_eia generator_id unit_id_pudl prime_mover_code capacity_mw
0 1 a 1 CT 50
1 1 b 1 CT 50
2 1 c 1 CA 100
A good example of a plant-part that isn't really logical also comes from a
combined-cycle unit. Grouping this example plant by the ``prime_mover_code``
would generate two records that would basically never show up in FERC1.
This stems from the inseparability of the generators.
>>> df_plant_prime_mover = pd.DataFrame({
... 'plant_id_eia': [1, 1],
... 'plant_part': ['plant_prime_mover', 'plant_prime_mover'],
... 'prime_mover_code': ['CT', 'CA'],
... 'capacity_mw': [100, 100],
... })
>>> df_plant_prime_mover
plant_id_eia plant_part prime_mover_code capacity_mw
0 1 plant_prime_mover CT 100
1 1 plant_prime_mover CA 100
In this case the unit is more relevant:
>>> df_plant_unit = pd.DataFrame({
... 'plant_id_eia': [1],
... 'plant_part': ['plant_unit'],
... 'unit_id_pudl': [1],
... 'capacity_mw': [200],
... })
>>> df_plant_unit
plant_id_eia plant_part unit_id_pudl capacity_mw
0 1 plant_unit 1 200
But if this same plant had both this combined-cycle unit and two more
generators that were self contained "GT" or gas combustion turbine, a frequent
way to group these generators is differnt for the combined-cycle unit and the
gas-turbine.
>>> df_gens = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1, 1],
... 'generator_id': ['a', 'b', 'c', 'd', 'e'],
... 'unit_id_pudl': [1, 1, 1, 2, 3],
... 'prime_mover_code': ['CT', 'CT', 'CA', 'GT', 'GT'],
... 'capacity_mw': [50, 50, 100, 75, 75],
... })
>>> df_gens
plant_id_eia generator_id unit_id_pudl prime_mover_code capacity_mw
0 1 a 1 CT 50
1 1 b 1 CT 50
2 1 c 1 CA 100
3 1 d 2 GT 75
4 1 e 3 GT 75
>>> df_plant_part = pd.DataFrame({
... 'plant_id_eia': [1, 1],
... 'plant_part': ['plant_unit', 'plant_prime_mover'],
... 'unit_id_pudl': [1, pd.NA],
... 'prime_mover_code': [pd.NA, 'GT',],
... 'capacity_mw': [200, 150],
... })
>>> df_plant_part
plant_id_eia plant_part unit_id_pudl prime_mover_code capacity_mw
0 1 plant_unit 1 <NA> 200
1 1 plant_prime_mover <NA> GT 150
In this case last, the ``plant_unit`` record would have a null
``plant_prime_mover`` because the unit contains more than one
``prime_mover_code``. Same goes for the ``unit_id_pudl`` of the
``plant_prime_mover``. This is handled in the :class:``AddConsistentAttributes``.
**Overview of flow for generating the master unit list:**
The three main classes which enable the generation of the plant-part table are:
* :class:`MakeMegaGenTbl`: All of the plant parts are compiled from generators.
So this class generates a big dataframe of generators with any ID and data
columns we'll need. This is also where we add records regarding utility
ownership slices. The table includes two records for every generator-owner:
one for the "total" generator (assuming the owner owns 100% of the generator)
and one for the report ownership fraction of that generator with all of the
data columns scaled to the ownership fraction.
* :class:`LabelTrueGranularities`: This class creates labels for all generators
which note wether the plant-part records that will be compiled from each
generator will be a "true granulary", as described above.
* :class:`MakePlantParts`: This class uses the generator dataframe, the
granularity dataframe from the above two classes as well as the information
stored in :py:const:`PLANT_PARTS` to know how to aggregate each of the plant
parts. Then we have plant part dataframes with the columns which identify the
plant part and all of the data columns aggregated to the level of the plant
part. With that compiled plant part dataframe we also add in qualifier
columns with :class:`AddConsistentAttributes`. A qualifer column is a column which
contain data that is not endemic to the plant part record (it is not one of
the identifying columns or aggregated data columns) but the data is still
useful data that is attributable to each of the plant part records. For more
detail on what a qualifier column is, see :meth:`AddConsistentAttributes.execute`.
**Generating the plant-parts list**
There are two ways to generate the plant-parts table: one directly using the
:class:`pudl.output.pudltabl.PudlTabl` object and the other using the classes
from this module. Either option needs a :class:`pudl.output.pudltabl.PudlTabl`
object.
Create the :class:`pudl.output.pudltabl.PudlTabl` object:
.. code-block:: python
import pudl
pudl_engine = sa.create_engine(pudl.workspace.setup.get_defaults()['pudl_db'])
pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine,freq='AS')
Then make the table via pudl_out:
.. code-block:: python
plant_parts_eia = pudl_out.plant_parts_eia()
OR make the table via objects in this module:
.. code-block:: python
gens_mega = MakeMegaGenTbl().execute(mcoe, own_eia860)
true_grans = LabelTrueGranularities().execute(gens_mega)
parts_compiler = MakePlantParts(pudl_out)
plant_parts_eia = parts_compiler.execute(gens_mega=gens_mega, true_grans=true_grans)
"""
import logging
import warnings
from copy import deepcopy
from typing import Dict, List, Literal
import numpy as np
import pandas as pd
import pudl
[docs]logger = logging.getLogger(__name__)
# HALP: I need both of these setting set in order for the dfs in the docstrings
# to pass the doctests. Without them the formatting get all jumbled.
# but obviously this is the wrong place to do this.
# I tried adding these into conftest.py in pandas_terminal_width().
# I tried adding this into __init__.py.
# I tried adding this into the module docstring.
[docs]pd.options.display.width = 1000
[docs]pd.options.display.max_columns = 1000
[docs]PLANT_PARTS: Dict[str, Dict[str, List]] = {
'plant': {
'id_cols': ['plant_id_eia'],
},
'plant_gen': {
'id_cols': ['plant_id_eia', 'generator_id'],
},
'plant_unit': {
'id_cols': ['plant_id_eia', 'unit_id_pudl'],
},
'plant_technology': {
'id_cols': ['plant_id_eia', 'technology_description'],
},
'plant_prime_fuel': { # 'plant_primary_fuel': {
'id_cols': ['plant_id_eia', 'energy_source_code_1'],
},
'plant_prime_mover': {
'id_cols': ['plant_id_eia', 'prime_mover_code'],
},
'plant_ferc_acct': {
'id_cols': ['plant_id_eia', 'ferc_acct_name'],
},
}
"""
dict: this dictionary contains a key for each of the 'plant parts' that should
end up in the mater unit list. The top-level value for each key is another
dictionary, which contains keys:
* id_cols (the primary key type id columns for this plant part). The
plant_id_eia column must come first.
"""
[docs]PLANT_PARTS_ORDERED: List[str] = [
'plant',
'plant_unit',
'plant_prime_mover',
'plant_technology',
'plant_prime_fuel',
'plant_ferc_acct',
'plant_gen'
]
[docs]IDX_TO_ADD: List[str] = ['report_date', 'operational_status_pudl']
"""
list: list of additional columns to add to the id_cols in :py:const:`PLANT_PARTS`.
The id_cols are the base columns that we need to aggregate on, but we also need
to add the report date to keep the records time sensitive and the
operational_status_pudl to separate the operating plant-parts from the
non-operating plant-parts.
"""
[docs]IDX_OWN_TO_ADD: List[str] = ['utility_id_eia', 'ownership']
"""
list: list of additional columns beyond the :py:const:`IDX_TO_ADD` to add to the
id_cols in :py:const:`PLANT_PARTS` when we are dealing with plant-part records
that have been broken out into "owned" and "total" records for each of their
owners.
"""
[docs]SUM_COLS: List[str] = [
'total_fuel_cost',
'net_generation_mwh',
'capacity_mw',
'capacity_eoy_mw',
'total_mmbtu',
]
"""
list: list of columns to sum when aggregating a table.
"""
[docs]WTAVG_DICT = {
'fuel_cost_per_mwh': 'capacity_mw',
'heat_rate_mmbtu_mwh': 'capacity_mw',
'fuel_cost_per_mmbtu': 'capacity_mw',
}
"""
dict: a dictionary of columns (keys) to perform weighted averages on and
the weight column (values)"""
[docs]CONSISTENT_ATTRIBUTE_COLS = [
'fuel_type_code_pudl',
'planned_retirement_date',
'retirement_date',
'generator_id',
'unit_id_pudl',
'technology_description',
'energy_source_code_1',
'prime_mover_code',
'ferc_acct_name',
]
"""
list: a list of column names to add as attributes when they are consistent into
the aggregated plant-part records.
"""
[docs]PRIORITY_ATTRIBUTES_DICT = {
'operational_status': ['existing', 'proposed', 'retired'],
}
[docs]FIRST_COLS = ['plant_id_eia', 'report_date', 'plant_part', 'generator_id',
'unit_id_pudl', 'prime_mover_code', 'energy_source_code_1',
'technology_description', 'ferc_acct_name',
'utility_id_eia', 'true_gran', 'appro_part_label']
[docs]class MakeMegaGenTbl(object):
"""
Compiler for a MEGA generator table with ownership integrated.
Examples
--------
**Input Tables**
Here is an example of one plant with three generators. We will use
``capacity_mw`` as the data column.
>>> mcoe = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01','2020-01-01'],
... 'generator_id': ['a', 'b', 'c'],
... 'utility_id_eia': [111, 111, 111],
... 'unit_id_pudl': [1, 1, 1],
... 'prime_mover_code': ['CT', 'CT', 'CA'],
... 'technology_description': [
... 'Natural Gas Fired Combined Cycle', 'Natural Gas Fired Combined Cycle', 'Natural Gas Fired Combined Cycle'
... ],
... 'operational_status': ['existing', 'existing','existing'],
... 'retirement_date': [pd.NA, pd.NA, pd.NA],
... 'capacity_mw': [50, 50, 100],
... }).astype({
... 'retirement_date': "datetime64[ns]",
... 'report_date': "datetime64[ns]",
... })
>>> mcoe
plant_id_eia report_date generator_id utility_id_eia unit_id_pudl prime_mover_code technology_description operational_status retirement_date capacity_mw
0 1 2020-01-01 a 111 1 CT Natural Gas Fired Combined Cycle existing NaT 50
1 1 2020-01-01 b 111 1 CT Natural Gas Fired Combined Cycle existing NaT 50
2 1 2020-01-01 c 111 1 CA Natural Gas Fired Combined Cycle existing NaT 100
The ownership table from EIA 860 includes one record for every owner of
each generator. In this example generator ``c`` has two owners.
>>> df_own_eia860 = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01','2020-01-01', '2020-01-01'],
... 'generator_id': ['a', 'b', 'c', 'c'],
... 'utility_id_eia': [111, 111, 111, 111],
... 'owner_utility_id_eia': [111, 111, 111, 888],
... 'fraction_owned': [1, 1, .75, .25]
... }).astype({'report_date': "datetime64[ns]"})
>>> df_own_eia860
plant_id_eia report_date generator_id utility_id_eia owner_utility_id_eia fraction_owned
0 1 2020-01-01 a 111 111 1.00
1 1 2020-01-01 b 111 111 1.00
2 1 2020-01-01 c 111 111 0.75
3 1 2020-01-01 c 111 888 0.25
**Output Mega Generators Table**
``MakeMegaGenTbl().execute(mcoe, df_own_eia860, slice_cols=['capacity_mw'])``
produces the output table ``gens_mega`` which includes two main sections:
the generators with a "total" ownership stake for each of their owners and
the generators with an "owned" ownership stake for each of their
owners. For the generators that are owned 100% by one utility, the
records are identical except the ``ownership`` column. For the
generators that have more than one owner, there are two "total" records
with 100% of the capacity of that generator - one for each owner - and
two "owned" records with the capacity scaled to the ownership stake
of each of the owner utilites - represented by ``fraction_owned``.
"""
def __init__(self):
"""
Initialize object which creates a MEGA generator table.
The coordinating function here is :meth:`execute`.
"""
self.id_cols_list = make_id_cols_list()
[docs] def execute(
self,
mcoe: pd.DataFrame,
own_eia860: pd.DataFrame,
slice_cols: List[str] = SUM_COLS
) -> pd.DataFrame:
"""
Make the mega generators table with ownership integrated.
Args:
mcoe: generator-based mcoe table from :meth:`pudl.output.PudlTabl.mcoe()`
own_eia860: ownership table from :meth:`pudl.output.PudlTabl.own_eia860()`
slice_cols: list of columns to slice by ownership fraction in
:meth:`MakeMegaGenTbl.slice_by_ownership`. Default is :py:const:`SUM_COLS`
Returns:
a table of all of the generators with identifying columns and data
columns, sliced by ownership which makes "total" and "owned"
records for each generator owner. The "owned" records have the
generator's data scaled to the ownership percentage (e.g. if a 200
MW generator has a 75% stake owner and a 25% stake owner, this will
result in two "owned" records with 150 MW and 50 MW). The "total"
records correspond to the full plant for every owner (e.g. using
the same 2-owner 200 MW generator as above, each owner will have a
records with 200 MW).
"""
logger.info('Generating the mega generator table with ownership.')
gens_mega = (
self.get_gens_mega_table(mcoe)
.pipe(self.label_operating_gens)
.pipe(self.slice_by_ownership, own_eia860, slice_cols)
)
return gens_mega
[docs] def get_gens_mega_table(self, mcoe):
"""
Compile the main generators table that will be used as base of PPL.
Get a table of all of the generators there ever were and all of the
data PUDL has to offer about those generators. This generator table
will be used to compile all of the "plant-parts", so we need to ensure
that any of the id columns from the other plant-parts are in this
generator table as well as all of the data columns that we are going to
aggregate to the various plant-parts.
Returns:
pandas.DataFrame
"""
all_gens = ( # Add EIA FERC acct fields
pd.merge(
mcoe,
pudl.helpers.get_eia_ferc_acct_map(),
on=['technology_description', 'prime_mover_code'],
validate='m:1',
how='left'
)
)
return all_gens
[docs] def label_operating_gens(self, gen_df: pd.DataFrame) -> pd.DataFrame:
"""
Label the operating generators.
We want to distinguish between "operating" generators (those that
report as "existing" and those that retire mid-year) and everything
else so that we can group the operating generators into their own
plant-parts separate from retired or proposed generators. We do this by
creating a new label column called "operational_status_pudl".
This method also adds a column called "capacity_eoy_mw", which is the
end of year capacity of the generators. We assume that if a generator
isn't "existing", its EOY capacity should be zero.
Args:
gen_df (pandas.DataFrame): annual table of all generators from EIA.
Returns
pandas.DataFrame: annual table of all generators from EIA that
operated within each reporting year.
TODO:
This function results in warning: `PerformanceWarning: DataFrame
is highly fragmented...` I expect this is because of the number of
columns that are being assigned here via `.loc[:, col_to_assign]`.
"""
mid_year_retiree_mask = (
gen_df.retirement_date.dt.year == gen_df.report_date.dt.year)
existing_mask = (gen_df.operational_status == 'existing')
operating_mask = existing_mask | mid_year_retiree_mask
# we've going to make a new column which combines both the mid-year
# reitrees and the fully existing gens into one code so we can group
# them together later on
gen_df.loc[:, "operational_status_pudl"] = (
gen_df.loc[:, "operational_status"]
.mask(operating_mask, "operating")
)
gen_df.loc[:, 'capacity_eoy_mw'] = (
gen_df.loc[:, 'capacity_mw'].mask(~existing_mask, 0)
)
logger.info(
f"Labeled {len(gen_df.loc[~existing_mask])/len(gen_df):.02%} of "
"generators as non-operative."
)
return gen_df
[docs] def slice_by_ownership(self, gens_mega, own_eia860, slice_cols=SUM_COLS):
"""
Generate proportional data by ownership %s.
Why do we have to do this at all? Sometimes generators are owned by
many different utility owners that own slices of that generator. EIA
reports which portion of each generator is owned by which utility
relatively clearly in their ownership table. On the other hand, in
FERC1, sometimes a partial owner reports the full plant-part, sometimes
they report only their ownership portion of the plant-part. And of
course it is not labeld in FERC1. Because of this, we need to compile
all of the possible ownership slices of the EIA generators.
In order to accumulate every possible version of how a generator could
be reported, this method generates two records for each generator's
reported owners: one of the portion of the plant part they own and one
for the plant-part as a whole. The portion records are labeled in the
``ownership`` column as "owned" and the total records are labeled as
"total".
In this function we merge in the ownership table so that generators
with multiple owners then have one record per owner with the
ownership fraction (in column ``fraction_owned``). Because the ownership
table only contains records for generators that have multiple owners,
we assume that all other generators are owned 100% by their operator.
Then we generate the "total" records by duplicating the "owned" records
but assigning the ``fraction_owned`` to be 1 (i.e. 100%).
"""
# grab the ownership table, and reduce it to only the columns we need
own860 = (
own_eia860
[['plant_id_eia', 'generator_id', 'report_date',
'fraction_owned', 'owner_utility_id_eia']]
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
# we're left merging BC we've removed the retired gens, which are
# reported in the ownership table
gens_mega = (
gens_mega.merge(
own860,
how='left',
on=['plant_id_eia', 'generator_id', 'report_date'],
validate='1:m'
)
.assign( # assume gens that don't show up in the own table have one 100% owner
fraction_owned=lambda x: x.fraction_owned.fillna(value=1),
# assign the operator id as the owner if null bc if a gen isn't
# reported in the own_eia860 table we can assume the operator
# is the owner
owner_utility_id_eia=lambda x:
x.owner_utility_id_eia.fillna(x.utility_id_eia),
ownership='owned'
) # swap in the owner as the utility
.drop(columns=['utility_id_eia'])
.rename(columns={'owner_utility_id_eia': 'utility_id_eia'})
)
# duplicate all of these "owned" records, asign 1 to all of the
# fraction_owned column to indicate 100% ownership, and add these new
# "total" records to the "owned"
gens_mega = pd.concat(
[gens_mega,
gens_mega.copy().assign(fraction_owned=1, ownership='total')]
)
gens_mega.loc[:, slice_cols] = (
gens_mega.loc[:, slice_cols]
.multiply(gens_mega['fraction_owned'], axis='index')
)
return gens_mega
[docs]class LabelTrueGranularities(object):
"""True Granularity Labeler."""
def __init__(self):
"""
Initialize the true granulary labeler.
The coordinating function here is :meth``execute``.
"""
self.parts_to_parent_parts = self.get_parts_to_parent_parts()
self.id_cols_list = make_id_cols_list()
self.parts_to_ids = make_parts_to_ids_dict()
{v: k for k, v in self.parts_to_ids.items()}
[docs] def execute(self, gens_mega: pd.DataFrame, drop_extra_cols: bool = True):
"""
Prep the table that denotes true_gran for all generators.
This method will generate a dataframe based on ``gens_mega``
that has boolean columns that denotes whether each plant-part is a true
or false granularity.
There are four main steps in this process:
* For every combinations of plant-parts, count the number of unique
types of peer plant-parts (see :meth:`make_all_the_counts` for more
details).
* Convert those counts to boolean values if there is more or less
than one unique type parent or child plant-part (see
:meth:`make_all_the_bools` for more details).
* Using the boolean values label each plant-part as a True or False
granularies if both the boolean for the parent-to-child and
child-to-parent (see :meth:`label_true_grans_by_part` for more
details).
* For each plant-part, label it with its the appropriate plant-part
counterpart - if it is a True granularity, the appropriate label is
itself (see :meth:`label_true_id_by_part` for more details).
Args:
gens_mega: a table of all of the generators with identifying
columns and data columns, sliced by ownership which makes
"total" and "owned" records for each generator owner.
drop_extra_cols: If True, the extra columns used to generate the
true_gran columns are dropped. Default is True. Use False for
debugging only.
"""
true_gran_labels = (
self.make_all_the_counts(gens_mega)
.pipe(self.make_all_the_bools)
.pipe(self.label_true_grans_by_part)
.pipe(self.label_true_id_by_part)
.pipe(pudl.helpers.convert_cols_dtypes, 'eia', 'true_gran')
)
if drop_extra_cols:
for drop_cols in ['_v_', '_has_only_one_', 'count_per']:
true_gran_labels = true_gran_labels.drop(
columns=true_gran_labels.filter(like=drop_cols)
)
return true_gran_labels
[docs] def get_parts_to_parent_parts(self):
"""
Make a dictionary of each plant-part's parent parts.
We have imposed a hierarchy on the plant-parts with the
:py:const:`PLANT_PARTS_ORDERED` list and this method generates a
dictionary of each plant-part's (key) parent-parts (value).
"""
parts_to_parent_parts = {}
n = 0
for part_name in PLANT_PARTS_ORDERED:
parts_to_parent_parts[part_name] = PLANT_PARTS_ORDERED[:n]
n = n + 1
return parts_to_parent_parts
[docs] def make_all_the_counts(self, gens_mega: pd.DataFrame) -> pd.DataFrame:
"""
For each plant-part, count the unique child and parent parts.
All plant-part's are situated within a hierarchy that is defined within
:py:const:`PLANT_PARTS_ORDERED`. Child parts are plant-parts that are
defined as a lower priority within :py:const:`PLANT_PARTS_ORDERED` and
parent parts are those with a higher priority.
In order to determine if a particular plant-part is a unique
granularity, we want to know if a cooresponding parent or child
plant-part is comprised of the same collection of generators. In order
to do that, we count the unique instances of the ID columns in both
the cooresponding parent and child parts. We use this count in
:meth:`make_all_the_bools` and subsequently
:meth:`label_true_grans_by_part` to label each plant-part as a unique
granularity or not based on whether there is only once type of
plant-part.
Args:
gens_mega: a table of all of the generators with identifying
columns and data columns, sliced by ownership which makes
"total" and "owned" records for each generator owner.
Returns:
an agumented version of the ``gens_mega`` dataframe with new
columns for each of the child and parent plant-parts with counts of
unique instances of those parts. The columns will be named in the
following format ``{child/parent_part_name}_count_per_{part_name}``
"""
# id_cols_list already has IDX_TO_ADD
all_the_counts = gens_mega.loc[:, self.id_cols_list + IDX_OWN_TO_ADD].copy()
og_gen_mega_cols = len(all_the_counts.columns)
# grab the plant-part id columns from the generator table
gens_wo_ownership = all_the_counts.loc[
:, self.id_cols_list].drop_duplicates()
# we want to compile the count results on a copy of the generator table
for part_name in PLANT_PARTS_ORDERED:
logger.debug(f"making the counts for: {part_name}")
part_cols = PLANT_PARTS[part_name]['id_cols'] + IDX_TO_ADD
# because the plant_id_eia is always a part of the groupby columns
# and we want to count the plants as well, we need to make a duplicate
# plant_id_eia column to count on
part_count = (
gens_wo_ownership.assign(plant_id_eia_temp=lambda x: x.plant_id_eia)
.groupby(by=part_cols, dropna=False, observed=True)
.nunique()
.rename(columns={'plant_id_eia_temp': 'plant_id_eia'})
.rename(columns={v: k for k, v in self.parts_to_ids.items()})
.add_suffix(f'_count_per_{part_name}')
)
all_the_counts = all_the_counts.merge(
part_count,
how='left',
left_on=part_cols,
right_index=True,
validate='m:1'
)
# CHECK the expected # of columns id columns minus the added columns
len_ids = len(self.id_cols_list) - len(IDX_TO_ADD)
expected_col_len = (
og_gen_mega_cols + # the gens_mega colums
len_ids * (len_ids - 1) + 1 # the count columns (we add one bc we
# added straggler plant_count_per_plant column bc we make a
# plant_id_eia_temp column)
)
if expected_col_len != len(all_the_counts.columns):
raise AssertionError(
f"We got {len(all_the_counts.columns)} columns from "
f"all_the_counts when we should have gotten {expected_col_len}"
)
return all_the_counts
[docs] def make_all_the_bools(self, counts):
"""
Make booleans to indicate whether a cooresponding plant-part is consistent.
We've counted all of the child- and parent-parts contained within a
plant-part in :meth:. If there is only one cooresponding plant-part
within a differnt plant-part, then we can assume it is in effect a
non-unique plant-part. So we convert the count columns into boolean
columns to indicate wether a plant-part has only one cooresponding
child- and parent-parts.
Args:
all_the_counts (pandas.DataFrame): result of :meth:`make_all_the_counts`
Returns:
pandas.DataFrame: a table with generator records where we have new
boolean columns which indicated whether or not the plant-part
has more than one child/parent-part. These columns are formated
as: {child/parent_part_name}_has_only_one_{part_name}
"""
counts.loc[:, counts.filter(like='_count_per_').columns] = (
counts.loc[:, counts.filter(like='_count_per_').columns]
.astype(pd.Int64Dtype())
.copy()
)
# convert the count columns to bool columns
for col in counts.filter(like='_count_per_').columns:
bool_col = col.replace("_count_per_", "_has_only_one_")
counts.loc[counts[col].notnull(), bool_col] = (
counts.loc[:, col] == 1
)
# force the nullable bool type for all our count cols
counts.loc[:, counts.filter(like='_has_only_one_').columns] = (
counts.filter(like='_has_only_one_')
.astype(pd.BooleanDtype())
)
return counts
[docs] def label_true_grans_by_part(self, part_bools):
"""
Label the true/false granularies for each part/parent-part combo.
This method uses the indicator columns which let us know whether or not
there are more than one unique value for both the parent and child
plant-part ids to generate an additional indicator column that let's us
know whether the child plant-part is a true or false granularity when
compared to the parent plant-part. With all of the indicator columns
from each plant-part's parent plant-parts, if all of those determined
that the plant-part is a true granularity, then this method will label
the plant-part as being a true granulary and vice versa.
Because we have forced a hierarchy within the :py:const:`PLANT_PARTS_ORDERED`,
the process for labeling true or false granularities must investigate
bi-directionally. This is because all of the plant-parts besides
'plant' and 'plant_gen' are not necessarily bigger of smaller than
their parent plant-part and thus there is overlap. Because of this,
this method uses the checks in both directions (from partent to child
and from child to parent).
Args:
part_bools (pandas.DataFrame): result of :meth:`make_all_the_bools`
TODO:
This function results in warning: ``PerformanceWarning: DataFrame
is highly fragmented...`` I expect this is because of the number of
columns that are being assigned here via ``.loc[:, col_to_assign]``.
This warning shows up only after the 5th iteration through the top-level
loop (when part_name = 'plant_prime_fuel').
"""
# assign a bool for the true gran only if all
for part_name, parent_parts in self.parts_to_parent_parts.items():
for parent_part_name in parent_parts:
# let's save the input boolean columns
bool_cols = [
f'{part_name}_has_only_one_{parent_part_name}',
f'{parent_part_name}_has_only_one_{part_name}'
]
# false_gran_col = f'false_gran_{part_name}_v_{parent_part_name}'
# the long awaited ALL.. label them as
part_bools.loc[
:, f'true_gran_{part_name}_v_{parent_part_name}'] = (
~(part_bools.loc[:, bool_cols].all(axis='columns'))
.astype(pd.BooleanDtype())
)
# if all of the true_gran part v parent part columns are false,
# than this part is a false gran. if they are all true, then wahoo
# the record is truly unique
part_bools.loc[:, f'true_gran_{part_name}'] = (
part_bools.filter(like=f'true_gran_{part_name}')
.all(axis='columns'))
trues_found = (
part_bools[part_bools[f'true_gran_{part_name}']]
.drop_duplicates(
subset=[self.parts_to_ids[part_name], 'plant_id_eia']
+ IDX_TO_ADD
)
)
logger.info(
f'true grans found for {part_name}: {len(trues_found)}'
)
return part_bools
[docs] def label_true_id_by_part(self, part_trues):
"""
Label the appropriate plant-part.
For each plant-part, we need to make a label which indicates what the
"true" unique plant-part is.. if a gen vs a unit is a non-unique set a
records, we only want to label one of them as false granularities. We
are going to use the :meth:`parts_to_parent_parts` dictionary to help us
with this. We want to "save" the biggest parent plant-part as true
granularity.
Because we have columns in ``part_trues`` that indicate whether a
plant-part is a true gran vs each parent part, we can cycle through
the possible parent-parts from biggest to smallest and the first time
we find that a plant-part is a false gran, we label it's true id as
that parent-part.
"""
part_trues = part_trues.copy()
for part_name, parent_parts in self.parts_to_parent_parts.items():
# make column which will indicate which part is the true/unique
# plant-part...
appro_part_col = f"appro_part_label_{part_name}"
# make this col null so we can fill in
part_trues.loc[:, appro_part_col] = pd.NA
for parent_part_name in parent_parts:
# find the reords where the true gran col is false, and label
# the appropriate part column name with that parent part
mask_loc = (
~part_trues.loc[
:, f'true_gran_{part_name}_v_{parent_part_name}'],
appro_part_col
)
part_trues.loc[mask_loc] = part_trues.loc[mask_loc].fillna(
parent_part_name)
# for all of the plant-part records which we didn't find any false
# gran's the appropriate label is itself! it is a unique snowflake
part_trues.loc[:, appro_part_col] = (
part_trues.loc[:, appro_part_col].fillna(part_name)
)
part_trues = (
assign_record_id_eia(
part_trues, plant_part_col=appro_part_col)
.rename(
columns={'record_id_eia': f'record_id_eia_{part_name}'})
)
# do a little check
if not all(part_trues[part_trues[f'true_gran_{part_name}']]
[appro_part_col] == part_name):
raise AssertionError(
f'eeeeEEeEe... the if true_gran_{part_name} is true, the '
f'{appro_part_col} should {part_name}.'
)
return part_trues
[docs]class MakePlantParts(object):
"""
Compile the plant parts for the master unit list.
This object generates a master list of different "plant-parts", which
are various collections of generators - i.e. units, fuel-types, whole
plants, etc. - as well as various ownership arrangements. Each
plant-part is included in the master plant-part table associated with
each of the plant-part's owner twice - once with the data scaled to the
fraction of each owners' ownership and another for a total plant-part
for each owner.
This master plant parts table is generated by first creating a complete
generators table - with all of the data columns we will be aggregating
to different plant-part's and sliced and scaled by ownership. Then we
make a label for each plant-part record which indicates whether or not
the record is a unique grouping of generator records. Then we use the
complete generator table to aggregate by each of the plant-part
categories.
The coordinating function here is :meth:`execute`.
"""
def __init__(self, pudl_out):
"""
Initialize instance of :class:`MakePlantParts`.
Args:
pudl_out (pudl.output.pudltabl.PudlTabl): An object used to create
the tables for EIA and FERC Form 1 analysis.
"""
self.pudl_out = pudl_out
self.freq = pudl_out.freq
self.parts_to_ids = make_parts_to_ids_dict()
# get a list of all of the id columns that constitue the primary keys
# for all of the plant parts
self.id_cols_list = make_id_cols_list()
[docs] def execute(self, gens_mega, true_grans):
"""
Aggreate and slice data points by each plant part.
Returns:
pandas.DataFrame:
"""
# aggreate everything by each plant part
part_dfs = []
for part_name in PLANT_PARTS_ORDERED:
part_df = PlantPart(part_name).execute(gens_mega)
part_df = (
PartTrueGranLabeler(part_name).execute(part_df, true_grans)
)
# add in the attributes!
for attribute_col in CONSISTENT_ATTRIBUTE_COLS:
part_df = (
AddConsistentAttributes(attribute_col, part_name)
.execute(part_df, gens_mega)
)
for attribute_col in PRIORITY_ATTRIBUTES_DICT.keys():
part_df = (
AddPriorityAttribute(attribute_col, part_name)
.execute(part_df, gens_mega)
)
part_dfs.append(part_df)
plant_parts_eia = pd.concat(part_dfs)
# clean up, add additional columns
self.plant_parts_eia = (
self.add_additonal_cols(plant_parts_eia)
.pipe(pudl.helpers.organize_cols, FIRST_COLS)
.pipe(self._clean_plant_parts)
)
self.validate_ownership_for_owned_records(self.plant_parts_eia)
validate_run_aggregations(self.plant_parts_eia, gens_mega)
return self.plant_parts_eia
#######################################
# Add Entity Columns and Final Cleaning
#######################################
[docs] def add_additonal_cols(self, plant_parts_eia):
"""
Add additonal data and id columns.
This method adds a set of either calculated columns or PUDL ID columns.
Returns:
pandas.DataFrame: master unit list table with these additional
columns:
* utility_id_pudl +
* plant_id_pudl +
* capacity_factor +
* ownership_dupe (boolean): indicator of whether the "owned"
record has a corresponding "total" duplicate.
"""
plant_parts_eia = (
pudl.helpers.calc_capacity_factor(
df=plant_parts_eia,
min_cap_fact=-0.5,
max_cap_fact=1.5,
freq=self.freq
)
.merge(
self.pudl_out.plants_eia860()
[['plant_id_eia', 'plant_id_pudl']]
.drop_duplicates(),
how='left',
on=['plant_id_eia', ]
)
.merge(
self.pudl_out.utils_eia860()
[['utility_id_eia', 'utility_id_pudl']]
.drop_duplicates(),
how='left',
on=['utility_id_eia']
)
.assign(ownership_dupe=lambda x: np.where(
(x.ownership == 'owned') & (x.fraction_owned == 1),
True, False)
)
)
return plant_parts_eia
[docs] def _clean_plant_parts(self, plant_parts_eia):
return (
plant_parts_eia
.assign(
report_year=lambda x: x.report_date.dt.year,
plant_id_report_year=lambda x:
x.plant_id_pudl.astype(str)
+ "_" + x.report_year.astype(str)
)
.pipe(
pudl.helpers.cleanstrings_snake,
['record_id_eia', 'appro_record_id_eia']
)
# we'll eventually take this out... once Issue #20
.drop_duplicates(subset=['record_id_eia'])
.set_index('record_id_eia')
)
#################
# Testing Methods
#################
[docs] def validate_ownership_for_owned_records(self, plant_parts_eia):
"""
Test ownership - fraction owned for owned records.
This test can be run at the end of or with the result of
:meth:`MakePlantParts.execute`. It tests a few aspects of the the
fraction_owned column and raises assertions if the tests fail.
"""
test_own_df = (
plant_parts_eia.groupby(
by=self.id_cols_list + ['plant_part', 'ownership'],
dropna=False,
observed=True,
)
[['fraction_owned', 'capacity_mw']].sum(min_count=1).reset_index())
owned_one_frac = test_own_df[
(~np.isclose(test_own_df.fraction_owned, 1))
& (test_own_df.capacity_mw != 0)
& (test_own_df.capacity_mw.notnull())
& (test_own_df.ownership == 'owned')]
if not owned_one_frac.empty:
self.test_own_df = test_own_df
self.owned_one_frac = owned_one_frac
raise AssertionError(
"Hello friend, you did a bad. It happens... There are "
f"{len(owned_one_frac)} rows where fraction_owned does not sum "
"to 100% for the owned records. "
"Check cached `owned_one_frac` & `test_own_df` and `slice_by_ownership()`"
)
no_frac_n_cap = test_own_df[
(test_own_df.capacity_mw == 0)
& (test_own_df.fraction_owned == 0)
]
self.no_frac_n_cap = no_frac_n_cap
if len(no_frac_n_cap) > 60:
self.no_frac_n_cap = no_frac_n_cap
warnings.warn(
f"""Too many nothings, you nothing. There shouldn't been much
more than 60 instances of records with zero capacity_mw (and
therefor zero fraction_owned) and you got {len(no_frac_n_cap)}.
"""
)
[docs]class PlantPart(object):
"""
Plant-part table maker.
The coordinating method here is :meth:`execute`.
**Examples**
Below are some examples of how the main processing step in this class
operates: :meth:`PlantPart.ag_part_by_own_slice`. If we have a plant with
four generators that looks like this:
>>> gens_mega = pd.DataFrame({
... 'plant_id_eia': [1, 1, 1, 1],
... 'report_date': ['2020-01-01', '2020-01-01', '2020-01-01', '2020-01-01',],
... 'utility_id_eia': [111, 111, 111, 111],
... 'generator_id': ['a', 'b', 'c', 'd'],
... 'prime_mover_code': ['ST', 'GT', 'CT', 'CA'],
... 'energy_source_code_1': ['BIT', 'NG', 'NG', 'NG'],
... 'ownership': ['total', 'total', 'total', 'total',],
... 'operational_status_pudl': ['operating', 'operating', 'operating', 'operating'],
... 'capacity_mw': [400, 50, 125, 75],
... }).astype({
... 'report_date': 'datetime64[ns]',
... })
>>> gens_mega
plant_id_eia report_date utility_id_eia generator_id prime_mover_code energy_source_code_1 ownership operational_status_pudl capacity_mw
0 1 2020-01-01 111 a ST BIT total operating 400
1 1 2020-01-01 111 b GT NG total operating 50
2 1 2020-01-01 111 c CT NG total operating 125
3 1 2020-01-01 111 d CA NG total operating 75
This ``gens_mega`` table can then be aggregated by ``plant``, ``plant_prime_fuel``,
``plant_prime_mover``, or ``plant_gen``.
"""
def __init__(self, part_name):
"""
Initialize an object which makes a tbl for a specific plant-part.
Args:
part_name (str): the name of the part to aggregate to. Names can be
only those in :py:const:`PLANT_PARTS`
"""
self.part_name = part_name
self.id_cols = PLANT_PARTS[part_name]['id_cols']
[docs] def execute(
self,
gens_mega: pd.DataFrame,
sum_cols: List[str] = SUM_COLS,
wtavg_dict: Dict = WTAVG_DICT
) -> pd.DataFrame:
"""
Get a table of data aggregated by a specific plant-part.
This method will take ``gens_mega`` and aggregate the generator records
to the level of the plant-part. This is mostly done via
:meth:`ag_part_by_own_slice`. Then several additional columns are added
and the records are labeled as true or false granularities.
Returns:
a table with records that have been aggregated to a plant-part.
"""
part_df = (
self.ag_part_by_own_slice(
gens_mega,
sum_cols=sum_cols,
wtavg_dict=wtavg_dict
)
.pipe(self.ag_fraction_owned)
.assign(plant_part=self.part_name)
.pipe(self.add_install_year, gens_mega)
.pipe( # add standard record id w/ year
add_record_id,
id_cols=self.id_cols,
plant_part_col='plant_part',
year=True
)
.pipe( # add additional record id that DOESN'T CARE ABOUT TIME
add_record_id,
id_cols=self.id_cols,
plant_part_col='plant_part',
year=False
)
.pipe(self.add_new_plant_name, gens_mega)
.pipe(self.add_record_count_per_plant)
)
return part_df
[docs] def ag_part_by_own_slice(
self,
gens_mega,
sum_cols=SUM_COLS,
wtavg_dict=WTAVG_DICT,
) -> pd.DataFrame:
"""
Aggregate the plant part by seperating ownership types.
There are total records and owned records in this master unit list.
Those records need to be aggregated differently to scale. The "total"
ownership slice is now grouped and aggregated as a single version of the
full plant and then the utilities are merged back. The "owned"
ownership slice is grouped and aggregated with the utility_id_eia, so
the portions of generators created by slice_by_ownership will be
appropriately aggregated to each plant part level.
Returns:
pandas.DataFrame: dataframe aggregated to the level of the
part_name
"""
logger.info(f'begin aggregation for: {self.part_name}')
# id_cols = PLANT_PARTS[self.part_name]['id_cols']
# split up the 'owned' slices from the 'total' slices.
# this is because the aggregations are different
part_own = gens_mega.loc[gens_mega.ownership == 'owned'].copy()
part_tot = gens_mega.loc[gens_mega.ownership == 'total'].copy()
if len(gens_mega) != len(part_own) + len(part_tot):
raise AssertionError(
"Error occured in breaking apart ownership types."
"The total and owned slices should equal the total records."
"Check for nulls in the ownership column."
)
part_own = pudl.helpers.sum_and_weighted_average_agg(
df_in=part_own,
by=self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD,
sum_cols=sum_cols,
wtavg_dict=wtavg_dict
)
# we want a "total" record for each of the utilities that own any slice
# of a particular plant-part. To achieve this, we are going to remove
# the utility info (and drop duplicates bc a plant-part with many
# generators will have multiple duplicate records for each owner)
# we are going to generate the aggregated output for a utility-less
# "total" record and then merge back in the many utilites so each of
# the utilities is associated with an aggergated "total" plant-part
# record
part_tot_no_utils = (
part_tot.drop(columns=['utility_id_eia'])
.drop_duplicates()
)
# still need to re-calc the fraction owned for the part
part_tot_out = (
pudl.helpers.sum_and_weighted_average_agg(
df_in=part_tot_no_utils,
by=self.id_cols + IDX_TO_ADD,
sum_cols=sum_cols,
wtavg_dict=wtavg_dict
)
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
.merge(
part_tot[self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD].drop_duplicates(),
on=self.id_cols + IDX_TO_ADD,
how='left',
validate='1:m'
)
)
part_ag = (
pd.concat([part_own, part_tot_out])
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
return part_ag
[docs] def ag_fraction_owned(self, part_ag):
"""
Calculate the fraction owned for a plant-part df.
This method takes a dataframe of records that are aggregated to the
level of a plant-part (with certain ``id_cols``) and appends a
fraction_owned column, which indicates the % ownership that a
particular utility owner has for each aggreated plant-part record.
For partial owner records (ownership == "owned"), fraction_owned is
calcuated based on the portion of the capacity and the total capacity
of the plant. For total owner records (ownership == "total"), the
fraction_owned is always 1.
This method is meant to be run after :meth:`ag_part_by_own_slice`.
Args:
part_ag (pandas.DataFrame):
"""
# we must first get the total capacity of the full plant
# Note: we could simply not include the ownership == "total" records
# We are automatically assign fraction_owned == 1 to them, but it seems
# cleaner to run the full df through this same grouby
frac_owned = (
part_ag.groupby(
by=self.id_cols + IDX_TO_ADD + ['ownership'], observed=True)
[['capacity_mw']].sum(min_count=1)
)
# then merge the total capacity with the plant-part capacity to use to
# calculate the fraction_owned
part_frac = (
pd.merge(
part_ag,
frac_owned,
right_index=True,
left_on=frac_owned.index.names,
suffixes=("", "_total")
)
.assign(
fraction_owned=lambda x:
np.where(
x.ownership == 'owned',
x.capacity_mw / x.capacity_mw_total,
1
)
)
.drop(columns=['capacity_mw_total'])
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
return part_frac
[docs] def add_install_year(self, part_df, gens_mega):
"""
Add the install year from the entities table to your plant part.
TODO: This should be converted into an AddAttribute... an
AddSortedAttribute or something like that.
"""
logger.debug(f'pre count of part DataFrame: {len(part_df)}')
# we want to sort to have the most recent on top
install = (
gens_mega
.assign(installation_year=lambda x: x.operating_date.dt.year)
.astype({'installation_year': 'Int64'})
[self.id_cols + IDX_TO_ADD + ['installation_year']]
.sort_values('installation_year', ascending=False)
.drop_duplicates(subset=self.id_cols, keep='first')
.dropna(subset=self.id_cols)
)
part_df = part_df.merge(
install, how='left',
on=self.id_cols + IDX_TO_ADD, validate='m:1')
logger.debug(
f'count of install years for part: {len(install)} \n'
f'post count of part DataFrame: {len(part_df)}'
)
return part_df
[docs] def add_new_plant_name(self, part_df, gens_mega):
"""
Add plants names into the compiled plant part df.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part (ex: all plant's or plant_prime_mover's).
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
part_df = (
pd.merge(
part_df,
gens_mega[self.id_cols + ['plant_name_eia']].drop_duplicates(),
on=self.id_cols,
how='left'
)
.assign(plant_name_new=lambda x: x.plant_name_eia))
# we don't want the plant_id_eia to be part of the plant name, but all
# of the other parts should have their id column in the new plant name
if self.part_name != 'plant':
col = [x for x in self.id_cols if x != 'plant_id_eia'][0]
part_df.loc[part_df[col].notnull(), 'plant_name_new'] = (
part_df['plant_name_new'] + " " + part_df[col].astype(str))
return part_df
[docs] def add_record_count_per_plant(self, part_df: pd.DataFrame) -> pd.DataFrame:
"""
Add a record count for each set of plant part records in each plant.
Args:
part_df: dataframe containing records associated
with one plant part (ex: all plant's or plant_prime_mover's).
Returns:
augmented version of ``part_df`` with a new column named
``record_count``
"""
group_cols = ['plant_id_eia'] + IDX_TO_ADD + IDX_OWN_TO_ADD
# count unique records per plant
part_df.loc[:, 'record_count'] = (
part_df.groupby(group_cols, observed=True)
['record_id_eia'].transform("count")
)
return part_df
[docs]class PartTrueGranLabeler:
"""Label a plant-part as a unique (or not) granularity."""
def __init__(self, part_name: Literal[PLANT_PARTS_ORDERED]):
"""Initialize a true granularity labeler."""
self.part_name = part_name
self.id_cols = PLANT_PARTS[part_name]['id_cols']
[docs] def execute(self, part_df, true_grans):
"""
Merge the true granularity labels into the plant part df.
Args:
part_df (pandas.DataFrame)
"""
# get only the columns you need for this part and drop duplicates
bool_df = (
true_grans[
self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD +
[f'true_gran_{self.part_name}',
f'appro_part_label_{self.part_name}',
f'record_id_eia_{self.part_name}', ]
]
.drop_duplicates()
)
prop_true_len1 = len(
bool_df[bool_df[f'true_gran_{self.part_name}']]) / len(bool_df)
logger.debug(f'proportion of trues: {prop_true_len1:.02}')
logger.debug(f'number of records pre-merge: {len(part_df)}')
part_df = (
part_df.merge(
bool_df,
on=self.id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD,
how='left',
)
.rename(columns={
f'true_gran_{self.part_name}': 'true_gran',
f'appro_part_label_{self.part_name}': 'appro_part_label',
f'record_id_eia_{self.part_name}': 'appro_record_id_eia'
})
)
prop_true_len2 = len(part_df[part_df.true_gran]) / len(part_df)
logger.debug(f'proportion of trues: {prop_true_len2:.02}')
logger.debug(f'number of records post-merge: {len(part_df)}')
return part_df
[docs]class AddAttribute(object):
"""Base class for adding attributes to plant-part tables."""
def __init__(self, attribute_col, part_name):
"""
Initialize a attribute adder.
Args:
attribute_col (string): name of qualifer record that you want added.
Must be in :py:const:`CONSISTENT_ATTRIBUTE_COLS` or a key in
:py:const:`PRIORITY_ATTRIBUTES_DICT`.
part_name (str): the name of the part to aggregate to. Names can be
only those in :py:const:`PLANT_PARTS`
"""
assert(attribute_col in CONSISTENT_ATTRIBUTE_COLS +
list(PRIORITY_ATTRIBUTES_DICT.keys()))
self.attribute_col = attribute_col
# the base columns will be the id columns, plus the other two main ids
self.part_name = part_name
self.id_cols = PLANT_PARTS[part_name]['id_cols']
# why no util id? otoh does ownership need to be here at all?
# qualifiers should be independent of ownership
self.base_cols = self.id_cols + IDX_TO_ADD + ['ownership']
[docs]class AddConsistentAttributes(AddAttribute):
"""Adder of attributes records to a plant-part table."""
[docs] def execute(self, part_df, gens_mega):
"""
Get qualifier records.
For an individual dataframe of one plant part (e.g. only
"plant_prime_mover" plant part records), we typically have identifying
columns and aggregated data columns. The identifying columns for a
given plant part are only those columns which are required to uniquely
specify a record of that type of plant part. For example, to uniquely
specify a plant_unit record, we need both ``plant_id_eia``,
``unit_id_pudl``, ``report_date`` and nothing else. In other words,
the identifying columns for a given plant part would make up a natural
composite primary key for a table composed entirely of that type of
plant part. Every plant part is cobbled together from generator
records, so each record in each part_df can be thought of as a
collection of generators.
Identifier and qualifier columns are the same columns; whether a column
is an identifier or a qualifier is a function of the plant part you're
considering. All the other columns which could be identifiers in the
context of other plant parrts (but aren't for this plant part) are
qualifiers.
This method takes a part_df and goes and checks whether or not the data
we are trying to grab from the record_name column is consistent across
every component genertor from each record.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part.
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
attribute_col = self.attribute_col
if attribute_col in part_df.columns:
logger.debug(f'{attribute_col} already here.. ')
return part_df
record_df = gens_mega.copy()
consistent_records = self.get_consistent_qualifiers(record_df)
non_nulls = consistent_records[
consistent_records[attribute_col].notnull()]
logger.debug(
f'merging in consistent {attribute_col}: {len(non_nulls)}')
return part_df.merge(consistent_records, how='left')
[docs] def get_consistent_qualifiers(self, record_df):
"""
Get fully consistent qualifier records.
When data is a qualifer column is identical for every record in a
plant part, we associate this data point with the record. If the data
points for the related generator records are not identical, then
nothing is associated with the record.
Args:
record_df (pandas.DataFrame): the dataframe with the record
base_cols (list) : list of identifying columns.
record_name (string) : name of qualitative record
"""
# TODO: determine if we can move this up the chain so we can do this
# once per plant-part, not once per plant-part * qualifer record
attribute_col = self.attribute_col
base_cols = self.base_cols
entity_count_df = (
pudl.helpers.count_records(
record_df, base_cols, 'entity_occurences')
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
record_count_df = (
pudl.helpers.count_records(
record_df, base_cols + [attribute_col], 'record_occurences')
. pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
re_count = (
record_df[base_cols + [attribute_col]]
.merge(entity_count_df, how='left', on=base_cols)
.merge(record_count_df, how='left', on=base_cols + [attribute_col])
)
# find all of the matching records..
consistent_records = (
re_count[
re_count['entity_occurences'] == re_count['record_occurences']]
.drop(columns=['entity_occurences', 'record_occurences'])
.drop_duplicates())
return consistent_records
[docs]class AddPriorityAttribute(AddAttribute):
"""
Add Attributes based on a priority sorting from :py:const:`PRIORITY_ATTRIBUTES`.
This object associates one attribute from the generators that make up a
plant-part based on a sorted list within :py:const:`PRIORITY_ATTRIBUTES`.
For example, for "operational_status" we will grab the highest level of
operational status that is associated with each records' component
generators. The order of operational status is defined within the method as:
'existing', 'proposed', then 'retired'. For example if a plant_unit is
composed of two generators, and one of them is "existing" and another
is "retired" the entire plant_unit will be considered "existing".
"""
[docs] def execute(self, part_df, gens_mega):
"""
Add the attribute to the plant-part df based on priority.
Args:
part_df (pandas.DataFrame): dataframe containing records associated
with one plant part.
gens_mega (pandas.DataFrame): a table of all of the generators with
identifying columns and data columns, sliced by ownership which
makes "total" and "owned" records for each generator owner.
"""
attribute_col = self.attribute_col
if attribute_col in part_df.columns:
logger.debug(f'{attribute_col} already here.. ')
return part_df
logger.debug(f'getting max {attribute_col}')
consistent_records = pudl.helpers.dedupe_on_category(
gens_mega.copy()[self.base_cols + [attribute_col]],
self.base_cols,
attribute_col,
PRIORITY_ATTRIBUTES_DICT[attribute_col]
)
part_df = part_df.merge(
consistent_records,
on=self.base_cols,
how='left'
)
return part_df
#################
# Data Validation
#################
[docs]def validate_run_aggregations(plant_parts_eia, gens_mega):
"""
Run a test of the aggregated columns.
This test will used the plant_parts_eia, re-run groubys and check
similarity.
"""
for part_name in PLANT_PARTS_ORDERED:
logger.info(f'Begining tests for {part_name}:')
test_merge = _test_prep_merge(part_name, plant_parts_eia, gens_mega)
for test_col in SUM_COLS:
# Check if test aggregation is the same as generated aggreation
# Apply a boolean column to the test df.
test_merge[f'test_{test_col}'] = (
(test_merge[f'{test_col}_test'] == test_merge[f'{test_col}'])
| (test_merge[f'{test_col}_test'].isnull()
& test_merge[f'{test_col}'].isnull())
| (test_merge.ownership == 'total')
)
result = list(test_merge[f'test_{test_col}'].unique())
logger.info(f' Results for {test_col}: {result}')
if not all(result):
warnings.warn(f"{test_col} done fucked up.")
return test_merge
# raise AssertionError(
# f"{test_col}'s '"
# )
[docs]def _test_prep_merge(part_name, plant_parts_eia, gens_mega):
"""Run the test groupby and merge with the aggregations."""
id_cols = PLANT_PARTS[part_name]['id_cols']
plant_cap = (
gens_mega[gens_mega.ownership == 'owned']
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
.groupby(
by=id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD, observed=True)
[SUM_COLS]
.sum(min_count=1)
.reset_index()
.pipe(pudl.helpers.convert_cols_dtypes, 'eia')
)
test_merge = pd.merge(
plant_parts_eia[plant_parts_eia.plant_part == part_name],
plant_cap,
on=id_cols + IDX_TO_ADD + IDX_OWN_TO_ADD,
how='outer',
indicator=True,
suffixes=('', '_test'))
return test_merge
#########################
# Module Helper Functions
#########################
[docs]def make_id_cols_list():
"""
Get a list of the id columns (primary keys) for all of the plant parts.
Returns:
list: a list of the ID columns for all of the plant-parts, including
``report_date``
"""
return (
IDX_TO_ADD + pudl.helpers.dedupe_n_flatten_list_of_lists(
[x['id_cols'] for x in PLANT_PARTS.values()])
)
[docs]def make_parts_to_ids_dict():
"""
Make dict w/ plant-part names (keys) to the main id column (values).
All plant-parts have 1 or 2 ID columns in :py:const:`PLANT_PARTS` plant_id_eia and
a secondary column (with the exception of the "plant" plant-part). The
plant_id_eia column is always first, so we're going to grab the last column.
Returns:
dictionary: plant-part names (keys) cooresponding to the main ID column
(value).
"""
parts_to_ids = {}
for part, part_dict in PLANT_PARTS.items():
parts_to_ids[part] = PLANT_PARTS[part]['id_cols'][-1]
return parts_to_ids
[docs]def add_record_id(part_df, id_cols, plant_part_col='plant_part', year=True):
"""
Add a record id to a compiled part df.
We need a standardized way to refer to these compiled records that
contains enough information in the id itself that in theory we could
deconstruct the id and determine which plant id and plant part id
columns are associated with this record.
"""
ids = deepcopy(id_cols)
# we want the plant id first... mostly just bc it'll be easier to read
part_df = part_df.assign(
record_id_eia_temp=lambda x: x.plant_id_eia.map(str))
ids.remove('plant_id_eia')
for col in ids:
part_df = part_df.assign(
record_id_eia_temp=lambda x: x.record_id_eia_temp + "_" + x[col].astype(str))
if year:
part_df = part_df.assign(
record_id_eia_temp=lambda x:
x.record_id_eia_temp + "_" +
x.report_date.dt.year.astype(str))
part_df = part_df.assign(
record_id_eia_temp=lambda x:
x.record_id_eia_temp + "_" +
x[plant_part_col] + "_" +
x.ownership.astype(str) + "_" +
x.utility_id_eia.astype('Int64').astype(str)
)
# add operational status only when records are not "operating" (i.e.
# existing or retiring mid-year see MakeMegaGenTbl.abel_operating_gens()
# for more details)
non_op_mask = part_df.operational_status_pudl != 'operating'
part_df.loc[non_op_mask, 'record_id_eia_temp'] = (
part_df.loc[non_op_mask, 'record_id_eia_temp'] + "_"
+ part_df.loc[non_op_mask, 'operational_status_pudl'])
if year:
part_df = part_df.rename(
columns={'record_id_eia_temp': 'record_id_eia'})
else:
part_df = part_df.rename(
columns={'record_id_eia_temp': 'plant_part_id_eia'})
return part_df
[docs]def assign_record_id_eia(test_df, plant_part_col='plant_part'):
"""
Assign record ids to a df with a mix of plant parts.
Args:
test_df (pandas.DataFrame):
plant_part_col (string):
TODO:
This function results in warning: ``PerformanceWarning: DataFrame is
highly fragmented...`` I'm honestly not sure if this is happening because of
this function specifically or is a result from all of the column
assignments in :meth:`label_true_id_by_part` or :meth:`label_true_grans_by_part`
where we are also getting this warning.
"""
dfs = []
for part in PLANT_PARTS:
dfs.append(
add_record_id(
part_df=test_df[test_df[plant_part_col] == part],
id_cols=PLANT_PARTS[part]['id_cols'],
plant_part_col=plant_part_col)
)
test_df_ids = pd.concat(dfs)
return test_df_ids