"""
Run the PUDL ETL Pipeline.
The PUDL project integrates several different public datasets into a well
normalized relational database allowing easier access and interaction between all
datasets. This module coordinates the extract/transfrom/load process for
data from:
- US Energy Information Agency (EIA):
- Form 860 (eia860)
- Form 923 (eia923)
- US Federal Energy Regulatory Commission (FERC):
- Form 1 (ferc1)
- US Environmental Protection Agency (EPA):
- Continuous Emissions Monitory System (epacems)
"""
import logging
import time
from pathlib import Path
from typing import Any, Dict
import pandas as pd
import sqlalchemy as sa
import pudl
from pudl import constants as pc
from pudl.metadata import RESOURCE_METADATA
from pudl.metadata.codes import (CONTRACT_TYPES_EIA, ENERGY_SOURCES_EIA,
FUEL_TRANSPORTATION_MODES_EIA,
FUEL_TYPES_AER_EIA, PRIME_MOVERS_EIA,
SECTOR_CONSOLIDATED_EIA)
from pudl.metadata.dfs import FERC_ACCOUNTS, FERC_DEPRECIATION_LINES
from pudl.settings import (EiaSettings, EpaCemsSettings, EtlSettings,
Ferc1Settings, GlueSettings)
from pudl.workspace.datastore import Datastore
[docs]logger = logging.getLogger(__name__)
###############################################################################
# EIA EXPORT FUNCTIONS
###############################################################################
[docs]def _read_static_tables_eia() -> Dict[str, pd.DataFrame]:
"""Build dataframes of static EIA tables for use as foreign key constraints.
There are many values specified within the data that are essentially
constant, but which we need to store for data validation purposes, for use
as foreign keys. E.g. the list of valid EIA fuel type codes, or the
possible state and country codes indicating a coal delivery's location of
origin. For now these values are primarily stored in a large collection of
lists, dictionaries, and dataframes which are specified in the
:mod:`pudl.constants` module.
"""
return {
'energy_sources_eia': ENERGY_SOURCES_EIA["df"],
'fuel_types_aer_eia': FUEL_TYPES_AER_EIA["df"],
'prime_movers_eia': PRIME_MOVERS_EIA["df"],
'sector_consolidated_eia': SECTOR_CONSOLIDATED_EIA["df"],
'fuel_transportation_modes_eia': FUEL_TRANSPORTATION_MODES_EIA["df"],
'contract_types_eia': CONTRACT_TYPES_EIA["df"]
}
[docs]def _etl_eia(
etl_settings: EiaSettings,
ds_kwargs: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
"""Extract, transform and load CSVs for the EIA datasets.
Args:
etl_settings: Validated ETL parameters required by this data source.
ds_kwargs: Keyword arguments for instantiating a PUDL datastore,
so that the ETL can access the raw input data.
Returns:
A dictionary of EIA dataframes ready for loading into the PUDL DB.
"""
eia860_tables = etl_settings.eia860.tables
eia860_years = etl_settings.eia860.years
eia860m = etl_settings.eia860.eia860m
eia923_tables = etl_settings.eia923.tables
eia923_years = etl_settings.eia923.years
if (
(not eia923_tables or not eia923_years)
and (not eia860_tables or not eia860_years)
):
logger.info('Not loading EIA.')
return []
# generate dataframes for the static EIA tables
out_dfs = _read_static_tables_eia()
ds = Datastore(**ds_kwargs)
# Extract EIA forms 923, 860
eia923_raw_dfs = pudl.extract.eia923.Extractor(ds).extract(
year=eia923_years)
eia860_raw_dfs = pudl.extract.eia860.Extractor(ds).extract(
year=eia860_years)
# if we are trying to add the EIA 860M YTD data, then extract it and append
if eia860m:
eia860m_raw_dfs = pudl.extract.eia860m.Extractor(ds).extract(
year_month=pc.WORKING_PARTITIONS['eia860m']['year_month'])
eia860_raw_dfs = pudl.extract.eia860m.append_eia860m(
eia860_raw_dfs=eia860_raw_dfs, eia860m_raw_dfs=eia860m_raw_dfs)
# Transform EIA forms 923, 860
eia860_transformed_dfs = pudl.transform.eia860.transform(
eia860_raw_dfs, eia860_tables=eia860_tables)
eia923_transformed_dfs = pudl.transform.eia923.transform(
eia923_raw_dfs, eia923_tables=eia923_tables)
# create an eia transformed dfs dictionary
eia_transformed_dfs = eia860_transformed_dfs.copy()
eia_transformed_dfs.update(eia923_transformed_dfs.copy())
# convert types..
eia_transformed_dfs = pudl.helpers.convert_dfs_dict_dtypes(
eia_transformed_dfs, 'eia')
entities_dfs, eia_transformed_dfs = pudl.transform.eia.transform(
eia_transformed_dfs,
eia860_years=eia860_years,
eia923_years=eia923_years,
eia860m=eia860m,
)
# convert types..
entities_dfs = pudl.helpers.convert_dfs_dict_dtypes(entities_dfs, 'eia')
for table in entities_dfs:
entities_dfs[table] = PUDL_META.get_resource(table).encode(entities_dfs[table])
out_dfs.update(entities_dfs)
out_dfs.update(eia_transformed_dfs)
return out_dfs
###############################################################################
# FERC1 EXPORT FUNCTIONS
###############################################################################
[docs]def _read_static_tables_ferc1() -> Dict[str, pd.DataFrame]:
"""Populate static PUDL tables with constants for use as foreign keys.
There are many values specified within the data that are essentially
constant, but which we need to store for data validation purposes, for use
as foreign keys. E.g. the list of valid EIA fuel type codes, or the
possible state and country codes indicating a coal delivery's location of
origin. For now these values are primarily stored in a large collection of
lists, dictionaries, and dataframes which are specified in the
pudl.constants module. This function uses those data structures to
populate a bunch of small infrastructural tables within the PUDL DB.
"""
return {
'ferc_accounts': FERC_ACCOUNTS[[
"ferc_account_id",
"ferc_account_description",
]],
'ferc_depreciation_lines': FERC_DEPRECIATION_LINES[[
"line_id",
"ferc_account_description",
]],
}
[docs]def _etl_ferc1(
etl_settings: Ferc1Settings,
pudl_settings: Dict[str, Any],
) -> Dict[str, pd.DataFrame]:
"""Extract, transform and load CSVs for FERC Form 1.
Args:
etl_settings: Validated ETL parameters required by this data source.
pudl_settings: a dictionary filled with settings that mostly
describe paths to various resources and outputs.
Returns:
Dataframes containing PUDL database tables pertaining to the FERC Form 1
data, keyed by table name.
"""
ferc1_years = etl_settings.years
ferc1_tables = etl_settings.tables
if not ferc1_years or not ferc1_tables:
logger.info('Not loading FERC1')
return []
# Compile static FERC 1 dataframes
out_dfs = _read_static_tables_ferc1()
# Extract FERC form 1
ferc1_raw_dfs = pudl.extract.ferc1.extract(
ferc1_tables=ferc1_tables,
ferc1_years=ferc1_years,
pudl_settings=pudl_settings)
# Transform FERC form 1
ferc1_transformed_dfs = pudl.transform.ferc1.transform(
ferc1_raw_dfs, ferc1_tables=ferc1_tables)
out_dfs.update(ferc1_transformed_dfs)
return out_dfs
###############################################################################
# EPA CEMS EXPORT FUNCTIONS
###############################################################################
[docs]def etl_epacems(
etl_settings: EpaCemsSettings,
pudl_settings: Dict[str, Any],
ds_kwargs: Dict[str, Any],
) -> None:
"""Extract, transform and load CSVs for EPA CEMS.
Args:
etl_settings: Validated ETL parameters required by this data source.
pudl_settings: a dictionary filled with settings that mostly describe paths to
various resources and outputs.
ds_kwargs: Keyword arguments for instantiating a PUDL datastore, so that the ETL
can access the raw input data.
Returns:
Unlike the other ETL functions, the EPACEMS writes its output to Parquet as it
goes, since the dataset is too large to hold in memory. So it doesn't return a
dictionary of dataframes.
"""
epacems_years = etl_settings.years
epacems_states = etl_settings.states
# If we're not doing CEMS, just stop here to avoid printing messages like
# "Reading EPA CEMS data...", which could be confusing.
if not epacems_states or not epacems_years:
logger.info('Not ingesting EPA CEMS.')
pudl_engine = sa.create_engine(pudl_settings["pudl_db"])
# Verify that we have a PUDL DB with plant attributes:
inspector = sa.inspect(pudl_engine)
if "plants_eia860" not in inspector.get_table_names():
raise RuntimeError(
"No plants_eia860 available in the PUDL DB! Have you run the ETL? "
f"Trying to access PUDL DB: {pudl_engine}"
)
eia_plant_years = pd.read_sql(
"""
SELECT DISTINCT strftime('%Y', report_date)
AS year
FROM plants_eia860
ORDER BY year ASC
""", pudl_engine).year.astype(int)
missing_years = list(set(epacems_years) - set(eia_plant_years))
if missing_years:
logger.info(
f"EPA CEMS years with no EIA plant data: {missing_years} "
"Some timezones may be estimated based on plant state."
)
# NOTE: This is a generator for raw dataframes
epacems_raw_dfs = pudl.extract.epacems.extract(
epacems_years, epacems_states, Datastore(**ds_kwargs))
# NOTE: This is a generator for transformed dataframes
epacems_transformed_dfs = pudl.transform.epacems.transform(
epacems_raw_dfs=epacems_raw_dfs,
pudl_engine=pudl_engine,
)
logger.info("Processing EPA CEMS data and writing it to Apache Parquet.")
if logger.isEnabledFor(logging.INFO):
start_time = time.monotonic()
# run the cems generator dfs through the load step
for df in epacems_transformed_dfs:
pudl.load.parquet.epacems_to_parquet(
df,
root_path=Path(pudl_settings["parquet_dir"]) / "epacems",
)
if logger.isEnabledFor(logging.INFO):
delta_t = time.strftime("%H:%M:%S", time.gmtime(
time.monotonic() - start_time))
time_message = f"Processing EPA CEMS took {delta_t}"
logger.info(time_message)
start_time = time.monotonic()
###############################################################################
# GLUE EXPORT FUNCTIONS
###############################################################################
[docs]def _etl_glue(etl_settings: GlueSettings) -> Dict[str, pd.DataFrame]:
"""Extract, transform and load CSVs for the Glue tables.
Args:
etl_settings (GlueSettings): Validated ETL parameters required by this data source.
Returns:
dict: A dictionary of :class:`pandas.Dataframe` whose keys are the names
of the corresponding database table.
"""
# grab the glue tables for ferc1 & eia
glue_dfs = pudl.glue.ferc1_eia.glue(
ferc1=etl_settings.ferc1,
eia=etl_settings.eia,
)
# Add the EPA to EIA crosswalk, but only if the eia data is being processed.
# Otherwise the foreign key references will have nothing to point at:
if etl_settings.eia:
glue_dfs.update(pudl.glue.eia_epacems.grab_clean_split())
return glue_dfs
###############################################################################
# Coordinating functions
###############################################################################
[docs]def etl( # noqa: C901
etl_settings: EtlSettings,
pudl_settings: Dict,
clobber: bool = False,
use_local_cache: bool = True,
gcs_cache_path: str = None,
check_foreign_keys: bool = True,
check_types: bool = True,
check_values: bool = True,
):
"""
Run the PUDL Extract, Transform, and Load data pipeline.
First we validate the settings, and then process data destined for loading
into SQLite, which includes The FERC Form 1 and the EIA Forms 860 and 923.
Once those data have been output to SQLite we mvoe on to processing the
long tables, which will be loaded into Apache Parquet files. Some of this
processing depends on data that's already been loaded into the SQLite DB.
Args:
etl_settings: settings that describe datasets to be loaded.
pudl_settings: a dictionary filled with settings that mostly
describe paths to various resources and outputs.
clobber: If True and there is already a pudl.sqlite database
it will be deleted and a new one will be created.
use_local_cache: controls whether datastore should be using local
file cache.
gcs_cache_path: controls whether datastore should be using Google
Cloud Storage based cache.
Returns:
None
"""
pudl_db_path = Path(pudl_settings["sqlite_dir"]) / "pudl.sqlite"
if pudl_db_path.exists() and not clobber:
raise SystemExit(
"The PUDL DB already exists, and we don't want to clobber it.\n"
f"Move {pudl_db_path} aside or set clobber=True and try again."
)
# Configure how we want to obtain raw input data:
ds_kwargs = dict(
gcs_cache_path=gcs_cache_path,
sandbox=pudl_settings.get("sandbox", False)
)
if use_local_cache:
ds_kwargs["local_cache_path"] = Path(pudl_settings["pudl_in"]) / "data"
validated_etl_settings = etl_settings.datasets
# Check for existing EPA CEMS outputs if we're going to process CEMS, and
# do it before running the SQLite part of the ETL so we don't do a bunch of
# work only to discover that we can't finish.
datasets = validated_etl_settings.get_datasets()
if "epacems" in datasets.keys():
epacems_pq_path = Path(pudl_settings["parquet_dir"]) / "epacems"
_ = pudl.helpers.prep_dir(epacems_pq_path, clobber=clobber)
sqlite_dfs = {}
# This could be cleaner if we simplified the settings file format:
if datasets.get("ferc1", False):
sqlite_dfs.update(_etl_ferc1(datasets["ferc1"], pudl_settings))
if datasets.get("eia", False):
sqlite_dfs.update(_etl_eia(datasets["eia"], ds_kwargs))
if datasets.get("glue", False):
sqlite_dfs.update(_etl_glue(datasets["glue"]))
# Load the ferc1 + eia data directly into the SQLite DB:
pudl_engine = sa.create_engine(pudl_settings["pudl_db"])
pudl.load.sqlite.dfs_to_sqlite(
sqlite_dfs,
engine=pudl_engine,
check_foreign_keys=check_foreign_keys,
check_types=check_types,
check_values=check_values,
)
# Parquet Outputs:
if datasets.get("epacems", False):
etl_epacems(datasets["epacems"], pudl_settings, ds_kwargs)