Source code for pudl.transform.gridpathratoolkit

"""Transformations of the GridPath RA Toolkit renewable generation profiles.

Wind and solar profiles are extracted separately, but concatenated into a single table
in this module, as they have exactly the same structure. The generator aggregation group
association tables for various technology types are also concatenated together.
"""

import pandas as pd
from dagster import AssetCheckResult, AssetIn, asset, asset_check

import pudl


[docs] def _transform_capacity_factors( capacity_factors: pd.DataFrame, utc_offset: pd.Timedelta ) -> pd.DataFrame: """Basic transformations that can be applied to many profiles. - Construct a datetime column and adjust it to be in UTC. - Reshape the table from wide to tidy format. - Name columns appropriately. """ # Convert timestamp to UTC before stacking! # TODO: Ensure that we're using the same hour starting vs. ending convention # here as we use in the FERC-714 and EPA CEMS data. Should this be reflected in # the column names here and elsewhere, whatever the convention is? datetime_cols = ["year", "month", "day", "hour"] capacity_factors["datetime_utc"] = ( pd.DatetimeIndex(pd.to_datetime(capacity_factors.loc[:, datetime_cols])) - utc_offset ) capacity_factors = ( capacity_factors.drop(columns=datetime_cols) .set_index("datetime_utc") .stack() .reset_index() ) capacity_factors.columns = ["datetime_utc", "aggregation_group", "capacity_factor"] capacity_factors = capacity_factors.astype({"aggregation_group": "string"}) return capacity_factors
@asset( io_manager_key="parquet_io_manager", compute_kind="Python", op_tags={"memory-use": "high"}, )
[docs] def out_gridpathratoolkit__hourly_available_capacity_factor( raw_gridpathratoolkit__aggregated_extended_solar_capacity: pd.DataFrame, raw_gridpathratoolkit__aggregated_extended_wind_capacity: pd.DataFrame, ) -> pd.DataFrame: """Transform raw GridPath RA Toolkit renewable generation profiles. Concatenates the solar and wind capacity factors into a single table and turns the aggregation key into a categorical column to save space. Note that this transform is a bit unusual, in that it is producing a highly processed output table. That's because we're working backwards from an archived finished product to be able to provide a minimum viable product. Our intent is to integrate or reimplement the steps required to produce this output table from less processed original inputs in the future. """ pacific_standard_time = pd.Timedelta("-8h") return pd.concat( [ _transform_capacity_factors( capacity_factors=raw_df, utc_offset=pacific_standard_time ) for raw_df in [ raw_gridpathratoolkit__aggregated_extended_solar_capacity, raw_gridpathratoolkit__aggregated_extended_wind_capacity, ] if not raw_df.empty ] ).astype({"aggregation_group": pd.CategoricalDtype()})
[docs] def _transform_aggs(raw_agg: pd.DataFrame) -> pd.DataFrame: """Transform raw GridPath RA Toolkit generator aggregations. - split EIA_UniqueID into plant + generator IDs - rename columns to use PUDL conventions - verify that split-out plant IDs always match reported plant IDs - Set column dtypes """ # Get rid of non-alphanumeric characters, and convert to lowercase agg = pudl.helpers.simplify_columns(raw_agg) # Split the eia_uniqueid into plant and generator IDs # Should always be an integer (plant id) followed by an underscore, with anything # after the underscore being the generator ID (a string). separate_ids = agg["eia_uniqueid"].str.split("_", n=1, expand=True) separate_ids.columns = ["plant_id_eia", "generator_id"] separate_ids = separate_ids.astype({"plant_id_eia": int, "generator_id": "string"}) agg = pd.concat([agg, separate_ids], axis="columns") # Ensure no mismatch between EIA plant IDs. Only applies to wind data. if "eia_plantcode" in agg.columns: assert agg.loc[agg.eia_plantcode != agg.plant_id_eia].empty agg = ( agg.drop( columns=[ col for col in ["eia_plantcode", "eia_uniqueid"] if col in agg.columns ] ) .rename( columns={ "gp_aggregation": "aggregation_group", "eia_nameplatecap": "capacity_mw", "include": "include_generator", } ) .astype({"include_generator": "boolean", "aggregation_group": "string"}) .pipe( pudl.helpers.remove_leading_zeros_from_numeric_strings, col_name="generator_id", ) ) # Plant 64272 does not exist in the PUDL data. However, plant 64481 is the only # plant that has a generator_id of "AEC1", and its solar, and has the same capacity # as the GridPath RA Toolkit expects for plant 64272, and also has generator AEC2 # which shows up in the GridPath RA Toolkit data in association with plant 64481. agg.loc[ (agg.plant_id_eia == 64272) & (agg.generator_id == "AEC1"), "plant_id_eia" ] = 64481 return agg
@asset( io_manager_key="pudl_io_manager", compute_kind="Python", )
[docs] def core_gridpathratoolkit__assn_generator_aggregation_group( raw_gridpathratoolkit__wind_capacity_aggregations: pd.DataFrame, raw_gridpathratoolkit__solar_capacity_aggregations: pd.DataFrame, ) -> pd.DataFrame: """Transform and combine raw GridPath RA Toolkit generator aggregations.""" return pd.concat( [ _transform_aggs(agg_df) for agg_df in [ raw_gridpathratoolkit__wind_capacity_aggregations, raw_gridpathratoolkit__solar_capacity_aggregations, ] if not agg_df.empty ] )
@asset_check( asset="out_gridpathratoolkit__hourly_available_capacity_factor", additional_ins={ "aggs": AssetIn("core_gridpathratoolkit__assn_generator_aggregation_group") }, blocking=True, )
[docs] def check_valid_aggregation_groups( out_gridpathratoolkit__hourly_available_capacity_factor, aggs: pd.DataFrame, ) -> AssetCheckResult: """Check that every capacity factor aggregation key appears in the aggregations. This isn't a normal foreign-key relationship, since the aggregation group isn't the primary key in the aggregation tables, and is not unique in either of these tables, but if an aggregation group appears in the capacity factor time series and never appears in the aggregation table, then something is wrong. """ missing_agg_groups = set( out_gridpathratoolkit__hourly_available_capacity_factor.aggregation_group.unique() ).difference(set(aggs.aggregation_group.unique())) return AssetCheckResult(passed=missing_agg_groups == set())