Source code for pudl.extract.epacems

"""
Retrieve data from EPA CEMS hourly zipped CSVs.

This modules pulls data from EPA's published CSV files.
"""
import logging

import pandas as pd

import pudl.constants as pc
import pudl.workspace.datastore as datastore

logger = logging.getLogger(__name__)


[docs]def read_cems_csv(filename): """Reads one CEMS CSV file. Note that some columns are not read. See epacems_columns_to_ignores. Args: filename (str): The name of the file to be read Returns: pandas.DataFrame: A DataFrame containing the contents of the CSV file. """ df = pd.read_csv( filename, index_col=False, usecols=lambda col: col not in pc.epacems_columns_to_ignore, dtype=pc.epacems_csv_dtypes, ).rename(columns=pc.epacems_rename_dict) return df
[docs]def extract(epacems_years, states, data_dir): """ Coordinate the extraction of EPA CEMS hourly DataFrames. Args: epacems_years (list): list of years from which we are trying to read CEMS data states (list): list of states from which we are trying to read CEMS data data_dir (path-like): Path to the top directory of the PUDL datastore. Yields: dict: a dictionary of States (keys) and DataFrames of CEMS data (values) Todo: This is really slow. Can we do some parallel processing? """ for year in epacems_years: # The keys of the us_states dictionary are the state abbrevs for state in states: dfs = [] for month in range(1, 13): filename = datastore.path('epacems', year=year, month=month, state=state, data_dir=data_dir) logger.info( f"Performing ETL for EPA CEMS hourly " f"{state}-{year}-{month:02}") dfs.append(read_cems_csv(filename)) # Return a dictionary where the key identifies this dataset # (just like the other extract functions), but unlike the # others, this is yielded as a generator (and it's a one-item # dictionary). yield { (year, state): pd.concat(dfs, sort=True, copy=False, ignore_index=True) }