Source code for pudl.settings

"""Module for validating pudl etl settings."""
import pathlib
from enum import Enum, unique
from typing import ClassVar

import pandas as pd
import yaml
from pydantic import AnyHttpUrl
from pydantic import BaseModel as PydanticBaseModel
from pydantic import BaseSettings, root_validator, validator

import pudl
import pudl.workspace.setup
from pudl.metadata.classes import DataSource
from pudl.metadata.constants import DBF_TABLES_FILENAMES, XBRL_TABLES
from pudl.metadata.resources.eia861 import TABLE_DEPENDENCIES


@unique
[docs]class XbrlFormNumber(Enum): """Contains full list of supported FERC XBRL forms."""
[docs] FORM1 = 1
[docs] FORM2 = 2
[docs] FORM6 = 6
[docs] FORM60 = 60
[docs] FORM714 = 714
[docs]class BaseModel(PydanticBaseModel): """BaseModel with global configuration."""
[docs] class Config: """Pydantic config."""
[docs] allow_mutation = False
[docs] extra = "forbid"
[docs]class GenericDatasetSettings(BaseModel): """An abstract pydantic model for generic datasets. Each dataset must specify working tables and partitions. A dataset can have an arbitrary number of partitions. """
[docs] tables: list[str]
@root_validator
[docs] def validate_partitions(cls, partitions): # noqa: N805 """Validate the requested data partitions. Check that all the partitions defined in the ``working_partitions`` of the associated ``data_source`` (e.g. years or states) have been assigned in the definition of the class, and that the requested values are a subset of the allowable values defined by the ``data_source``. """ for name, working_partitions in cls.data_source.working_partitions.items(): try: partition = partitions[name] except KeyError: raise ValueError(f"{cls.__name__} is missing required '{name}' field.") partitions_not_working = list(set(partition) - set(working_partitions)) if partitions_not_working: raise ValueError( f"'{partitions_not_working}' {name} are not available." ) partitions[name] = sorted(set(partition)) return partitions
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables are available.""" tables_not_working = list(set(tables) - set(cls.data_source.get_resource_ids())) if tables_not_working: raise ValueError(f"'{tables_not_working}' tables are not available.") return sorted(set(tables))
[docs]class Ferc1Settings(GenericDatasetSettings): """An immutable pydantic model to validate Ferc1Settings. Args: data_source: DataSource metadata object years: list of years to validate. tables: list of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] tables: list[str] = data_source.get_resource_ids()
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables are available.""" unavailable_tables = list(set(tables) - set(cls.data_source.get_resource_ids())) if unavailable_tables: raise ValueError(f"'{unavailable_tables}' tables are not available.") return sorted(set(tables))
@property
[docs] def dbf_years(self): """Return validated years for which DBF data is available.""" return [year for year in self.years if year <= 2020]
@property
[docs] def xbrl_years(self): """Return validated years for which DBF data is available.""" return [year for year in self.years if year >= 2021]
[docs]class Ferc714Settings(GenericDatasetSettings): """An immutable pydantic model to validate Ferc714Settings. Args: data_source: DataSource metadata object tables: list of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc714")
[docs] tables: list[str] = data_source.get_resource_ids()
[docs] years: list[int] = data_source.working_partitions[ "years" ] # Years only apply to XBRL
[docs]class EpaCemsSettings(GenericDatasetSettings): """An immutable pydantic model to validate EPA CEMS settings. Args: data_source: DataSource metadata object years: list of years to validate. states: list of states to validate. tables: list of tables to validate. partition: Whether to output year-state partitioned Parquet files. If True, all available threads / CPUs will be used in parallel. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("epacems")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] states: list[str] = data_source.working_partitions["states"]
[docs] tables: list[str] = data_source.get_resource_ids()
[docs] partition: bool = False
@validator("states")
[docs] def allow_all_keyword(cls, states): # noqa: N805 """Allow users to specify ['all'] to get all states.""" if states == ["all"]: states = cls.data_source.working_partitions["states"] return states
[docs]class Eia923Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 923 settings. Args: data_source: DataSource metadata object years: list of years to validate. tables: list of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia923")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] tables: list[str] = data_source.get_resource_ids()
[docs]class Eia861Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 861 settings. Args: data_source: DataSource metadata object years: list of years to validate. tables: list of tables to validate. transform_functions: list of transform functions to be applied to eia861 """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia861")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] tables: list[str] = data_source.get_resource_ids()
[docs] transform_functions: list[str]
@root_validator(pre=True)
[docs] def generate_transform_functions(cls, values): # noqa: N805 """Map tables to transform functions. Args: values: eia861 settings. Returns: values: eia861 settings. """ # balancing_authority_eia861 is always processed transform_functions = ["balancing_authority_eia861"] # Defaults to all transformation functions if not values.get("tables"): transform_functions.extend(list(TABLE_DEPENDENCIES)) else: for table in values["tables"]: transform_functions.extend( [ tf_func for tf_func, tables in TABLE_DEPENDENCIES.items() if table in tables ] ) values["transform_functions"] = sorted(set(transform_functions)) return values
[docs]class Eia860Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 860 settings. This model also check 860m settings. Args: data_source: DataSource metadata object years: list of years to validate. tables: list of tables to validate. eia860m_date ClassVar[str]: The 860m year to date. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia860")
[docs] eia860m_data_source: ClassVar[DataSource] = DataSource.from_id("eia860m")
[docs] eia860m_date: ClassVar[str] = eia860m_data_source.working_partitions["year_month"]
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] tables: list[str] = data_source.get_resource_ids()
[docs] eia860m: bool = True
@validator("eia860m")
[docs] def check_eia860m_date(cls, eia860m: bool) -> bool: # noqa: N805 """Check 860m date-year is exactly one year after most recent working 860 year. Args: eia860m: True if 860m is requested. Returns: eia860m: True if 860m is requested. Raises: ValueError: the 860m date is within 860 working years. """ eia860m_year = pd.to_datetime(cls.eia860m_date).year expected_year = max(cls.data_source.working_partitions["years"]) + 1 if eia860m and (eia860m_year != expected_year): raise AssertionError( """Attempting to integrate an eia860m year""" f"""({eia860m_year}) not immediately following the eia860 years:""" f"""{cls.data_source.working_partitions["years"]}. Consider switching eia860m""" """parameter to False.""" ) return eia860m
[docs]class GlueSettings(BaseModel): """An immutable pydantic model to validate Glue settings. Args: eia: Include eia in glue settings. ferc1: Include ferc1 in glue settings. """
[docs] eia: bool = True
[docs] ferc1: bool = True
[docs]class EiaSettings(BaseModel): """An immutable pydantic model to validate EIA datasets settings. Args: eia860: Immutable pydantic model to validate eia860 settings. eia923: Immutable pydantic model to validate eia923 settings. """
[docs] eia860: Eia860Settings = None
[docs] eia923: Eia923Settings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805 """If no datasets are specified default to all. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ if not any(values.values()): values["eia860"] = Eia860Settings() values["eia923"] = Eia923Settings() return values
@root_validator
[docs] def check_eia_dependencies(cls, values): # noqa: N805 """Make sure the dependencies between the eia datasets are satisfied. Dependencies: * eia860 requires eia923.boiler_fuel_eia923 and eia923.generation_eia923. * eia923 requires eia860 for harvesting purposes. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ eia923 = values.get("eia923") eia860 = values.get("eia860") if not eia923 and eia860: values["eia923"] = Eia923Settings( tables=["boiler_fuel_eia923", "generation_eia923"], years=eia860.years ) if eia923 and not eia860: values["eia860"] = Eia860Settings(years=eia923.years) return values
[docs]class DatasetsSettings(BaseModel): """An immutable pydantic model to validate PUDL Dataset settings. Args: ferc1: Immutable pydantic model to validate ferc1 settings. eia: Immutable pydantic model to validate eia(860, 923) settings. glue: Immutable pydantic model to validate glue settings. epacems: Immutable pydantic model to validate epacems settings. """
[docs] ferc1: Ferc1Settings = None
[docs] eia: EiaSettings = None
[docs] glue: GlueSettings = None
[docs] epacems: EpaCemsSettings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805 """If no datasets are specified default to all. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ if not any(values.values()): values["ferc1"] = Ferc1Settings() values["eia"] = EiaSettings() values["glue"] = GlueSettings() values["epacems"] = EpaCemsSettings() return values
@root_validator
[docs] def add_glue_settings(cls, values): # noqa: N805 """Add glue settings if ferc1 and eia data are both requested. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ ferc1 = bool(values.get("ferc1")) eia = bool(values.get("eia")) values["glue"] = GlueSettings(ferc1=ferc1, eia=eia) return values
[docs] def get_datasets(self): # noqa: N805 """Gets dictionary of dataset settings.""" return vars(self)
[docs]class Ferc1DbfToSqliteSettings(GenericDatasetSettings): """An immutable Pydantic model to validate FERC 1 to SQLite settings. Args: tables: List of tables to validate. years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: list[int] = [ year for year in data_source.working_partitions["years"] if year <= 2020 ]
[docs] tables: list[str] = sorted(list(DBF_TABLES_FILENAMES.keys()))
[docs] refyear: ClassVar[int] = max(years)
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables.""" default_tables = sorted(list(DBF_TABLES_FILENAMES.keys())) tables_not_working = list(set(tables) - set(default_tables)) if len(tables_not_working) > 0: raise ValueError(f"'{tables_not_working}' tables are not available.") return sorted(set(tables))
[docs]class FercGenericXbrlToSqliteSettings(BaseSettings): """An immutable pydantic model to validate Ferc1 to SQLite settings. Args: taxonomy: URL of XBRL taxonomy used to create structure of SQLite DB. tables: list of tables to validate. years: list of years to validate. """
[docs] taxonomy: AnyHttpUrl
[docs] tables: list[int] | None = None
[docs] years: list[int]
[docs]class Ferc1XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """An immutable pydantic model to validate Ferc1 to SQLite settings. Args: taxonomy: URL of taxonomy used to . years: list of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: list[int] = [ year for year in data_source.working_partitions["years"] if year >= 2021 ]
[docs] taxonomy: AnyHttpUrl = "https://eCollection.ferc.gov/taxonomy/form1/2022-01-01/form/form1/form-1_2022-01-01.xsd"
[docs] tables: list[str] = XBRL_TABLES
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables.""" default_tables = sorted(list(XBRL_TABLES)) tables_not_working = list(set(tables) - set(default_tables)) if len(tables_not_working) > 0: raise ValueError(f"'{tables_not_working}' tables are not available.") return sorted(set(tables))
[docs]class Ferc2XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """An immutable pydantic model to validate FERC from 2 XBRL to SQLite settings. Args: years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc2")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] taxonomy: AnyHttpUrl = "https://eCollection.ferc.gov/taxonomy/form2/2022-01-01/form/form2/form-2_2022-01-01.xsd"
[docs]class Ferc6XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """An immutable pydantic model to validate FERC from 6 XBRL to SQLite settings. Args: years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc6")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] taxonomy: AnyHttpUrl = "https://eCollection.ferc.gov/taxonomy/form6/2022-01-01/form/form6/form-6_2022-01-01.xsd"
[docs]class Ferc60XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """An immutable pydantic model to validate FERC from 60 XBRL to SQLite settings. Args: years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc60")
[docs] years: list[int] = data_source.working_partitions["years"]
[docs] taxonomy: AnyHttpUrl = "https://eCollection.ferc.gov/taxonomy/form60/2022-01-01/form/form60/form-60_2022-01-01.xsd"
[docs]class Ferc714XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """An immutable pydantic model to validate FERC from 714 XBRL to SQLite settings. Args: years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc714")
[docs] years: list[int] = [2021]
[docs] taxonomy: AnyHttpUrl = "https://eCollection.ferc.gov/taxonomy/form714/2022-01-01/form/form714/form-714_2022-01-01.xsd"
[docs]class FercToSqliteSettings(BaseSettings): """An immutable pydantic model to validate FERC XBRL to SQLite settings. Args: ferc1_dbf_to_sqlite_settings: Settings for converting FERC 1 DBF data to SQLite. ferc1_xbrl_to_sqlite_settings: Settings for converting FERC 1 XBRL data to SQLite. other_xbrl_forms: List of non-FERC1 forms to convert from XBRL to SQLite. """
[docs] ferc1_dbf_to_sqlite_settings: Ferc1DbfToSqliteSettings = None
[docs] ferc1_xbrl_to_sqlite_settings: Ferc1XbrlToSqliteSettings = None
[docs] ferc2_xbrl_to_sqlite_settings: Ferc2XbrlToSqliteSettings = None
[docs] ferc6_xbrl_to_sqlite_settings: Ferc6XbrlToSqliteSettings = None
[docs] ferc60_xbrl_to_sqlite_settings: Ferc60XbrlToSqliteSettings = None
[docs] ferc714_xbrl_to_sqlite_settings: Ferc714XbrlToSqliteSettings = None
[docs] def get_xbrl_dataset_settings( self, form_number: XbrlFormNumber ) -> FercGenericXbrlToSqliteSettings: """Return a list with all requested FERC XBRL to SQLite datasets. Args: form_number: Get settings by FERC form number. """ # Get requested settings object match form_number: case XbrlFormNumber.FORM1: settings = self.ferc1_xbrl_to_sqlite_settings case XbrlFormNumber.FORM2: settings = self.ferc2_xbrl_to_sqlite_settings case XbrlFormNumber.FORM6: settings = self.ferc6_xbrl_to_sqlite_settings case XbrlFormNumber.FORM60: settings = self.ferc60_xbrl_to_sqlite_settings case XbrlFormNumber.FORM714: settings = self.ferc714_xbrl_to_sqlite_settings return settings
[docs]class EtlSettings(BaseSettings): """Main settings validation class."""
[docs] ferc_to_sqlite_settings: FercToSqliteSettings = None
[docs] datasets: DatasetsSettings = None
[docs] name: str = None
[docs] title: str = None
[docs] description: str = None
[docs] version: str = None
[docs] pudl_in: str = pudl.workspace.setup.get_defaults()["pudl_in"]
[docs] pudl_out: str = pudl.workspace.setup.get_defaults()["pudl_out"]
@classmethod
[docs] def from_yaml(cls, path: str) -> "EtlSettings": """Create an EtlSettings instance from a yaml_file path. Args: path: path to a yaml file. Returns: An ETL settings object. """ with pathlib.Path(path).open() as f: yaml_file = yaml.safe_load(f) return cls.parse_obj(yaml_file)