Source code for pudl.analysis.state_demand

"""
Predict state-level electricity demand.

Using hourly electricity demand reported at the balancing authority and utility
level in the FERC 714, and service territories for utilities and balancing
autorities inferred from the counties served by each utility, and the utilities
that make up each balancing authority in the EIA 861, estimate the total hourly
electricity demand for each US state.

This analysis uses the total electricity sales by state reported in the EIA 861
as a scaling factor to ensure that the magnitude of electricity sales is
roughly correct, and obtains the shape of the demand curve from the hourly
planning area demand reported in the FERC 714.

The compilation of historical service territories based on the EIA 861 data is
somewhat manual and could certainly be improved, but overall the results seem
reasonable. Additional predictive spatial variables will be required to obtain
more granular electricity demand estimates (e.g. at the county level).

Currently the script takes no arguments and simply runs a predefined analysis
across all states and all years for which both EIA 861 and FERC 714 data are
available, and outputs the results as a CSV in
PUDL_DIR/local/state-demand/demand.csv

"""

import argparse
import datetime
import logging
import pathlib
import sys
from typing import Any, Dict, Iterable, List, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sqlalchemy as sa

import pudl.analysis.timeseries_cleaning
import pudl.output.pudltabl
import pudl.workspace.setup

[docs]logger = logging.getLogger(__name__)
# --- Constants --- #
[docs]STATES: List[Dict[str, Union[str, int]]] = [ {'name': 'Alabama', 'code': 'AL', 'fips': '01'}, {'name': 'Alaska', 'code': 'AK', 'fips': '02'}, {'name': 'Arizona', 'code': 'AZ', 'fips': '04'}, {'name': 'Arkansas', 'code': 'AR', 'fips': '05'}, {'name': 'California', 'code': 'CA', 'fips': '06'}, {'name': 'Colorado', 'code': 'CO', 'fips': '08'}, {'name': 'Connecticut', 'code': 'CT', 'fips': '09'}, {'name': 'Delaware', 'code': 'DE', 'fips': '10'}, {'name': 'District of Columbia', 'code': 'DC', 'fips': '11'}, {'name': 'Florida', 'code': 'FL', 'fips': '12'}, {'name': 'Georgia', 'code': 'GA', 'fips': '13'}, {'name': 'Hawaii', 'code': 'HI', 'fips': '15'}, {'name': 'Idaho', 'code': 'ID', 'fips': '16'}, {'name': 'Illinois', 'code': 'IL', 'fips': '17'}, {'name': 'Indiana', 'code': 'IN', 'fips': '18'}, {'name': 'Iowa', 'code': 'IA', 'fips': '19'}, {'name': 'Kansas', 'code': 'KS', 'fips': '20'}, {'name': 'Kentucky', 'code': 'KY', 'fips': '21'}, {'name': 'Louisiana', 'code': 'LA', 'fips': '22'}, {'name': 'Maine', 'code': 'ME', 'fips': '23'}, {'name': 'Maryland', 'code': 'MD', 'fips': '24'}, {'name': 'Massachusetts', 'code': 'MA', 'fips': '25'}, {'name': 'Michigan', 'code': 'MI', 'fips': '26'}, {'name': 'Minnesota', 'code': 'MN', 'fips': '27'}, {'name': 'Mississippi', 'code': 'MS', 'fips': '28'}, {'name': 'Missouri', 'code': 'MO', 'fips': '29'}, {'name': 'Montana', 'code': 'MT', 'fips': '30'}, {'name': 'Nebraska', 'code': 'NE', 'fips': '31'}, {'name': 'Nevada', 'code': 'NV', 'fips': '32'}, {'name': 'New Hampshire', 'code': 'NH', 'fips': '33'}, {'name': 'New Jersey', 'code': 'NJ', 'fips': '34'}, {'name': 'New Mexico', 'code': 'NM', 'fips': '35'}, {'name': 'New York', 'code': 'NY', 'fips': '36'}, {'name': 'North Carolina', 'code': 'NC', 'fips': '37'}, {'name': 'North Dakota', 'code': 'ND', 'fips': '38'}, {'name': 'Ohio', 'code': 'OH', 'fips': '39'}, {'name': 'Oklahoma', 'code': 'OK', 'fips': '40'}, {'name': 'Oregon', 'code': 'OR', 'fips': '41'}, {'name': 'Pennsylvania', 'code': 'PA', 'fips': '42'}, {'name': 'Rhode Island', 'code': 'RI', 'fips': '44'}, {'name': 'South Carolina', 'code': 'SC', 'fips': '45'}, {'name': 'South Dakota', 'code': 'SD', 'fips': '46'}, {'name': 'Tennessee', 'code': 'TN', 'fips': '47'}, {'name': 'Texas', 'code': 'TX', 'fips': '48'}, {'name': 'Utah', 'code': 'UT', 'fips': '49'}, {'name': 'Vermont', 'code': 'VT', 'fips': '50'}, {'name': 'Virginia', 'code': 'VA', 'fips': '51'}, {'name': 'Washington', 'code': 'WA', 'fips': '53'}, {'name': 'West Virginia', 'code': 'WV', 'fips': '54'}, {'name': 'Wisconsin', 'code': 'WI', 'fips': '55'}, {'name': 'Wyoming', 'code': 'WY', 'fips': '56'}, {'name': 'American Samoa', 'code': 'AS', 'fips': '60'}, {'name': 'Guam', 'code': 'GU', 'fips': '66'}, {'name': 'Northern Mariana Islands', 'code': 'MP', 'fips': '69'}, {'name': 'Puerto Rico', 'code': 'PR', 'fips': '72'}, {'name': 'Virgin Islands', 'code': 'VI', 'fips': '78'}, ]
""" Attributes of US states and territories. * `name` (str): Full name. * `code` (str): US Postal Service (USPS) two-letter alphabetic code. * `fips` (int): Federal Information Processing Standard (FIPS) code. """
[docs]STANDARD_UTC_OFFSETS: Dict[str, str] = { 'Pacific/Honolulu': -10, 'America/Anchorage': -9, 'America/Los_Angeles': -8, 'America/Denver': -7, 'America/Chicago': -6, 'America/New_York': -5, 'America/Halifax': -4, }
""" Hour offset from Coordinated Universal Time (UTC) by time zone. Time zones are canonical names (e.g. 'America/Denver') from tzdata (https://www.iana.org/time-zones) mapped to their standard-time UTC offset. """
[docs]UTC_OFFSETS: Dict[str, int] = { 'HST': -10, 'AKST': -9, 'AKDT': -8, 'PST': -8, 'PDT': -7, 'MST': -7, 'MDT': -6, 'CST': -6, 'CDT': -5, 'EST': -5, 'EDT': -4, 'AST': -4, 'ADT': -3, }
""" Hour offset from Coordinated Universal Time (UTC) by time zone. Time zones are either standard or daylight-savings time zone abbreviations (e.g. 'MST'). """ # --- Helpers --- #
[docs]def lookup_state(state: Union[str, int]) -> dict: """ Lookup US state by state identifier. Args: state: State name, two-letter abbreviation, or FIPS code. String matching is case-insensitive. Returns: State identifers. Examples: >>> lookup_state('alabama') {'name': 'Alabama', 'code': 'AL', 'fips': '01'} >>> lookup_state('AL') {'name': 'Alabama', 'code': 'AL', 'fips': '01'} >>> lookup_state(1) {'name': 'Alabama', 'code': 'AL', 'fips': '01'} """ # Try to cast state as an integer to deal with "02", "2", 2.0, np.int64(2)... try: is_fips = isinstance(int(state), int) except ValueError: is_fips = False if is_fips: state = str(int(state)).zfill(2) return {x['fips']: x for x in STATES}[state] key = 'code' if len(state) == 2 else 'name' return {x[key].lower(): x for x in STATES}[state.lower()]
[docs]def local_to_utc(local: pd.Series, tz: Iterable, **kwargs: Any) -> pd.Series: """ Convert local times to UTC. Args: local: Local times (tz-naive `datetime64[ns]`). tz: For each time, a timezone (see :meth:`DatetimeIndex.tz_localize`) or UTC offset in hours (`int` or `float`). kwargs: Optional arguments to :meth:`DatetimeIndex.tz_localize`. Returns: UTC times (tz-naive `datetime64[ns]`). Examples: >>> s = pd.Series([pd.Timestamp(2020, 1, 1), pd.Timestamp(2020, 1, 1)]) >>> local_to_utc(s, [-7, -6]) 0 2020-01-01 07:00:00 1 2020-01-01 06:00:00 dtype: datetime64[ns] >>> local_to_utc(s, ['America/Denver', 'America/Chicago']) 0 2020-01-01 07:00:00 1 2020-01-01 06:00:00 dtype: datetime64[ns] """ return local.groupby(tz).transform( lambda x: x.dt.tz_localize( datetime.timezone(datetime.timedelta(hours=x.name)) if isinstance(x.name, (int, float)) else x.name, **kwargs ).dt.tz_convert(None) )
[docs]def utc_to_local(utc: pd.Series, tz: Iterable) -> pd.Series: """ Convert UTC times to local. Args: utc: UTC times (tz-naive `datetime64[ns]` or `datetime64[ns, UTC]`). tz: For each time, a timezone (see :meth:`DatetimeIndex.tz_localize`) or UTC offset in hours (`int` or `float`). Returns: Local times (tz-naive `datetime64[ns]`). Examples: >>> s = pd.Series([pd.Timestamp(2020, 1, 1), pd.Timestamp(2020, 1, 1)]) >>> utc_to_local(s, [-7, -6]) 0 2019-12-31 17:00:00 1 2019-12-31 18:00:00 dtype: datetime64[ns] >>> utc_to_local(s, ['America/Denver', 'America/Chicago']) 0 2019-12-31 17:00:00 1 2019-12-31 18:00:00 dtype: datetime64[ns] """ if utc.dt.tz is None: utc = utc.dt.tz_localize('UTC') return utc.groupby(tz).transform( lambda x: x.dt.tz_convert( datetime.timezone(datetime.timedelta(hours=x.name)) if isinstance(x.name, (int, float)) else x.name ).dt.tz_localize(None) )
# --- Datasets: References --- #
[docs]def load_ventyx_hourly_state_demand(path: str) -> pd.DataFrame: """ Read and format Ventyx hourly state-level demand. After manual corrections of the listed time zone, ambiguous time zone issues remain. Below is a list of transmission zones (by `Transmission Zone ID`) with one or more missing timestamps at transitions to or from daylight-savings: * 615253 (Indiana) * 615261 (Michigan) * 615352 (Wisconsin) * 615357 (Missouri) * 615377 (Saskatchewan) * 615401 (Minnesota, Wisconsin) * 615516 (Missouri) * 615529 (Oklahoma) * 615603 (Idaho, Washington) * 1836089 (California) Args: path: Path to the data file (published as 'state_level_load_2007_2018.csv'). Returns: Dataframe with hourly state-level demand. * `state_id_fips`: FIPS code of US state. * `utc_datetime`: UTC time of the start of each hour. * `demand_mwh`: Hourly demand in MWh. """ df = pd.read_csv( path, usecols=[ 'State/Province', 'Local Datetime (Hour Ending)', 'Time Zone', 'Estimated State Load MW - Sum', ] ) df.rename(columns={ 'State/Province': 'state', 'Local Datetime (Hour Ending)': 'datetime', 'Estimated State Load MW - Sum': 'demand_mwh', 'Time Zone': 'tz', }, inplace=True) # Convert state name to FIPS codes and keep only data for US states fips = {x['name']: x['fips'] for x in STATES} df['state_id_fips'] = df['state'].map(fips) df = df[~df['state_id_fips'].isnull()] # Parse datetime df['datetime'] = pd.to_datetime(df['datetime'], format='%m/%d/%Y %H:%M') # Correct timezone errors mask = df['state'].eq('Wyoming') & df['tz'].eq('PST') df.loc[mask, 'tz'] = 'MST' # Sum by local time and timezone df = df.groupby( ['state_id_fips', 'datetime', 'tz'], as_index=False )['demand_mwh'].sum() # Convert local times to UTC df['utc_datetime'] = local_to_utc(df['datetime'], df['tz'].map(UTC_OFFSETS)) # Sum by UTC time df = df.groupby( ['state_id_fips', 'utc_datetime'], as_index=False )['demand_mwh'].sum() # Roll back one hour to convert hour-ending to hour-starting df['utc_datetime'] -= pd.Timedelta(hours=1) return df
# --- Datasets: FERC 714 hourly demand --- #
[docs]def load_ferc714_hourly_demand_matrix( pudl_out: pudl.output.pudltabl.PudlTabl ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Read and format FERC 714 hourly demand into matrix form. Args: pudl_out: Used to access :meth:`pudl.output.pudltabl.PudlTabl.demand_hourly_pa_ferc714`. Returns: Hourly demand as a matrix with a `datetime` row index (e.g. '2006-01-01 00:00:00', ..., '2019-12-31 23:00:00') in local time ignoring daylight-savings, and a `respondent_id_ferc714` column index (e.g. 101, ..., 329). A second Dataframe lists the UTC offset in hours of each `respondent_id_ferc714` and reporting `year` (int). """ demand = pudl_out.demand_hourly_pa_ferc714().copy() # Convert UTC to local time (ignoring daylight savings) demand['utc_offset'] = demand['timezone'].map(STANDARD_UTC_OFFSETS) demand['datetime'] = utc_to_local(demand['utc_datetime'], demand['utc_offset']) # Pivot to demand matrix: timestamps x respondents matrix = demand.pivot( index='datetime', columns='respondent_id_ferc714', values='demand_mwh' ) # List timezone by year for each respondent demand['year'] = demand['report_date'].dt.year utc_offset = demand.groupby( ['respondent_id_ferc714', 'year'], as_index=False )['utc_offset'].first() return matrix, utc_offset
[docs]def clean_ferc714_hourly_demand_matrix(df: pd.DataFrame) -> pd.DataFrame: """ Detect and null anomalous values in FERC 714 hourly demand matrix. .. note:: Takes about 10 minutes. Args: df: FERC 714 hourly demand matrix, as described in :func:`load_ferc714_hourly_demand_matrix`. Returns: Copy of `df` with nulled anomalous values. """ ts = pudl.analysis.timeseries_cleaning.Timeseries(df) ts.flag_ruggles() return ts.to_dataframe(copy=False)
[docs]def filter_ferc714_hourly_demand_matrix( df: pd.DataFrame, min_data: int = 100, min_data_fraction: float = 0.9, ) -> pd.DataFrame: """ Filter incomplete years from FERC 714 hourly demand matrix. Nulls respondent-years with too few data and drops respondents with no data across all years. Args: df: FERC 714 hourly demand matrix, as described in :func:`load_ferc714_hourly_demand_matrix`. min_data: Minimum number of non-null hours in a year. min_data_fraction: Minimum fraction of non-null hours between the first and last non-null hour in a year. Returns: Hourly demand matrix `df` modified in-place. """ # Identify respondent-years where data coverage is below thresholds has_data = ~df.isnull() coverage = ( # Last timestamp with demand in year has_data[::-1].groupby(df.index.year[::-1]).idxmax() - # First timestamp with demand in year has_data.groupby(df.index.year).idxmax() ).apply(lambda x: 1 + x.dt.days * 24 + x.dt.seconds / 3600, axis=1) fraction = has_data.groupby(df.index.year).sum() / coverage short = coverage.lt(min_data) bad = fraction.gt(0) & fraction.lt(min_data_fraction) # Set all values in short or bad respondent-years to null mask = (short | bad).loc[df.index.year] mask.index = df.index df[mask] = np.nan # Report nulled respondent-years for mask, msg in [ (short, 'Nulled short respondent-years (below min_data)'), (bad, 'Nulled bad respondent-years (below min_data_fraction)'), ]: row, col = mask.values.nonzero() report = ( pd.DataFrame({'id': mask.columns[col], 'year': mask.index[row]}) .groupby('id')['year'].apply(lambda x: np.sort(x)) ) with pd.option_context('display.max_colwidth', -1): logger.info(f'{msg}:\n{report}') # Drop respondents with no data blank = df.columns[df.isnull().all()].tolist() df.drop(columns=blank, inplace=True) # Report dropped respondents (with no data) logger.info(f'Dropped blank respondents: {blank}') return df
[docs]def impute_ferc714_hourly_demand_matrix(df: pd.DataFrame) -> pd.DataFrame: """ Impute null values in FERC 714 hourly demand matrix. Imputation is performed separately for each year, with only the respondents reporting data in that year. .. note:: Takes about 15 minutes. Args: df: FERC 714 hourly demand matrix, as described in :func:`load_ferc714_hourly_demand_matrix`. Returns: Copy of `df` with imputed values. """ results = [] for year, gdf in df.groupby(df.index.year): logger.info(f'Imputing year {year}') keep = df.columns[~gdf.isnull().all()] tsi = pudl.analysis.timeseries_cleaning.Timeseries(gdf[keep]) result = tsi.to_dataframe(tsi.impute(method='tnn'), copy=False) results.append(result) return pd.concat(results)
[docs]def melt_ferc714_hourly_demand_matrix( df: pd.DataFrame, tz: pd.DataFrame ) -> pd.DataFrame: """ Melt FERC 714 hourly demand matrix to long format. Args: df: FERC 714 hourly demand matrix, as described in :func:`load_ferc714_hourly_demand_matrix`. tz: FERC 714 respondent time zones, as described in :func:`load_ferc714_hourly_demand_matrix`. Returns: Long-format hourly demand with columns `respondent_id_ferc714`, report `year` (int), `utc_datetime`, and `demand_mwh`. """ # Melt demand matrix to long format df = df.melt(value_name='demand_mwh', ignore_index=False) df.reset_index(inplace=True) # Convert local times to UTC df['year'] = df['datetime'].dt.year df = df.merge(tz, on=['respondent_id_ferc714', 'year']) df['utc_datetime'] = local_to_utc(df['datetime'], df['utc_offset']) df.drop(columns=['utc_offset', 'datetime'], inplace=True) return df
# --- Datasets: Counties --- #
[docs]def load_ferc714_county_assignments( pudl_out: pudl.output.pudltabl.PudlTabl ) -> pd.DataFrame: """ Load FERC 714 county assignments. Args: pudl_out: PUDL database extractor. Returns: Dataframe with columns `respondent_id_ferc714`, report `year` (int), and `county_id_fips`. """ respondents = pudl.output.ferc714.Respondents(pudl_out) df = respondents.fipsify()[ ['respondent_id_ferc714', 'county_id_fips', 'report_date'] ] # Drop rows where county is blank or a duplicate df = df[~df['county_id_fips'].isnull()].drop_duplicates() # Convert date to year df['year'] = df['report_date'].dt.year df.drop(columns=['report_date'], inplace=True) return df
[docs]def load_counties( pudl_out: pudl.output.pudltabl.PudlTabl, pudl_settings: dict ) -> pd.DataFrame: """ Load county attributes. Args: pudl_out: PUDL database extractor. pudl_settings: PUDL settings. Returns: Dataframe with columns `county_id_fips` and `population`. """ df = pudl.output.censusdp1tract.get_layer( layer="county", pudl_settings=pudl_settings )[["geoid10", "dp0010001"]] return df.rename(columns={"geoid10": "county_id_fips", "dp0010001": "population"})
# --- Allocation --- #
[docs]def load_eia861_state_total_sales( pudl_out: pudl.output.pudltabl.PudlTabl ) -> pd.DataFrame: """ Read and format EIA 861 sales by state and year. Args: pudl_out: Used to access :meth:`pudl.output.pudltabl.PudlTabl.sales_eia861`. Returns: Dataframe with columns `state_id_fips`, `year`, `demand_mwh`. """ df = pudl_out.sales_eia861() df = df.groupby(['state', 'report_date'], as_index=False)['sales_mwh'].sum() # Convert report_date to year df['year'] = df['report_date'].dt.year # Convert state abbreviations to FIPS codes fips = {x['code']: x['fips'] for x in STATES} df['state_id_fips'] = df['state'].map(fips) # Drop records with zero sales df.rename(columns={'sales_mwh': 'demand_mwh'}, inplace=True) df = df[df['demand_mwh'].gt(0)] return df[['state_id_fips', 'year', 'demand_mwh']]
[docs]def predict_state_hourly_demand( demand: pd.DataFrame, counties: pd.DataFrame, assignments: pd.DataFrame, state_totals: pd.DataFrame = None, mean_overlaps: bool = False, ) -> pd.DataFrame: """ Predict state hourly demand. Args: demand: Hourly demand timeseries, with columns `respondent_id_ferc714`, report `year`, `utc_datetime`, and `demand_mwh`. counties: Counties, with columns `county_id_fips` and `population`. assignments: County assignments for demand respondents, with columns `respondent_id_ferc714`, `year`, and `county_id_fips`. state_totals: Total annual demand by state, with columns `state_id_fips`, `year`, and `demand_mwh`. If provided, the predicted hourly demand is scaled to match these totals. mean_overlaps: Whether to mean the demands predicted for a county in cases when a county is assigned to multiple respondents. By default, demands are summed. Returns: Dataframe with columns `state_id_fips`, `utc_datetime`, `demand_mwh`, and (if `state_totals` was provided) `scaled_demand_mwh`. """ # Pre-compute list of respondent-years with demand with_demand = ( demand .groupby(['respondent_id_ferc714', 'year'], as_index=False)['demand_mwh'].sum() .query('demand_mwh > 0') )[['respondent_id_ferc714', 'year']] # Pre-compute state-county assignments counties = counties.copy() counties['state_id_fips'] = counties['county_id_fips'].str[:2] # Merge counties with respondent- and state-county assignments df = ( assignments # Drop respondent-years with no demand .merge(with_demand, on=['respondent_id_ferc714', 'year']) # Merge with counties and state-county assignments .merge(counties, on=['county_id_fips']) ) # Divide county population by total population in respondent (by year) # TODO: Use more county attributes in the calculation of their weights totals = df.groupby( ['respondent_id_ferc714', 'year'] )['population'].transform('sum') df['weight'] = df['population'] / totals # Normalize county weights by county occurences (by year) if mean_overlaps: counts = df.groupby( ['county_id_fips', 'year'] )['county_id_fips'].transform('count') df['weight'] /= counts # Sum county weights by respondent, year, and state weights = df.groupby( ['respondent_id_ferc714', 'year', 'state_id_fips'], as_index=False )['weight'].sum() # Multiply respondent-state weights with demands df = weights.merge(demand, on=['respondent_id_ferc714', 'year']) df['demand_mwh'] *= df['weight'] # Scale estimates using state totals if state_totals is not None: # Compute scale factor between current and target state totals totals = ( df.groupby(['state_id_fips', 'year'], as_index=False)['demand_mwh'].sum() .merge(state_totals, on=['state_id_fips', 'year']) ) totals['scale'] = totals['demand_mwh_y'] / totals['demand_mwh_x'] df = df.merge(totals[['state_id_fips', 'year', 'scale']]) df['scaled_demand_mwh'] = df['demand_mwh'] * df['scale'] # Sum demand by state by matching UTC time fields = [x for x in ['demand_mwh', 'scaled_demand_mwh'] if x in df] return df.groupby(['state_id_fips', 'utc_datetime'], as_index=False)[fields].sum()
[docs]def plot_demand_timeseries( a: pd.DataFrame, b: pd.DataFrame = None, window: int = 168, title: str = None, path: str = None, ) -> None: """ Make a timeseries plot of predicted and reference demand. Args: a: Predicted demand with columns `utc_datetime` and any of `demand_mwh` (in grey) and `scaled_demand_mwh` (in orange). b: Reference demand with columns `utc_datetime` and `demand_mwh` (in red). window: Width of window (in rows) to use to compute rolling means, or `None` to plot raw values. title: Plot title. path: Plot path. If provided, the figure is saved to file and closed. """ plt.figure(figsize=(16, 8)) # Plot predicted for field, color in [('demand_mwh', 'grey'), ('scaled_demand_mwh', 'orange')]: if field not in a: continue y = a[field] if window: y = y.rolling(window).mean() plt.plot( a['utc_datetime'], y, color=color, alpha=0.5, label=f'Predicted ({field})' ) # Plot expected if b is not None: y = b['demand_mwh'] if window: y = y.rolling(window).mean() plt.plot( b['utc_datetime'], y, color='red', alpha=0.5, label='Reference (demand_mwh)' ) if title: plt.title(title) plt.ylabel('Demand (MWh)') plt.legend() if path: plt.savefig(path, bbox_inches='tight') plt.close()
[docs]def plot_demand_scatter( a: pd.DataFrame, b: pd.DataFrame, title: str = None, path: str = None, ) -> None: """ Make a scatter plot comparing predicted and reference demand. Args: a: Predicted demand with columns `utc_datetime` and any of `demand_mwh` (in grey) and `scaled_demand_mwh` (in orange). b: Reference demand with columns `utc_datetime` and `demand_mwh`. Every element in `utc_datetime` must match the one in `a`. title: Plot title. path: Plot path. If provided, the figure is saved to file and closed. Raises: ValueError: Datetime columns do not match. """ if not a['utc_datetime'].equals(b['utc_datetime']): raise ValueError('Datetime columns do not match') plt.figure(figsize=(8, 8)) plt.gca().set_aspect('equal') plt.axline((0, 0), (1, 1), linestyle=':', color='grey') for field, color in [('demand_mwh', 'grey'), ('scaled_demand_mwh', 'orange')]: if field not in a: continue plt.scatter( b['demand_mwh'], a[field], c=color, s=0.1, alpha=0.5, label=f'Prediction ({field})', ) if title: plt.title(title) plt.xlabel('Reference (MWh)') plt.ylabel('Predicted (MWh)') plt.legend() if path: plt.savefig(path, bbox_inches='tight') plt.close()
[docs]def compare_state_demand( a: pd.DataFrame, b: pd.DataFrame, scaled: bool = True ) -> pd.DataFrame: """ Compute statistics comparing predicted and reference demand. Statistics are computed for each year. Args: a: Predicted demand with columns `utc_datetime` and either `demand_mwh` (if `scaled=False) or `scaled_demand_mwh` (if `scaled=True`). b: Reference demand with columns `utc_datetime` and `demand_mwh`. Every element in `utc_datetime` must match the one in `a`. Returns: Dataframe with columns `year`, `rmse` (root mean square error), and `mae` (mean absolute error). Raises: ValueError: Datetime columns do not match. """ if not a['utc_datetime'].equals(b['utc_datetime']): raise ValueError('Datetime columns do not match') field = 'scaled_demand_mwh' if scaled else 'demand_mwh' df = pd.DataFrame({ 'year': a['utc_datetime'].dt.year, 'diff': a[field] - b['demand_mwh'], }) return df.groupby(['year'], as_index=False)['diff'].agg({ 'rmse': lambda x: np.sqrt(np.sum(x**2) / x.size), 'mae': lambda x: np.sum(np.abs(x)) / x.size, })
# --- Parse Command Line Args --- #
[docs]def parse_command_line(argv): """Skeletal command line argument parser to provide a help message.""" parser = argparse.ArgumentParser(description=__doc__) return parser.parse_args(argv[1:])
# --- Example usage --- #
[docs]def main(): """Predict state demand.""" # --- Connect to PUDL logger --- # logger = logging.getLogger('pudl') handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( '%(asctime)s [%(levelname)8s] %(name)s:%(lineno)s %(message)s' )) logger.addHandler(handler) logger.setLevel(logging.INFO) # --- Parse command line args --- # _ = parse_command_line(sys.argv) # --- Connect to PUDL database --- # pudl_settings = pudl.workspace.setup.get_defaults() pudl_engine = sa.create_engine(pudl_settings["pudl_db"]) pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine) # --- Prepare FERC 714 hourly demand --- # df, tz = load_ferc714_hourly_demand_matrix(pudl_out) df = clean_ferc714_hourly_demand_matrix(df) df = filter_ferc714_hourly_demand_matrix(df, min_data=100, min_data_fraction=0.9) df = impute_ferc714_hourly_demand_matrix(df) demand = melt_ferc714_hourly_demand_matrix(df, tz) # --- Predict demand --- # counties = load_counties(pudl_out, pudl_settings) assignments = load_ferc714_county_assignments(pudl_out) state_totals = load_eia861_state_total_sales(pudl_out) prediction = predict_state_hourly_demand( demand, counties=counties, assignments=assignments, state_totals=state_totals, mean_overlaps=False ) # --- Export results --- # local_dir = pathlib.Path(pudl_settings['data_dir']) / 'local' ventyx_path = local_dir / 'ventyx/state_level_load_2007_2018.csv' base_dir = local_dir / 'state-demand' base_dir.mkdir(parents=True, exist_ok=True) demand_path = base_dir / 'demand.csv' stats_path = base_dir / 'demand-stats.csv' timeseries_dir = base_dir / 'timeseries' timeseries_dir.mkdir(parents=True, exist_ok=True) scatter_dir = base_dir / 'scatter' scatter_dir.mkdir(parents=True, exist_ok=True) # Write predicted hourly state demand prediction.to_csv( demand_path, index=False, date_format='%Y%m%dT%H', float_format='%.1f' ) # Load Ventyx as reference if available reference = None if ventyx_path.exists(): reference = load_ventyx_hourly_state_demand(ventyx_path) # Plots and statistics stats = [] for fips in prediction['state_id_fips'].unique(): state = lookup_state(fips) # Filter demand by state a = prediction.query(f"state_id_fips == '{fips}'") b = None title = f'{state["fips"]}: {state["name"]} ({state["code"]})' plot_name = f'{state["fips"]}-{state["name"]}.png' if reference is not None: b = reference.query(f"state_id_fips == '{fips}'") # Save timeseries plot plot_demand_timeseries( a, b=b, window=168, title=title, path=timeseries_dir / plot_name ) if b is None or b.empty: continue # Align predicted and reference demand a = a.set_index('utc_datetime') b = b.set_index('utc_datetime') index = a.index.intersection(b.index) a = a.loc[index].reset_index() b = b.loc[index].reset_index() # Compute statistics stat = compare_state_demand(a, b, scaled=True) stat['state_id_fips'] = fips stats.append(stat) # Save scatter plot plot_demand_scatter(a, b=b, title=title, path=scatter_dir / plot_name) # Write statistics if reference is not None: pd.concat(stats, ignore_index=True).to_csv( stats_path, index=False, float_format='%.1f' )
if __name__ == '__main__': sys.exit(main())