Source code for pudl.output.eia860

"""Functions for pulling data primarily from the EIA's Form 860."""

import logging

import pandas as pd
import sqlalchemy as sa

import pudl

[docs]logger = logging.getLogger(__name__)
[docs]def utilities_eia860(pudl_engine, start_date=None, end_date=None): """Pull all fields from the EIA860 Utilities table. Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. Returns: pandas.DataFrame: A DataFrame containing all the fields of the EIA 860 Utilities table. """ pt = pudl.output.pudltabl.get_table_meta(pudl_engine) # grab the entity table utils_eia_tbl = pt['utilities_entity_eia'] utils_eia_select = sa.sql.select(utils_eia_tbl) utils_eia_df = pd.read_sql(utils_eia_select, pudl_engine) # grab the annual eia entity table utils_eia860_tbl = pt['utilities_eia860'] utils_eia860_select = sa.sql.select(utils_eia860_tbl) if start_date is not None: start_date = pd.to_datetime(start_date) utils_eia860_select = utils_eia860_select.where( utils_eia860_tbl.c.report_date >= start_date ) if end_date is not None: end_date = pd.to_datetime(end_date) utils_eia860_select = utils_eia860_select.where( utils_eia860_tbl.c.report_date <= end_date ) utils_eia860_df = pd.read_sql(utils_eia860_select, pudl_engine) # grab the glue table for the utility_id_pudl utils_g_eia_tbl = pt['utilities_eia'] utils_g_eia_select = sa.sql.select( utils_g_eia_tbl.c.utility_id_eia, utils_g_eia_tbl.c.utility_id_pudl, ) utils_g_eia_df = pd.read_sql(utils_g_eia_select, pudl_engine) out_df = pd.merge(utils_eia_df, utils_eia860_df, how='left', on=['utility_id_eia']) out_df = pd.merge(out_df, utils_g_eia_df, how='left', on=['utility_id_eia']) out_df = ( out_df.assign(report_date=lambda x: pd.to_datetime(x.report_date)) .dropna(subset=["report_date", "utility_id_eia"]) .astype(pudl.helpers.get_pudl_dtypes({ "utility_id_eia": "eia", "utility_id_pudl": "eia", })) ) first_cols = [ 'report_date', 'utility_id_eia', 'utility_id_pudl', 'utility_name_eia', ] out_df = pudl.helpers.organize_cols(out_df, first_cols) return out_df
[docs]def plants_eia860(pudl_engine, start_date=None, end_date=None): """Pull all fields from the EIA Plants tables. Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. Returns: pandas.DataFrame: A DataFrame containing all the fields of the EIA 860 Plants table. """ pt = pudl.output.pudltabl.get_table_meta(pudl_engine) # grab the entity table plants_eia_tbl = pt['plants_entity_eia'] plants_eia_select = sa.sql.select(plants_eia_tbl) plants_eia_df = pd.read_sql(plants_eia_select, pudl_engine) # grab the annual table select plants_eia860_tbl = pt['plants_eia860'] plants_eia860_select = sa.sql.select(plants_eia860_tbl) if start_date is not None: start_date = pd.to_datetime(start_date) plants_eia860_select = plants_eia860_select.where( plants_eia860_tbl.c.report_date >= start_date ) if end_date is not None: end_date = pd.to_datetime(end_date) plants_eia860_select = plants_eia860_select.where( plants_eia860_tbl.c.report_date <= end_date ) plants_eia860_df = ( pd.read_sql(plants_eia860_select, pudl_engine) .assign(report_date=lambda x: pd.to_datetime(x.report_date)) ) # plant glue table plants_g_eia_tbl = pt['plants_eia'] plants_g_eia_select = sa.sql.select( plants_g_eia_tbl.c.plant_id_eia, plants_g_eia_tbl.c.plant_id_pudl, ) plants_g_eia_df = pd.read_sql(plants_g_eia_select, pudl_engine) out_df = pd.merge( plants_eia_df, plants_eia860_df, how='left', on=['plant_id_eia']) out_df = pd.merge(out_df, plants_g_eia_df, how='left', on=['plant_id_eia']) utils_eia_tbl = pt['utilities_eia'] utils_eia_select = sa.sql.select(utils_eia_tbl) utils_eia_df = pd.read_sql(utils_eia_select, pudl_engine) out_df = ( pd.merge(out_df, utils_eia_df, how='left', on=['utility_id_eia']) .dropna(subset=["report_date", "plant_id_eia"]) .astype(pudl.helpers.get_pudl_dtypes({ "plant_id_eia": "eia", "plant_id_pudl": "eia", "utility_id_eia": "eia", "utility_id_pudl": "eia", })) ) return out_df
[docs]def plants_utils_eia860(pudl_engine, start_date=None, end_date=None): """Create a dataframe of plant and utility IDs and names from EIA 860. Returns a pandas dataframe with the following columns: - report_date (in which data was reported) - plant_name_eia (from EIA entity) - plant_id_eia (from EIA entity) - plant_id_pudl - utility_id_eia (from EIA860) - utility_name_eia (from EIA860) - utility_id_pudl Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. Returns: pandas.DataFrame: A DataFrame containing plant and utility IDs and names from EIA 860. """ # Contains the one-to-one mapping of EIA plants to their operators plants_eia = ( plants_eia860(pudl_engine, start_date=start_date, end_date=end_date) .drop(['utility_id_pudl', 'city', 'state', # Avoid dupes in merge 'zip_code', 'street_address', 'utility_name_eia'], axis='columns') .dropna(subset=["utility_id_eia"]) # Drop unmergable records ) utils_eia = utilities_eia860(pudl_engine, start_date=start_date, end_date=end_date) # to avoid duplicate columns on the merge... out_df = pd.merge(plants_eia, utils_eia, how='left', on=['report_date', 'utility_id_eia']) out_df = ( out_df.loc[:, ['report_date', 'plant_id_eia', 'plant_name_eia', 'plant_id_pudl', 'utility_id_eia', 'utility_name_eia', 'utility_id_pudl'] ] .dropna(subset=["report_date", "plant_id_eia", "utility_id_eia"]) .astype(pudl.helpers.get_pudl_dtypes({ "plant_id_eia": "eia", "plant_id_pudl": "eia", "utility_id_eia": "eia", "utility_id_pudl": "eia", })) ) return out_df
[docs]def generators_eia860( pudl_engine, start_date=None, end_date=None, unit_ids=False, ): """Pull all fields reported in the generators_eia860 table. Merge in other useful fields including the latitude & longitude of the plant that the generators are part of, canonical plant & operator names and the PUDL IDs of the plant and operator, for merging with other PUDL data sources. Fill in data for adjacent years if requested, but never fill in earlier than the earliest working year of data for EIA923, and never add more than one year on after the reported data (since there should at most be a one year lag between EIA923 and EIA860 reporting) Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. pudl_unit_ids (bool): If True, use several heuristics to assign individual generators to functional units. EXPERIMENTAL. Returns: pandas.DataFrame: A DataFrame containing all the fields of the EIA 860 Generators table. """ pt = pudl.output.pudltabl.get_table_meta(pudl_engine) # Almost all the info we need will come from here. gens_eia860_tbl = pt['generators_eia860'] gens_eia860_select = sa.sql.select(gens_eia860_tbl) # To get plant age generators_entity_eia_tbl = pt['generators_entity_eia'] generators_entity_eia_select = sa.sql.select(generators_entity_eia_tbl) # To get the Lat/Lon coordinates plants_entity_eia_tbl = pt['plants_entity_eia'] plants_entity_eia_select = sa.sql.select(plants_entity_eia_tbl) if start_date is not None: start_date = pd.to_datetime(start_date) gens_eia860_select = gens_eia860_select.where( gens_eia860_tbl.c.report_date >= start_date ) if end_date is not None: end_date = pd.to_datetime(end_date) gens_eia860_select = gens_eia860_select.where( gens_eia860_tbl.c.report_date <= end_date ) gens_eia860 = pd.read_sql(gens_eia860_select, pudl_engine) generators_entity_eia_df = pd.read_sql( generators_entity_eia_select, pudl_engine) plants_entity_eia_df = pd.read_sql(plants_entity_eia_select, pudl_engine) out_df = pd.merge(gens_eia860, plants_entity_eia_df, how='left', on=['plant_id_eia']) out_df = pd.merge(out_df, generators_entity_eia_df, how='left', on=['plant_id_eia', 'generator_id']) out_df.report_date = pd.to_datetime(out_df.report_date) # Bring in some generic plant & utility information: pu_eia = ( plants_utils_eia860( pudl_engine, start_date=start_date, end_date=end_date) .drop(["plant_name_eia", "utility_id_eia"], axis="columns") ) out_df = pd.merge(out_df, pu_eia, on=['report_date', 'plant_id_eia'], how="left") # Merge in the unit_id_pudl assigned to each generator in the BGA process # Pull the BGA table and make it unit-generator only: out_df = pd.merge( out_df, boiler_generator_assn_eia860( pudl_engine, start_date=start_date, end_date=end_date )[[ "report_date", "plant_id_eia", "generator_id", "unit_id_pudl", "bga_source", ]].drop_duplicates(), on=["report_date", "plant_id_eia", "generator_id"], how="left", validate="m:1", ) # In order to be able to differentiate between single and multi-fuel # plants, we need to count how many different simple energy sources there # are associated with plant's generators. This allows us to do the simple # lumping of an entire plant's fuel & generation if its primary fuels # are homogeneous, and split out fuel & generation by fuel if it is # hetereogeneous. ft_count = out_df[['plant_id_eia', 'fuel_type_code_pudl', 'report_date']].\ drop_duplicates().groupby(['plant_id_eia', 'report_date']).count() ft_count = ft_count.reset_index() ft_count = ft_count.rename( columns={'fuel_type_code_pudl': 'fuel_type_count'}) out_df = ( pd.merge(out_df, ft_count, how='left', on=['plant_id_eia', 'report_date']) .dropna(subset=["report_date", "plant_id_eia", "generator_id"]) .astype(pudl.helpers.get_pudl_dtypes({ "plant_id_eia": "eia", "plant_id_pudl": "eia", "unit_id_pudl": "eia", "utility_id_eia": "eia", "utility_id_pudl": "eia", })) ) # Augment those base unit_id_pudl values using heuristics, see below. if unit_ids: out_df = assign_unit_ids(out_df) first_cols = [ 'report_date', 'plant_id_eia', 'plant_id_pudl', 'plant_name_eia', 'utility_id_eia', 'utility_id_pudl', 'utility_name_eia', 'generator_id', ] # Re-arrange the columns for easier readability: out_df = ( pudl.helpers.organize_cols(out_df, first_cols) .sort_values(['report_date', 'plant_id_eia', 'generator_id']) .pipe(pudl.helpers.convert_cols_dtypes, data_source="eia") ) return out_df
[docs]def boiler_generator_assn_eia860(pudl_engine, start_date=None, end_date=None): """Pull all fields from the EIA 860 boiler generator association table. Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. Returns: pandas.DataFrame: A DataFrame containing all the fields from the EIA 860 boiler generator association table. """ pt = pudl.output.pudltabl.get_table_meta(pudl_engine) bga_eia860_tbl = pt['boiler_generator_assn_eia860'] bga_eia860_select = sa.sql.select(bga_eia860_tbl) if start_date is not None: start_date = pd.to_datetime(start_date) bga_eia860_select = bga_eia860_select.where( bga_eia860_tbl.c.report_date >= start_date ) if end_date is not None: end_date = pd.to_datetime(end_date) bga_eia860_select = bga_eia860_select.where( bga_eia860_tbl.c.report_date <= end_date ) out_df = ( pd.read_sql(bga_eia860_select, pudl_engine) .assign(report_date=lambda x: pd.to_datetime(x.report_date)) ) return out_df
[docs]def ownership_eia860(pudl_engine, start_date=None, end_date=None): """Pull a useful set of fields related to ownership_eia860 table. Args: pudl_engine (sqlalchemy.engine.Engine): SQLAlchemy connection engine for the PUDL DB. start_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. end_date (date-like): date-like object, including a string of the form 'YYYY-MM-DD' which will be used to specify the date range of records to be pulled. Dates are inclusive. Returns: pandas.DataFrame: A DataFrame containing a useful set of fields related to the EIA 860 Ownership table. """ pt = pudl.output.pudltabl.get_table_meta(pudl_engine) own_eia860_tbl = pt["ownership_eia860"] own_eia860_select = sa.sql.select(own_eia860_tbl) if start_date is not None: start_date = pd.to_datetime(start_date) own_eia860_select = own_eia860_select.where( own_eia860_tbl.c.report_date >= start_date ) if end_date is not None: end_date = pd.to_datetime(end_date) own_eia860_select = own_eia860_select.where( own_eia860_tbl.c.report_date <= end_date ) own_eia860_df = ( pd.read_sql(own_eia860_select, pudl_engine) .assign(report_date=lambda x: pd.to_datetime(x["report_date"])) ) pu_eia = ( plants_utils_eia860( pudl_engine, start_date=start_date, end_date=end_date) .loc[:, ['plant_id_eia', 'plant_id_pudl', 'plant_name_eia', 'utility_name_eia', 'utility_id_pudl', 'report_date']] ) out_df = ( pd.merge(own_eia860_df, pu_eia, how='left', on=['report_date', 'plant_id_eia']) .dropna(subset=[ "report_date", "plant_id_eia", "generator_id", "owner_utility_id_eia", ]) .astype(pudl.helpers.get_pudl_dtypes({ "plant_id_eia": "eia", "plant_id_pudl": "eia", "utility_id_eia": "eia", "utility_id_pudl": "eia", })) ) first_cols = [ 'report_date', 'plant_id_eia', 'plant_id_pudl', 'plant_name_eia', 'utility_id_eia', 'utility_id_pudl', 'utility_name_eia', 'generator_id', 'owner_utility_id_eia', 'owner_name', ] # Re-arrange the columns for easier readability: out_df = ( pudl.helpers.organize_cols(out_df, first_cols) ) return out_df
################################################################################ # Temporary integration of more complete unit_id_pudl assignments # Eventually this should go into the boiler-generator-association process # and these IDs should probably live in the BGA table with the other # unit_id_pudl values derived from the BGA table and other heuristics. ################################################################################
[docs]def assign_unit_ids(gens_df): """ Group generators into operational units using various heuristics. Splits a few columns off from the big generator dataframe and uses several heuristic functions to fill in missing unit_id_pudl values beyond those that are generated in the boiler generator association process. Then merges the new unit ID values back in to the generators dataframe. Args: gens_df (pandas.DataFrame): An EIA generator table. Must contain at least the columns: report_date, plant_id_eia, generator_id, unit_id_pudl, bga_source, fuel_type_code_pudl, prime_mover_code, Returns: pandas.DataFrame: Returned dataframe should only vary from the input in that some NA values in the ``unit_id_pudl`` and ``bga_source`` columns have been filled in with real values. Raises: ValueError: If the input dataframe is missing required columns. ValueError: If any generator is associated with more than one unit_id_pudl. AssertionError: If row or column indices are changed. AssertionError: If pre-existing unit_id_pudl or bga_source values are altered. AssertionError: If contents of any other columns are altered at all. """ required_cols = [ "plant_id_eia", "generator_id", "report_date", "unit_id_pudl", "bga_source", "fuel_type_code_pudl", "prime_mover_code", ] if not set(required_cols).issubset(gens_df.columns): missing_cols = set(required_cols).difference(gens_df.columns) errstr = f"Input DataFrame missing required columns: {missing_cols}." raise ValueError(errstr) unit_ids = ( gens_df.loc[:, required_cols] # Forward and back fill preexisting Unit IDs: .pipe(fill_unit_ids) # Assign Unit IDs to the CT+CA CC generators: .pipe(assign_cc_unit_ids) # For whole-combined cycle (CC) and single-shaft combined cycle (CS) # units, we give each generator their own unit ID. We do the same for # internal combustion and simple-cycle gas combustion turbines. .pipe( assign_single_gen_unit_ids, prime_mover_codes=["CC", "CS", "GT", "IC"] ) # Nuclear units don't report in boiler_fuel_eia923 or generation_eia923 # Their fuel consumption is reported as mmbtu in generation_fuel_eia923 # Their net generation also only shows up in generation_fuel_eia923 # The generation_fuel_eia923 table records a "nuclear_unit_id" which # appears to be the same as the associated generator_id. However, we # can't use that as a unit_id_pudl since it might have a collision with # other already assigned unit_id_pudl values in the same plant for # generators with other fuel types. Thus we still need to assign them # a fuel-and-prime-mover based unit ID here. For now ALL nuclear plants # use steam turbines. .pipe( assign_single_gen_unit_ids, prime_mover_codes=["ST"], fuel_type_code_pudl="nuclear", label_prefix="nuclear", ) # In these next 4 assignments, we lump together all steam turbine (ST) # generators that have a consistent simplified fuel_type_code_pudl # across all years within a given plant into the same unit, since we # won't be able to distinguish them in the generation_fuel_eia923 # table. This will lump together solid fuels like BIT, LIG, SUB, PC etc. # under "coal". There are a few cases in which a generator has truly # changed its fuel type, e.g. coal-to-gas conversions but these are # rare and insubstantial. They will not be assigned a Unit ID in this # process. Non-fuel steam generation is also left out (geothermal & # solar thermal) .pipe( assign_prime_fuel_unit_ids, prime_mover_code="ST", fuel_type_code_pudl="coal" ) .pipe( assign_prime_fuel_unit_ids, prime_mover_code="ST", fuel_type_code_pudl="oil" ) .pipe( assign_prime_fuel_unit_ids, prime_mover_code="ST", fuel_type_code_pudl="gas" ) .pipe( assign_prime_fuel_unit_ids, prime_mover_code="ST", fuel_type_code_pudl="waste" ) # Retain only the merge keys and output columns .loc[:, [ "plant_id_eia", # Merge key "generator_id", # Merge key "report_date", # Merge key "unit_id_pudl", # Output column "bga_source" # Output column ]] ) # Check that each generator is only ever associated with a single unit, # at least within the codes that we've just assigned -- the Unit IDs that # are based on the EIA boiler-generator-association or other matching # methods could legitimately specify different units for generators over # time -- which could impact the forward-/back-filled IDs as well: old_codes = list(gens_df.bga_source.unique()) + ["bfill_units", "ffill_units"] gens_have_unique_unit = ( unit_ids[~unit_ids.bga_source.isin(old_codes)] .groupby(["plant_id_eia", "generator_id"])["unit_id_pudl"] .nunique() <= 1 # nunique() == 0 when there are only NA values. ).all() if not gens_have_unique_unit: errstr = "Some generators are associated with more than one unit_id_pudl." raise ValueError(errstr) # Use natural composite primary key as the index gens_idx = ["plant_id_eia", "generator_id", "report_date"] unit_ids = unit_ids.set_index(gens_idx).sort_index() gens_df = gens_df.set_index(gens_idx).sort_index() # Check that our input DataFrame and unit IDs have identical row indices # This is a dumb hack b/c set_index() doesn't preserve index data types # under some circumstances, and so we have "object" and "int64" types # being used for plant_id_eia at this point, fml. Really this should just # be assert_index_equal() for the two df indices: pd.testing.assert_frame_equal( unit_ids.reset_index()[gens_idx], gens_df.reset_index()[gens_idx] ) # Verify that anywhere out_df has a unit_id_pudl, it's identical in unit_ids pd.testing.assert_series_equal( gens_df.unit_id_pudl.dropna(), unit_ids.unit_id_pudl.loc[gens_df.unit_id_pudl.dropna().index] ) # Verify that anywhere out_df has a bga_source, it's identical in unit_ids pd.testing.assert_series_equal( gens_df.bga_source.dropna(), unit_ids.bga_source.loc[gens_df.bga_source.dropna().index] ) # We know that the indices are identical # We know that we aren't going to overwrite anything that isn't NA # Thus we should be able to just assign these values straight across. unit_cols = ["unit_id_pudl", "bga_source"] gens_df.loc[:, unit_cols] = unit_ids[unit_cols] return gens_df.reset_index()
[docs]def fill_unit_ids(gens_df): """ Back and forward fill Unit IDs for each plant / gen combination. This routine assumes that the mapping of generators to units is constant over time, and extends those mappings into years where no boilers have been reported -- since in the BGA we can only connect generators to each other if they are both connected to a boiler. Prior to 2014, combined cycle units didn't report any "boilers" but in latter years, they have been given "boilers" that correspond to their generators, so that all of their fuel consumption is recorded alongside that of other types of generators. The bga_source field is set to "bfill_units" for those that were backfilled, and "ffill_units" for those that were forward filled. Note: We could back/forward fill the boiler IDs prior to the BGA process and we ought to get consistent units across all the years that are the same as what we fill in here. We could also back/forward fill boiler IDs and Unit IDs after the fact, and we *should* get the same result. this will address many currently "boilerless" CCNG units that use generator ID as boiler ID in the latter years. We could try and apply this more generally, but in cases of generator IDs that haven't been used as boiler IDs, it would break the foreign key relationship with the boiler table, unless we added them there too, which seems like too much deep muddling. Args: gens_df (pandas.DataFrame): An generators_eia860 dataframe, which must contain columns: report_date, plant_id_eia, generator_id, unit_id_pudl, bga_source. Returns: pandas.DataFrame: with the same columns as the input dataframe, but having some NA values filled in for both the unit_id_pudl and bga_source columns. """ # forward and backward fill the unit IDs gen_ids = ["plant_id_eia", "generator_id"] gens_df = gens_df.sort_values(["report_date", "plant_id_eia", "generator_id"]) bfill_units = gens_df.groupby(gen_ids)["unit_id_pudl"].bfill() bfill_idx = (bfill_units.notnull()) & (gens_df.unit_id_pudl.isnull()) gens_df.loc[bfill_idx, "bga_source"] = "bfill_units" gens_df.loc[bfill_idx, "unit_id_pudl"] = bfill_units.loc[bfill_idx] ffill_units = gens_df.groupby(gen_ids)["unit_id_pudl"].ffill() ffill_idx = (ffill_units.notnull()) & (gens_df.unit_id_pudl.isnull()) gens_df.loc[ffill_idx, "bga_source"] = "ffill_units" gens_df.loc[ffill_idx, "unit_id_pudl"] = ffill_units.loc[ffill_idx] gens_df["bga_source"] = gens_df["bga_source"].astype(pd.StringDtype()) return gens_df
[docs]def max_unit_id_by_plant(gens_df): """ Identify the largest unit ID associated with each plant so we don't overlap. The PUDL Unit IDs are sequentially assigned integers. To assign a new ID, we need to know the largest existing Unit ID within a plant. This function calculates that largest existing ID, or uses zero, if no Unit IDs are set within the plant. Note that this calculation depends on having all of the pre-existing generators and units still available in the dataframe! Args: gens_df (pandas.DataFrame): A generators_eia860 dataframe containing at least the columns plant_id_eia and unit_id_pudl. Returns: pandas.DataFrame: Having two columns: plant_id_eia and max_unit_id_pudl in which each row should be unique. """ return ( gens_df[["plant_id_eia", "unit_id_pudl"]] .drop_duplicates() .groupby("plant_id_eia").agg({"unit_id_pudl": max}) .fillna(0) .rename(columns={"unit_id_pudl": "max_unit_id_pudl"})
.reset_index() )
[docs]def _append_masked_units(gens_df, row_mask, unit_ids, on): """ Replace rows with new PUDL Unit IDs in the original dataframe. Merges the newly assigned Unit IDs found in ``unit_ids`` into the ``gens_df`` dataframe, but only for those rows which are selected by the boolean ``row_mask``. Merges using the column or columns specified by ``on``. This operation should only result in changes to the values of ``unit_id_pudl`` and ``bga_source`` in the output dataframe. All of ``gens_df``, ``unit_ids`` and ``row_mask`` must be similarly indexed for this to work. Args: gens_df (pandas.DataFrame): a gens_eia860 based dataframe. row_mask (boolean mask): A boolean array indicating which records in ``gens_df`` should be replaced using values from ``unit_ids``. unit_ids (pandas.DataFrame): A dataframe containing newly assigned ``unit_id_pudl`` values to be integrated into ``gens_df``. on (str or list): Column or list of columns to merge on. Returns: pandas.DataFrame: """ return gens_df.loc[~row_mask].append( gens_df.loc[row_mask] .drop(["unit_id_pudl", "bga_source"], axis="columns") .merge( unit_ids, on=on, how="left", validate="many_to_one",
) )
[docs]def assign_single_gen_unit_ids( gens_df, prime_mover_codes, fuel_type_code_pudl=None, label_prefix="single" ): """ Assign a unique PUDL Unit ID to each generator of a given prime mover type. Calculate the maximum pre-existing PUDL Unit ID within each plant, and assign each as of yet unidentified distinct generator within each plant with an incrementing integer unit_id_pudl, beginning with 1 + the previous maximum unit_id_pudl found in that plant. Mark that generator with a label in the bga_source column consisting of label_prefix + the prime mover code. If fuel_type_code_pudl is not None, then only assign new Unit IDs to those generators having the specified fuel type code, and use that fuel type code as the label prefix, e.g. "coal_st" for a coal-fired steam turbine. Only generators having NA unit_id_pudl will be assigned a new ID. Args: gens_df (pandas.DataFrame): A collection of EIA generator records. Must include the ``plant_id_eia``, ``generator_id`` and ``prime_mover_code`` and ``unit_id_pudl`` columns. prime_mover_codes (list): List of prime mover codes for which we are attempting to assign simple Unit IDs. fuel_type_code_pudl (str, None): If not None, then limit the records assigned a unit_id to those that have the specified fuel_type_code_pudl (e.g. "coal", "gas", "oil", "nuclear") label_prefix (str): String to use in labeling records as to how their unit_id_pudl was set. Will be concatenated with the prime mover code. Returns: pandas.DataFrame: A new dataframe with the same rows and columns as were passed in, but with the unit_id_pudl and bga_source columns updated to reflect the newly assigned Unit IDs. """ if fuel_type_code_pudl is not None: # Need to make this only apply to consistent inter-year fuel types. fuel_type_mask = gens_df.fuel_type_code_pudl == fuel_type_code_pudl else: fuel_type_mask = True # Only alter the rows lacking Unit IDs and matching our target rows row_mask = ( (gens_df.prime_mover_code.isin(prime_mover_codes)) & (gens_df.unit_id_pudl.isnull()) & (fuel_type_mask) ) # We only need a few columns to make these assignments. cols = ["plant_id_eia", "generator_id", "unit_id_pudl", "prime_mover_code"] logger.info( "Selected %s %s records lacking Unit IDs from %s records overall. ", row_mask.sum(), prime_mover_codes, len(gens_df) ) unit_ids = ( gens_df.loc[row_mask, cols] .drop_duplicates() .merge( max_unit_id_by_plant(gens_df), on="plant_id_eia", how="left", validate="many_to_one", ) # Assign new unit_id_pudl values based on number of distinct generators: .assign( unit_id_pudl=lambda x: ( x.groupby("plant_id_eia")["generator_id"] .cumcount() + x.max_unit_id_pudl + 1 ), bga_source=lambda x: label_prefix + "_" + x.prime_mover_code.str.lower(), ) .drop(["max_unit_id_pudl", "prime_mover_code"], axis="columns") ) # Split original dataframe based on row_mask, and merge in the new IDs and # labels only on the subset of the dataframe matching our row_mask: return _append_masked_units( gens_df, row_mask, unit_ids, on=["plant_id_eia", "generator_id"]
)
[docs]def assign_cc_unit_ids(gens_df): """ Assign PUDL Unit IDs for combined cycle generation units. This applies only to combined cycle units reported as a combination of CT and CA prime movers. All CT and CA generators within a plant that do not already have a unit_id_pudl assigned will be given the same unit ID. The ``bga_source`` column is set to one of several flags indicating what type of arrangement was found: * ``orphan_ct`` (zero CA gens, 1+ CT gens) * ``orphan_ca`` (zero CT gens, 1+ CA gens) * ``one_ct_one_ca_inferred`` (1 CT, 1 CA) * ``one_ct_many_ca_inferred`` (1 CT, 1+ CA) * ``many_ct_one_ca_inferred`` (1+ CT, 1 CA) * ``many_ct_many_ca_inferred`` (1+ CT, 1+ CA) Orphaned generators are still assigned a ``unit_id_pudl`` so that they can potentially be associated with other generators in the same unit across years. It's likely that these orphans are a result of mislabled or missing generators. Note that as generators are added or removed over time, the flags associated with each generator may change, even though it remains part of the same inferred unit. Returns: pandas.DataFrame """ # Calculate the largest preexisting unit_id_pudl within each plant max_unit_ids = max_unit_id_by_plant(gens_df) cc_missing_units = gens_df[ (gens_df.unit_id_pudl.isna()) & gens_df.prime_mover_code.isin(["CT", "CA"]) ] # On a per-plant, per-year basis, count up the number of CT and CA generators. # Only look at those which don't already have a unit ID assigned: cc_pm_counts = ( cc_missing_units .groupby(["plant_id_eia", "report_date"])["prime_mover_code"] .value_counts().unstack(fill_value=0).astype(int).reset_index() ) cc_pm_counts.columns.name = None # Bring the max unit ID and PM counts into the DF so we can select and # assign based on them. We're using the cc_missing_units and a temporary # dataframe here to avoid interference from the CT & CA generators # that do already have unit IDs assigned to them in gens_df. tmp_df = ( cc_missing_units .merge( max_unit_ids, on="plant_id_eia", how="left", validate="many_to_one", ) .merge( cc_pm_counts, on=["plant_id_eia", "report_date"], how="left", validate="many_to_one", ) ) # Assign the new Unit IDs. # All CA and CT units get assigned to the same unit within a plant: tmp_df["unit_id_pudl"] = tmp_df["max_unit_id_pudl"] + 1 # Assign the orphan flags tmp_df.loc[tmp_df.CA == 0, "bga_source"] = "orphan_ct" tmp_df.loc[tmp_df.CT == 0, "bga_source"] = "orphan_ca" # The orphan flags should only have been applied to generators that had # at least one prime mover of the orphaned type. Just checking... assert (tmp_df.loc[tmp_df.bga_source == "orphan_ct", "CT"] > 0).all() assert (tmp_df.loc[tmp_df.bga_source == "orphan_ca", "CA"] > 0).all() # Assign flags for various arrangements of CA and CT generators tmp_df.loc[((tmp_df.CT == 1) & (tmp_df.CA == 1)), "bga_source"] = "one_ct_one_ca_inferred" tmp_df.loc[((tmp_df.CT == 1) & (tmp_df.CA > 1)), "bga_source"] = "one_ct_many_ca_inferred" tmp_df.loc[((tmp_df.CT > 1) & (tmp_df.CA == 1)), "bga_source"] = "many_ct_one_ca_inferred" tmp_df.loc[((tmp_df.CT > 1) & (tmp_df.CA > 1)), "bga_source"] = "many_ct_many_ca_inferred" # Align the indices of the two dataframes so we can assign directly tmp_df = tmp_df.set_index(["plant_id_eia", "generator_id", "report_date"]) out_df = gens_df.set_index(["plant_id_eia", "generator_id", "report_date"]) out_df.loc[tmp_df.index, ["unit_id_pudl", "bga_source"] ] = tmp_df[["unit_id_pudl", "bga_source"]] return out_df.reset_index()
[docs]def assign_prime_fuel_unit_ids(gens_df, prime_mover_code, fuel_type_code_pudl): """ Assign a PUDL Unit ID to all generators with a given prime mover and fuel. Within each plant, assign a Unit ID to all generators that don't have one, and that share the same `fuel_type_code_pudl` and `prime_mover_code`. This is especially useful for differentiating between different types of steam turbine generators, as there are so many different kinds of steam turbines, and the only characteristic we have to differentiate between them in this context is the fuel they consume. E.g. nuclear, geothermal, solar thermal, natural gas, diesel, and coal can all run steam turbines, but it doesn't make sense to lump those turbines together into a single unit just because they are located at the same plant. This routine only assigns a PUDL Unit ID to generators that have a consistently reported value of `fuel_type_code_pudl` across all of the years of data in `gens_df`. This consistency is important because otherwise the prime-fuel based unit assignment could put the same generator into different units in different years, which is currently not compatible with our concept of "units." Args: gens_df (pandas.DataFrame): A collection of EIA generator records. Must include the ``plant_id_eia``, ``generator_id`` and ``prime_mover_code`` and ``unit_id_pudl`` columns. prime_mover_code (str): List of prime mover codes for which we are attempting to assign simple Unit IDs. fuel_type_code_pudl (str): If not None, then limit the records assigned a unit_id to those that have the specified fuel_type_code_pudl (e.g. "coal", "gas", "oil", "nuclear") Returns: pandas.DataFrame: """ # Find generators with a consistent fuel_type_code_pudl across all years. consistent_fuel = ( gens_df.groupby(["plant_id_eia", "generator_id"])["fuel_type_code_pudl"] .transform(lambda x: x.nunique()) ) == 1 # This mask defines the generators generators we are going to alter: row_mask = ( (gens_df.prime_mover_code == prime_mover_code) & (gens_df.unit_id_pudl.isna()) & (gens_df.fuel_type_code_pudl == fuel_type_code_pudl) & (consistent_fuel) ) # We only need a few columns to make these assignments. cols = ["plant_id_eia", "generator_id", "unit_id_pudl"] logger.info( "Selected %s %s records lacking Unit IDs burning %s from %s records overall.", row_mask.sum(), prime_mover_code, fuel_type_code_pudl, len(gens_df) ) unit_ids = ( gens_df.loc[row_mask, cols] .drop_duplicates() .merge( max_unit_id_by_plant(gens_df), on="plant_id_eia", how="left", validate="many_to_one", ) # Assign all selected generators within each plant the next PUDL Unit ID. .assign( unit_id_pudl=lambda x: x.max_unit_id_pudl + 1, bga_source=lambda x: fuel_type_code_pudl + "_" + prime_mover_code.lower(), ) .drop(["max_unit_id_pudl"], axis="columns") ) # Split original dataframe based on row_mask, and merge in the new IDs and # labels only on the subset of the dataframe matching our row_mask: out_df = _append_masked_units( gens_df, row_mask, unit_ids, on=["plant_id_eia", "generator_id"] ) # Find generators with inconsistent fuel_type_code_pudl so we can label them inconsistent_fuel = ( out_df.groupby(["plant_id_eia", "generator_id"])["fuel_type_code_pudl"] .transform(lambda x: x.nunique()) ) > 1 inconsistent_fuel_mask = ( (out_df.prime_mover_code == prime_mover_code) & (out_df.unit_id_pudl.isna()) & (out_df.fuel_type_code_pudl == fuel_type_code_pudl) & (inconsistent_fuel) ) out_df.loc[inconsistent_fuel_mask, "bga_source"] = ( "inconsistent_" + fuel_type_code_pudl + "_" + prime_mover_code.lower() ) return out_df