pudl.metadata
Metadata constants and methods.
Subpackages
Submodules
Package Contents
Classes
Tabular data package. |
|
Tabular data resource (package.resources[...]). |
Attributes
- class pudl.metadata.Package[source]
Bases:
Base
Tabular data package.
See https://specs.frictionlessdata.io/data-package.
Examples
Foreign keys between resources are checked for completeness and consistency.
>>> fields = [{'name': 'x', 'type': 'year'}, {'name': 'y', 'type': 'string'}] >>> fkey = {'fields': ['x', 'y'], 'reference': {'resource': 'b', 'fields': ['x', 'y']}} >>> schema = {'fields': fields, 'primary_key': ['x'], 'foreign_keys': [fkey]} >>> a = Resource(name='a', schema=schema) >>> b = Resource(name='b', schema=Schema(fields=fields, primary_key=['x'])) >>> Package(name='ab', resources=[a, b]) Traceback (most recent call last): ValidationError: ... >>> b.schema.primary_key = ['x', 'y'] >>> package = Package(name='ab', resources=[a, b])
SQL Alchemy can sort tables, based on foreign keys, in the order in which they need to be loaded into a database.
>>> metadata = package.to_sql() >>> [table.name for table in metadata.sorted_tables] ['b', 'a']
- name :String
- title :String
- description :String
- keywords :List[String] = []
- homepage :HttpUrl = https://catalyst.coop/pudl
- created :Datetime
- contributors :List[Contributor] = []
- sources :List[Source] = []
- licenses :List[License] = []
- resources :StrictList(Resource)
- _check_foreign_keys(cls, value)
- _populate_from_resources(cls, values)
- classmethod from_resource_ids(cls, resource_ids: Iterable[str], resolve_foreign_keys: bool = False) Package
Construct a collection of Resources from PUDL identifiers (resource.name).
Identify any fields that have foreign key relationships referencing the coding tables defined in
pudl.metadata.codes
and if so, associate the coding table’s encoder with those columns for later use cleaning them up.- Parameters
resource_ids – Resource PUDL identifiers (resource.name).
resolve_foreign_keys – Whether to add resources as needed based on foreign keys.
- class pudl.metadata.Resource[source]
Bases:
Base
Tabular data resource (package.resources[…]).
See https://specs.frictionlessdata.io/tabular-data-resource.
Examples
A simple example illustrates the conversion to SQLAlchemy objects.
>>> fields = [{'name': 'x', 'type': 'year'}, {'name': 'y', 'type': 'string'}] >>> fkeys = [{'fields': ['x', 'y'], 'reference': {'resource': 'b', 'fields': ['x', 'y']}}] >>> schema = {'fields': fields, 'primary_key': ['x'], 'foreign_keys': fkeys} >>> resource = Resource(name='a', schema=schema) >>> table = resource.to_sql() >>> table.columns.x Column('x', Integer(), ForeignKey('b.x'), CheckConstraint(...), table=<a>, primary_key=True, nullable=False) >>> table.columns.y Column('y', Text(), ForeignKey('b.y'), CheckConstraint(...), table=<a>)
To illustrate harvesting operations, say we have a resource with two fields - a primary key (id) and a data field - which we want to harvest from two different dataframes.
>>> from pudl.metadata.helpers import unique, as_dict >>> fields = [ ... {'name': 'id', 'type': 'integer'}, ... {'name': 'x', 'type': 'integer', 'harvest': {'aggregate': unique, 'tolerance': 0.25}} ... ] >>> resource = Resource(**{ ... 'name': 'a', ... 'harvest': {'harvest': True}, ... 'schema': {'fields': fields, 'primary_key': ['id']} ... }) >>> dfs = { ... 'a': pd.DataFrame({'id': [1, 1, 2, 2], 'x': [1, 1, 2, 2]}), ... 'b': pd.DataFrame({'id': [2, 3, 3], 'x': [3, 4, 4]}) ... }
Skip aggregation to access all the rows concatenated from the input dataframes. The names of the input dataframes are used as the index.
>>> df, _ = resource.harvest_dfs(dfs, aggregate=False) >>> df id x df a 1 1 a 1 1 a 2 2 a 2 2 b 2 3 b 3 4 b 3 4
Field names and data types are enforced.
>>> resource.to_pandas_dtypes() == df.dtypes.apply(str).to_dict() True
Alternatively, aggregate by primary key (the default when
harvest
. harvest=True) and report aggregation errors.>>> df, report = resource.harvest_dfs(dfs) >>> df x id 1 1 2 <NA> 3 4 >>> report['stats'] {'all': 2, 'invalid': 1, 'tolerance': 0.0, 'actual': 0.5} >>> report['fields']['x']['stats'] {'all': 3, 'invalid': 1, 'tolerance': 0.25, 'actual': 0.33...} >>> report['fields']['x']['errors'] id 2 Not unique. Name: x, dtype: object
Customize the error values in the error report.
>>> error = lambda x, e: as_dict(x) >>> df, report = resource.harvest_dfs( ... dfs, aggregate_kwargs={'raised': False, 'error': error} ... ) >>> report['fields']['x']['errors'] id 2 {'a': [2, 2], 'b': [3]} Name: x, dtype: object
Limit harvesting to the input dataframe of the same name by setting
harvest
. harvest=False.>>> resource.harvest.harvest = False >>> df, _ = resource.harvest_dfs(dfs, aggregate_kwargs={'raised': False}) >>> df id x df a 1 1 a 1 1 a 2 2 a 2 2
Harvesting can also handle conversion to longer time periods. Period harvesting requires primary key fields with a datetime data type, except for year fields which can be integer.
>>> fields = [{'name': 'report_year', 'type': 'year'}] >>> resource = Resource(**{ ... 'name': 'table', 'harvest': {'harvest': True}, ... 'schema': {'fields': fields, 'primary_key': ['report_year']} ... }) >>> df = pd.DataFrame({'report_date': ['2000-02-02', '2000-03-03']}) >>> resource.format_df(df) report_year 0 2000-01-01 1 2000-01-01 >>> df = pd.DataFrame({'report_year': [2000, 2000]}) >>> resource.format_df(df) report_year 0 2000-01-01 1 2000-01-01
- name :SnakeCase
- title :String
- description :String
- harvest :ResourceHarvest
- group :Literal[eia, epacems, ferc1, ferc714, glue, pudl]
- schema_ :Schema
- contributors :List[Contributor] = []
- licenses :List[License] = []
- sources :List[Source] = []
- keywords :List[String] = []
- encoder :Encoder
- _check_unique
- _check_harvest_primary_key(cls, value, values)
- static dict_from_id(x: str) dict
Construct dictionary from PUDL identifier (resource.name).
schema.fields
Field names are expanded (
Field.from_id()
).Field attributes are replaced with any specific to the resource.group and field.name.
sources: Source ids are expanded (
Source.from_id()
).licenses: License ids are expanded (
License.from_id()
).contributors: Contributor ids are fetched by source ids, then expanded (
Contributor.from_id()
).keywords: Keywords are fetched by source ids.
schema.foreign_keys: Foreign keys are fetched by resource name.
- to_sql(self, metadata: sqlalchemy.MetaData = None, check_types: bool = True, check_values: bool = True) sqlalchemy.Table
Return equivalent SQL Table.
- to_pandas_dtypes(self, **kwargs: Any) Dict[str, Union[str, pandas.CategoricalDtype]]
Return Pandas data type of each field by field name.
- Parameters
kwargs – Arguments to
Field.to_pandas_dtype()
.
- match_primary_key(self, names: Iterable[str]) Optional[Dict[str, str]]
Match primary key fields to input field names.
An exact match is required unless
harvest
.`harvest=True`, in which case periodic names may also match a basename with a smaller period.- Parameters
names – Field names.
- Raises
ValueError – Field names are not unique.
ValueError – Multiple field names match primary key field.
- Returns
The name matching each primary key field (if any) as a
dict
, or None if not all primary key fields have a match.
Examples
>>> fields = [{'name': 'x_year', 'type': 'year'}] >>> schema = {'fields': fields, 'primary_key': ['x_year']} >>> resource = Resource(name='r', schema=schema)
By default, when
harvest
.`harvest=False`, exact matches are required.>>> resource.harvest.harvest False >>> resource.match_primary_key(['x_month']) is None True >>> resource.match_primary_key(['x_year', 'x_month']) {'x_year': 'x_year'}
When
harvest
.`harvest=True`, in the absence of an exact match, periodic names may also match a basename with a smaller period.>>> resource.harvest.harvest = True >>> resource.match_primary_key(['x_year', 'x_month']) {'x_year': 'x_year'} >>> resource.match_primary_key(['x_month']) {'x_month': 'x_year'} >>> resource.match_primary_key(['x_month', 'x_date']) Traceback (most recent call last): ValueError: ... {'x_month', 'x_date'} match primary key field 'x_year'
- format_df(self, df: pandas.DataFrame = None, **kwargs: Any) pandas.DataFrame
Format a dataframe.
- Parameters
df – Dataframe to format.
kwargs – Arguments to
Field.to_pandas_dtypes()
.
- Returns
Dataframe with column names and data types matching the resource fields. Periodic primary key fields are snapped to the start of the desired period. If the primary key fields could not be matched to columns in df (
match_primary_key()
) or if df=None, an empty dataframe is returned.
- aggregate_df(self, df: pandas.DataFrame, raised: bool = False, error: Callable = None) Tuple[pandas.DataFrame, dict]
Aggregate dataframe by primary key.
The dataframe is grouped by primary key fields and aggregated with the aggregate function of each field (
schema_
. fields[*].harvest.aggregate).The report is formatted as follows:
valid (bool): Whether resouce is valid.
stats (dict): Error statistics for resource fields.
fields (dict):
<field_name> (str)
valid (bool): Whether field is valid.
stats (dict): Error statistics for field groups.
errors (
pandas.Series
): Error values indexed by primary key.
…
Each stats (dict) contains the following:
all (int): Number of entities (field or field group).
invalid (int): Invalid number of entities.
tolerance (float): Fraction of invalid entities below which parent entity is considered valid.
actual (float): Actual fraction of invalid entities.
- Parameters
df – Dataframe to aggregate. It is assumed to have column names and data types matching the resource fields.
raised – Whether aggregation errors are raised or replaced with
np.nan
and returned in an error report.error – A function with signature f(x, e) -> Any, where x are the original field values as a
pandas.Series
and e is the original error. If provided, the returned value is reported instead of e.
- Raises
ValueError – A primary key is required for aggregating.
- Returns
The aggregated dataframe indexed by primary key fields, and an aggregation report (descripted above) that includes all aggregation errors and whether the result meets the resource’s and fields’ tolerance.
- _build_aggregation_report(self, df: pandas.DataFrame, errors: dict) dict
Build report from aggregation errors.
- Parameters
df – Harvested dataframe (see
harvest_dfs()
).errors – Aggregation errors (see
groupby_aggregate()
).
- Returns
Aggregation report, as described in
aggregate_df()
.
- harvest_dfs(self, dfs: Dict[str, pandas.DataFrame], aggregate: bool = None, aggregate_kwargs: Dict[str, Any] = {}, format_kwargs: Dict[str, Any] = {}) Tuple[pandas.DataFrame, dict]
Harvest from named dataframes.
For standard resources (
harvest
. harvest=False), the columns matching all primary key fields and any data fields are extracted from the input dataframe of the same name.For harvested resources (
harvest
. harvest=True), the columns matching all primary key fields and any data fields are extracted from each compatible input dataframe, and concatenated into a single dataframe. Periodic key fields (e.g. ‘report_month’) are matched to any column of the same name with an equal or smaller period (e.g. ‘report_day’) and snapped to the start of the desired period.If aggregate=False, rows are indexed by the name of the input dataframe. If aggregate=True, rows are indexed by primary key fields.
- Parameters
dfs – Dataframes to harvest.
aggregate – Whether to aggregate the harvested rows by their primary key. By default, this is True if self.harvest.harvest=True and False otherwise.
aggregate_kwargs – Optional arguments to
aggregate_df()
.format_kwargs – Optional arguments to
format_df()
.
- Returns
A dataframe harvested from the dataframes, with column names and data types matching the resource fields, alongside an aggregation report.
- encode(self, df: pandas.DataFrame) pandas.DataFrame
Standardize coded columns using the foreign column they refer to.