Source code for pudl.extract.xbrl

"""Generic extractor for all FERC XBRL data."""
import io
from import Callable
from datetime import date
from pathlib import Path

from dagster import op
from ferc_xbrl_extractor.cli import run_main

import pudl
from pudl.resources import RuntimeSettings
from pudl.settings import FercGenericXbrlToSqliteSettings, XbrlFormNumber
from pudl.workspace.datastore import Datastore
from pudl.workspace.setup import PudlPaths

[docs] logger = pudl.logging_helpers.get_logger(__name__)
[docs] class FercXbrlDatastore: """Simple datastore wrapper for accessing ferc1 xbrl resources.""" def __init__(self, datastore: Datastore): """Instantiate datastore wrapper for ferc1 resources.""" self.datastore = datastore
[docs] def get_taxonomy(self, year: int, form: XbrlFormNumber) -> tuple[io.BytesIO, str]: """Returns the path to the taxonomy entry point within the an archive.""" taxonomy_dates = {2021: date(2022, 1, 1), 2022: date(2022, 1, 1)} taxonomy_date = taxonomy_dates[year] raw_archive = self.datastore.get_unique_resource( f"ferc{form.value}", year=taxonomy_date.year, data_format="xbrl_taxonomy", ) taxonomy_entry_point = f"taxonomy/form{form.value}/{taxonomy_date}/form/form{form.value}/form-{form.value}_{taxonomy_date.isoformat()}.xsd" return io.BytesIO(raw_archive), taxonomy_entry_point
[docs] def get_filings(self, year: int, form: XbrlFormNumber) -> io.BytesIO: """Return the corresponding archive full of XBRL filings.""" return io.BytesIO( self.datastore.get_unique_resource( f"ferc{form.value}", year=year, data_format="xbrl" ) )
[docs] def xbrl2sqlite_op_factory(form: XbrlFormNumber) -> Callable: """Generates xbrl2sqlite op for a given FERC form.""" @op( name=f"ferc{form.value}_xbrl", required_resource_keys={ "ferc_to_sqlite_settings", "datastore", "runtime_settings", }, tags={"data_format": "xbrl", "dataset": f"ferc{form.value}"}, ) def inner_op(context) -> None: output_path = PudlPaths().output_dir rs: RuntimeSettings = context.resources.runtime_settings settings = context.resources.ferc_to_sqlite_settings.get_xbrl_dataset_settings( form ) datastore = FercXbrlDatastore(context.resources.datastore)"====== xbrl2sqlite runtime_settings: {rs}") if settings is None or settings.disabled: f"Skipping dataset ferc{form.value}_xbrl: no config or is disabled." ) return sql_path = PudlPaths().sqlite_db_path(f"ferc{form.value}_xbrl") if sql_path.exists(): if rs.clobber: sql_path.unlink() else: raise RuntimeError( f"Found existing DB at {sql_path} and clobber was set to False. Aborting." ) convert_form( settings, form, datastore, output_path=output_path, sql_path=sql_path, batch_size=rs.xbrl_batch_size, workers=rs.xbrl_num_workers, ) return inner_op
[docs] def convert_form( form_settings: FercGenericXbrlToSqliteSettings, form: XbrlFormNumber, datastore: FercXbrlDatastore, output_path: Path, sql_path: Path, batch_size: int | None = None, workers: int | None = None, ) -> None: """Clone a single FERC XBRL form to SQLite. Args: form_settings: Validated settings for converting the desired XBRL form to SQLite. form: FERC form number. datastore: Instance of a FERC XBRL datastore for retrieving data. output_path: PUDL output directory sql_path: path to the SQLite DB we'd like to write to. batch_size: Number of XBRL filings to process in a single CPU process. workers: Number of CPU processes to create for processing XBRL filings. Returns: None """ datapackage_path = str(output_path / f"ferc{form.value}_xbrl_datapackage.json") metadata_path = str(output_path / f"ferc{form.value}_xbrl_taxonomy_metadata.json") # Process XBRL filings for each year requested for year in form_settings.years: taxonomy_archive, taxonomy_entry_point = datastore.get_taxonomy(year, form) filings_archive = datastore.get_filings(year, form) # if we set clobber=True, clobbers on *every* call to run_main; # we already delete the existing base on `clobber=True` in `xbrl2sqlite` run_main( instance_path=filings_archive, sql_path=sql_path, clobber=False, taxonomy=taxonomy_archive, entry_point=taxonomy_entry_point, form_number=form.value, metadata_path=metadata_path, datapackage_path=datapackage_path, workers=workers, batch_size=batch_size, loglevel="INFO", logfile=None, )