"""Connect FERC1 plant tables to EIA's plant-parts via record linkage.
FERC plant records are reported very non-uniformly. In the same table there are records
that are reported as whole plants, individual generators, and collections of prime
movers. This means portions of EIA plants that correspond to a plant record in FERC
Form 1 are heterogeneous, which complicates using the two data sets together.
The EIA plant data is much cleaner and more uniformly structured. The are generators
with ids and plants with ids reported in *seperate* tables. Several generator IDs are
typically grouped under a single plant ID. In :mod:`pudl.analysis.plant_parts_eia`,
we create a large number of synthetic aggregated records representing many possible
slices of a power plant which could in theory be what is actually reported in the FERC
Form 1.
In this module we infer which of the many ``plant_parts_eia`` records is most likely to
correspond to an actually reported FERC Form 1 plant record. this is done with a
logistic regression model.
We train the logistic regression model using manually labeled training data that links
together several thousand EIA and FERC plant records, and use grid search cross
validation to select a best set of hyperparameters. This trained model is used to
predict matches on the full dataset (see :func:`run_model`). The model can return
multiple EIA match options for each FERC1 record, so we rank the matches and choose the
one with the highest score (see :func:`find_best_matches`). Any matches identified by
the model which are in conflict with our training data are overwritten with the manually
assigned associations (see :func:`overwrite_bad_predictions`). The final match results
are the connections we keep as the matches between FERC1 plant records and EIA
plant-parts.
"""
import importlib.resources
from typing import Literal
import mlflow
import numpy as np
import pandas as pd
from dagster import Out, graph, op
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support
from sklearn.model_selection import GridSearchCV, train_test_split
import pudl
import pudl.helpers
from pudl.analysis.ml_tools import experiment_tracking, models
from pudl.analysis.plant_parts_eia import match_to_single_plant_part
from pudl.analysis.record_linkage import embed_dataframe
from pudl.metadata.classes import DataSource, Resource
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
pair_vectorizers = embed_dataframe.dataframe_embedder_factory(
"ferc1_eia_pair_vectorizers",
{
"plant_name": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.NameCleaner(),
embed_dataframe.StringSimilarityScorer(
metric="jaro_winkler",
col1="plant_name_ferc1",
col2="plant_name_eia",
output_name="plant_name",
),
],
columns=["plant_name_ferc1", "plant_name_eia"],
),
"utility_name": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.NameCleaner(),
embed_dataframe.StringSimilarityScorer(
metric="jaro_winkler",
col1="utility_name_ferc1",
col2="utility_name_eia",
output_name="utility_name",
),
],
columns=["utility_name_ferc1", "utility_name_eia"],
),
"net_generation_mwh": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="exponential",
col1="net_generation_mwh_ferc1",
col2="net_generation_mwh_eia",
output_name="net_generation_mwh",
scale=1000,
),
],
columns=["net_generation_mwh_ferc1", "net_generation_mwh_eia"],
),
"capacity_mw": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="exponential",
col1="capacity_mw_ferc1",
col2="capacity_mw_eia",
output_name="capacity_mw",
scale=10,
),
],
columns=["capacity_mw_ferc1", "capacity_mw_eia"],
),
"total_fuel_cost": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="exponential",
col1="total_fuel_cost_ferc1",
col2="total_fuel_cost_eia",
output_name="total_fuel_cost",
scale=10000,
offset=2500,
missing_value=0.5,
),
],
columns=["total_fuel_cost_ferc1", "total_fuel_cost_eia"],
),
"total_mmbtu": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="exponential",
col1="total_mmbtu_ferc1",
col2="total_mmbtu_eia",
output_name="total_mmbtu",
scale=100,
offset=1,
missing_value=0.5,
),
],
columns=["total_mmbtu_ferc1", "total_mmbtu_eia"],
),
"capacity_factor": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="linear",
col1="capacity_factor_ferc1",
col2="capacity_factor_eia",
output_name="capacity_factor",
),
],
columns=["capacity_factor_ferc1", "capacity_factor_eia"],
),
"fuel_cost_per_mmbtu": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="linear",
col1="fuel_cost_per_mmbtu_ferc1",
col2="fuel_cost_per_mmbtu_eia",
output_name="fuel_cost_per_mmbtu",
),
],
columns=["fuel_cost_per_mmbtu_ferc1", "fuel_cost_per_mmbtu_eia"],
),
"heat_rate_mmbtu_mwh": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericSimilarityScorer(
method="linear",
col1="unit_heat_rate_mmbtu_per_mwh_ferc1",
col2="unit_heat_rate_mmbtu_per_mwh_eia",
output_name="heat_rate_mmbtu_mwh",
),
],
columns=[
"unit_heat_rate_mmbtu_per_mwh_ferc1",
"unit_heat_rate_mmbtu_per_mwh_eia",
],
),
"fuel_type_code_pudl": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"),
embed_dataframe.NumericSimilarityScorer(
method="exact",
col1="fuel_type_code_pudl_ferc1",
col2="fuel_type_code_pudl_eia",
output_name="fuel_type_code_pudl",
),
],
columns=["fuel_type_code_pudl_ferc1", "fuel_type_code_pudl_eia"],
),
"installation_year": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.NumericSimilarityScorer(
method="linear",
col1="installation_year_ferc1",
col2="installation_year_eia",
output_name="installation_year",
)
],
columns=["installation_year_ferc1", "installation_year_eia"],
),
},
)
@op
@op(out={"all_pairs_df": Out(), "train_pairs_df": Out()})
[docs]
def get_pairs_dfs(inputs):
"""Get a dataframe with all possible FERC to EIA record pairs.
Merge the FERC and EIA records on ``block_col`` to generate possible
record pairs for the matching model.
Arguments:
inputs: :class:`InputManager` object.
Returns:
A dataframe with all possible record pairs from all the input
data and a dataframe with all possible record pairs from the
training data.
"""
ferc1_df = inputs.get_plants_ferc1().reset_index()
eia_df = inputs.get_plant_parts_eia_true().reset_index()
block_col = "plant_id_report_year_util_id"
all_pairs_df = ferc1_df.merge(
eia_df, how="inner", on=block_col, suffixes=("_ferc1", "_eia")
).set_index(["record_id_ferc1", "record_id_eia"])
ferc1_train_df = inputs.get_train_ferc1().reset_index()
eia_train_df = inputs.get_train_eia().reset_index()
block_col = "plant_id_report_year_util_id"
train_pairs_df = ferc1_train_df.merge(
eia_train_df, how="inner", on=block_col, suffixes=("_ferc1", "_eia")
).set_index(["record_id_ferc1", "record_id_eia"])
return (all_pairs_df, train_pairs_df)
@op
[docs]
def get_y_label_df(train_pairs_df, inputs):
"""Get the dataframe of y labels.
For each record pair in ``train_pairs_df``, a 0 if the pair is not
a match and a 1 if the pair is a match.
"""
label_df = np.where(
train_pairs_df.merge(
inputs.get_train_df(),
how="left",
left_index=True,
right_index=True,
indicator=True,
)["_merge"]
== "both",
1,
0,
)
return label_df
@op
[docs]
def get_best_matches_with_overwrites(match_df, inputs):
"""Get dataframe with the best EIA match for each FERC record."""
return find_best_matches(match_df).pipe(overwrite_bad_predictions, inputs.train_df)
@op
[docs]
def run_matching_model(features_train, features_all, y_df, experiment_tracker):
"""Run model to match EIA to FERC records."""
return run_model(
features_train=features_train,
features_all=features_all,
y_df=y_df,
experiment_tracker=experiment_tracker,
)
@op(
out={
"out_pudl__yearly_assn_eia_ferc1_plant_parts": Out(
io_manager_key="pudl_io_manager"
)
}
)
[docs]
def get_match_full_records(best_match_df, inputs):
"""Join full dataframe onto matches to make usable and get stats."""
connected_df = prettyify_best_matches(
matches_best=best_match_df,
plant_parts_eia_true=inputs.get_plant_parts_eia_true(),
plants_ferc1=inputs.get_plants_ferc1(),
train_df=inputs.get_train_df(),
).pipe(add_null_overrides) # Override specified values with NA record_id_eia
return Resource.from_id(
"out_pudl__yearly_assn_eia_ferc1_plant_parts"
).enforce_schema(connected_df)
@models.pudl_model(
"out_pudl__yearly_assn_eia_ferc1_plant_parts",
config_from_yaml=False,
)
@graph
[docs]
def ferc_to_eia(
experiment_tracker: experiment_tracking.ExperimentTracker,
out_ferc1__yearly_all_plants: pd.DataFrame,
out_ferc1__yearly_steam_plants_fuel_by_plant_sched402: pd.DataFrame,
out_eia__yearly_plant_parts: pd.DataFrame,
) -> pd.DataFrame:
"""Coordinate the connection between FERC1 plants and EIA plant-parts.
Args:
out_ferc1__yearly_all_plants: Table of all of the FERC1-reporting plants.
out_ferc1__yearly_steam_plants_fuel_by_plant_sched402: Table of the fuel
reported aggregated to the FERC1 plant-level.
out_eia__yearly_plant_parts: The EIA plant parts list.
"""
inputs = get_compiled_input_manager(
out_ferc1__yearly_all_plants,
out_ferc1__yearly_steam_plants_fuel_by_plant_sched402,
out_eia__yearly_plant_parts,
)
all_pairs_df, train_pairs_df = get_pairs_dfs(inputs)
features_all = pair_vectorizers(all_pairs_df, experiment_tracker)
features_train = pair_vectorizers(train_pairs_df, experiment_tracker)
y_df = get_y_label_df(train_pairs_df, inputs)
match_df = run_matching_model(
features_train=features_train,
features_all=features_all,
y_df=y_df,
experiment_tracker=experiment_tracker,
)
# choose one EIA match for each FERC record
best_match_df = get_best_matches_with_overwrites(match_df, inputs)
# join EIA and FERC columns back on
ferc1_eia_connected_df = get_match_full_records(best_match_df, inputs)
return ferc1_eia_connected_df
[docs]
def run_model(
features_train: pd.DataFrame,
features_all: pd.DataFrame,
y_df: pd.DataFrame,
experiment_tracker: experiment_tracking.ExperimentTracker,
) -> pd.DataFrame:
"""Train Logistic Regression model using GridSearch cross validation.
Search over the parameter grid for the best fit parameters for the
Logistic Regression estimator on the training data. Predict matches
on all the input features.
Args:
features_train: Dataframe of the feature vectors for the training data.
features_all: Dataframe of the feature vectors for all the input data.
y_df: Dataframe with 1 if a pair in ``features_train`` is a match and 0
if a pair is not a match.
Returns:
A dataframe of matches with record_id_ferc1 and record_id_eia as the
index and a column for the probability of a match.
"""
param_grid = [
{
"solver": ["newton-cg", "lbfgs", "sag"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["l2"],
},
{
"solver": ["liblinear", "saga"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["l1", "l2"],
},
{
"solver": ["saga"],
"C": [1000, 1, 10, 100],
"class_weight": [None, "balanced"],
"penalty": ["elasticnet"],
"l1_ratio": [0.1, 0.3, 0.5, 0.7, 0.9],
},
]
X = features_train.matrix # noqa: N806
X_train, X_test, y_train, y_test = train_test_split( # noqa: N806
X, y_df, test_size=0.25, random_state=16
)
lrc = LogisticRegression()
clf = GridSearchCV(estimator=lrc, param_grid=param_grid, verbose=True, n_jobs=-1)
clf.fit(X=X_train, y=y_train)
# Log best parameters
experiment_tracker.execute_logging(lambda: mlflow.log_params(clf.best_params_))
y_pred = clf.predict(X_test)
precision, recall, f_score, _ = precision_recall_fscore_support(
y_test, y_pred, average="binary"
)
accuracy = clf.best_score_
logger.info(
"Scores from the best model:\n"
f" Accuracy: {accuracy:.02}\n"
f" F-Score: {f_score:.02}\n"
f" Precision: {precision:.02}\n"
f" Recall: {recall:.02}\n"
)
# Log model metrics
experiment_tracker.execute_logging(
lambda: mlflow.log_metrics(
{
"accuracy": accuracy,
"f_score": f_score,
"precision": precision,
"recall": recall,
}
)
)
preds = clf.predict(features_all.matrix)
probs = clf.predict_proba(features_all.matrix)
final_df = pd.DataFrame(
index=features_all.index, data={"match": preds, "prob_of_match": probs[:, 1]}
)
match_df = final_df[final_df.match == 1].sort_values(
by="prob_of_match", ascending=False
)
return match_df
[docs]
def find_best_matches(match_df):
"""Only keep the best EIA match for each FERC record.
We only want one EIA match for each FERC1 plant record. If there are multiple
predicted matches for a FERC1 record, the match with the highest
probability found by the model is chosen.
Args:
match_df: A dataframe of matches with record_id_eia and record_id_ferc1
as the index and a column for the probability of the match.
Returns:
Dataframe of matches with one EIA record for each FERC1 record.
"""
# sort from lowest to highest probability of match
match_df = match_df.reset_index().sort_values(
by=["record_id_ferc1", "prob_of_match"]
)
best_match_df = match_df.groupby("record_id_ferc1").tail(1)
return best_match_df
[docs]
def overwrite_bad_predictions(
match_df: pd.DataFrame, train_df: pd.DataFrame
) -> pd.DataFrame:
"""Overwrite incorrect predictions with the correct match from training data.
Args:
match_df: A dataframe of the best matches with only one match for each
FERC1 record.
train_df: A dataframe of the training data.
"""
train_df = train_df.reset_index()
overwrite_df = pd.merge(
match_df,
train_df[["record_id_eia", "record_id_ferc1"]],
on="record_id_ferc1",
how="outer",
suffixes=("_pred", "_train"),
indicator=True,
validate="1:1",
)
# construct new record_id_eia column with incorrect preds overwritten
overwrite_df["record_id_eia"] = np.where(
overwrite_df["_merge"] == "left_only",
overwrite_df["record_id_eia_pred"],
overwrite_df["record_id_eia_train"],
)
# create a the column match_type which indicates whether the match is good
# based on the training data
overwrite_rows = (overwrite_df._merge == "both") & (
overwrite_df.record_id_eia_train != overwrite_df.record_id_eia_pred
)
correct_rows = (overwrite_df._merge == "both") & (
overwrite_df.record_id_eia_train == overwrite_df.record_id_eia_pred
)
incorrect_rows = overwrite_df._merge == "right_only"
overwrite_df.loc[:, "match_type"] = "prediction; not in training data"
overwrite_df.loc[overwrite_rows, "match_type"] = "incorrect prediction; overwritten"
overwrite_df.loc[correct_rows, "match_type"] = "correct match"
overwrite_df.loc[
incorrect_rows, "match_type"
] = "incorrect prediction; no predicted match"
# print out stats
percent_correct = len(
overwrite_df[overwrite_df.match_type == "correct match"]
) / len(train_df)
percent_overwritten = len(
overwrite_df[overwrite_df.match_type == "incorrect prediction; overwritten"]
) / len(train_df)
logger.info(
"Matches stats:\n"
f"Percent of training data matches correctly predicted: {percent_correct:.02}\n"
f"Percent of training data overwritten in matches: {percent_overwritten:.02}\n"
)
overwrite_df = overwrite_df.drop(
columns=["_merge", "record_id_eia_train", "record_id_eia_pred"]
)
return overwrite_df
[docs]
def restrict_train_connections_on_date_range(
train_df: pd.DataFrame,
id_col: Literal["record_id_eia", "record_id_ferc1"],
start_date: pd.Timestamp,
end_date: pd.Timestamp,
) -> pd.DataFrame:
"""Restrict the training data based on the date ranges of the input tables.
The training data for this model spans the full PUDL date range. We don't want to
add training data from dates that are outside of the range of the FERC and EIA data
we are attempting to match. So this function restricts the training data based on
start and end dates.
The training data is only the record IDs, which contain the report year inside them.
This function compiles a regex using the date range to grab only training records
which contain the years in the date range followed by and preceeded by ``_`` - in
the format of ``record_id_eia``and ``record_id_ferc1``. We use that extracted year
to determine
"""
# filter training data by year range
# first get list of all years to grab from training data
date_range_years_str = "|".join(
[
f"{year}"
for year in pd.date_range(start=start_date, end=end_date, freq="YS").year
]
)
logger.info(f"Restricting training data on years: {date_range_years_str}")
train_df = train_df.assign(
year_in_date_range=lambda x: x[id_col].str.extract(
r"_{1}" + f"({date_range_years_str})" + "_{1}"
)
)
# pd.drop returns a copy, so no need to copy this portion of train_df
return train_df.loc[train_df.year_in_date_range.notnull()].drop(
columns=["year_in_date_range"]
)
[docs]
def prep_train_connections(
ppe: pd.DataFrame, start_date: pd.Timestamp, end_date: pd.Timestamp
) -> pd.DataFrame:
"""Get and prepare the training connections for the model.
We have stored training data, which consists of records with ids
columns for both FERC and EIA. Those id columns serve as a connection
between ferc1 plants and the EIA plant-parts. These connections
indicate that a ferc1 plant records is reported at the same granularity
as the connected EIA plant-parts record.
Arguments:
ppe: The EIA plant parts. Records from this dataframe will be connected to the
training data records. This needs to be the full EIA plant parts, not just
the distinct/true granularities because the training data could contain
non-distinct records and this function reassigns those to their distinct
counterparts.
start_date: Beginning date for records from the training data. Should match the
start date of ``ppe``. Default is None and all the training data will be used.
end_date: Ending date for records from the training data. Should match the end
date of ``ppe``. Default is None and all the training data will be used.
Returns:
A dataframe of training connections which has a MultiIndex of ``record_id_eia``
and ``record_id_ferc1``.
"""
ppe_cols = [
"true_gran",
"appro_part_label",
"appro_record_id_eia",
"plant_part",
"ownership_dupe",
]
# Read in one_to_many csv and join corresponding plant_match_ferc1 parts to FERC IDs
one_to_many = (
pd.read_csv(
importlib.resources.files("pudl.package_data.glue")
/ "eia_ferc1_one_to_many.csv"
)
.pipe(pudl.helpers.cleanstrings_snake, ["record_id_eia"])
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
)
# Get the 'm' generator IDs 1:m
one_to_many_single = match_to_single_plant_part(
multi_gran_df=ppe.loc[ppe.index.isin(one_to_many.record_id_eia)].reset_index(),
ppl=ppe.reset_index(),
part_name="plant_gen",
cols_to_keep=["plant_part"],
)[["record_id_eia_og", "record_id_eia"]].rename(
columns={"record_id_eia": "gen_id", "record_id_eia_og": "record_id_eia"}
)
one_to_many = (
one_to_many.merge(
one_to_many_single, # Match plant parts to generators
on="record_id_eia",
how="left",
validate="1:m",
)
.drop_duplicates("gen_id")
.merge( # Match generators to ferc1_generator_agg_id
ppe["ferc1_generator_agg_id"].reset_index(),
left_on="gen_id",
right_on="record_id_eia",
how="left",
validate="1:1",
)
.dropna(subset=["ferc1_generator_agg_id"])
.drop(["record_id_eia_x", "record_id_eia_y"], axis=1)
.merge( # Match ferc1_generator_agg_id to new faked plant part record_id_eia
ppe.loc[
ppe.plant_part == "plant_match_ferc1",
["ferc1_generator_agg_id"],
].reset_index(),
on="ferc1_generator_agg_id",
how="left",
validate="m:1",
)
.drop(["ferc1_generator_agg_id", "gen_id"], axis=1)
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
.set_index("record_id_ferc1")
)
train_df = (
pd.read_csv(
importlib.resources.files("pudl.package_data.glue") / "eia_ferc1_train.csv"
)
.pipe(pudl.helpers.cleanstrings_snake, ["record_id_eia"])
.drop_duplicates(subset=["record_id_ferc1", "record_id_eia"])
.set_index("record_id_ferc1")
)
logger.info(f"Updating {len(one_to_many)} training records with 1:m plant parts.")
train_df.update(one_to_many) # Overwrite FERC records with faked 1:m parts.
train_df = (
# we want to ensure that the records are associated with a
# "true granularity" - which is a way we filter out whether or
# not each record in the EIA plant-parts is actually a
# new/unique collection of plant parts
# once the true_gran is dealt with, we also need to convert the
# records which are ownership dupes to reflect their "total"
# ownership counterparts
train_df.reset_index()
.pipe(
restrict_train_connections_on_date_range,
id_col="record_id_eia",
start_date=start_date,
end_date=end_date,
)
.merge(
ppe[ppe_cols].reset_index(),
how="left",
on=["record_id_eia"],
indicator=True,
)
)
not_in_ppe = train_df[train_df._merge == "left_only"]
if not not_in_ppe.empty:
raise AssertionError(
"Not all training data is associated with EIA records.\n"
"record_id_ferc1's of bad training data records are: "
f"{list(not_in_ppe.reset_index().record_id_ferc1)}"
)
train_df = (
train_df.assign(
plant_part=lambda x: x["appro_part_label"],
record_id_eia=lambda x: x["appro_record_id_eia"],
)
.pipe(pudl.analysis.plant_parts_eia.reassign_id_ownership_dupes)
.fillna(
value={
"record_id_eia": pd.NA,
}
)
.set_index( # sklearn wants a MultiIndex to do the stuff
[
"record_id_ferc1",
"record_id_eia",
]
)
)
train_df = train_df.drop(columns=ppe_cols + ["_merge"])
return train_df
[docs]
def prettyify_best_matches(
matches_best: pd.DataFrame,
plant_parts_eia_true: pd.DataFrame,
plants_ferc1: pd.DataFrame,
train_df: pd.DataFrame,
debug: bool = False,
) -> pd.DataFrame:
"""Make the EIA-FERC best matches usable.
Use the ID columns from the best matches to merge together both EIA plant-parts data
and FERC plant data. This removes the comparison vectors (the floats between 0 and 1
that compare the two columns from each dataset).
"""
connects_ferc1_eia = (
# first merge in the EIA plant-parts
pd.merge(
matches_best[["record_id_ferc1", "record_id_eia", "match_type"]],
plant_parts_eia_true.reset_index(),
how="left",
on=["record_id_eia"],
validate="m:1", # multiple FERC records can have the same EIA match
)
# then merge in the FERC data we want the backbone of this table to be
# the plant records so we have all possible FERC plant records, even
# the unmapped ones
.merge(
plants_ferc1,
how="outer",
on=["record_id_ferc1"],
suffixes=("_eia", "_ferc1"),
validate="1:1",
indicator=True,
)
)
# now we have some important cols that have dataset suffixes that we want to condense
def fill_eia_w_ferc1(x, col):
return x[f"{col}_eia"].fillna(x[f"{col}_ferc1"])
condense_cols = ["report_year", "plant_id_pudl", "utility_id_pudl"]
connects_ferc1_eia = (
connects_ferc1_eia.assign(
**{col: fill_eia_w_ferc1(connects_ferc1_eia, col) for col in condense_cols}
)
.drop(
columns=[
col + dataset for col in condense_cols for dataset in ["_eia", "_ferc1"]
]
)
.assign(
report_date=lambda x: pd.to_datetime(
x.report_year, format="%Y", errors="coerce"
),
)
)
no_ferc = connects_ferc1_eia[
(connects_ferc1_eia._merge == "left_only")
& (connects_ferc1_eia.record_id_eia.notnull())
& ~(connects_ferc1_eia.record_id_ferc1.str.contains("_hydro_", na=False))
& ~(connects_ferc1_eia.record_id_ferc1.str.contains("_gnrt_plant_", na=False))
]
connects_ferc1_eia = connects_ferc1_eia.drop(columns=["_merge"])
if not no_ferc.empty:
message = (
"Help. \nI'm trapped in this computer and I can't get out.\n"
".... jk there shouldn't be any matches between FERC and EIA\n"
"that have EIA matches but aren't in the FERC plant table, but we\n"
f"got {len(no_ferc)}. Check the training data and "
"prettyify_best_matches()"
)
if debug:
logger.warning(message)
return no_ferc
logger.info(
"grrrr there are some FERC-EIA matches that aren't in the steam \
table but this is because they are linked to retired EIA generators."
)
logger.warning(message)
_log_match_coverage(connects_ferc1_eia)
for match_set in ["all", "overrides"]:
check_match_consistency(
connects_ferc1_eia,
train_df,
match_set=match_set,
)
return connects_ferc1_eia
[docs]
def _log_match_coverage(connects_ferc1_eia):
eia_years = DataSource.from_id("eia860").working_partitions["years"]
# get the matches from just the EIA working years
matches = connects_ferc1_eia[
(connects_ferc1_eia.report_date.dt.year.isin(eia_years))
& (connects_ferc1_eia.record_id_eia.notnull())
]
# get all records from just the EIA working years
possible_records = connects_ferc1_eia[
connects_ferc1_eia.report_date.dt.year.isin(eia_years)
]
fuel_type_coverage = len(matches[matches.energy_source_code_1.notnull()]) / len(
matches
)
tech_type_coverage = len(matches[matches.technology_description.notnull()]) / len(
matches
)
def _get_subtable(table_name):
return possible_records[
possible_records.record_id_ferc1.str.contains(f"{table_name}")
]
def _get_match_pct(df):
return len(df[df["record_id_eia"].notna()]) / len(df)
logger.info(
"Coverage for matches during EIA working years:\n"
f" Fuel type: {fuel_type_coverage:.01%}\n"
f" Tech type: {tech_type_coverage:.01%}\n"
"Coverage for all steam table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('steam')):.01%}\n"
f"Coverage for all small gen table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('gnrt_plant')):.01%}\n"
f"Coverage for all hydro table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('hydro')):.01%}\n"
f"Coverage for all pumped storage table records during EIA working years:\n"
f" EIA matches: {_get_match_pct(_get_subtable('pumped')):.01%}"
)
[docs]
def check_match_consistency(
connects_ferc1_eia: pd.DataFrame,
train_df: pd.DataFrame,
match_set: Literal["all", "overrides"] = "all",
) -> pd.DataFrame:
"""Check how consistent FERC-EIA matches are with FERC-FERC matches.
We have two record linkage processes: one that links FERC plant records across time,
and another that links FERC plant records to EIA plant-parts. This function checks
that the two processes are as consistent with each other as we expect. Here
"consistent" means that each FERC plant ID is associated with a single EIA plant
parts ID across time. The reverse is not necessarily required -- a single EIA plant
part ID may be associated with various FERC plant IDs across time.
Args:
connects_ferc1_eia: Matches of FERC1 to EIA.
train_df: training data.
match_set: either ``all`` - to check all of the matches - or ``overrides`` - to
check just the overrides. Default is ``all``. The overrides are less
consistent than all of the data, so this argument changes the consistency
threshold for this check.
"""
# these are the default
expected_consistency = 0.74
expected_uniform_capacity_consistency = 0.85
mask = connects_ferc1_eia.record_id_eia.notnull()
if match_set == "overrides":
expected_consistency = 0.39
expected_uniform_capacity_consistency = 0.75
train_ferc1 = train_df.reset_index()
# these bbs were missing from connects_ferc1_eia. not totally sure why
missing = [
"f1_steam_2018_12_51_0_1",
"f1_steam_2018_12_45_2_2",
"f1_steam_2018_12_45_2_1",
"f1_steam_2018_12_45_1_2",
"f1_steam_2018_12_45_1_1",
"f1_steam_2018_12_45_1_5",
"f1_steam_2018_12_45_1_4",
"f1_steam_2018_12_56_2_3",
]
over_f1 = (
train_ferc1[
train_ferc1.record_id_ferc1.str.contains("_steam_")
& ~train_ferc1.record_id_ferc1.isin(missing)
]
.set_index("record_id_ferc1")
.index
)
over_ferc1_ids = (
connects_ferc1_eia.set_index("record_id_ferc1")
.loc[over_f1]
.plant_id_ferc1.unique()
)
mask = mask & connects_ferc1_eia.plant_id_ferc1.isin(over_ferc1_ids)
count = (
connects_ferc1_eia[mask]
.groupby(["plant_id_ferc1"])[["plant_part_id_eia", "capacity_mw_ferc1"]]
.nunique()
)
actual_consistency = len(count[count.plant_part_id_eia == 1]) / len(count)
logger.info(
f"Matches with consistency across years of {match_set} matches is "
f"{actual_consistency:.1%}"
)
if actual_consistency < expected_consistency:
raise AssertionError(
"Inter-year consistency between plant_id_ferc1 and plant_part_id_eia of "
f"{match_set} matches {actual_consistency:.1%} is less than the expected "
f"value of {expected_consistency:.1%}."
)
actual_uniform_capacity_consistency = (
len(count)
- len(count[(count.plant_part_id_eia > 1) & (count.capacity_mw_ferc1 == 1)])
) / len(count)
logger.info(
"Matches with a uniform FERC 1 capacity have an inter-year consistency between "
"plant_id_ferc1 and plant_part_id_eia of "
f"{actual_uniform_capacity_consistency:.1%}"
)
if actual_uniform_capacity_consistency < expected_uniform_capacity_consistency:
raise AssertionError(
"Inter-year consistency between plant_id_ferc1 and plant_part_id_eia of "
"matches with uniform FERC 1 capacity "
f"{actual_uniform_capacity_consistency:.1%} is less than the expected "
f"value of {expected_uniform_capacity_consistency:.1%}."
)
return count
[docs]
def add_null_overrides(connects_ferc1_eia):
"""Override known null matches with pd.NA.
There is no way to indicate in the training data that certain FERC records have no
proper EIA match. That is to say--you can't specifiy a blank match or tell the AI
not to match a given record. Because we've gone through by hand and know for a fact
that some FERC records have no EIA match (even when you aggregate generators), we
have to add in these null matches after the fact.
This function reads in a list of record_id_ferc1 values that are known to have no
cooresponding EIA record match and makes sure they are mapped as NA in the final
record linkage output. It also updates the match_type field to indicate that this
value has been overriden.
"""
logger.info("Overriding specified record_id_ferc1 values with NA record_id_eia")
# Get record_id_ferc1 values that should be overriden to have no EIA match
null_overrides = pd.read_csv(
importlib.resources.files("pudl.package_data.glue") / "eia_ferc1_null.csv"
).pipe(
restrict_train_connections_on_date_range,
id_col="record_id_ferc1",
start_date=min(
connects_ferc1_eia[~(connects_ferc1_eia.record_id_eia.isnull())].report_date
),
end_date=max(
connects_ferc1_eia[~(connects_ferc1_eia.record_id_eia.isnull())].report_date
),
)
# Make sure there is content!
if null_overrides.empty:
raise AssertionError(
f"No null overrides found. Consider checking for file at {null_overrides}"
)
logger.debug(f"Found {len(null_overrides)} null overrides")
# List of EIA columns to null. Ideally would like to get this from elsewhere, but
# compiling this here for now...
eia_cols_to_null = Resource.from_id("out_eia__yearly_plant_parts").get_field_names()
# Make all EIA values NA for record_id_ferc1 values in the Null overrides list and
# make the match_type column say "overriden"
connects_ferc1_eia.loc[
connects_ferc1_eia["record_id_ferc1"].isin(null_overrides.record_id_ferc1),
eia_cols_to_null,
] = np.nan
connects_ferc1_eia.loc[
connects_ferc1_eia["record_id_ferc1"].isin(null_overrides.record_id_ferc1),
"match_type",
] = "overridden"
return connects_ferc1_eia