Source code for pudl.analysis.epacamd_eia
"""Helper functions for filtering the EPA CAMD crosswalk table.
This filtering was originally designed to filter the crosswalk before making a
``subplant_id`` so that the only ``subplant_id`` s that are generated are for records
that show up in EPA CAMD.
Usage Example:
epacems = pudl.output.epacems.epacems(states=['ID'], years=[2020]) # subset for test
core_epa__assn_eia_epacamd = pudl_out.epacamd_eia()
filtered_crosswalk = filter_crosswalk(core_epa__assn_eia_epacamd, epacems)
crosswalk_with_subplant_ids = pudl.etl.make_subplant_ids(filtered_crosswalk)
"""
import dask.dataframe as dd
import pandas as pd
[docs]
def _get_unique_keys(epacems: pd.DataFrame | dd.DataFrame) -> pd.DataFrame:
"""Get unique unit IDs from CEMS data.
Args:
epacems (Union[pd.DataFrame, dd.DataFrame]): epacems dataset from
pudl.output.epacems.epacems
Returns:
pd.DataFrame: unique keys from the epacems dataset
"""
# The purpose of this function is mostly to resolve the
# ambiguity between dask and pandas dataframes
ids = epacems[["plant_id_eia", "emissions_unit_id_epa"]].drop_duplicates()
if isinstance(epacems, dd.DataFrame):
ids = ids.compute()
return ids
[docs]
def filter_crosswalk_by_epacems(
crosswalk: pd.DataFrame, epacems: pd.DataFrame | dd.DataFrame
) -> pd.DataFrame:
"""Inner join unique CEMS units with the core_epa__assn_eia_epacamd crosswalk.
This is essentially an empirical filter on EPA units. Instead of filtering by
construction/retirement dates in the crosswalk (thus assuming they are accurate),
use the presence/absence of CEMS data to filter the units.
Args:
crosswalk: core_epa__assn_eia_epacamd crosswalk
unique_epacems_ids (pd.DataFrame): unique ids from _get_unique_keys
Returns:
The inner join of the core_epa__assn_eia_epacamd crosswalk and unique epacems units. Adds
the global ID column unit_id_epa.
"""
unique_epacems_ids = _get_unique_keys(epacems)
key_map = unique_epacems_ids.merge(
crosswalk,
on=["plant_id_eia", "emissions_unit_id_epa"],
how="inner",
)
return key_map
[docs]
def filter_out_boiler_rows(crosswalk: pd.DataFrame) -> pd.DataFrame:
"""Remove rows that represent graph edges between generators and boilers.
Args:
crosswalk (pd.DataFrame): core_epa__assn_eia_epacamd crosswalk
Returns:
pd.DataFrame: the core_epa__assn_eia_epacamd crosswalk with boiler rows (many/one-to-many)
removed
"""
crosswalk = crosswalk.drop_duplicates(
subset=["plant_id_eia", "emissions_unit_id_epa", "generator_id"]
)
return crosswalk
[docs]
def filter_crosswalk(
crosswalk: pd.DataFrame, epacems: pd.DataFrame | dd.DataFrame
) -> pd.DataFrame:
"""Remove unmapped crosswalk rows or duplicates due to m2m boiler relationships.
Args:
crosswalk (pd.DataFrame): The core_epa__assn_eia_epacamd crosswalk.
epacems (Union[pd.DataFrame, dd.DataFrame]): Emissions data. Must contain
columns named ["plant_id_eia", "emissions_unit_id_epa"]
Returns:
pd.DataFrame: A filtered copy of core_epa__assn_eia_epacamd crosswalk
"""
filtered_crosswalk = filter_out_boiler_rows(crosswalk)
key_map = filter_crosswalk_by_epacems(filtered_crosswalk, epacems)
return key_map