"""Metadata data classes."""
import copy
import datetime
import logging
import os
import re
from pathlib import Path
from typing import (Any, Callable, Dict, Iterable, List, Literal, Optional,
Tuple, Type, Union)
import jinja2
import pandas as pd
import pydantic
import sqlalchemy as sa
from .constants import (CONSTRAINT_DTYPES, CONTRIBUTORS,
CONTRIBUTORS_BY_SOURCE, FIELD_DTYPES, FIELD_DTYPES_SQL,
KEYWORDS_BY_SOURCE, LICENSES, PERIODS, SOURCES)
from .fields import (FIELD_METADATA, FIELD_METADATA_BY_GROUP,
FIELD_METADATA_BY_RESOURCE)
from .helpers import (expand_periodic_column_names, format_errors,
groupby_aggregate, most_and_more_frequent, split_period)
from .resources import FOREIGN_KEYS, RESOURCE_METADATA
[docs]logger = logging.getLogger(__name__)
# ---- Helpers ---- #
[docs]def _unique(*args: Iterable) -> list:
"""
Return a list of all unique values, in order of first appearance.
Args:
args: Iterables of values.
Examples:
>>> _unique([0, 2], (2, 1))
[0, 2, 1]
>>> _unique([{'x': 0, 'y': 1}, {'y': 1, 'x': 0}], [{'z': 2}])
[{'x': 0, 'y': 1}, {'z': 2}]
"""
values = []
for parent in args:
for child in parent:
if child not in values:
values.append(child)
return values
[docs]JINJA_ENVIRONMENT: jinja2.Environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(
os.path.join(os.path.dirname(__file__), "templates")
),
autoescape=True
)
# ---- Base ---- #
[docs]class Base(pydantic.BaseModel):
"""
Custom Pydantic base class.
It overrides :meth:`fields` and :meth:`schema` to allow properties with those names.
To use them in a class, use an underscore prefix and an alias.
Examples:
>>> class Class(Base):
... fields_: List[str] = pydantic.Field(alias="fields")
>>> m = Class(fields=['x'])
>>> m
Class(fields=['x'])
>>> m.fields
['x']
>>> m.fields = ['y']
>>> m.dict()
{'fields': ['y']}
"""
[docs] class Config:
"""Custom Pydantic configuration."""
[docs] validate_all: bool = True
[docs] validate_assignment: bool = True
[docs] arbitrary_types_allowed = True
[docs] def dict(self, *args, by_alias=True, **kwargs) -> dict: # noqa: A003
"""Return as a dictionary."""
return super().dict(*args, by_alias=by_alias, **kwargs)
[docs] def json(self, *args, by_alias=True, **kwargs) -> str:
"""Return as JSON."""
return super().json(*args, by_alias=by_alias, **kwargs)
[docs] def __getattribute__(self, name: str) -> Any:
"""Get attribute."""
if name in ("fields", "schema") and f"{name}_" in self.__dict__:
name = f"{name}_"
return super().__getattribute__(name)
[docs] def __setattr__(self, name, value) -> None:
"""Set attribute."""
if name in ("fields", "schema") and f"{name}_" in self.__dict__:
name = f"{name}_"
super().__setattr__(name, value)
[docs] def __repr_args__(self) -> List[Tuple[str, Any]]:
"""Returns the attributes to show in __str__, __repr__, and __pretty__."""
return [
(a[:-1] if a in ("fields_", "schema_") else a, v)
for a, v in self.__dict__.items()
]
# ---- Class attribute types ---- #
# NOTE: Using regex=r"^\S(.*\S)*$" to fail on whitespace is too slow
[docs]String = pydantic.constr(min_length=1, strict=True, regex=r"^\S+(\s+\S+)*$")
"""Non-empty :class:`str` with no trailing or leading whitespace."""
[docs]SnakeCase = pydantic.constr(
min_length=1, strict=True, regex=r"^[a-z][a-z0-9]*(_[a-z0-9]+)*$"
)
"""Snake-case variable name :class:`str` (e.g. 'pudl', 'entity_eia860')."""
[docs]Bool = pydantic.StrictBool
"""Any :class:`bool` (`True` or `False`)."""
[docs]Float = pydantic.StrictFloat
"""Any :class:`float`."""
[docs]Int = pydantic.StrictInt
"""Any :class:`int`."""
[docs]PositiveInt = pydantic.conint(ge=0, strict=True)
"""Positive :class:`int`."""
[docs]PositiveFloat = pydantic.confloat(ge=0, strict=True)
"""Positive :class:`float`."""
[docs]Email = pydantic.EmailStr
"""String representing an email."""
[docs]HttpUrl = pydantic.AnyHttpUrl
"""Http(s) URL."""
[docs]class BaseType:
"""Base class for custom pydantic types."""
@classmethod
[docs] def __get_validators__(cls) -> Callable:
"""Yield validator methods."""
yield cls.validate
[docs]class Date(BaseType):
"""Any :class:`datetime.date`."""
@classmethod
[docs] def validate(cls, value: Any) -> datetime.date:
"""Validate as date."""
if not isinstance(value, datetime.date):
raise TypeError("value is not a date")
return value
[docs]class Datetime(BaseType):
"""Any :class:`datetime.datetime`."""
@classmethod
[docs] def validate(cls, value: Any) -> datetime.datetime:
"""Validate as datetime."""
if not isinstance(value, datetime.datetime):
raise TypeError("value is not a datetime")
return value
[docs]class Pattern(BaseType):
"""Regular expression pattern."""
@classmethod
[docs] def validate(cls, value: Any) -> re.Pattern:
"""Validate as pattern."""
if not isinstance(value, (str, re.Pattern)):
raise TypeError("value is not a string or compiled regular expression")
if isinstance(value, str):
try:
value = re.compile(value)
except re.error:
raise ValueError("string is not a valid regular expression")
return value
[docs]def StrictList(item_type: Type = Any) -> pydantic.ConstrainedList: # noqa: N802
"""
Non-empty :class:`list`.
Allows :class:`list`, :class:`tuple`, :class:`set`, :class:`frozenset`,
:class:`collections.deque`, or generators and casts to a :class:`list`.
"""
return pydantic.conlist(item_type=item_type, min_items=1)
# ---- Class attribute validators ---- #
[docs]def _check_unique(value: list = None) -> Optional[list]:
"""Check that input list has unique values."""
if value:
for i in range(len(value)):
if value[i] in value[:i]:
raise ValueError(f"contains duplicate {value[i]}")
return value
[docs]def _validator(*names, fn: Callable) -> Callable:
"""
Construct reusable Pydantic validator.
Args:
names: Names of attributes to validate.
fn: Validation function (see :meth:`pydantic.validator`).
Examples:
>>> class Class(Base):
... x: list = None
... _check_unique = _validator("x", fn=_check_unique)
>>> Class(y=[0, 0])
Traceback (most recent call last):
ValidationError: ...
"""
return pydantic.validator(*names, allow_reuse=True)(fn)
# ---- Classes: Field ---- #
[docs]class FieldConstraints(Base):
"""
Field constraints (`resource.schema.fields[...].constraints`).
See https://specs.frictionlessdata.io/table-schema/#constraints.
"""
[docs] min_length: PositiveInt = None
[docs] max_length: PositiveInt = None
[docs] minimum: Union[Int, Float, Date, Datetime] = None
[docs] maximum: Union[Int, Float, Date, Datetime] = None
[docs] pattern: Pattern = None
# TODO: Replace with String (min_length=1) once "" removed from enums
[docs] enum: StrictList(Union[pydantic.StrictStr, Int, Float, Bool, Date, Datetime]) = None
[docs] _check_unique = _validator("enum", fn=_check_unique)
@pydantic.validator("max_length")
[docs] def _check_max_length(cls, value, values): # noqa: N805
minimum, maximum = values.get("min_length"), value
if minimum is not None and maximum is not None:
if type(minimum) is not type(maximum):
raise ValueError("must be same type as min_length")
if maximum < minimum:
raise ValueError("must be greater or equal to min_length")
return value
@pydantic.validator("maximum")
[docs] def _check_max(cls, value, values): # noqa: N805
minimum, maximum = values.get("minimum"), value
if minimum is not None and maximum is not None:
if type(minimum) is not type(maximum):
raise ValueError("must be same type as minimum")
if maximum < minimum:
raise ValueError("must be greater or equal to minimum")
return value
[docs]class FieldHarvest(Base):
"""Field harvest parameters (`resource.schema.fields[...].harvest`)."""
# NOTE: Callables with defaults must use pydantic.Field() to not bind to self
[docs] aggregate: Callable[[pd.Series], pd.Series] = pydantic.Field(
default=lambda x: most_and_more_frequent(x, min_frequency=0.7)
)
"""Computes a single value from all field values in a group."""
[docs] tolerance: PositiveFloat = 0.0
"""Fraction of invalid groups above which result is considered invalid."""
[docs]class Encoder(Base):
"""
A class that allows us to standardize reported categorical codes.
Often the original data we are integrating uses short codes to indicate a
categorical value, like ``ST`` in place of "steam turbine" or ``LIG`` in place of
"lignite coal". Many of these coded fields contain non-standard codes due to
data-entry errors. The codes have also evolved over the years.
In order to allow easy comparison of records across all years and tables, we define
a standard set of codes, a mapping from non-standard codes to standard codes (where
possible), and a set of known but unfixable codes which will be ignored and replaced
with NA values. These definitions can be found in :mod:`pudl.metadata.codes` and we
refer to these as coding tables.
In our metadata structures, each coding table is defined just like any other DB
table, with the addition of an associated ``Encoder`` object defining the standard,
fixable, and ignored codes.
In addition, a :class:`Package` class that has been instantiated using the
:meth:`Package.from_resource_ids` method will associate an `Encoder` object with any
column that has a foreign key constraint referring to a coding table (This
column-level encoder is same as the encoder associated with the referenced table).
This `Encoder` can be used to standardize the codes found within the column.
:class:`Field` and :class:`Resource` objects have ``encode()`` methods that will
use the column-level encoders to recode the original values, either for a single
column or for all coded columns within a Resource, given either a corresponding
:class:`pandas.Series` or :class:`pandas.DataFrame` containing actual values.
If any unrecognized values are encountered, an exception will be raised, alerting
us that a new code has been identified, and needs to be classified as fixable or
to be ignored.
"""
"""
A table associating short codes with long descriptions and other information.
Each coding table contains at least a ``code`` column containing the standard codes
and a ``definition`` column with a human readable explanation of what the code
stands for. Additional metadata pertaining to the codes and their categories may
also appear in this dataframe, which will be loaded into the PUDL DB as a static
table. The ``code`` column is a natural primary key and must contain no duplicate
values.
"""
[docs] ignored_codes: List[Union[Int, String]] = []
"""
A list of non-standard codes which appear in the data, and will be set to NA.
These codes may be the result of data entry errors, and we are unable to map them
to the appropriate canonical code. They are discarded from the raw input data.
"""
[docs] code_fixes: Dict[Union[Int, String], Union[Int, String]] = {}
"""
A dictionary mapping non-standard codes to canonical, standardized codes.
The intended meanings of some non-standard codes are clear, and therefore they can
be mapped to the standardized, canonical codes with confidence. Sometimes these are
the result of data entry errors or changes in the stanard codes over time.
"""
@pydantic.validator("df")
[docs] def _df_is_encoding_table(cls, df): # noqa: N805
"""Verify that the coding table provides both codes and descriptions."""
errors = []
if "code" not in df.columns or "description" not in df.columns:
errors.append(
"Encoding tables must contain both 'code' & 'description' columns."
)
if len(df.code) != len(df.code.unique()):
dupes = df[df.duplicated("code")].code.to_list()
errors.append(f"Duplicate codes {dupes} found in coding table")
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return df
@pydantic.validator("ignored_codes")
[docs] def _good_and_ignored_codes_are_disjoint(cls, ignored_codes, values): # noqa: N805
"""Check that there's no overlap between good and ignored codes."""
if "df" not in values:
return ignored_codes
errors = []
overlap = set(values["df"]["code"]).intersection(ignored_codes)
if overlap:
errors.append(
f"Overlap found between good and ignored codes: {overlap}."
)
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return ignored_codes
@pydantic.validator("code_fixes")
[docs] def _good_and_fixable_codes_are_disjoint(cls, code_fixes, values): # noqa: N805
"""Check that there's no overlap between the good and fixable codes."""
if "df" not in values:
return code_fixes
errors = []
overlap = set(values["df"]["code"]).intersection(code_fixes)
if overlap:
errors.append(
f"Overlap found between good and fixable codes: {overlap}"
)
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return code_fixes
@pydantic.validator("code_fixes")
[docs] def _fixable_and_ignored_codes_are_disjoint(cls, code_fixes, values): # noqa: N805
"""Check that there's no overlap between the ignored and fixable codes."""
if "ignored_codes" not in values:
return code_fixes
errors = []
overlap = set(code_fixes).intersection(values["ignored_codes"])
if overlap:
errors.append(
f"Overlap found between fixable and ignored codes: {overlap}"
)
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return code_fixes
@pydantic.validator("code_fixes")
[docs] def _check_fixed_codes_are_good_codes(cls, code_fixes, values): # noqa: N805
"""Check that every every fixed code is also one of the good codes."""
if "df" not in values:
return code_fixes
errors = []
bad_codes = set(code_fixes.values()).difference(values["df"]["code"])
if bad_codes:
errors.append(
f"Some fixed codes aren't in the list of good codes: {bad_codes}"
)
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return code_fixes
@property
[docs] def code_map(self) -> Dict[str, Union[str, type(pd.NA)]]:
"""A mapping of all known codes to their standardized values, or NA."""
code_map = {code: code for code in self.df["code"]}
code_map.update(self.code_fixes)
code_map.update({code: pd.NA for code in self.ignored_codes})
return code_map
[docs] def encode(
self,
col: pd.Series,
dtype: Union[type, None] = None,
) -> pd.Series:
"""Apply the stored code mapping to an input Series."""
# Every value in the Series should appear in the map. If that's not the
# case we want to hear about it so we don't wipe out data unknowingly.
unknown_codes = set(col.dropna()).difference(self.code_map)
if unknown_codes:
raise ValueError(f"Found unknown codes while encoding: {unknown_codes=}")
col = col.map(self.code_map)
if dtype:
col = col.astype(dtype)
return col
@staticmethod
[docs] def dict_from_id(x: str) -> dict:
"""Look up the encoder by coding table name in the metadata."""
return copy.deepcopy(RESOURCE_METADATA[x]).get("encoder", None)
@classmethod
[docs] def from_id(cls, x: str) -> 'Encoder':
"""Construct an Encoder based on `Resource.name` of a coding table."""
return cls(**cls.dict_from_id(x))
[docs]class Field(Base):
"""
Field (`resource.schema.fields[...]`).
See https://specs.frictionlessdata.io/table-schema/#field-descriptors.
Examples:
>>> field = Field(name='x', type='string', constraints={'enum': ['x', 'y']})
>>> field.to_pandas_dtype()
CategoricalDtype(categories=['x', 'y'], ordered=False)
>>> field.to_sql()
Column('x', Enum('x', 'y'), CheckConstraint(...), table=None)
>>> field = Field.from_id('utility_id_eia')
>>> field.name
'utility_id_eia'
"""
[docs] type: Literal[ # noqa: A003
"string", "number", "integer", "boolean", "date", "datetime", "year"
]
[docs] description: String = None
[docs] constraints: FieldConstraints = {}
[docs] harvest: FieldHarvest = {}
[docs] encoder: Encoder = None
@pydantic.validator("constraints")
[docs] def _check_constraints(cls, value, values): # noqa: N805, C901
if "type" not in values:
return value
dtype = values["type"]
errors = []
for key in ("min_length", "max_length", "pattern"):
if getattr(value, key) is not None and dtype != "string":
errors.append(f"{key} not supported by {dtype} field")
for key in ("minimum", "maximum"):
x = getattr(value, key)
if x is not None:
if dtype in ("string", "boolean"):
errors.append(f"{key} not supported by {dtype} field")
elif not isinstance(x, CONSTRAINT_DTYPES[dtype]):
errors.append(f"{key} not {dtype}")
if value.enum:
for x in value.enum:
if not isinstance(x, CONSTRAINT_DTYPES[dtype]):
errors.append(f"enum value {x} not {dtype}")
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return value
@pydantic.validator("encoder")
[docs] def _check_encoder(cls, value, values): # noqa: N805
if "type" not in values or value is None:
return value
errors = []
dtype = values["type"]
if dtype not in ["string", "integer"]:
errors.append(
"Encoding only supported for string and integer fields, found "
f"{dtype}"
)
if errors:
raise ValueError(format_errors(*errors, pydantic=True))
return value
@staticmethod
[docs] def dict_from_id(x: str) -> dict:
"""Construct dictionary from PUDL identifier (`Field.name`)."""
return {'name': x, **copy.deepcopy(FIELD_METADATA[x])}
@classmethod
[docs] def from_id(cls, x: str) -> 'Field':
"""Construct from PUDL identifier (`Field.name`)."""
return cls(**cls.dict_from_id(x))
[docs] def to_pandas_dtype(
self, compact: bool = False
) -> Union[str, pd.CategoricalDtype]:
"""
Return Pandas data type.
Args:
compact: Whether to return a low-memory data type
(32-bit integer or float).
"""
if self.constraints.enum:
return pd.CategoricalDtype(self.constraints.enum)
if compact:
if self.type == "integer":
return "Int32"
if self.type == "number":
return "float32"
return FIELD_DTYPES[self.type]
[docs] def to_sql_dtype(self) -> sa.sql.visitors.VisitableType:
"""Return SQLAlchemy data type."""
if self.constraints.enum and self.type == "string":
return sa.Enum(*self.constraints.enum)
return FIELD_DTYPES_SQL[self.type]
[docs] def to_sql( # noqa: C901
self,
dialect: Literal["sqlite"] = "sqlite",
check_types: bool = True,
check_values: bool = True,
) -> sa.Column:
"""Return equivalent SQL column."""
if dialect != "sqlite":
raise NotImplementedError(f"Dialect {dialect} is not supported")
checks = []
name = _format_for_sql(self.name, identifier=True)
if check_types:
# Required with TYPEOF since TYPEOF(NULL) = 'null'
prefix = "" if self.constraints.required else f"{name} IS NULL OR "
# Field type
if self.type == "string":
checks.append(f"{prefix}TYPEOF({name}) = 'text'")
elif self.type in ("integer", "year"):
checks.append(f"{prefix}TYPEOF({name}) = 'integer'")
elif self.type == "number":
checks.append(f"{prefix}TYPEOF({name}) = 'real'")
elif self.type == "boolean":
# Just IN (0, 1) accepts floats equal to 0, 1 (0.0, 1.0)
checks.append(
f"{prefix}(TYPEOF({name}) = 'integer' AND {name} IN (0, 1))")
elif self.type == "date":
checks.append(f"{name} IS DATE({name})")
elif self.type == "datetime":
checks.append(f"{name} IS DATETIME({name})")
if check_values:
# Field constraints
if self.constraints.min_length is not None:
checks.append(f"LENGTH({name}) >= {self.constraints.min_length}")
if self.constraints.max_length is not None:
checks.append(f"LENGTH({name}) <= {self.constraints.max_length}")
if self.constraints.minimum is not None:
minimum = _format_for_sql(self.constraints.minimum)
checks.append(f"{name} >= {minimum}")
if self.constraints.maximum is not None:
maximum = _format_for_sql(self.constraints.maximum)
checks.append(f"{name} <= {maximum}")
if self.constraints.pattern:
pattern = _format_for_sql(self.constraints.pattern)
checks.append(f"{name} REGEXP {pattern}")
if self.constraints.enum:
enum = [_format_for_sql(x) for x in self.constraints.enum]
checks.append(f"{name} IN ({', '.join(enum)})")
return sa.Column(
self.name,
self.to_sql_dtype(),
*[sa.CheckConstraint(check) for check in checks],
nullable=not self.constraints.required,
unique=self.constraints.unique,
comment=self.description
)
[docs] def encode(
self,
col: pd.Series,
dtype: Union[type, None] = None
) -> pd.Series:
"""Recode the Field if it has an associated encoder."""
return self.encoder.encode(col, dtype=dtype) if self.encoder else col
# ---- Classes: Resource ---- #
[docs]class ForeignKeyReference(Base):
"""
Foreign key reference (`resource.schema.foreign_keys[...].reference`).
See https://specs.frictionlessdata.io/table-schema/#foreign-keys.
"""
[docs] fields_: StrictList(SnakeCase) = pydantic.Field(alias="fields")
[docs] _check_unique = _validator("fields_", fn=_check_unique)
[docs]class ForeignKey(Base):
"""
Foreign key (`resource.schema.foreign_keys[...]`).
See https://specs.frictionlessdata.io/table-schema/#foreign-keys.
"""
[docs] fields_: StrictList(SnakeCase) = pydantic.Field(alias="fields")
[docs] reference: ForeignKeyReference
[docs] _check_unique = _validator("fields_", fn=_check_unique)
@pydantic.validator("reference")
[docs] def _check_fields_equal_length(cls, value, values): # noqa: N805
if "fields_" in values:
if len(value.fields) != len(values["fields_"]):
raise ValueError("fields and reference.fields are not equal length")
return value
[docs] def is_simple(self) -> bool:
"""Indicate whether the FK relationship contains a single column."""
return True if len(self.fields) == 1 else False
[docs] def to_sql(self) -> sa.ForeignKeyConstraint:
"""Return equivalent SQL Foreign Key."""
return sa.ForeignKeyConstraint(
self.fields,
[f"{self.reference.resource}.{field}" for field in self.reference.fields]
)
[docs]class Schema(Base):
"""
Table schema (`resource.schema`).
See https://specs.frictionlessdata.io/table-schema.
"""
[docs] fields_: StrictList(Field) = pydantic.Field(alias="fields")
[docs] missing_values: List[pydantic.StrictStr] = [""]
[docs] primary_key: StrictList(SnakeCase) = None
[docs] foreign_keys: List[ForeignKey] = []
[docs] _check_unique = _validator(
"missing_values", "primary_key", "foreign_keys", fn=_check_unique
)
@pydantic.validator("fields_")
[docs] def _check_field_names_unique(cls, value): # noqa: N805
_check_unique([f.name for f in value])
return value
@pydantic.validator("primary_key")
[docs] def _check_primary_key_in_fields(cls, value, values): # noqa: N805
if value is not None and "fields_" in values:
missing = []
names = [f.name for f in values['fields_']]
for name in value:
if name in names:
# Flag primary key fields as required
field = values['fields_'][names.index(name)]
field.constraints.required = True
else:
missing.append(field.name)
if missing:
raise ValueError(f"names {missing} missing from fields")
return value
@pydantic.validator("foreign_keys", each_item=True)
[docs] def _check_foreign_key_in_fields(cls, value, values): # noqa: N805
if value and "fields_" in values:
names = [f.name for f in values['fields_']]
missing = [x for x in value.fields if x not in names]
if missing:
raise ValueError(f"names {missing} missing from fields")
return value
[docs]class License(Base):
"""
Data license (`package|resource.licenses[...]`).
See https://specs.frictionlessdata.io/data-package/#licenses.
"""
@staticmethod
[docs] def dict_from_id(x: str) -> dict:
"""Construct dictionary from PUDL identifier."""
return copy.deepcopy(LICENSES[x])
@classmethod
[docs] def from_id(cls, x: str) -> "License":
"""Construct from PUDL identifier."""
return cls(**cls.dict_from_id(x))
[docs]class Source(Base):
"""
Data source (`package|resource.sources[...]`).
See https://specs.frictionlessdata.io/data-package/#sources.
"""
@staticmethod
[docs] def dict_from_id(x: str) -> dict:
"""Construct dictionary from PUDL identifier."""
return copy.deepcopy(SOURCES[x])
@classmethod
[docs] def from_id(cls, x: str) -> "Source":
"""Construct from PUDL identifier."""
return cls(**cls.dict_from_id(x))
[docs]class Contributor(Base):
"""
Data contributor (`package.contributors[...]`).
See https://specs.frictionlessdata.io/data-package/#contributors.
"""
[docs] role: Literal[
"author", "contributor", "maintainer", "publisher", "wrangler"
] = "contributor"
[docs] organization: String = None
@staticmethod
[docs] def dict_from_id(x: str) -> dict:
"""Construct dictionary from PUDL identifier."""
return copy.deepcopy(CONTRIBUTORS[x])
@classmethod
[docs] def from_id(cls, x: str) -> "Contributor":
"""Construct from PUDL identifier."""
return cls(**cls.dict_from_id(x))
[docs]class ResourceHarvest(Base):
"""Resource harvest parameters (`resource.harvest`)."""
"""
Whether to harvest from dataframes based on field names.
If `False`, the dataframe with the same name is used
and the process is limited to dropping unwanted fields.
"""
[docs] tolerance: PositiveFloat = 0.0
"""Fraction of invalid fields above which result is considerd invalid."""
[docs]class Resource(Base):
"""
Tabular data resource (`package.resources[...]`).
See https://specs.frictionlessdata.io/tabular-data-resource.
Examples:
A simple example illustrates the conversion to SQLAlchemy objects.
>>> fields = [{'name': 'x', 'type': 'year'}, {'name': 'y', 'type': 'string'}]
>>> fkeys = [{'fields': ['x', 'y'], 'reference': {'resource': 'b', 'fields': ['x', 'y']}}]
>>> schema = {'fields': fields, 'primary_key': ['x'], 'foreign_keys': fkeys}
>>> resource = Resource(name='a', schema=schema)
>>> table = resource.to_sql()
>>> table.columns.x
Column('x', Integer(), ForeignKey('b.x'), CheckConstraint(...), table=<a>, primary_key=True, nullable=False)
>>> table.columns.y
Column('y', Text(), ForeignKey('b.y'), CheckConstraint(...), table=<a>)
To illustrate harvesting operations,
say we have a resource with two fields - a primary key (`id`) and a data field -
which we want to harvest from two different dataframes.
>>> from pudl.metadata.helpers import unique, as_dict
>>> fields = [
... {'name': 'id', 'type': 'integer'},
... {'name': 'x', 'type': 'integer', 'harvest': {'aggregate': unique, 'tolerance': 0.25}}
... ]
>>> resource = Resource(**{
... 'name': 'a',
... 'harvest': {'harvest': True},
... 'schema': {'fields': fields, 'primary_key': ['id']}
... })
>>> dfs = {
... 'a': pd.DataFrame({'id': [1, 1, 2, 2], 'x': [1, 1, 2, 2]}),
... 'b': pd.DataFrame({'id': [2, 3, 3], 'x': [3, 4, 4]})
... }
Skip aggregation to access all the rows concatenated from the input dataframes.
The names of the input dataframes are used as the index.
>>> df, _ = resource.harvest_dfs(dfs, aggregate=False)
>>> df
id x
df
a 1 1
a 1 1
a 2 2
a 2 2
b 2 3
b 3 4
b 3 4
Field names and data types are enforced.
>>> resource.to_pandas_dtypes() == df.dtypes.apply(str).to_dict()
True
Alternatively, aggregate by primary key
(the default when :attr:`harvest`. `harvest=True`)
and report aggregation errors.
>>> df, report = resource.harvest_dfs(dfs)
>>> df
x
id
1 1
2 <NA>
3 4
>>> report['stats']
{'all': 2, 'invalid': 1, 'tolerance': 0.0, 'actual': 0.5}
>>> report['fields']['x']['stats']
{'all': 3, 'invalid': 1, 'tolerance': 0.25, 'actual': 0.33...}
>>> report['fields']['x']['errors']
id
2 Not unique.
Name: x, dtype: object
Customize the error values in the error report.
>>> error = lambda x, e: as_dict(x)
>>> df, report = resource.harvest_dfs(
... dfs, aggregate_kwargs={'raised': False, 'error': error}
... )
>>> report['fields']['x']['errors']
id
2 {'a': [2, 2], 'b': [3]}
Name: x, dtype: object
Limit harvesting to the input dataframe of the same name
by setting :attr:`harvest`. `harvest=False`.
>>> resource.harvest.harvest = False
>>> df, _ = resource.harvest_dfs(dfs, aggregate_kwargs={'raised': False})
>>> df
id x
df
a 1 1
a 1 1
a 2 2
a 2 2
Harvesting can also handle conversion to longer time periods.
Period harvesting requires primary key fields with a `datetime` data type,
except for `year` fields which can be integer.
>>> fields = [{'name': 'report_year', 'type': 'year'}]
>>> resource = Resource(**{
... 'name': 'table', 'harvest': {'harvest': True},
... 'schema': {'fields': fields, 'primary_key': ['report_year']}
... })
>>> df = pd.DataFrame({'report_date': ['2000-02-02', '2000-03-03']})
>>> resource.format_df(df)
report_year
0 2000-01-01
1 2000-01-01
>>> df = pd.DataFrame({'report_year': [2000, 2000]})
>>> resource.format_df(df)
report_year
0 2000-01-01
1 2000-01-01
"""
[docs] description: String = None
[docs] harvest: ResourceHarvest = {}
[docs] group: Literal["eia", "epacems", "ferc1", "ferc714", "glue", "pudl"] = None
[docs] schema_: Schema = pydantic.Field(alias='schema')
[docs] contributors: List[Contributor] = []
[docs] licenses: List[License] = []
[docs] sources: List[Source] = []
[docs] keywords: List[String] = []
[docs] encoder: Encoder = None
[docs] _check_unique = _validator(
"contributors",
"keywords",
"licenses",
"sources",
fn=_check_unique
)
@pydantic.validator("schema_")
[docs] def _check_harvest_primary_key(cls, value, values): # noqa: N805
if values["harvest"].harvest:
if not value.primary_key:
raise ValueError("Harvesting requires a primary key")
return value
@staticmethod
[docs] def dict_from_id(x: str) -> dict: # noqa: C901
"""
Construct dictionary from PUDL identifier (`resource.name`).
* `schema.fields`
* Field names are expanded (:meth:`Field.from_id`).
* Field attributes are replaced with any specific to the
`resource.group` and `field.name`.
* `sources`: Source ids are expanded (:meth:`Source.from_id`).
* `licenses`: License ids are expanded (:meth:`License.from_id`).
* `contributors`: Contributor ids are fetched by source ids,
then expanded (:meth:`Contributor.from_id`).
* `keywords`: Keywords are fetched by source ids.
* `schema.foreign_keys`: Foreign keys are fetched by resource name.
"""
obj = copy.deepcopy(RESOURCE_METADATA[x])
obj["name"] = x
schema = obj["schema"]
# Expand fields
if "fields" in schema:
fields = []
for name in schema["fields"]:
# Lookup field by name
value = Field.dict_from_id(name)
# Update with any custom group-level metadata
group = obj.get("group")
if name in FIELD_METADATA_BY_GROUP.get(group, {}):
value = {**value, **FIELD_METADATA_BY_GROUP[group][name]}
# Update with any custom resource-level metadata
if name in FIELD_METADATA_BY_RESOURCE.get(x, {}):
value = {**value, **FIELD_METADATA_BY_RESOURCE[x][name]}
fields.append(value)
schema["fields"] = fields
# Expand sources
sources = obj.get("sources", [])
obj["sources"] = [Source.dict_from_id(value) for value in sources]
encoder = obj.get("encoder", None)
obj["encoder"] = encoder
# Expand licenses (assign CC-BY-4.0 by default)
licenses = obj.get("licenses", ["cc-by-4.0"])
obj["licenses"] = [License.dict_from_id(value) for value in licenses]
# Lookup and insert contributors
if "contributors" in schema:
raise ValueError("Resource metadata contains explicit contributors")
cids = []
for source in sources:
cids.extend(CONTRIBUTORS_BY_SOURCE.get(source, []))
obj["contributors"] = [Contributor.dict_from_id(cid) for cid in set(cids)]
# Lookup and insert keywords
if "keywords" in schema:
raise ValueError("Resource metadata contains explicit keywords")
keywords = []
for source in sources:
keywords.extend(KEYWORDS_BY_SOURCE.get(source, []))
obj["keywords"] = sorted(set(keywords))
# Insert foreign keys
if "foreign_keys" in schema:
raise ValueError("Resource metadata contains explicit foreign keys")
schema["foreign_keys"] = FOREIGN_KEYS.get(x, [])
# Delete foreign key rules
if "foreign_key_rules" in schema:
del schema["foreign_key_rules"]
return obj
@classmethod
[docs] def from_id(cls, x: str) -> "Resource":
"""Construct from PUDL identifier (`resource.name`)."""
return cls(**cls.dict_from_id(x))
[docs] def get_field(self, name: str) -> Field:
"""Return field with the given name if it's part of the Resources."""
names = [field.name for field in self.schema.fields]
if name not in names:
raise KeyError(
f"The field {name} is not part of the {self.name} schema."
)
return self.schema.fields[names.index(name)]
[docs] def to_sql(
self,
metadata: sa.MetaData = None,
check_types: bool = True,
check_values: bool = True,
) -> sa.Table:
"""Return equivalent SQL Table."""
if metadata is None:
metadata = sa.MetaData()
columns = [
f.to_sql(
check_types=check_types,
check_values=check_values,
)
for f in self.schema.fields
]
constraints = []
if self.schema.primary_key:
constraints.append(sa.PrimaryKeyConstraint(*self.schema.primary_key))
for key in self.schema.foreign_keys:
constraints.append(key.to_sql())
return sa.Table(self.name, metadata, *columns, *constraints)
[docs] def to_pandas_dtypes(
self, **kwargs: Any
) -> Dict[str, Union[str, pd.CategoricalDtype]]:
"""
Return Pandas data type of each field by field name.
Args:
kwargs: Arguments to :meth:`Field.to_pandas_dtype`.
"""
return {f.name: f.to_pandas_dtype(**kwargs) for f in self.schema.fields}
[docs] def match_primary_key(self, names: Iterable[str]) -> Optional[Dict[str, str]]:
"""
Match primary key fields to input field names.
An exact match is required unless :attr:`harvest` .`harvest=True`,
in which case periodic names may also match a basename with a smaller period.
Args:
names: Field names.
Raises:
ValueError: Field names are not unique.
ValueError: Multiple field names match primary key field.
Returns:
The name matching each primary key field (if any) as a :class:`dict`,
or `None` if not all primary key fields have a match.
Examples:
>>> fields = [{'name': 'x_year', 'type': 'year'}]
>>> schema = {'fields': fields, 'primary_key': ['x_year']}
>>> resource = Resource(name='r', schema=schema)
By default, when :attr:`harvest` .`harvest=False`,
exact matches are required.
>>> resource.harvest.harvest
False
>>> resource.match_primary_key(['x_month']) is None
True
>>> resource.match_primary_key(['x_year', 'x_month'])
{'x_year': 'x_year'}
When :attr:`harvest` .`harvest=True`,
in the absence of an exact match,
periodic names may also match a basename with a smaller period.
>>> resource.harvest.harvest = True
>>> resource.match_primary_key(['x_year', 'x_month'])
{'x_year': 'x_year'}
>>> resource.match_primary_key(['x_month'])
{'x_month': 'x_year'}
>>> resource.match_primary_key(['x_month', 'x_date'])
Traceback (most recent call last):
ValueError: ... {'x_month', 'x_date'} match primary key field 'x_year'
"""
if len(names) != len(set(names)):
raise ValueError("Field names are not unique")
keys = self.schema.primary_key or []
if self.harvest.harvest:
remaining = set(names)
matches = {}
for key in keys:
match = None
if key in remaining:
# Use exact match if present
match = key
elif split_period(key)[1]:
# Try periodic alternatives
periods = expand_periodic_column_names([key])
matching = remaining.intersection(periods)
if len(matching) > 1:
raise ValueError(
f"Multiple field names {matching} "
f"match primary key field '{key}'"
)
if len(matching) == 1:
match = list(matching)[0]
if match:
matches[match] = key
remaining.remove(match)
else:
matches = {key: key for key in keys if key in names}
return matches if len(matches) == len(keys) else None
[docs] def aggregate_df(
self, df: pd.DataFrame, raised: bool = False, error: Callable = None
) -> Tuple[pd.DataFrame, dict]:
"""
Aggregate dataframe by primary key.
The dataframe is grouped by primary key fields
and aggregated with the aggregate function of each field
(:attr:`schema_`. `fields[*].harvest.aggregate`).
The report is formatted as follows:
* `valid` (bool): Whether resouce is valid.
* `stats` (dict): Error statistics for resource fields.
* `fields` (dict):
* `<field_name>` (str)
* `valid` (bool): Whether field is valid.
* `stats` (dict): Error statistics for field groups.
* `errors` (:class:`pandas.Series`): Error values indexed by primary key.
* ...
Each `stats` (dict) contains the following:
* `all` (int): Number of entities (field or field group).
* `invalid` (int): Invalid number of entities.
* `tolerance` (float): Fraction of invalid entities below which
parent entity is considered valid.
* `actual` (float): Actual fraction of invalid entities.
Args:
df: Dataframe to aggregate. It is assumed to have column names and
data types matching the resource fields.
raised: Whether aggregation errors are raised or
replaced with :obj:`np.nan` and returned in an error report.
error: A function with signature `f(x, e) -> Any`,
where `x` are the original field values as a :class:`pandas.Series`
and `e` is the original error.
If provided, the returned value is reported instead of `e`.
Raises:
ValueError: A primary key is required for aggregating.
Returns:
The aggregated dataframe indexed by primary key fields,
and an aggregation report (descripted above)
that includes all aggregation errors and whether the result
meets the resource's and fields' tolerance.
"""
if not self.schema.primary_key:
raise ValueError("A primary key is required for aggregating")
aggfuncs = {
f.name: f.harvest.aggregate
for f in self.schema.fields
if f.name not in self.schema.primary_key
}
df, report = groupby_aggregate(
df,
by=self.schema.primary_key,
aggfuncs=aggfuncs,
raised=raised,
error=error,
)
report = self._build_aggregation_report(df, report)
return df, report
[docs] def _build_aggregation_report(self, df: pd.DataFrame, errors: dict) -> dict:
"""
Build report from aggregation errors.
Args:
df: Harvested dataframe (see :meth:`harvest_dfs`).
errors: Aggregation errors (see :func:`groupby_aggregate`).
Returns:
Aggregation report, as described in :meth:`aggregate_df`.
"""
nrows, ncols = df.reset_index().shape
freports = {}
for field in self.schema.fields:
if field.name in errors:
nerrors = errors[field.name].size
else:
nerrors = 0
stats = {
"all": nrows,
"invalid": nerrors,
"tolerance": field.harvest.tolerance,
"actual": nerrors / nrows if nrows else 0,
}
freports[field.name] = {
"valid": stats["actual"] <= stats["tolerance"],
"stats": stats,
"errors": errors.get(field.name, None),
}
nerrors = sum([not f["valid"] for f in freports.values()])
stats = {
"all": ncols,
"invalid": nerrors,
"tolerance": self.harvest.tolerance,
"actual": nerrors / ncols,
}
return {
"valid": stats["actual"] <= stats["tolerance"],
"stats": stats,
"fields": freports,
}
[docs] def harvest_dfs(
self,
dfs: Dict[str, pd.DataFrame],
aggregate: bool = None,
aggregate_kwargs: Dict[str, Any] = {},
format_kwargs: Dict[str, Any] = {}
) -> Tuple[pd.DataFrame, dict]:
"""
Harvest from named dataframes.
For standard resources (:attr:`harvest`. `harvest=False`), the columns
matching all primary key fields and any data fields are extracted from
the input dataframe of the same name.
For harvested resources (:attr:`harvest`. `harvest=True`), the columns
matching all primary key fields and any data fields are extracted from
each compatible input dataframe, and concatenated into a single
dataframe. Periodic key fields (e.g. 'report_month') are matched to any
column of the same name with an equal or smaller period (e.g.
'report_day') and snapped to the start of the desired period.
If `aggregate=False`, rows are indexed by the name of the input dataframe.
If `aggregate=True`, rows are indexed by primary key fields.
Args:
dfs: Dataframes to harvest.
aggregate: Whether to aggregate the harvested rows by their primary key.
By default, this is `True` if `self.harvest.harvest=True` and
`False` otherwise.
aggregate_kwargs: Optional arguments to :meth:`aggregate_df`.
format_kwargs: Optional arguments to :meth:`format_df`.
Returns:
A dataframe harvested from the dataframes, with column names and
data types matching the resource fields, alongside an aggregation
report.
"""
if aggregate is None:
aggregate = self.harvest.harvest
if self.harvest.harvest:
# Harvest resource from all inputs where all primary key fields are present
samples = {}
for name, df in dfs.items():
samples[name] = self.format_df(df, **format_kwargs)
# Pass input names to aggregate via the index
samples[name].index = pd.Index([name] * len(samples[name]), name="df")
df = pd.concat(samples.values())
elif self.name in dfs:
# Subset resource from input of same name
df = self.format_df(dfs[self.name], **format_kwargs)
# Pass input names to aggregate via the index
df.index = pd.Index([self.name] * df.shape[0], name="df")
else:
return self.format_df(df=None, **format_kwargs), {}
if aggregate:
return self.aggregate_df(df, **aggregate_kwargs)
return df, {}
[docs] def to_rst(self, path: str) -> None:
"""Output to an RST file."""
template = JINJA_ENVIRONMENT.get_template("resource.rst.jinja")
rendered = template.render(resource=self)
Path(path).write_text(rendered)
[docs] def encode(self, df: pd.DataFrame) -> pd.DataFrame:
"""Standardize coded columns using the foreign column they refer to."""
for field in self.schema.fields:
if field.encoder:
logger.info(f"Recoding {self.name}.{field.name}")
df[field.name] = field.encoder.encode(
col=df[field.name],
dtype=field.to_pandas_dtype()
)
return df
# ---- Package ---- #
[docs]class Package(Base):
"""
Tabular data package.
See https://specs.frictionlessdata.io/data-package.
Examples:
Foreign keys between resources are checked for completeness and consistency.
>>> fields = [{'name': 'x', 'type': 'year'}, {'name': 'y', 'type': 'string'}]
>>> fkey = {'fields': ['x', 'y'], 'reference': {'resource': 'b', 'fields': ['x', 'y']}}
>>> schema = {'fields': fields, 'primary_key': ['x'], 'foreign_keys': [fkey]}
>>> a = Resource(name='a', schema=schema)
>>> b = Resource(name='b', schema=Schema(fields=fields, primary_key=['x']))
>>> Package(name='ab', resources=[a, b])
Traceback (most recent call last):
ValidationError: ...
>>> b.schema.primary_key = ['x', 'y']
>>> package = Package(name='ab', resources=[a, b])
SQL Alchemy can sort tables, based on foreign keys,
in the order in which they need to be loaded into a database.
>>> metadata = package.to_sql()
>>> [table.name for table in metadata.sorted_tables]
['b', 'a']
"""
[docs] description: String = None
[docs] keywords: List[String] = []
[docs] homepage: HttpUrl = "https://catalyst.coop/pudl"
[docs] created: Datetime = datetime.datetime.utcnow()
[docs] contributors: List[Contributor] = []
[docs] sources: List[Source] = []
[docs] licenses: List[License] = []
[docs] resources: StrictList(Resource)
@pydantic.validator("resources")
[docs] def _check_foreign_keys(cls, value): # noqa: N805
rnames = [resource.name for resource in value]
errors = []
for resource in value:
for foreign_key in resource.schema.foreign_keys:
rname = foreign_key.reference.resource
tag = f"[{resource.name} -> {rname}]"
if rname not in rnames:
errors.append(f"{tag}: Reference not found")
continue
reference = value[rnames.index(rname)]
if not reference.schema.primary_key:
errors.append(f"{tag}: Reference missing primary key")
continue
missing = [
x for x in foreign_key.reference.fields
if x not in reference.schema.primary_key
]
if missing:
errors.append(f"{tag}: Reference primary key missing {missing}")
if errors:
raise ValueError(
format_errors(*errors, title="Foreign keys", pydantic=True)
)
return value
@pydantic.root_validator(skip_on_failure=True)
[docs] def _populate_from_resources(cls, values): # noqa: N805
for key in ('keywords', 'contributors', 'sources', 'licenses'):
values[key] = _unique(
values[key],
*[getattr(r, key) for r in values['resources']]
)
return values
@classmethod
[docs] def from_resource_ids(
cls, resource_ids: Iterable[str], resolve_foreign_keys: bool = False
) -> "Package":
"""
Construct a collection of Resources from PUDL identifiers (`resource.name`).
Identify any fields that have foreign key relationships referencing the
coding tables defined in :mod:`pudl.metadata.codes` and if so, associate the
coding table's encoder with those columns for later use cleaning them up.
Args:
resource_ids: Resource PUDL identifiers (`resource.name`).
resolve_foreign_keys: Whether to add resources as needed based on
foreign keys.
"""
resources = [Resource.dict_from_id(x) for x in resource_ids]
if resolve_foreign_keys:
# Add missing resources based on foreign keys
names = list(resource_ids)
i = 0
while i < len(resources):
for resource in resources[i:]:
for key in resource["schema"].get("foreign_keys", []):
name = key.get("reference", {}).get("resource")
if name and name not in names:
names.append(name)
i = len(resources)
if len(names) > i:
resources += [Resource.dict_from_id(x) for x in names[i:]]
# Add per-column encoders for each resource
for resource in resources:
# Foreign key relationships determine the set of codes to use
for fk in resource["schema"]["foreign_keys"]:
# Only referenced tables with an associated encoder indicate
# that the column we're looking at should have an encoder
# attached to it. All of these FK relationships must have simple
# single-column keys.
encoder = Encoder.dict_from_id(fk["reference"]["resource"])
if len(fk["fields"]) == 1 and encoder:
# fk["fields"] is a one element list, get the one element:
field = fk["fields"][0]
field_names = [f["name"] for f in resource["schema"]["fields"]]
idx = field_names.index(field)
resource["schema"]["fields"][idx]["encoder"] = encoder
return cls(name="pudl", resources=resources)
[docs] def get_resource(self, name: str) -> Resource:
"""Return the resource with the given name if it is in the Package."""
names = [resource.name for resource in self.resources]
return self.resources[names.index(name)]
[docs] def to_rst(self, path: str) -> None:
"""Output to an RST file."""
template = JINJA_ENVIRONMENT.get_template("package.rst.jinja")
rendered = template.render(package=self)
Path(path).write_text(rendered)
[docs] def to_sql(
self,
check_types: bool = True,
check_values: bool = True,
) -> sa.MetaData:
"""Return equivalent SQL MetaData."""
metadata = sa.MetaData()
for resource in self.resources:
_ = resource.to_sql(
metadata,
check_types=check_types,
check_values=check_values,
)
return metadata