Source code for pudl.ferc_to_sqlite.cli

"""A script using Dagster to convert FERC data fom DBF and XBRL to SQLite databases."""
import pathlib
import sys
import time
from import Callable

import click
from dagster import (

import pudl
from pudl import ferc_to_sqlite
from pudl.helpers import get_dagster_execution_config
from pudl.settings import EtlSettings

# Create a logger to output any messages we might have...
[docs] logger = pudl.logging_helpers.get_logger(__name__)
[docs] def ferc_to_sqlite_job_factory( logfile: str | None = None, loglevel: str = "INFO", dataset_only: str | None = None, ) -> Callable[[], JobDefinition]: """Factory for parameterizing a reconstructable ferc_to_sqlite job. Args: logfile: Path to a log file for the job's execution. loglevel: The log level for the job's execution. Returns: The job definition to be executed. """ def get_ferc_to_sqlite_job(): """Module level func for creating a job to be wrapped by reconstructable.""" ferc_to_sqlite_graph = ferc_to_sqlite.ferc_to_sqlite op_selection = None if dataset_only is not None: logger.warning(f"Running ferc_to_sqlite restricted to {dataset_only}") op_selection = [dataset_only] return ferc_to_sqlite_graph.to_job( resource_defs=ferc_to_sqlite.default_resources_defs, name="ferc_to_sqlite_job", op_selection=op_selection, ) return get_ferc_to_sqlite_job
@click.command( name="ferc_to_sqlite", context_settings={"help_option_names": ["-h", "--help"]}, ) @click.argument( "etl_settings_yml", type=click.Path( exists=True, dir_okay=False, resolve_path=True, path_type=pathlib.Path, ), ) @click.option( "-b", "--batch-size", type=int, default=50, help="Number of XBRL instances to be processed at a time.", ) @click.option( "--clobber/--no-clobber", type=bool, default=False, help=( "Clobber existing FERC SQLite databases if they exist. If clobber is not " "specified but the SQLite database already exists the run will fail." ), ) @click.option( "-w", "--workers", type=int, default=None, help=( "Number of worker processes to use when parsing XBRL filings. " "Defaults to using the number of CPUs." ), ) @click.option( "--dagster-workers", type=int, default=0, help=( "Set the max number of processes that dagster can launch. " "If set to 1, in-process serial executor will be used. If set to 0, " "dagster will saturate available CPUs (this is the default)." ), ) @click.option( "--gcs-cache-path", type=str, default="", help=( "Load cached inputs from Google Cloud Storage if possible. This is usually " "much faster and more reliable than downloading from Zenodo directly. The " "path should be a URL of the form gs://bucket[/path_prefix]. Internally we use " "gs:// A public cache is available at " "gs:// but requires GCS authentication and a billing " "project to pay data egress costs." ), ) @click.option( "--logfile", type=click.Path( exists=False, resolve_path=True, path_type=pathlib.Path, ), help="If specified, write logs to this file.", ) @click.option( "--loglevel", type=click.Choice( ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False ), default="INFO", ) @click.option( "--dataset-only", type=str, help=( "If specified, restricts processing to only a given dataset. This is" "expected to be in the form of ferc1_dbf, ferc1_xbrl. " "This is intended for ci-integration purposes where we fan-out the " "execution into several parallel small jobs that should finish faster. " "Other operations are still going to be invoked, but they will terminate " "early if this setting is in use." ), )
[docs] def main( etl_settings_yml: pathlib.Path, batch_size: int, workers: int | None, dagster_workers: int, clobber: bool, gcs_cache_path: str, logfile: pathlib.Path, loglevel: str, dataset_only: str, ): """Use Dagster to convert FERC data fom DBF and XBRL to SQLite databases. Reads settings specifying which forms and years to convert from ETL_SETTINGS_YML. Also produces JSON versions of XBRL taxonomies and datapackage descriptors which annotate the XBRL derived SQLite databases. """ # Display logged output from the PUDL package: pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel) etl_settings = EtlSettings.from_yaml(etl_settings_yml) ferc_to_sqlite_reconstructable_job = build_reconstructable_job( "pudl.ferc_to_sqlite.cli", "ferc_to_sqlite_job_factory", reconstructable_kwargs={ "loglevel": loglevel, "logfile": logfile, "dataset_only": dataset_only, }, ) run_config = { "resources": { "ferc_to_sqlite_settings": { "config": etl_settings.ferc_to_sqlite_settings.model_dump() }, "datastore": { "config": {"gcs_cache_path": gcs_cache_path}, }, "runtime_settings": { "config": { "xbrl_num_workers": workers, "xbrl_batch_size": batch_size, "clobber": clobber, }, }, }, } run_config.update(get_dagster_execution_config(dagster_workers)) start_time = time.time() result = execute_job( ferc_to_sqlite_reconstructable_job, instance=DagsterInstance.get(), run_config=run_config, raise_on_error=True, ) end_time = time.time()"FERC to SQLite job completed in {end_time - start_time} seconds.") # Workaround to reliably getting full stack trace if not result.success: for event in result.all_events: if event.event_type_value == "STEP_FAILURE": raise Exception(event.event_specific_data.error)
if __name__ == "__main__": sys.exit(main())