"""Connect FERC1 plant tables to EIA's plant-parts via record linkage.
FERC plant records are reported very non-uniformly. In the same table there are records
that are reported as whole plants, individual generators, and collections of prime
movers. This means portions of EIA plants that correspond to a plant record in FERC
Form 1 are heterogeneous, which complicates using the two data sets together.
The EIA plant data is much cleaner and more uniformly structured. The are generators
with ids and plants with ids reported in *seperate* tables. Several generator IDs are
typically grouped under a single plant ID. In :mod:`pudl.analysis.plant_parts_eia`,
we create a large number of synthetic aggregated records representing many possible
slices of a power plant which could in theory be what is actually reported in the FERC
Form 1.
In this module we infer which of the many ``plant_parts_eia`` records is most likely to
correspond to an actually reported FERC Form 1 plant record. this is done with a
logistic regression model. The :mod:`recordlinkage` package helps us create feature
vectors (via :meth:`Features.make_features`) for each candidate match between FERC and
EIA.
We train the logistic regression model using manually labeled training data that links
together several thousand EIA and FERC plant records, and use grid search cross
validation to select a best set of hyperparameters. This trained model is used to
predict matches on the full dataset (see :func:`run_model`). The model can return
multiple EIA match options for each FERC1 record, so we rank the matches and choose the
one with the highest score (see :func:`find_best_matches`). Any matches identified by
the model which are in conflict with our training data are overwritten with the manually
assigned associations (see :func:`overwrite_bad_predictions`). The final match results
are the connections we keep as the matches between FERC1 plant records and EIA
plant-parts.
"""
import importlib.resources
from typing import Literal
import numpy as np
import pandas as pd
import recordlinkage as rl
from dagster import asset
from recordlinkage.compare import Exact, Numeric, String # , Date
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support
from sklearn.model_selection import GridSearchCV # , cross_val_score
import pudl
import pudl.helpers
from pudl.analysis.plant_parts_eia import match_to_single_plant_part
from pudl.metadata.classes import DataSource, Resource
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
# Silence the recordlinkage logger, which is out of control
@asset(
io_manager_key="pudl_sqlite_io_manager",
compute_kind="Python",
)
[docs]
def out__yearly_plants_all_ferc1_plant_parts_eia(
denorm_plants_all_ferc1: pd.DataFrame,
denorm_fuel_by_plant_ferc1: pd.DataFrame,
plant_parts_eia: pd.DataFrame,
) -> pd.DataFrame:
"""Coordinate the connection between FERC1 plants and EIA plant-parts.
Args:
denorm_plants_all_ferc1: Table of all of the FERC1-reporting plants.
denorm_fuel_by_plant_ferc1: Table of the fuel reported aggregated to the FERC1
plant-level.
plant_parts_eia: The EIA plant parts list.
"""
inputs = InputManager(
denorm_plants_all_ferc1, denorm_fuel_by_plant_ferc1, plant_parts_eia
)
# compile/cache inputs upfront. Hopefully we can catch any errors in inputs early.
inputs.execute()
features_all = Features(feature_type="all", inputs=inputs).get_features(
clobber=False
)
features_train = Features(feature_type="training", inputs=inputs).get_features(
clobber=False
)
match_df = run_model(
features_train=features_train,
features_all=features_all,
train_df=inputs.train_df,
)
# choose one EIA match for each FERC record
best_match_df = find_best_matches(match_df).pipe(
overwrite_bad_predictions, inputs.train_df
)
# join EIA and FERC columns back on
connects_ferc1_eia = prettyify_best_matches(
best_match_df,
train_df=inputs.get_train_df(),
plant_parts_eia_true=inputs.get_plant_parts_eia_true(),
plants_ferc1=inputs.get_plants_ferc1(),
).pipe(add_null_overrides) # Override specified values with NA record_id_eia
connects_ferc1_eia = Resource.from_id(
"out__yearly_plants_all_ferc1_plant_parts_eia"
).enforce_schema(connects_ferc1_eia)
return connects_ferc1_eia
[docs]
class Features:
"""Generate feature vectors for connecting FERC and EIA."""
def __init__(self, feature_type: Literal["training", "all"], inputs: InputManager):
"""Initialize feature generator.
Args:
feature_type: Type of features to compile. Either 'training' or 'all'.
inputs: Instance of :class:`InputManager`.
"""
self.inputs: InputManager = inputs
self.features_df: pd.DataFrame | None = None
if feature_type not in ["all", "training"]:
raise ValueError(
f"feature_type {feature_type} not allowable. Must be either "
"'all' or 'training'"
)
self.feature_type = feature_type
# the input_dict is going to help in standardizing how we generate
# features. Based on the feature_type (keys), the latter methods will
# know which dataframes to use as inputs for ``make_features()``
self.input_dict = {
"all": {
"ferc1_df": self.inputs.get_plants_ferc1,
"eia_df": self.inputs.get_plant_parts_eia_true,
},
"training": {
"ferc1_df": self.inputs.get_train_ferc1,
"eia_df": self.inputs.get_train_eia,
},
}
[docs]
def make_features(
self, ferc1_df: pd.DataFrame, eia_df: pd.DataFrame, block_col: str | None = None
) -> pd.DataFrame:
"""Generate comparison features based on defined features.
The recordlinkage package helps us create feature vectors. For each column that
we have in both datasets, this method generates a column of feature vecotrs,
which contain values between 0 and 1 that are measures of the similarity between
each datapoint the two datasets (1 meaning the two datapoints were exactly the
same and 0 meaning they were not similar at all).
For more details see recordlinkage's documentaion:
https://recordlinkage.readthedocs.io/en/latest/ref-compare.html
Args:
ferc1_df: Either training or all records from ferc plants table
(via :meth:`InputManager.get_train_ferc1` or :meth:`InputManager.get_plants_ferc1`).
eia_df: Either training or all records from the
EIA plant-parts (:meth:`InputManager.get_train_eia` or
`plant_parts_eia_true`).
block_col: If you want to restrict possible matches
between ferc_df and eia_df based on a particular column,
block_col is the column name of blocking column. Default is
None. If None, this method will generate features between all
possible matches.
Returns:
a dataframe of feature vectors between FERC and EIA.
"""
compare_cl = rl.Compare(
features=[
String(
"plant_name_ferc1",
"plant_name_ppe",
label="plant_name",
method="jarowinkler",
),
Numeric(
"net_generation_mwh",
"net_generation_mwh",
label="net_generation_mwh",
method="exp",
scale=1000,
),
Numeric(
"capacity_mw",
"capacity_mw",
label="capacity_mw",
method="exp",
scale=10,
),
Numeric(
"total_fuel_cost",
"total_fuel_cost",
label="total_fuel_cost",
method="exp",
offset=2500,
scale=10000,
missing_value=0.5,
),
Numeric(
"total_mmbtu",
"total_mmbtu",
label="total_mmbtu",
method="exp",
offset=1,
scale=100,
missing_value=0.5,
),
Numeric("capacity_factor", "capacity_factor", label="capacity_factor"),
Numeric(
"fuel_cost_per_mmbtu",
"fuel_cost_per_mmbtu",
label="fuel_cost_per_mmbtu",
),
Numeric(
"heat_rate_mmbtu_mwh",
"heat_rate_mmbtu_mwh",
label="heat_rate_mmbtu_mwh",
),
Exact(
"fuel_type_code_pudl",
"fuel_type_code_pudl",
label="fuel_type_code_pudl",
),
Numeric(
"installation_year", "installation_year", label="installation_year"
),
# Exact('utility_id_pudl', 'utility_id_pudl',
# label='utility_id_pudl'),
]
)
# generate the index of all candidate features
indexer = rl.Index()
indexer.block(block_col)
feature_index = indexer.index(ferc1_df, eia_df)
features = compare_cl.compute(feature_index, ferc1_df, eia_df)
return features
[docs]
def get_features(self, clobber=False):
"""Get the feature vectors for the training matches."""
# generate feature matrixes for known/training data
if clobber or self.features_df is None:
self.features_df = self.make_features(
ferc1_df=self.input_dict[self.feature_type]["ferc1_df"](),
eia_df=self.input_dict[self.feature_type]["eia_df"](),
block_col="plant_id_report_year_util_id",
)
logger.info(
f"Generated {len(self.features_df)} {self.feature_type} "
"candidate features."
)
return self.features_df
[docs]
def run_model(
features_train: pd.DataFrame, features_all: pd.DataFrame, train_df: pd.DataFrame
) -> pd.DataFrame:
"""Train Logistic Regression model using GridSearch cross validation.
Search over the parameter grid for the best fit parameters for the
Logistic Regression estimator on the training data. Predict matches
on all the input features.
Args:
features_train: Dataframe of the feature vectors for the training data.
features_all: Dataframe of the feature vectors for all the input data.
train_df: Dataframe of the training data.
Returns:
A dataframe of matches with record_id_ferc1 and record_id_eia as the
index and a column for the probability of a match.
"""
param_grid = [
{
"solver": ["newton-cg", "lbfgs", "sag"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["l2"],
},
{
"solver": ["liblinear", "saga"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["l1", "l2"],
},
{
"solver": ["saga"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["elasticnet"],
"l1_ratio": [0.1, 0.3, 0.5, 0.7, 0.9],
},
]
x_train = features_train.to_numpy()
y_train = np.where(
features_train.merge(
train_df,
how="left",
left_index=True,
right_index=True,
indicator=True,
)["_merge"]
== "both",
1,
0,
)
lrc = LogisticRegression()
clf = GridSearchCV(estimator=lrc, param_grid=param_grid, verbose=True, n_jobs=-1)
clf.fit(X=x_train, y=y_train)
y_pred = clf.predict(x_train)
precision, recall, f_score, _ = precision_recall_fscore_support(
y_train, y_pred, average="binary"
)
accuracy = clf.best_score_
logger.info(
"Scores from the best model:\n"
f" Accuracy: {accuracy:.02}\n"
f" F-Score: {f_score:.02}\n"
f" Precision: {precision:.02}\n"
f" Recall: {recall:.02}\n"
)
preds = clf.predict(features_all.to_numpy())
probs = clf.predict_proba(features_all.to_numpy())
final_df = pd.DataFrame(
index=features_all.index, data={"match": preds, "prob_of_match": probs[:, 1]}
)
match_df = final_df[final_df.match == 1].sort_values(
by="prob_of_match", ascending=False
)
return match_df
[docs]
def find_best_matches(match_df):
"""Only keep the best EIA match for each FERC record.
We only want one EIA match for each FERC1 plant record. If there are multiple
predicted matches for a FERC1 record, the match with the highest
probability found by the model is chosen.
Args:
match_df: A dataframe of matches with record_id_eia and record_id_ferc1
as the index and a column for the probability of the match.
Returns:
Dataframe of matches with one EIA record for each FERC1 record.
"""
# sort from lowest to highest probability of match
match_df = match_df.reset_index().sort_values(
by=["record_id_ferc1", "prob_of_match"]
)
best_match_df = match_df.groupby("record_id_ferc1").tail(1)
return best_match_df
[docs]
def overwrite_bad_predictions(match_df, train_df):
"""Overwrite incorrect predictions with the correct match from training data.
Args:
match_df: A dataframe of the best matches with only one match for each
FERC1 record.
train_df: A dataframe of the training data.
"""
train_df = train_df.reset_index()
overwrite_df = pd.merge(
match_df,
train_df[["record_id_eia", "record_id_ferc1"]],
on="record_id_ferc1",
how="outer",
suffixes=("_pred", "_train"),
indicator=True,
validate="1:1",
)
# construct new record_id_eia column with incorrect preds overwritten
overwrite_df["record_id_eia"] = np.where(
overwrite_df["_merge"] == "left_only",
overwrite_df["record_id_eia_pred"],
overwrite_df["record_id_eia_train"],
)
# create a the column match_type which indicates whether the match is good
# based on the training data
overwrite_rows = (overwrite_df._merge == "both") & (
overwrite_df.record_id_eia_train != overwrite_df.record_id_eia_pred
)
correct_rows = (overwrite_df._merge == "both") & (
overwrite_df.record_id_eia_train == overwrite_df.record_id_eia_pred
)
incorrect_rows = overwrite_df._merge == "right_only"
overwrite_df.loc[:, "match_type"] = "prediction; not in training data"
overwrite_df.loc[overwrite_rows, "match_type"] = "incorrect prediction; overwritten"
overwrite_df.loc[correct_rows, "match_type"] = "correct match"
overwrite_df.loc[
incorrect_rows, "match_type"
] = "incorrect prediction; no predicted match"
# print out stats
percent_correct = len(
overwrite_df[overwrite_df.match_type == "correct match"]
) / len(train_df)
percent_overwritten = len(
overwrite_df[overwrite_df.match_type == "incorrect prediction; overwritten"]
) / len(train_df)
logger.info(
"Matches stats:\n"
f"Percent of training data matches correctly predicted: {percent_correct:.02}\n"
f"Percent of training data overwritten in matches: {percent_overwritten:.02}\n"
)
overwrite_df = overwrite_df.drop(
columns=["_merge", "record_id_eia_train", "record_id_eia_pred"]
)
return overwrite_df
[docs]
def restrict_train_connections_on_date_range(
train_df: pd.DataFrame,
id_col: Literal["record_id_eia", "record_id_ferc1"],
start_date: pd.Timestamp,
end_date: pd.Timestamp,
) -> pd.DataFrame:
"""Restrict the training data based on the date ranges of the input tables.
The training data for this model spans the full PUDL date range. We don't want to
add training data from dates that are outside of the range of the FERC and EIA data
we are attempting to match. So this function restricts the training data based on
start and end dates.
The training data is only the record IDs, which contain the report year inside them.
This function compiles a regex using the date range to grab only training records
which contain the years in the date range followed by and preceeded by ``_`` - in
the format of ``record_id_eia``and ``record_id_ferc1``. We use that extracted year
to determine
"""
# filter training data by year range
# first get list of all years to grab from training data
date_range_years_str = "|".join(
[
f"{year}"
for year in pd.date_range(start=start_date, end=end_date, freq="AS").year
]
)
logger.info(f"Restricting training data on years: {date_range_years_str}")
train_df = train_df.assign(
year_in_date_range=lambda x: x[id_col].str.extract(
r"_{1}" + f"({date_range_years_str})" + "_{1}"
)
)
# pd.drop returns a copy, so no need to copy this portion of train_df
return train_df.loc[train_df.year_in_date_range.notnull()].drop(
columns=["year_in_date_range"]
)
[docs]
def prep_train_connections(
ppe: pd.DataFrame, start_date: pd.Timestamp, end_date: pd.Timestamp
) -> pd.DataFrame:
"""Get and prepare the training connections for the model.
We have stored training data, which consists of records with ids
columns for both FERC and EIA. Those id columns serve as a connection
between ferc1 plants and the EIA plant-parts. These connections
indicate that a ferc1 plant records is reported at the same granularity
as the connected EIA plant-parts record.
Arguments:
ppe: The EIA plant parts. Records from this dataframe will be connected to the
training data records. This needs to be the full EIA plant parts, not just
the distinct/true granularities because the training data could contain
non-distinct records and this function reassigns those to their distinct
counterparts.
start_date: Beginning date for records from the training data. Should match the
start date of ``ppe``. Default is None and all the training data will be used.
end_date: Ending date for records from the training data. Should match the end
date of ``ppe``. Default is None and all the training data will be used.
Returns:
A dataframe of training connections which has a MultiIndex of ``record_id_eia``
and ``record_id_ferc1``.
"""
ppe_cols = [
"true_gran",
"appro_part_label",
"appro_record_id_eia",
"plant_part",
"ownership_dupe",
]
# Read in one_to_many csv and join corresponding plant_match_ferc1 parts to FERC IDs
one_to_many = (
pd.read_csv(
importlib.resources.files("pudl.package_data.glue")
/ "ferc1_eia_one_to_many.csv"
)
.pipe(pudl.helpers.cleanstrings_snake, ["record_id_eia"])
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
)
# Get the 'm' generator IDs 1:m
one_to_many_single = match_to_single_plant_part(
multi_gran_df=ppe.loc[ppe.index.isin(one_to_many.record_id_eia)].reset_index(),
ppl=ppe.reset_index(),
part_name="plant_gen",
cols_to_keep=["plant_part"],
)[["record_id_eia_og", "record_id_eia"]].rename(
columns={"record_id_eia": "gen_id", "record_id_eia_og": "record_id_eia"}
)
one_to_many = (
one_to_many.merge(
one_to_many_single, # Match plant parts to generators
on="record_id_eia",
how="left",
validate="1:m",
)
.drop_duplicates("gen_id")
.merge( # Match generators to ferc1_generator_agg_id
ppe["ferc1_generator_agg_id"].reset_index(),
left_on="gen_id",
right_on="record_id_eia",
how="left",
validate="1:1",
)
.dropna(subset=["ferc1_generator_agg_id"])
.drop(["record_id_eia_x", "record_id_eia_y"], axis=1)
.merge( # Match ferc1_generator_agg_id to new faked plant part record_id_eia
ppe.loc[
ppe.plant_part == "plant_match_ferc1",
["ferc1_generator_agg_id"],
].reset_index(),
on="ferc1_generator_agg_id",
how="left",
validate="m:1",
)
.drop(["ferc1_generator_agg_id", "gen_id"], axis=1)
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
.set_index("record_id_ferc1")
)
train_df = (
pd.read_csv(
importlib.resources.files("pudl.package_data.glue") / "ferc1_eia_train.csv"
)
.pipe(pudl.helpers.cleanstrings_snake, ["record_id_eia"])
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
.set_index("record_id_ferc1")
)
logger.info(f"Updating {len(one_to_many)} training records with 1:m plant parts.")
train_df.update(one_to_many) # Overwrite FERC records with faked 1:m parts.
train_df = (
# we want to ensure that the records are associated with a
# "true granularity" - which is a way we filter out whether or
# not each record in the EIA plant-parts is actually a
# new/unique collection of plant parts
# once the true_gran is dealt with, we also need to convert the
# records which are ownership dupes to reflect their "total"
# ownership counterparts
train_df.reset_index()
.pipe(
restrict_train_connections_on_date_range,
id_col="record_id_eia",
start_date=start_date,
end_date=end_date,
)
.merge(
ppe[ppe_cols].reset_index(),
how="left",
on=["record_id_eia"],
indicator=True,
)
)
not_in_ppe = train_df[train_df._merge == "left_only"]
if not not_in_ppe.empty:
raise AssertionError(
"Not all training data is associated with EIA records.\n"
"record_id_ferc1's of bad training data records are: "
f"{list(not_in_ppe.reset_index().record_id_ferc1)}"
)
train_df = (
train_df.assign(
plant_part=lambda x: x["appro_part_label"],
record_id_eia=lambda x: x["appro_record_id_eia"],
)
.pipe(pudl.analysis.plant_parts_eia.reassign_id_ownership_dupes)
.fillna(
value={
"record_id_eia": pd.NA,
}
)
.set_index( # recordlinkage and sklearn wants MultiIndexs to do the stuff
[
"record_id_ferc1",
"record_id_eia",
]
)
)
train_df = train_df.drop(columns=ppe_cols + ["_merge"])
return train_df
[docs]
def prettyify_best_matches(
matches_best: pd.DataFrame,
plant_parts_eia_true: pd.DataFrame,
plants_ferc1: pd.DataFrame,
train_df: pd.DataFrame,
debug: bool = False,
) -> pd.DataFrame:
"""Make the EIA-FERC best matches usable.
Use the ID columns from the best matches to merge together both EIA plant-parts data
and FERC plant data. This removes the comparison vectors (the floats between 0 and 1
that compare the two columns from each dataset).
"""
connects_ferc1_eia = (
# first merge in the EIA plant-parts
pd.merge(
matches_best[["record_id_ferc1", "record_id_eia", "match_type"]],
plant_parts_eia_true.reset_index(),
how="left",
on=["record_id_eia"],
validate="m:1", # multiple FERC records can have the same EIA match
)
# then merge in the FERC data we want the backbone of this table to be
# the plant records so we have all possible FERC plant records, even
# the unmapped ones
.merge(
plants_ferc1,
how="outer",
on=["record_id_ferc1"],
suffixes=("_eia", "_ferc1"),
validate="1:1",
indicator=True,
)
)
# now we have some important cols that have dataset suffixes that we want to condense
def fill_eia_w_ferc1(x, col):
return x[f"{col}_eia"].fillna(x[f"{col}_ferc1"])
condense_cols = ["report_year", "plant_id_pudl", "utility_id_pudl"]
connects_ferc1_eia = (
connects_ferc1_eia.assign(
**{col: fill_eia_w_ferc1(connects_ferc1_eia, col) for col in condense_cols}
)
.drop(
columns=[
col + dataset for col in condense_cols for dataset in ["_eia", "_ferc1"]
]
)
.assign(
report_date=lambda x: pd.to_datetime(
x.report_year, format="%Y", errors="coerce"
),
)
)
no_ferc = connects_ferc1_eia[
(connects_ferc1_eia._merge == "left_only")
& (connects_ferc1_eia.record_id_eia.notnull())
& ~(connects_ferc1_eia.record_id_ferc1.str.contains("_hydro_", na=False))
& ~(connects_ferc1_eia.record_id_ferc1.str.contains("_gnrt_plant_", na=False))
]
connects_ferc1_eia = connects_ferc1_eia.drop(columns=["_merge"])
if not no_ferc.empty:
message = (
"Help. \nI'm trapped in this computer and I can't get out.\n"
".... jk there shouldn't be any matches between FERC and EIA\n"
"that have EIA matches but aren't in the FERC plant table, but we\n"
f"got {len(no_ferc)}. Check the training data and "
"prettyify_best_matches()"
)
if debug:
logger.warning(message)
return no_ferc
logger.info(
"grrrr there are some FERC-EIA matches that aren't in the steam \
table but this is because they are linked to retired EIA generators."
)
logger.warning(message)
_log_match_coverage(connects_ferc1_eia)
for match_set in ["all", "overrides"]:
check_match_consistency(
connects_ferc1_eia,
train_df,
match_set=match_set,
)
return connects_ferc1_eia
[docs]
def _log_match_coverage(connects_ferc1_eia):
eia_years = DataSource.from_id("eia860").working_partitions["years"]
# get the matches from just the EIA working years
matches = connects_ferc1_eia[
(connects_ferc1_eia.report_date.dt.year.isin(eia_years))
& (connects_ferc1_eia.record_id_eia.notnull())
]
# get all records from just the EIA working years
possible_records = connects_ferc1_eia[
connects_ferc1_eia.report_date.dt.year.isin(eia_years)
]
fuel_type_coverage = len(matches[matches.energy_source_code_1.notnull()]) / len(
matches
)
tech_type_coverage = len(matches[matches.technology_description.notnull()]) / len(
matches
)
def _get_subtable(table_name):
return possible_records[
possible_records.record_id_ferc1.str.contains(f"{table_name}")
]
def _get_match_pct(df):
return len(df[df["record_id_eia"].notna()]) / len(df)
logger.info(
"Coverage for matches during EIA working years:\n"
f" Fuel type: {fuel_type_coverage:.01%}\n"
f" Tech type: {tech_type_coverage:.01%}\n"
"Coverage for all steam table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('steam')):.01%}\n"
f"Coverage for all small gen table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('gnrt_plant')):.01%}\n"
f"Coverage for all hydro table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('hydro')):.01%}\n"
f"Coverage for all pumped storage table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('pumped')):.01%}"
)
[docs]
def check_match_consistency(
connects_ferc1_eia: pd.DataFrame,
train_df: pd.DataFrame,
match_set: Literal["all", "overrides"] = "all",
) -> pd.DataFrame:
"""Check how consistent matches are across time.
Args:
connects_ferc1_eia: Matches of FERC1 to EIA.
train_df: training data.
match_set: either ``all`` - to check all of the matches - or ``overrides`` - to
check just the overrides. Default is'``all``. The overrides are less
consistent than all of the data, so this argument changes the consistency
threshold for this check.
"""
# these are the default
consistency = 0.75
consistency_one_cap_ferc = 0.85
mask = connects_ferc1_eia.record_id_eia.notnull()
if match_set == "overrides":
consistency = 0.39
consistency_one_cap_ferc = 0.75
train_ferc1 = train_df.reset_index()
# these bbs were missing from connects_ferc1_eia. not totally sure why
missing = [
"f1_steam_2018_12_51_0_1",
"f1_steam_2018_12_45_2_2",
"f1_steam_2018_12_45_2_1",
"f1_steam_2018_12_45_1_2",
"f1_steam_2018_12_45_1_1",
"f1_steam_2018_12_45_1_5",
"f1_steam_2018_12_45_1_4",
"f1_steam_2018_12_56_2_3",
]
over_f1 = (
train_ferc1[
train_ferc1.record_id_ferc1.str.contains("_steam_")
& ~train_ferc1.record_id_ferc1.isin(missing)
]
.set_index("record_id_ferc1")
.index
)
over_ferc1_ids = (
connects_ferc1_eia.set_index("record_id_ferc1")
.loc[over_f1]
.plant_id_ferc1.unique()
)
mask = mask & connects_ferc1_eia.plant_id_ferc1.isin(over_ferc1_ids)
count = (
connects_ferc1_eia[mask]
.groupby(["plant_id_ferc1"])[["plant_part_id_eia", "capacity_mw_ferc1"]]
.nunique()
)
consist = len(count[count.plant_part_id_eia == 1]) / len(count)
logger.info(
f"Matches with consistency across years of {match_set} matches is "
f"{consist:.1%}"
)
if consist < consistency:
raise AssertionError(
f"Consistency of {match_set} matches across years dipped below "
f"{consistency:.1%} to {consist:.1%}"
)
consist_one_cap_ferc = (
len(count)
- len(count[(count.plant_part_id_eia > 1) & (count.capacity_mw_ferc1 == 1)])
) / len(count)
logger.info(
"Matches with completely consistent FERC capacity have a consistency "
f"of {consist_one_cap_ferc:.1%}"
)
if consist_one_cap_ferc < consistency_one_cap_ferc:
raise AssertionError(
"Consistency of matches with consistent FERC capcity dipped below "
f"{consistency_one_cap_ferc:.1%} to {consist_one_cap_ferc:.1%}"
)
return count
[docs]
def add_null_overrides(connects_ferc1_eia):
"""Override known null matches with pd.NA.
There is no way to indicate in the training data that certain FERC records have no
proper EIA match. That is to say--you can't specifiy a blank match or tell the AI
not to match a given record. Because we've gone through by hand and know for a fact
that some FERC records have no EIA match (even when you aggregate generators), we
have to add in these null matches after the fact.
This function reads in a list of record_id_ferc1 values that are known to have no
cooresponding EIA record match and makes sure they are mapped as NA in the final
record linkage output. It also updates the match_type field to indicate that this
value has been overriden.
"""
logger.info("Overriding specified record_id_ferc1 values with NA record_id_eia")
# Get record_id_ferc1 values that should be overriden to have no EIA match
null_overrides = pd.read_csv(
importlib.resources.files("pudl.package_data.glue") / "ferc1_eia_null.csv"
).pipe(
restrict_train_connections_on_date_range,
id_col="record_id_ferc1",
start_date=min(
connects_ferc1_eia[~(connects_ferc1_eia.record_id_eia.isnull())].report_date
),
end_date=max(
connects_ferc1_eia[~(connects_ferc1_eia.record_id_eia.isnull())].report_date
),
)
# Make sure there is content!
if null_overrides.empty:
raise AssertionError(
f"No null overrides found. Consider checking for file at {null_overrides}"
)
logger.debug(f"Found {len(null_overrides)} null overrides")
# List of EIA columns to null. Ideally would like to get this from elsewhere, but
# compiling this here for now...
eia_cols_to_null = Resource.from_id("plant_parts_eia").get_field_names()
# Make all EIA values NA for record_id_ferc1 values in the Null overrides list and
# make the match_type column say "overriden"
connects_ferc1_eia.loc[
connects_ferc1_eia["record_id_ferc1"].isin(null_overrides.record_id_ferc1),
eia_cols_to_null,
] = np.nan
connects_ferc1_eia.loc[
connects_ferc1_eia["record_id_ferc1"].isin(null_overrides.record_id_ferc1),
"match_type",
] = "overridden"
return connects_ferc1_eia