Source code for pudl.etl.cli

"""A command line interface (CLI) to the main PUDL ETL functionality."""

import pathlib
import sys
from collections.abc import Callable

import click
import fsspec
from dagster import (
    DagsterInstance,
    Definitions,
    JobDefinition,
    build_reconstructable_job,
    define_asset_job,
    execute_job,
)

import pudl
from pudl.helpers import get_dagster_execution_config
from pudl.settings import EpaCemsSettings, EtlSettings
from pudl.workspace.setup import PudlPaths

[docs] logger = pudl.logging_helpers.get_logger(__name__)
[docs] def pudl_etl_job_factory( logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True ) -> Callable[[], JobDefinition]: """Factory for parameterizing a reconstructable pudl_etl job. Args: loglevel: The log level for the job's execution. logfile: Path to a log file for the job's execution. process_epacems: Include EPA CEMS assets in the job execution. Returns: The job definition to be executed. """ def get_pudl_etl_job(): """Create an pudl_etl_job wrapped by to be wrapped by reconstructable.""" pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel) jobs = [define_asset_job("etl_job")] if not process_epacems: jobs = [ define_asset_job( "etl_job", selection=pudl.etl.create_non_cems_selection( pudl.etl.default_assets ), ) ] return Definitions( assets=pudl.etl.default_assets, resources=pudl.etl.default_resources, jobs=jobs, ).get_job_def("etl_job") return get_pudl_etl_job
@click.command( context_settings={"help_option_names": ["-h", "--help"]}, ) @click.argument( "etl_settings_yml", type=click.Path( exists=True, dir_okay=False, resolve_path=True, path_type=pathlib.Path, ), ) @click.option( "--dagster-workers", default=0, type=int, help="Max number of processes Dagster can launch. Defaults to the number of CPUs.", ) @click.option( "--gcs-cache-path", type=str, default="", help=( "Load cached inputs from Google Cloud Storage if possible. This is usually " "much faster and more reliable than downloading from Zenodo directly. The " "path should be a URL of the form gs://bucket[/path_prefix]. Internally we use " "gs://internal-zenodo-cache.catalyst.coop. A public cache is available at " "gs://zenodo-cache.catalyst.coop but requires GCS authentication and a billing " "project to pay data egress costs." ), ) @click.option( "--logfile", help="If specified, write logs to this file.", type=click.Path( exists=False, resolve_path=True, path_type=pathlib.Path, ), ) @click.option( "--loglevel", default="INFO", type=click.Choice( ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False ), )
[docs] def pudl_etl( etl_settings_yml: pathlib.Path, dagster_workers: int, gcs_cache_path: str, logfile: pathlib.Path, loglevel: str, ): """Use Dagster to run the PUDL ETL, as specified by the file ETL_SETTINGS_YML.""" # Display logged output from the PUDL package: pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel) etl_settings = EtlSettings.from_yaml(etl_settings_yml) dataset_settings_config = etl_settings.datasets.model_dump() process_epacems = True if etl_settings.datasets.epacems is None or etl_settings.datasets.epacems.disabled: process_epacems = False # Dagster config expects values for the epacems settings even though # the CEMS assets will not be executed. Fill in the config dictionary # with default cems values. Replace this workaround once dagster pydantic # config classes are available. dataset_settings_config["epacems"] = EpaCemsSettings().model_dump() pudl_etl_reconstructable_job = build_reconstructable_job( "pudl.etl.cli", "pudl_etl_job_factory", reconstructable_kwargs={ "loglevel": loglevel, "logfile": logfile, "process_epacems": process_epacems, }, ) run_config = { "resources": { "dataset_settings": {"config": dataset_settings_config}, "datastore": { "config": { "gcs_cache_path": gcs_cache_path, }, }, }, } # Limit the number of concurrent workers when launch assets that use a lot of memory. tag_concurrency_limits = [ { "key": "memory-use", "value": "high", "limit": 4, }, ] run_config.update( get_dagster_execution_config( num_workers=dagster_workers, tag_concurrency_limits=tag_concurrency_limits, ) ) result = execute_job( pudl_etl_reconstructable_job, instance=DagsterInstance.get(), run_config=run_config, ) # Workaround to reliably getting full stack trace if not result.success: for event in result.all_events: if event.event_type_value == "STEP_FAILURE": raise Exception(event.event_specific_data.error) else: logger.info("ETL job completed successfully, publishing outputs.") for output_path in etl_settings.publish_destinations: logger.info(f"Publishing outputs to {output_path}") fs, _, _ = fsspec.get_fs_token_paths(output_path) fs.put( PudlPaths().output_dir, output_path, recursive=True, )
if __name__ == "__main__": sys.exit(pudl_etl())