Source code for pudl.convert.epacems_to_parquet

"""A script for converting the EPA CEMS dataset from gzip to Apache Parquet.

The original EPA CEMS data is available as ~12,000 gzipped CSV files, one for
each month for each state, from 1995 to the present. On disk they take up
about 7.3 GB of space, compressed. Uncompressed it is closer to 100 GB. That's
too much data to work with in memory.

Apache Parquet is a compressed, columnar datastore format, widely used in Big
Data applications. It's an open standard, and is very fast to read from disk.
It works especially well with both `Dask dataframes <https://dask.org/>`__ (a
parallel / distributed computing extension of pandas) and Apache Spark (a cloud
based Big Data processing pipeline system.)

Since pulling 100 GB of data into SQLite takes a long time, and working with
that data en masse isn't particularly pleasant on a laptop, this script can be
used to convert the original EPA CEMS data to the more widely usable Apache
Parquet format for use with Dask, either on a multi-core workstation or in an
interactive cloud computing environment like `Pangeo <https://pangeo.io>`__.

"""

import argparse
import logging
import pathlib
import sys
from functools import partial

import coloredlogs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

import pudl
import pudl.constants as pc

# Because this is an entry point module for a script, we want to gather all the
# pudl output, not just from this module, hence the pudl.__name__
logger = logging.getLogger(__name__)


[docs]def create_in_dtypes(): """ Create a dictionary of input data types. This specifies the dtypes of the input columns, which is necessary for some cases where, e.g., a column is always NaN. Returns: dict: mapping columns names to :mod:`pandas` data types. """ # These measurement codes are used by all four of our measurement variables common_codes = ( "LME", "Measured", "Measured and Substitute", "Other", "Substitute", "Undetermined", "Unknown Code", "", ) co2_so2_cats = pd.CategoricalDtype(categories=common_codes, ordered=False) nox_cats = pd.CategoricalDtype( categories=common_codes + ("Calculated",), ordered=False ) state_cats = pd.CategoricalDtype( categories=pc.cems_states.keys(), ordered=False) in_dtypes = { "state": state_cats, "plant_id_eia": "int32", "unitid": "str", # "operating_datetime_utc": "datetime", "operating_time_hours": "float32", "gross_load_mw": "float32", "steam_load_1000_lbs": "float32", "so2_mass_lbs": "float32", "so2_mass_measurement_code": co2_so2_cats, "nox_rate_lbs_mmbtu": "float32", "nox_rate_measurement_code": nox_cats, "nox_mass_lbs": "float32", "nox_mass_measurement_code": nox_cats, "co2_mass_tons": "float32", "co2_mass_measurement_code": co2_so2_cats, "heat_content_mmbtu": "float32", # Note: it'd be better to use pandas' nullable integers once this issue # is resolved: https://issues.apache.org/jira/browse/ARROW-5379 # "facility_id": "Int32", # "unit_id_epa": "Int32", "facility_id": "float32", "unit_id_epa": "float32", } return in_dtypes
[docs]def create_cems_schema(): """Make an explicit Arrow schema for the EPA CEMS data. Make changes in the types of the generated parquet files by editing this function. Note that parquet's internal representation doesn't use unsigned numbers or 16-bit ints, so just keep things simple here and always use int32 and float32. Returns: :mod:`pyarrow.schema`: An Arrow schema for the EPA CEMS data. """ int_nullable = partial(pa.field, type=pa.int32(), nullable=True) int_not_null = partial(pa.field, type=pa.int32(), nullable=False) str_not_null = partial(pa.field, type=pa.string(), nullable=False) # Timestamp resolution is hourly, but millisecond is the largest allowed. timestamp = partial(pa.field, type=pa.timestamp( "ms", tz="utc"), nullable=False) float_nullable = partial(pa.field, type=pa.float32(), nullable=True) float_not_null = partial(pa.field, type=pa.float32(), nullable=False) # (float32 can accurately hold integers up to 16,777,216 so no need for # float64) dict_nullable = partial( pa.field, type=pa.dictionary(pa.int8(), pa.string(), ordered=False), nullable=True ) return pa.schema([ int_not_null("year"), dict_nullable("state"), int_not_null("plant_id_eia"), str_not_null("unitid"), timestamp("operating_datetime_utc"), float_nullable("operating_time_hours"), float_not_null("gross_load_mw"), float_nullable("steam_load_1000_lbs"), float_nullable("so2_mass_lbs"), dict_nullable("so2_mass_measurement_code"), float_nullable("nox_rate_lbs_mmbtu"), dict_nullable("nox_rate_measurement_code"), float_nullable("nox_mass_lbs"), dict_nullable("nox_mass_measurement_code"), float_nullable("co2_mass_tons"), dict_nullable("co2_mass_measurement_code"), float_not_null("heat_content_mmbtu"), int_nullable("facility_id"), int_nullable("unit_id_epa"), ])
[docs]def epacems_to_parquet(datapkg_path, epacems_years, epacems_states, out_dir, compression='snappy', partition_cols=('year', 'state'), clobber=False): """Take transformed EPA CEMS dataframes and output them as Parquet files. We need to do a few additional manipulations of the dataframes after they have been transformed by PUDL to get them ready for output to the Apache Parquet format. Mostly this has to do with ensuring homogeneous data types across all of the dataframes, and downcasting to the most efficient data type possible for each of them. We also add a 'year' column so that we can partition the datset on disk by year as well as state. (Year partitions follow the CEMS input data, based on local plant time. The operating_datetime_utc identifies time in UTC, so there's a mismatch of a few hours on December 31 / January 1.) Args: datapkg_path (path-like): Path to the datapackage.json file describing the datapackage contaning the EPA CEMS data to be converted. epacems_years (list): list of years from which we are trying to read CEMS data epacems_states (list): list of years from which we are trying to read CEMS data out_dir (path-like): The directory in which to output the Parquet files compression (string): partition_cols (tuple): clobber (bool): If True and there is already a directory with out_dirs name, the existing parquet files will be deleted and new ones will be generated in their place. Raises: AssertionError: Raised if an output directory is not specified. Todo: Return to """ if not out_dir: raise AssertionError("Required output directory not specified.") out_dir = pudl.helpers.prep_dir(out_dir, clobber=clobber) data_dir = pathlib.Path(datapkg_path).parent / "data" # Verify that all the requested data files are present: epacems_years = list(epacems_years) epacems_years.sort() epacems_states = list(epacems_states) epacems_states.sort() for year in epacems_years: for state in epacems_states: newpath = pathlib.Path( data_dir, f"hourly_emissions_epacems_{year}_{state.lower()}.csv.gz") if not newpath.is_file(): raise FileNotFoundError(f"EPA CEMS file not found: {newpath}") # TODO: Rather than going directly to the data directory, we should really # use the metadata inside the datapackage to find the appropriate file # paths pertaining to the CEMS years/states of interest. in_types = create_in_dtypes() schema = create_cems_schema() for year in epacems_years: for state in epacems_states: newpath = pathlib.Path( data_dir, f"hourly_emissions_epacems_{year}_{state.lower()}.csv.gz") df = ( pd.read_csv(newpath, dtype=in_types, parse_dates=["operating_datetime_utc"]) .assign(year=year) ) logger.info(f"{year}-{state}: {len(df)} records") pq.write_to_dataset( pa.Table.from_pandas( df, preserve_index=False, schema=schema), root_path=str(out_dir), partition_cols=list(partition_cols), compression=compression)
[docs]def parse_command_line(argv): """ Parse command line arguments. See the -h option. Args: argv (str): Command line arguments, including caller filename. Returns: dict: Dictionary of command line arguments and their parsed values. """ parser = argparse.ArgumentParser(description=__doc__) defaults = pudl.workspace.setup.get_defaults() parser.add_argument( 'datapkg', type=str, help="""Path to the datapackage.json file describing the datapackage that contains the CEMS data to be converted.""", ) parser.add_argument( '-z', '--compression', type=str, choices=["gzip", "snappy"], help="""Compression algorithm to use for Parquet files. Can be either 'snappy' (much faster but larger files) or 'gzip' (slower but better compression). (default: %(default)s).""", default='snappy' ) parser.add_argument( '-i', '--pudl_in', type=str, help="""Path to the top level datastore directory. (default: %(default)s).""", default=defaults["pudl_in"], ) parser.add_argument( '-o', '--pudl_out', type=str, help="""Path to the pudl output directory. (default: %(default)s).""", default=str(defaults["pudl_out"]) ) parser.add_argument( '-y', '--years', nargs='+', type=int, help="""Which years of EPA CEMS data should be converted to Apache Parquet format. Default is all available years, ranging from 1995 to the present. Note that data is typically incomplete before ~2000.""", default=pc.data_years['epacems'] ) parser.add_argument( '-s', '--states', nargs='+', type=str.upper, help="""Which states EPA CEMS data should be converted to Apache Parquet format, as a list of two letter US state abbreviations. Default is everything: all 48 continental US states plus Washington DC.""", default=pc.cems_states.keys() ) parser.add_argument( '-c', '--clobber', action='store_true', help="""Clobber existing parquet files if they exist. If clobber is not included but the parquet directory already exists the _build will fail.""", default=False) arguments = parser.parse_args(argv[1:]) return arguments
[docs]def main(): """Convert zipped EPA CEMS Hourly data to Apache Parquet format.""" # Display logged output from the PUDL package: logger = logging.getLogger(pudl.__name__) log_format = '%(asctime)s [%(levelname)8s] %(name)s:%(lineno)s %(message)s' coloredlogs.install(fmt=log_format, level='INFO', logger=logger) args = parse_command_line(sys.argv) pudl_settings = pudl.workspace.setup.derive_paths( pudl_in=args.pudl_in, pudl_out=args.pudl_out) epacems_to_parquet(datapkg_path=pathlib.Path(args.datapkg), epacems_years=args.years, epacems_states=args.states, out_dir=pathlib.Path( pudl_settings['parquet_dir'], "epacems"), compression=args.compression, partition_cols=('year', 'state'), clobber=args.clobber)
if __name__ == '__main__': sys.exit(main())