"""Generic extractor for all FERC XBRL data."""
import contextlib
import io
import json
from datetime import date, datetime
from pathlib import Path
import sqlalchemy as sa
from dagster import Field, Noneable, op
from ferc_xbrl_extractor import xbrl
from ferc_xbrl_extractor.instance import InstanceBuilder
import pudl
from pudl.settings import FercGenericXbrlToSqliteSettings, XbrlFormNumber
from pudl.workspace.datastore import Datastore
from pudl.workspace.setup import PudlPaths
[docs]
logger = pudl.logging_helpers.get_logger(__name__)
[docs]
class FercXbrlDatastore:
"""Simple datastore wrapper for accessing ferc1 xbrl resources."""
def __init__(self, datastore: Datastore):
"""Instantiate datastore wrapper for ferc1 resources."""
self.datastore = datastore
[docs]
def get_taxonomy(self, year: int, form: XbrlFormNumber) -> tuple[io.BytesIO, str]:
"""Returns the path to the taxonomy entry point within the an archive."""
raw_archive = self.datastore.get_unique_resource(
f"ferc{form.value}", year=year, data_format="xbrl"
)
# Construct path to taxonomy entry point within archive
taxonomy_date = date(year, 1, 1).isoformat()
taxonomy_entry_point = f"taxonomy/form{form.value}/{taxonomy_date}/form/form{form.value}/form-{form.value}_{taxonomy_date}.xsd"
return io.BytesIO(raw_archive), taxonomy_entry_point
[docs]
def get_filings(self, year: int, form: XbrlFormNumber) -> list[InstanceBuilder]:
"""Return list of filings from archive."""
archive = self.datastore.get_zipfile_resource(
f"ferc{form.value}", year=year, data_format="xbrl"
)
# Load RSS feed metadata
filings = []
with archive.open("rssfeed") as f:
metadata = json.load(f)
# Loop through all filings by a given filer in a given quarter
# And take the most recent one
for key, filing_info in metadata.items():
latest = datetime.min
for filing_id, info in filing_info.items():
# Parse date from 9-tuple
published = datetime.fromisoformat(info["published_parsed"])
if published > latest:
latest = published
latest_filing = filing_id
# Create in memory buffers with file data to be used in conversion
filings.append(
InstanceBuilder(
io.BytesIO(archive.open(f"{latest_filing}.xbrl").read()),
latest_filing,
)
)
return filings
[docs]
def _get_sqlite_engine(form_number: int, clobber: bool) -> sa.engine.Engine:
"""Create SQLite engine for specified form and drop tables.
Args:
form_number: FERC form number.
clobber: Flag indicating whether or not to drop tables.
"""
# Read in the structure of the DB, if it exists
logger.info(
f"Dropping the old FERC Form {form_number} XBRL derived SQLite DB if it exists."
)
db_path = PudlPaths().sqlite_db(f"ferc{form_number}_xbrl")
logger.info(f"Connecting to SQLite at {db_path}...")
sqlite_engine = sa.create_engine(db_path)
logger.info(f"Connected to SQLite at {db_path}!")
with contextlib.suppress(sa.exc.OperationalError):
# So that we can wipe it out
pudl.helpers.drop_tables(sqlite_engine, clobber=clobber)
return sqlite_engine
@op(
config_schema={
"clobber": Field(
bool, description="Clobber existing ferc1 database.", default_value=False
),
"workers": Field(
Noneable(int),
description="Specify number of worker processes for parsing XBRL filings.",
default_value=None,
),
"batch_size": Field(
int,
description="Specify number of XBRL instances to be processed at a time (defaults to 50)",
default_value=50,
),
},
required_resource_keys={"ferc_to_sqlite_settings", "datastore"},
)
[docs]
def xbrl2sqlite(context) -> None:
"""Clone the FERC Form 1 XBRL Databsae to SQLite."""
output_path = PudlPaths().output_dir
clobber = context.op_config["clobber"]
batch_size = context.op_config["batch_size"]
workers = context.op_config["workers"]
ferc_to_sqlite_settings = context.resources.ferc_to_sqlite_settings
datastore = datastore = context.resources.datastore
datastore = FercXbrlDatastore(datastore)
# Loop through all other forms and perform conversion
for form in XbrlFormNumber:
# Get desired settings object
settings = ferc_to_sqlite_settings.get_xbrl_dataset_settings(form)
# If no settings for form in question, skip
if settings is None:
continue
if settings.disabled:
logger.info(f"Dataset ferc{form}_xbrl is disabled, skipping")
continue
sqlite_engine = _get_sqlite_engine(form.value, clobber)
convert_form(
settings,
form,
datastore,
sqlite_engine,
output_path=output_path,
batch_size=batch_size,
workers=workers,
)