"""A command line interface (CLI) to the main PUDL ETL functionality.
This script cordinates the PUDL ETL process, based on parameters provided via a YAML
settings file.
If the settings for a dataset has empty parameters (meaning there are no years or tables
included), no outputs will be generated. See :doc:`/dev/run_the_etl` for details.
The output SQLite and Parquet files will be stored in ``PUDL_OUTPUT``. To
setup your default ``PUDL_INPUT`` and ``PUDL_OUTPUT`` directories see
``pudl_setup --help``.
"""
import argparse
import sys
from collections.abc import Callable
import fsspec
from dagster import (
DagsterInstance,
Definitions,
JobDefinition,
build_reconstructable_job,
define_asset_job,
execute_job,
)
import pudl
from pudl.settings import EpaCemsSettings, EtlSettings
from pudl.workspace.setup import PudlPaths
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
def parse_command_line(argv):
"""Parse script command line arguments. See the -h option.
Args:
argv (list): command line arguments including caller file name.
Returns:
dict: A dictionary mapping command line arguments to their values.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
dest="settings_file", type=str, default="", help="path to ETL settings file."
)
parser.add_argument(
"--logfile",
default=None,
help="If specified, write logs to this file.",
)
parser.add_argument(
"--gcs-cache-path",
type=str,
help="Load datastore resources from Google Cloud Storage. Should be gs://bucket[/path_prefix]",
)
parser.add_argument(
"--loglevel",
help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).",
default="INFO",
)
parser.add_argument(
"--max-concurrent",
help="Set the max number of processes dagster can launch. Defaults to use the number of CPUs on the machine.",
default=0,
)
arguments = parser.parse_args(argv[1:])
return arguments
[docs]
def pudl_etl_job_factory(
logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True
) -> Callable[[], JobDefinition]:
"""Factory for parameterizing a reconstructable pudl_etl job.
Args:
loglevel: The log level for the job's execution.
logfile: Path to a log file for the job's execution.
process_epacems: Include EPA CEMS assets in the job execution.
Returns:
The job definition to be executed.
"""
def get_pudl_etl_job():
"""Create an pudl_etl_job wrapped by to be wrapped by reconstructable."""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
jobs = [define_asset_job("etl_job")]
if not process_epacems:
jobs = [
define_asset_job(
"etl_job",
selection=pudl.etl.create_non_cems_selection(
pudl.etl.default_assets
),
)
]
return Definitions(
assets=pudl.etl.default_assets,
resources=pudl.etl.default_resources,
jobs=jobs,
).get_job_def("etl_job")
return get_pudl_etl_job
[docs]
def main():
"""Parse command line and initialize PUDL DB."""
args = parse_command_line(sys.argv)
# Display logged output from the PUDL package:
pudl.logging_helpers.configure_root_logger(
logfile=args.logfile, loglevel=args.loglevel
)
etl_settings = EtlSettings.from_yaml(args.settings_file)
dataset_settings_config = etl_settings.datasets.model_dump()
process_epacems = True
if etl_settings.datasets.epacems is None:
process_epacems = False
# Dagster config expects values for the epacems settings even though
# the CEMS assets will not be executed. Fill in the config dictionary
# with default cems values. Replace this workaround once dagster pydantic
# config classes are available.
dataset_settings_config["epacems"] = EpaCemsSettings().model_dump()
pudl_etl_reconstructable_job = build_reconstructable_job(
"pudl.cli.etl",
"pudl_etl_job_factory",
reconstructable_kwargs={
"loglevel": args.loglevel,
"logfile": args.logfile,
"process_epacems": process_epacems,
},
)
result = execute_job(
pudl_etl_reconstructable_job,
instance=DagsterInstance.get(),
run_config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": int(args.max_concurrent),
},
}
},
"resources": {
"dataset_settings": {"config": dataset_settings_config},
"datastore": {
"config": {
"gcs_cache_path": args.gcs_cache_path
if args.gcs_cache_path
else "",
},
},
},
},
)
# Workaround to reliably getting full stack trace
if not result.success:
for event in result.all_events:
if event.event_type_value == "STEP_FAILURE":
raise Exception(event.event_specific_data.error)
else:
logger.info("ETL job completed successfully, publishing outputs.")
for output_path in etl_settings.publish_destinations:
logger.info(f"Publishing outputs to {output_path}")
fs, _, _ = fsspec.get_fs_token_paths(output_path)
fs.put(
PudlPaths().output_dir,
output_path,
recursive=True,
)
if __name__ == "__main__":
sys.exit(main())