"""Compile historical utility and balancing area territories.
Use the mapping of utilities to counties, and balancing areas to utilities, available
within the EIA 861, in conjunction with the US Census geometries for counties, to infer
the historical spatial extent of utility and balancing area territories. Output the
resulting geometries for use in other applications.
"""
import math
import pathlib
import sys
from collections.abc import Iterable
from typing import Literal
import click
import geopandas as gpd
import pandas as pd
import sqlalchemy as sa
from dagster import AssetsDefinition, Field, asset
from matplotlib import pyplot as plt
import pudl
from pudl.workspace.setup import PudlPaths
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
################################################################################
# Coordinate Reference Systems used in different contexts
################################################################################
[docs]
MAP_CRS = "EPSG:3857" # For mapping w/ OSM baselayer tiles
[docs]
CALC_CRS = "ESRI:102003" # For accurate area calculations
[docs]
def utility_ids_all_eia(
out_eia__yearly_utilities: pd.DataFrame,
core_eia861__yearly_service_territory: pd.DataFrame,
) -> pd.DataFrame:
"""Compile IDs and Names of all known EIA Utilities.
Grab all EIA utility names and IDs from both the EIA 861 Service Territory table and
the EIA 860 Utility entity table. This is a temporary function that's only needed
because we haven't integrated the EIA 861 information into the entity harvesting
process and PUDL database yet.
Args:
out_eia__yearly_utilities: De-normalized EIA 860 utility attributes table.
core_eia861__yearly_service_territory: Normalized EIA 861 Service Territory table.
Returns:
A DataFrame having 2 columns ``utility_id_eia`` and ``utility_name_eia``.
"""
return (
pd.concat(
[
out_eia__yearly_utilities[["utility_id_eia", "utility_name_eia"]],
core_eia861__yearly_service_territory[
["utility_id_eia", "utility_name_eia"]
],
]
)
.dropna(subset=["utility_id_eia"])
.drop_duplicates(subset=["utility_id_eia"])
)
################################################################################
# Functions that compile geometries based on EIA 861 data tables:
################################################################################
[docs]
def get_territory_fips(
ids: Iterable[int],
assn: pd.DataFrame,
assn_col: str,
core_eia861__yearly_service_territory: pd.DataFrame,
limit_by_state: bool = True,
) -> pd.DataFrame:
"""Compile county FIPS codes associated with an entity's service territory.
For each entity identified by ids, look up the set of counties associated
with that entity on an annual basis. Optionally limit the set of counties
to those within states where the selected entities reported activity elsewhere
within the EIA 861 data.
Args:
ids: A collection of EIA utility or balancing authority IDs.
assn: Association table, relating ``report_date``,
``state``, and ``utility_id_eia`` to each other, as well as the
column indicated by ``assn_col`` -- if it's not ``utility_id_eia``.
assn_col: Label of the dataframe column in ``assn`` that contains
the ID of the entities of interest. Should probably be either
``balancing_authority_id_eia`` or ``utility_id_eia``.
core_eia861__yearly_service_territory: The EIA 861 Service Territory table.
limit_by_state: Whether to require that the counties associated
with the balancing authority are inside a state that has also been
seen in association with the balancing authority and the utility
whose service territory contians the county.
Returns:
A table associating the entity IDs with a collection of
counties annually, identifying counties both by name and county_id_fips
(both state and state_id_fips are included for clarity).
"""
# Limit the association table to only the relevant values:
assn = assn.loc[assn[assn_col].isin(ids)]
if not limit_by_state:
assn = assn.drop("state", axis="columns")
return (
pd.merge(assn, core_eia861__yearly_service_territory, how="inner")
.loc[
:,
[
"report_date",
assn_col,
"state",
"county",
"state_id_fips",
"county_id_fips",
],
]
.drop_duplicates()
)
[docs]
def add_geometries(
df: pd.DataFrame,
census_gdf: gpd.GeoDataFrame,
dissolve: bool = False,
dissolve_by: list[str] = None,
) -> gpd.GeoDataFrame:
"""Merge census geometries into dataframe on county_id_fips, optionally dissolving.
Merge the US Census county-level geospatial information into the DataFrame df
based on the the column county_id_fips (in df), which corresponds to the column
GEOID10 in census_gdf. Also bring in the population and area of the counties,
summing as necessary in the case of dissolved geometries.
Args:
df: A DataFrame containing a county_id_fips column.
census_gdf (geopandas.GeoDataFrame): A GeoDataFrame based on the US Census
demographic profile (DP1) data at county resolution, with the original
column names as published by US Census.
dissolve: If True, dissolve individual county geometries into larger
service territories.
dissolve_by: The columns to group by in the dissolve. For example,
dissolve_by=["report_date", "utility_id_eia"] might provide annual utility
service territories, while ["report_date", "balancing_authority_id_eia"]
would provide annual balancing authority territories.
Returns:
geopandas.GeoDataFrame
"""
out_gdf = (
census_gdf[["geoid10", "namelsad10", "dp0010001", "geometry"]]
.rename(
columns={
"geoid10": "county_id_fips",
"namelsad10": "county_name_census",
"dp0010001": "population",
}
)
# Calculate county areas using cylindrical equal area projection:
.assign(area_km2=lambda x: x.geometry.to_crs(epsg=6933).area / 1e6)
.merge(df, how="right")
)
if dissolve is True:
# Don't double-count duplicated counties, if any.
out_gdf = out_gdf.drop_duplicates(
subset=dissolve_by
+ [
"county_id_fips",
]
)
# Sum these numerical columns so we can merge with dissolved geometries
summed = (
out_gdf.groupby(dissolve_by)[["population", "area_km2"]].sum().reset_index()
)
out_gdf = (
out_gdf.dissolve(by=dissolve_by)
.drop(
[
"county_id_fips",
"county",
"county_name_census",
"state",
"state_id_fips",
"population",
"area_km2",
],
axis="columns",
)
.reset_index()
.merge(summed)
)
return out_gdf
[docs]
def get_territory_geometries(
ids: Iterable[int],
assn: pd.DataFrame,
assn_col: str,
core_eia861__yearly_service_territory: pd.DataFrame,
census_gdf: gpd.GeoDataFrame,
limit_by_state: bool = True,
dissolve: bool = False,
) -> gpd.GeoDataFrame:
"""Compile service territory geometries based on county_id_fips.
Calls ``get_territory_fips`` to generate the list of counties associated with
each entity identified by ``ids``, and then merges in the corresponding county
geometries from the US Census DP1 data passed in via ``census_gdf``.
Optionally dissolve all of the county level geometries into a single geometry for
each combination of entity and year.
Note:
Dissolving geometires is a costly operation, and may take half an hour or more
if you are processing all entities for all years. Dissolving also means that all
the per-county information will be lost, rendering the output inappropriate for
use in many analyses. Dissolving is mostly useful for generating visualizations.
Args:
ids: A collection of EIA balancing authority IDs.
assn: Association table, relating ``report_date``,
``state``, and ``utility_id_eia`` to each other, as well as the
column indicated by ``assn_col`` -- if it's not ``utility_id_eia``.
assn_col: Label of the dataframe column in ``assn`` that contains
the ID of the entities of interest. Should probably be either
``balancing_authority_id_eia`` or ``utility_id_eia``.
core_eia861__yearly_service_territory: The EIA 861 Service Territory table.
census_gdf: The US Census DP1 county-level geometries.
limit_by_state: Whether to require that the counties associated
with the balancing authority are inside a state that has also been
seen in association with the balancing authority and the utility
whose service territory contians the county.
dissolve: If False, each record in the compiled territory will correspond
to a single county, with a county-level geometry, and there will be many
records enumerating all the counties associated with a given
balancing_authority_id_eia in each year. If dissolve=True, all of the
county-level geometries for each utility in each year will be merged
together ("dissolved") resulting in a single geometry and record for each
balancing_authority-year.
Returns:
A GeoDataFrame with service territory geometries for each entity.
"""
return get_territory_fips(
ids=ids,
assn=assn,
assn_col=assn_col,
core_eia861__yearly_service_territory=core_eia861__yearly_service_territory,
limit_by_state=limit_by_state,
).pipe(
add_geometries,
census_gdf,
dissolve=dissolve,
dissolve_by=["report_date", assn_col],
)
[docs]
def _save_geoparquet(
gdf: gpd.GeoDataFrame,
entity_type: Literal["utility", "balancing_authority"],
dissolve: bool,
limit_by_state: bool,
output_dir: pathlib.Path | None = None,
) -> None:
"""Save utility or balancing authority service territory geometries to GeoParquet.
In order to prevent the geometry data from exceeding the 2GB maximum size of an
Arrow object, we need to keep the row groups small. Sort the dataframe by the
primary key columns to minimize the number of values in any row group. Output
filename is constructed based on input arguments.
Args:
gdf: GeoDataframe containing utility or balancing authority geometries.
entity_type: string indicating whether we're outputting utility or balancing
authority geometries.
dissolve: Wether the individual county geometries making up the service
territories have been merged together. Used to construct filename.
limit_by_state: Whether service territories have been limited to include only
counties in states where the utilities reported sales. Used to construct
filename.
output_dir: Path to the directory where the GeoParquet file will be written.
"""
dissolved = "_dissolved" if dissolve else ""
limited = "_limited" if limit_by_state else ""
if output_dir is None:
output_dir = pathlib.Path.cwd()
file_path = output_dir / f"{entity_type}_geometry{limited}{dissolved}.parquet"
gdf.sort_values(["report_date", f"{entity_type}_id_eia"]).to_parquet(
file_path, row_group_size=512, compression="snappy", index=False
)
[docs]
def compile_geoms(
core_eia861__yearly_balancing_authority: pd.DataFrame,
core_eia861__assn_balancing_authority: pd.DataFrame,
out_eia__yearly_utilities: pd.DataFrame,
core_eia861__yearly_service_territory: pd.DataFrame,
core_eia861__assn_utility: pd.DataFrame,
census_counties: pd.DataFrame,
entity_type: Literal["balancing_authority", "utility"],
save_format: Literal["geoparquet", "geodataframe", "dataframe"],
output_dir: pathlib.Path | None = None,
dissolve: bool = False,
limit_by_state: bool = True,
years: list[int] = [],
) -> pd.DataFrame:
"""Compile all available utility or balancing authority geometries.
Returns a geoparquet file, geopandas GeoDataFrame or a pandas DataFrame with the
geometry column removed depending on the value of the save_format parameter. By
default, this returns only counties with observed EIA 861 data for a utility or
balancing authority, with geometries available at the county level.
"""
logger.info(
f"Compiling {entity_type} geometries with {dissolve=}, {limit_by_state=}, "
f"and {years=}."
)
if save_format == "geoparquet" and output_dir is None:
raise ValueError("No output_dir provided while writing geoparquet.")
if years:
def _limit_years(df: pd.DataFrame) -> pd.DataFrame:
return df[df.report_date.dt.year.isin(years)]
core_eia861__yearly_balancing_authority = _limit_years(
core_eia861__yearly_balancing_authority
)
core_eia861__assn_balancing_authority = _limit_years(
core_eia861__assn_balancing_authority
)
out_eia__yearly_utilities = _limit_years(out_eia__yearly_utilities)
core_eia861__yearly_service_territory = _limit_years(
core_eia861__yearly_service_territory
)
core_eia861__assn_utility = _limit_years(core_eia861__assn_utility)
utilids_all_eia = utility_ids_all_eia(
out_eia__yearly_utilities, core_eia861__yearly_service_territory
)
if entity_type == "balancing_authority":
ids = (
core_eia861__yearly_balancing_authority.balancing_authority_id_eia.unique()
)
assn = core_eia861__assn_balancing_authority
assn_col = "balancing_authority_id_eia"
elif entity_type == "utility":
ids = utilids_all_eia.utility_id_eia.unique()
assn = core_eia861__assn_utility
assn_col = "utility_id_eia"
else:
raise ValueError(
f"Got {entity_type=}, but need either 'balancing_authority' or 'utility'"
)
# Identify all Utility IDs with service territory information
geom = get_territory_geometries(
ids=ids,
assn=assn,
assn_col=assn_col,
core_eia861__yearly_service_territory=core_eia861__yearly_service_territory,
census_gdf=census_counties,
limit_by_state=limit_by_state,
dissolve=dissolve,
)
if save_format == "geoparquet":
# TODO[dagster]: update to use IO Manager.
_save_geoparquet(
geom,
entity_type=entity_type,
dissolve=dissolve,
limit_by_state=limit_by_state,
output_dir=output_dir,
)
elif save_format == "dataframe":
geom = pd.DataFrame(geom.drop(columns="geometry"))
return geom
[docs]
def service_territory_asset_factory(
entity_type: Literal["balancing_authority", "utility"],
io_manager_key: str | None = None,
) -> list[AssetsDefinition]:
"""Build asset definitions for balancing authority and utility territories."""
@asset(
name=f"out_eia861__yearly_{entity_type}_service_territory",
io_manager_key=io_manager_key,
config_schema={
"dissolve": Field(
bool,
default_value=False,
description=(
"If True, dissolve the compiled geometries to the entity level. If False, leave them as counties."
),
),
"limit_by_state": Field(
bool,
default_value=True,
description=(
"If True, only include counties with observed EIA 861 data in association with the state and utility/balancing authority."
),
),
"save_format": Field(
str,
default_value="dataframe",
description=(
"Format of output in PUDL. One of: geoparquet, geodataframe, dataframe."
),
),
},
compute_kind="Python",
)
def _service_territory(
context,
core_eia861__yearly_balancing_authority: pd.DataFrame,
core_eia861__assn_balancing_authority: pd.DataFrame,
out_eia__yearly_utilities: pd.DataFrame,
core_eia861__yearly_service_territory: pd.DataFrame,
core_eia861__assn_utility: pd.DataFrame,
_core_censusdp1tract__counties: pd.DataFrame,
) -> pd.DataFrame:
"""Compile all available utility or balancing authority geometries.
Returns:
A dataframe compiling all available utility or balancing authority geometries.
"""
# Get options from dagster
dissolve = context.op_config["dissolve"]
limit_by_state = context.op_config["limit_by_state"]
save_format = context.op_config["save_format"]
return compile_geoms(
core_eia861__yearly_balancing_authority=core_eia861__yearly_balancing_authority,
core_eia861__assn_balancing_authority=core_eia861__assn_balancing_authority,
out_eia__yearly_utilities=out_eia__yearly_utilities,
core_eia861__yearly_service_territory=core_eia861__yearly_service_territory,
core_eia861__assn_utility=core_eia861__assn_utility,
census_counties=_core_censusdp1tract__counties,
entity_type=entity_type,
dissolve=dissolve,
limit_by_state=limit_by_state,
save_format=save_format,
)
return _service_territory
[docs]
service_territory_eia861_assets = [
service_territory_asset_factory(
entity_type=entity, io_manager_key="pudl_io_manager"
)
for entity in ["balancing_authority", "utility"]
]
################################################################################
# Functions for visualizing the service territory geometries
################################################################################
[docs]
def plot_historical_territory(
gdf: gpd.GeoDataFrame,
id_col: str,
id_val: str | int,
) -> None:
"""Plot all the historical geometries defined for the specified entity.
This is useful for exploring how a particular entity's service territory has evolved
over time, or for identifying individual missing or inaccurate territories.
Args:
gdf: A geodataframe containing geometries pertaining electricity planning areas.
Can be broken down by county FIPS code, or have a single record containing a
geometry for each combination of report_date and the column being used to
select planning areas (see below).
id_col: The label of a column in gdf that identifies the planning area to be
visualized, like ``utility_id_eia``, ``balancing_authority_id_eia``, or
``balancing_authority_code_eia``.
id_val: The ID of the entity whose territory should be plotted.
"""
if id_col not in gdf.columns:
raise ValueError(f"The input id_col {id_col} doesn't exist in this GDF.")
logger.info("Plotting historical territories for %s==%s.", id_col, id_val)
# Pare down the GDF so this all goes faster
entity_gdf = gdf[gdf[id_col] == id_val]
if "county_id_fips" in entity_gdf.columns:
entity_gdf = entity_gdf.drop_duplicates(
subset=["report_date", "county_id_fips"]
)
entity_gdf["report_year"] = entity_gdf.report_date.dt.year
logger.info(
"Plotting service territories of %s %s records.", len(entity_gdf), id_col
)
# Create a grid of subplots sufficient to hold all the years:
years = entity_gdf.report_year.sort_values().unique()
ncols = 5
nrows = math.ceil(len(years) / ncols)
fig, axes = plt.subplots(
ncols=ncols,
nrows=nrows,
figsize=(15, 3 * nrows),
sharex=True,
sharey=True,
facecolor="white",
)
fig.suptitle(f"{id_col} == {id_val}")
for year, ax in zip(years, axes.flat, strict=True):
ax.set_title(f"{year}")
ax.set_xticks([])
ax.set_yticks([])
year_gdf = entity_gdf.loc[entity_gdf.report_year == year]
year_gdf.plot(ax=ax, linewidth=0.1)
plt.show()
[docs]
def plot_all_territories(
gdf: gpd.GeoDataFrame,
report_date: str,
respondent_type: str | Iterable[str] = ("balancing_authority", "utility"),
color: str = "black",
alpha: float = 0.25,
):
"""Plot all of the planning areas of a given type for a given report date.
Todo:
This function needs to be made more general purpose, and less
entangled with the FERC 714 data.
Args:
gdf: GeoDataFrame containing planning area geometries, organized by
``respondent_id_ferc714`` and ``report_date``.
report_date: A string representing a datetime that indicates what year's
planning areas should be displayed.
respondent_type: Type of respondent whose planning
areas should be displayed. Either "utility" or
"balancing_authority" or an iterable collection containing both.
color: Color to use for the planning areas.
alpha: Transparency to use for the planning areas.
Returns:
matplotlib.axes.Axes
"""
unwanted_respondent_ids = ( # noqa: F841 variable is used, in df.query() below
112, # Alaska
133, # Alaska
178, # Hawaii
301, # PJM Dupe
302, # PJM Dupe
303, # PJM Dupe
304, # PJM Dupe
305, # PJM Dupe
306, # PJM Dupe
)
if isinstance(respondent_type, str):
respondent_type = (respondent_type,)
plot_gdf = (
gdf.query("report_date==@report_date")
.query("respondent_id_ferc714 not in @unwanted_respondent_ids")
.query("respondent_type in @respondent_type")
)
ax = plot_gdf.plot(figsize=(20, 20), color=color, alpha=alpha, linewidth=1)
plt.title(f"FERC 714 {', '.join(respondent_type)} planning areas for {report_date}")
plt.show()
return ax
################################################################################
# Provide a CLI for generating service territories
################################################################################
@click.command(
context_settings={"help_option_names": ["-h", "--help"]},
)
@click.option(
"--entity-type",
type=click.Choice(["utility", "balancing_authority"]),
default="util",
show_default=True,
help="What kind of entity's service territories should be generated?",
)
@click.option(
"--limit-by-state/--no-limit-by-state",
default=False,
help=(
"Limit service territories to including only counties located in states where "
"the utility or balancing authority also reported electricity sales in EIA-861 "
"in the year that the geometry pertains to. In theory a utility could serve a "
"county, but not sell any electricity there, but that seems like an unusual "
"situation."
),
show_default=True,
)
@click.option(
"--year",
"-y",
"years",
type=click.IntRange(min=2001),
default=[],
multiple=True,
help=(
"Limit service territories generated to those from the given year. This can "
"dramatically reduce the memory and CPU intensity of the geospatial "
"operations. Especially useful for testing. Option can be used multiple times "
"toselect multiple years."
),
)
@click.option(
"--dissolve/--no-dissolve",
default=True,
help=(
"Dissolve county level geometries to the utility or balancing authority "
"boundaries. The dissolve operation may take several minutes and is quite "
"memory intensive, but results in significantly smaller files, in which each "
"record contains the whole geometry of a utility or balancing authority. The "
"un-dissolved geometries use many records to describe each service territory, "
"with each record containing the geometry of a single constituent county."
),
show_default=True,
)
@click.option(
"--output-dir",
"-o",
type=click.Path(
exists=True,
writable=True,
dir_okay=True,
file_okay=False,
resolve_path=True,
path_type=pathlib.Path,
),
default=pathlib.Path.cwd(),
show_default=True,
help=(
"Path to the directory where the service territory geometries should be saved. "
"Defaults to the current working directory. Filenames are constructed based on "
"the other flags provided."
),
)
@click.option(
"--logfile",
help="If specified, write logs to this file.",
type=click.Path(
exists=False,
resolve_path=True,
path_type=pathlib.Path,
),
)
@click.option(
"--loglevel",
default="INFO",
type=click.Choice(
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
),
show_default=True,
)
[docs]
def pudl_service_territories(
entity_type: Literal["utility", "balancing_authority"],
dissolve: bool,
output_dir: pathlib.Path,
limit_by_state: bool,
years: list[int],
logfile: pathlib.Path,
loglevel: str,
):
"""Compile historical utility and balancing area service territory geometries.
This script produces GeoParquet files describing the historical service territories
of utilities and balancing authorities based on data reported in the EIA Form 861
and county geometries from the US Census DP1 geodatabase.
See: https://geoparquet.org/ for more on the GeoParquet file format.
Usage examples:
pudl_service_territories --entity-type balancing_authority --dissolve --limit-by-state
pudl_service_territories --entity-type utility
"""
# Display logged output from the PUDL package:
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
pudl_engine = sa.create_engine(PudlPaths().pudl_db)
# Load the required US Census DP1 county geometry data:
dp1_engine = PudlPaths().sqlite_db_uri("censusdp1tract")
sql = """
SELECT
geoid10,
namelsad10,
dp0010001,
shape AS geometry
FROM
county_2010census_dp1;
"""
county_gdf = gpd.read_postgis(
sql,
con=dp1_engine,
geom_col="geometry",
crs="EPSG:4326",
)
_ = compile_geoms(
core_eia861__yearly_balancing_authority=pd.read_sql(
"core_eia861__yearly_balancing_authority",
pudl_engine,
),
core_eia861__assn_balancing_authority=pd.read_sql(
"core_eia861__assn_balancing_authority",
pudl_engine,
),
out_eia__yearly_utilities=pd.read_sql("out_eia__yearly_utilities", pudl_engine),
core_eia861__yearly_service_territory=pd.read_sql(
"core_eia861__yearly_service_territory", pudl_engine
),
core_eia861__assn_utility=pd.read_sql("core_eia861__assn_utility", pudl_engine),
census_counties=county_gdf,
dissolve=dissolve,
save_format="geoparquet",
output_dir=output_dir,
entity_type=entity_type,
limit_by_state=limit_by_state,
years=years,
)
if __name__ == "__main__":
sys.exit(pudl_service_territories())