Source code for pudl.settings

"""Module for validating pudl etl settings."""
import pathlib
from typing import ClassVar, List

import pandas as pd
import yaml
from pydantic import BaseModel as PydanticBaseModel
from pydantic import BaseSettings, root_validator, validator

import pudl
import pudl.workspace.setup
from pudl.metadata.classes import DataSource
from pudl.metadata.constants import DBF_TABLES_FILENAMES
from pudl.metadata.resources.eia861 import TABLE_DEPENDENCIES


[docs]class BaseModel(PydanticBaseModel): """BaseModel with global configuration."""
[docs] class Config: """Pydantic config."""
[docs] allow_mutation = False
[docs] extra = "forbid"
[docs]class GenericDatasetSettings(BaseModel): """An abstract pydantic model for generic datasets. Each dataset must specify working tables and partitions. A dataset can have an arbitrary number of partitions. """
[docs] tables: List[str]
@root_validator
[docs] def validate_partitions(cls, partitions): # noqa: N805 """Validate the requested data partitions. Check that all the partitions defined in the ``working_partitions`` of the associated ``data_source`` (e.g. years or states) have been assigned in the definition of the class, and that the requested values are a subset of the allowable values defined by the ``data_source``. """ for name, working_partitions in cls.data_source.working_partitions.items(): try: partition = partitions[name] except KeyError: raise ValueError(f"{cls.__name__} is missing required '{name}' field.") partitions_not_working = list(set(partition) - set(working_partitions)) if partitions_not_working: raise ValueError( f"'{partitions_not_working}' {name} are not available." ) partitions[name] = sorted(set(partition)) return partitions
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables are available.""" tables_not_working = list(set(tables) - set(cls.data_source.get_resource_ids())) if tables_not_working: raise ValueError(f"'{tables_not_working}' tables are not available.") return sorted(set(tables))
[docs]class Ferc1Settings(GenericDatasetSettings): """An immutable pydantic model to validate Ferc1Settings. Args: data_source: DataSource metadata object years: List of years to validate. tables: List of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class Ferc714Settings(GenericDatasetSettings): """An immutable pydantic model to validate Ferc714Settings. Args: data_source: DataSource metadata object tables: List of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc714")
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class EpaCemsSettings(GenericDatasetSettings): """An immutable pydantic model to validate EPA CEMS settings. Args: data_source: DataSource metadata object years: List of years to validate. states: List of states to validate. tables: List of tables to validate. partition: Whether to output year-state partitioned Parquet files. If True, all available threads / CPUs will be used in parallel. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("epacems")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] states: List[str] = data_source.working_partitions["states"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs] partition: bool = False
@validator("states")
[docs] def allow_all_keyword(cls, states): # noqa: N805 """Allow users to specify ['all'] to get all states.""" if states == ["all"]: states = cls.data_source.working_partitions["states"] return states
[docs]class Eia923Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 923 settings. Args: data_source: DataSource metadata object years: List of years to validate. tables: List of tables to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia923")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class Eia861Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 861 settings. Args: data_source: DataSource metadata object years: List of years to validate. tables: List of tables to validate. transform_functions: List of transform functions to be applied to eia861 """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia861")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs] transform_functions: List[str]
@root_validator(pre=True)
[docs] def generate_transform_functions(cls, values): # noqa: N805 """Map tables to transform functions. Args: values: eia861 settings. Returns: values: eia861 settings. """ # balancing_authority_eia861 is always processed transform_functions = ["balancing_authority_eia861"] # Defaults to all transformation functions if not values.get("tables"): transform_functions.extend(list(TABLE_DEPENDENCIES)) else: for table in values["tables"]: transform_functions.extend( [ tf_func for tf_func, tables in TABLE_DEPENDENCIES.items() if table in tables ] ) values["transform_functions"] = sorted(set(transform_functions)) return values
[docs]class Eia860Settings(GenericDatasetSettings): """An immutable pydantic model to validate EIA 860 settings. This model also check 860m settings. Args: data_source: DataSource metadata object years: List of years to validate. tables: List of tables to validate. eia860m_date ClassVar[str]: The 860m year to date. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia860")
[docs] eia860m_data_source: ClassVar[DataSource] = DataSource.from_id("eia860m")
[docs] eia860m_date: ClassVar[str] = eia860m_data_source.working_partitions["year_month"]
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs] eia860m: bool = True
@validator("eia860m")
[docs] def check_eia860m_date(cls, eia860m: bool) -> bool: # noqa: N805 """Check 860m date year is exactly one year later than most recent working 860 year. Args: eia860m: True if 860m is requested. Returns: eia860m: True if 860m is requested. Raises: ValueError: the 860m date is within 860 working years. """ eia860m_year = pd.to_datetime(cls.eia860m_date).year expected_year = max(cls.data_source.working_partitions["years"]) + 1 if eia860m and (eia860m_year != expected_year): raise AssertionError( """Attempting to integrate an eia860m year""" f"""({eia860m_year}) not immediately following the eia860 years:""" f"""{cls.data_source.working_partitions["years"]}. Consider switching eia860m""" """parameter to False.""" ) return eia860m
[docs]class GlueSettings(BaseModel): """An immutable pydantic model to validate Glue settings. Args: eia: Include eia in glue settings. ferc1: Include ferc1 in glue settings. """
[docs] eia: bool = True
[docs] ferc1: bool = True
[docs]class EiaSettings(BaseModel): """An immutable pydantic model to validate EIA datasets settings. Args: eia860: Immutable pydantic model to validate eia860 settings. eia923: Immutable pydantic model to validate eia923 settings. """
[docs] eia860: Eia860Settings = None
[docs] eia923: Eia923Settings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805 """If no datasets are specified default to all. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ if not any(values.values()): values["eia860"] = Eia860Settings() values["eia923"] = Eia923Settings() return values
@root_validator
[docs] def check_eia_dependencies(cls, values): # noqa: N805 """Make sure the dependencies between the eia datasets are satisfied. Dependencies: * eia860 requires eia923.boiler_fuel_eia923 and eia923.generation_eia923. * eia923 requires eia860 for harvesting purposes. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ eia923 = values.get("eia923") eia860 = values.get("eia860") if not eia923 and eia860: values["eia923"] = Eia923Settings( tables=["boiler_fuel_eia923", "generation_eia923"], years=eia860.years ) if eia923 and not eia860: values["eia860"] = Eia860Settings(years=eia923.years) return values
[docs]class DatasetsSettings(BaseModel): """An immutable pydantic model to validate PUDL Dataset settings. Args: ferc1: Immutable pydantic model to validate ferc1 settings. eia: Immutable pydantic model to validate eia(860, 923) settings. glue: Immutable pydantic model to validate glue settings. epacems: Immutable pydantic model to validate epacems settings. """
[docs] ferc1: Ferc1Settings = None
[docs] eia: EiaSettings = None
[docs] glue: GlueSettings = None
[docs] epacems: EpaCemsSettings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805 """If no datasets are specified default to all. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ if not any(values.values()): values["ferc1"] = Ferc1Settings() values["eia"] = EiaSettings() values["glue"] = GlueSettings() values["epacems"] = EpaCemsSettings() return values
@root_validator
[docs] def add_glue_settings(cls, values): # noqa: N805 """Add glue settings if ferc1 and eia data are both requested. Args: values (Dict[str, BaseModel]): dataset settings. Returns: values (Dict[str, BaseModel]): dataset settings. """ ferc1 = bool(values.get("ferc1")) eia = bool(values.get("eia")) values["glue"] = GlueSettings(ferc1=ferc1, eia=eia) return values
[docs] def get_datasets(self): # noqa: N805 """Gets dictionary of dataset settings.""" return vars(self)
[docs]class Ferc1ToSqliteSettings(GenericDatasetSettings): """An immutable pydantic nodel to validate Ferc1 to SQLite settings. Args: tables: List of tables to validate. years: List of years to validate. """
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = sorted(list(DBF_TABLES_FILENAMES.keys()))
[docs] refyear: ClassVar[int] = max(years)
[docs] bad_cols: tuple = ()
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805 """Validate tables.""" default_tables = sorted(list(DBF_TABLES_FILENAMES.keys())) tables_not_working = list(set(tables) - set(default_tables)) if len(tables_not_working) > 0: raise ValueError(f"'{tables_not_working}' tables are not available.") return sorted(set(tables))
[docs]class EtlSettings(BaseSettings): """Main settings validation class."""
[docs] ferc1_to_sqlite_settings: Ferc1ToSqliteSettings = None
[docs] datasets: DatasetsSettings = None
[docs] name: str = None
[docs] title: str = None
[docs] description: str = None
[docs] version: str = None
[docs] pudl_in: str = pudl.workspace.setup.get_defaults()["pudl_in"]
[docs] pudl_out: str = pudl.workspace.setup.get_defaults()["pudl_out"]
@classmethod
[docs] def from_yaml(cls, path: str) -> "EtlSettings": """Create an EtlSettings instance from a yaml_file path. Args: path: path to a yaml file. Returns: An ETL settings object. """ with pathlib.Path(path).open() as f: yaml_file = yaml.safe_load(f) return cls.parse_obj(yaml_file)