"""Module for validating pudl etl settings."""
import pathlib
from typing import ClassVar, List
import pandas as pd
import yaml
from pydantic import BaseModel as PydanticBaseModel
from pydantic import BaseSettings, root_validator, validator
import pudl
import pudl.workspace.setup
from pudl.metadata.classes import DataSource
from pudl.metadata.constants import DBF_TABLES_FILENAMES
from pudl.metadata.resources.eia861 import TABLE_DEPENDENCIES
[docs]class BaseModel(PydanticBaseModel):
"""BaseModel with global configuration."""
[docs] class Config:
"""Pydantic config."""
[docs]class GenericDatasetSettings(BaseModel):
"""
An abstract pydantic model for generic datasets.
Each dataset must specify working tables and partitions.
A dataset can have an arbitrary number of partitions.
"""
@root_validator
[docs] def validate_partitions(cls, partitions): # noqa: N805
"""
Validate the requested data partitions.
Check that all the partitions defined in the ``working_partitions`` of the
associated ``data_source`` (e.g. years or states) have been assigned in the
definition of the class, and that the requested values are a subset of the
allowable values defined by the ``data_source``.
"""
for name, working_partitions in cls.data_source.working_partitions.items():
try:
partition = partitions[name]
except KeyError:
raise ValueError(f"{cls.__name__} is missing required '{name}' field.")
partitions_not_working = list(set(partition) - set(working_partitions))
if partitions_not_working:
raise ValueError(
f"'{partitions_not_working}' {name} are not available.")
partitions[name] = sorted(set(partition))
return partitions
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805
"""Validate tables are available."""
tables_not_working = list(set(tables) - set(cls.data_source.get_resource_ids()))
if tables_not_working:
raise ValueError(
f"'{tables_not_working}' tables are not available.")
return sorted(set(tables))
[docs]class Ferc1Settings(GenericDatasetSettings):
"""
An immutable pydantic model to validate Ferc1Settings.
Parameters:
data_source: DataSource metadata object
years: List of years to validate.
tables: List of tables to validate.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class Ferc714Settings(GenericDatasetSettings):
"""
An immutable pydantic model to validate Ferc714Settings.
Parameters:
data_source: DataSource metadata object
tables: List of tables to validate.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc714")
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class EpaCemsSettings(GenericDatasetSettings):
"""
An immutable pydantic model to validate EPA CEMS settings.
Parameters:
data_source: DataSource metadata object
years: List of years to validate.
states: List of states to validate.
tables: List of tables to validate.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("epacems")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] states: List[str] = data_source.working_partitions["states"]
[docs] tables: List[str] = data_source.get_resource_ids()
@validator("states")
[docs] def allow_all_keyword(cls, states): # noqa: N805
"""Allow users to specify ['all'] to get all states."""
if states == ["all"]:
states = cls.data_source.working_partitions["states"]
return states
[docs]class Eia923Settings(GenericDatasetSettings):
"""
An immutable pydantic model to validate EIA 923 settings.
Parameters:
data_source: DataSource metadata object
years: List of years to validate.
tables: List of tables to validate.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia923")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
[docs]class Eia861Settings(GenericDatasetSettings):
"""
An immutable pydantic model to validate EIA 861 settings.
Parameters:
data_source: DataSource metadata object
years: List of years to validate.
tables: List of tables to validate.
transform_functions: List of transform functions to be applied to eia861
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia861")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
@root_validator(pre=True)
[docs]class Eia860Settings(GenericDatasetSettings):
"""
An immutable pydantic model to validate EIA 860 settings.
This model also check 860m settings.
Parameters:
data_source: DataSource metadata object
years: List of years to validate.
tables: List of tables to validate.
eia860m_date ClassVar[str]: The 860m year to date.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("eia860")
[docs] eia860m_data_source: ClassVar[DataSource] = DataSource.from_id("eia860m")
[docs] eia860m_date: ClassVar[str] = eia860m_data_source.working_partitions[
"year_month"]
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = data_source.get_resource_ids()
@validator("eia860m")
[docs] def check_eia860m_date(cls, eia860m: bool) -> bool: # noqa: N805
"""
Check 860m date year is exactly one year later than most recent working 860 year.
Args:
eia860m: True if 860m is requested.
Returns:
eia860m: True if 860m is requested.
Raises:
ValueError: the 860m date is within 860 working years.
"""
eia860m_year = pd.to_datetime(cls.eia860m_date).year
expected_year = max(cls.data_source.working_partitions["years"]) + 1
if eia860m and (eia860m_year != expected_year):
raise AssertionError(
"""Attempting to integrate an eia860m year"""
f"""({eia860m_year}) not immediately following the eia860 years:"""
f"""{cls.data_source.working_partitions["years"]}. Consider switching eia860m"""
"""parameter to False."""
)
return eia860m
[docs]class GlueSettings(BaseModel):
"""
An immutable pydantic model to validate Glue settings.
Parameters:
eia: Include eia in glue settings.
ferc1: Include ferc1 in glue settings.
"""
[docs]class EiaSettings(BaseModel):
"""
An immutable pydantic model to validate EIA datasets settings.
Parameters:
eia860: Immutable pydantic model to validate eia860 settings.
eia923: Immutable pydantic model to validate eia923 settings.
"""
[docs] eia860: Eia860Settings = None
[docs] eia923: Eia923Settings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805
"""
If no datasets are specified default to all.
Args:
values (Dict[str, BaseModel]): dataset settings.
Returns:
values (Dict[str, BaseModel]): dataset settings.
"""
if not any(values.values()):
values["eia860"] = Eia860Settings()
values["eia923"] = Eia923Settings()
return values
@root_validator
[docs] def check_eia_dependencies(cls, values): # noqa: N805
"""
Make sure the dependencies between the eia datasets are satisfied.
Dependencies:
* eia860 requires eia923.boiler_fuel_eia923 and eia923.generation_eia923.
* eia923 requires eia860 for harvesting purposes.
Args:
values (Dict[str, BaseModel]): dataset settings.
Returns:
values (Dict[str, BaseModel]): dataset settings.
"""
eia923 = values.get("eia923")
eia860 = values.get("eia860")
if not eia923 and eia860:
values["eia923"] = Eia923Settings(
tables=['boiler_fuel_eia923', 'generation_eia923'],
years=eia860.years
)
if eia923 and not eia860:
values["eia860"] = Eia860Settings(
years=eia923.years
)
return values
[docs]class DatasetsSettings(BaseModel):
"""
An immutable pydantic model to validate PUDL Dataset settings.
Parameters:
ferc1: Immutable pydantic model to validate ferc1 settings.
eia: Immutable pydantic model to validate eia(860, 923) settings.
glue: Immutable pydantic model to validate glue settings.
epacems: Immutable pydantic model to validate epacems settings.
"""
[docs] ferc1: Ferc1Settings = None
[docs] eia: EiaSettings = None
[docs] glue: GlueSettings = None
[docs] epacems: EpaCemsSettings = None
@root_validator(pre=True)
[docs] def default_load_all(cls, values): # noqa: N805
"""
If no datasets are specified default to all.
Args:
values (Dict[str, BaseModel]): dataset settings.
Returns:
values (Dict[str, BaseModel]): dataset settings.
"""
if not any(values.values()):
values["ferc1"] = Ferc1Settings()
values["eia"] = EiaSettings()
values["glue"] = GlueSettings()
values["epacems"] = EpaCemsSettings()
return values
@root_validator
[docs] def add_glue_settings(cls, values): # noqa: N805
"""
Add glue settings if ferc1 and eia data are both requested.
Args:
values (Dict[str, BaseModel]): dataset settings.
Returns:
values (Dict[str, BaseModel]): dataset settings.
"""
ferc1 = bool(values.get("ferc1"))
eia = bool(values.get("eia"))
values["glue"] = GlueSettings(ferc1=ferc1, eia=eia)
return values
[docs] def get_datasets(self): # noqa: N805
"""Gets dictionary of dataset settings."""
return vars(self)
[docs]class Ferc1ToSqliteSettings(GenericDatasetSettings):
"""
An immutable pydantic nodel to validate Ferc1 to SQLite settings.
Parameters:
tables: List of tables to validate.
years: List of years to validate.
"""
[docs] data_source: ClassVar[DataSource] = DataSource.from_id("ferc1")
[docs] years: List[int] = data_source.working_partitions["years"]
[docs] tables: List[str] = sorted(list(DBF_TABLES_FILENAMES.keys()))
[docs] refyear: ClassVar[int] = max(years)
@validator("tables")
[docs] def validate_tables(cls, tables): # noqa: N805
"""Validate tables."""
default_tables = sorted(list(DBF_TABLES_FILENAMES.keys()))
tables_not_working = list(set(tables) - set(default_tables))
if len(tables_not_working) > 0:
raise ValueError(
f"'{tables_not_working}' tables are not available.")
return sorted(set(tables))
[docs]class EtlSettings(BaseSettings):
"""Main settings validation class."""
[docs] ferc1_to_sqlite_settings: Ferc1ToSqliteSettings = None
[docs] datasets: DatasetsSettings = None
[docs] description: str = None
[docs] pudl_in: str = pudl.workspace.setup.get_defaults()["pudl_in"]
[docs] pudl_out: str = pudl.workspace.setup.get_defaults()["pudl_out"]
@classmethod
[docs] def from_yaml(cls, path: str):
"""
Create an EtlSettings instance from a yaml_file path.
Parameters:
path: path to a yaml file.
Returns:
EtlSettings: etl settings object.
"""
with pathlib.Path(path).open() as f:
yaml_file = yaml.safe_load(f)
return cls.parse_obj(yaml_file)