pudl.metadata.helpers

Functions for manipulating metadata constants.

Module Contents

Functions

format_errors(*errors: str, title: str = None, pydantic: bool = False) → str

Format multiple errors into a single error.

_parse_field_names(fields: List[Union[str, dict]]) → List[str]

Parse field names.

_parse_foreign_key_rule(rule: dict, name: str, key: List[str]) → List[dict]

Parse foreign key rule from resource descriptor.

_build_foreign_key_tree(resources: Dict[str, dict]) → Dict[str, Dict[Tuple[str, Ellipsis], dict]]

Build foreign key tree.

_traverse_foreign_key_tree(tree: Dict[str, Dict[Tuple[str, Ellipsis], dict]], name: str, fields: Tuple[str, Ellipsis]) → List[Dict[str, Any]]

Traverse foreign key tree.

build_foreign_keys(resources: Dict[str, dict], prune: bool = True) → Dict[str, List[dict]]

Build foreign keys for each resource.

split_period(name: str) → Tuple[str, Optional[str]]

Split the time period from a column name.

expand_periodic_column_names(names: Iterable[str]) → List[str]

Add smaller periods to a list of column names.

most_frequent(x: pandas.Series) → Any

Return most frequent value (or error if none exists).

most_and_more_frequent(x: pandas.Series, min_frequency: float = None) → Any

Return most frequent value if more frequent than minimum (or error if none exists).

unique(x: pandas.Series) → Any

Return single unique value (or error if none exists).

as_dict(x: pandas.Series) → Dict[Any, list]

Return dictionary of values, listed by index.

try_aggfunc(func: Callable, raised: bool = True, error: Union[str, Callable] = None) → Callable

Wrap aggregate function in a try-except for error handling.

groupby_apply(df: pandas.DataFrame, by: Iterable, aggfuncs: Dict[Any, Callable], raised: bool = True, error: Callable = None) → Tuple[pandas.DataFrame, Dict[Any, pandas.Series]]

Aggregate dataframe and capture errors (using apply).

groupby_aggregate(df: pandas.DataFrame, by: Iterable, aggfuncs: Dict[Any, Callable], raised: bool = True, error: Callable = None) → Tuple[pandas.DataFrame, Dict[Any, pandas.Series]]

Aggregate dataframe and capture errors (using aggregate).

pudl.metadata.helpers.format_errors(*errors: str, title: str = None, pydantic: bool = False) str[source]

Format multiple errors into a single error.

Parameters
  • errors – Error messages.

  • title – Title for error messages.

Examples

>>> e = format_errors('worse', title='bad')
>>> print(e)
bad
* worse
>>> e = format_errors('worse', title='bad', pydantic=True)
>>> print(e)
bad
  * worse
>>> e = format_errors('bad', 'worse')
>>> print(e)
* bad
* worse
>>> e = format_errors('bad', 'worse', pydantic=True)
>>> print(e)
* bad
  * worse
pudl.metadata.helpers._parse_field_names(fields: List[Union[str, dict]]) List[str][source]

Parse field names.

Parameters

fields – Either field names or field descriptors with a name key.

Returns

Field names.

pudl.metadata.helpers._parse_foreign_key_rule(rule: dict, name: str, key: List[str]) List[dict][source]

Parse foreign key rule from resource descriptor.

Parameters
  • meta – Resource descriptor.

  • name – Resource name.

  • key – Resource primary key.

Returns

  • fields (List[str]): Local fields.

  • reference[‘resource’] (str): Reference resource name.

  • reference[‘fields’] (List[str]): Reference primary key fields.

  • exclude (List[str]): Names of resources to exclude, including name.

Return type

Parsed foreign key rules

pudl.metadata.helpers._build_foreign_key_tree(resources: Dict[str, dict]) Dict[str, Dict[Tuple[str, Ellipsis], dict]][source]

Build foreign key tree.

Parameters

resources – Resource descriptors by name.

Returns

Foreign key tree where the first key is a resource name (str), the second key is resource field names (Tuple[str, …]), and the value describes the reference resource (dict):

  • reference[‘resource’] (str): Reference name.

  • reference[‘fields’] (List[str]): Reference field names.

pudl.metadata.helpers._traverse_foreign_key_tree(tree: Dict[str, Dict[Tuple[str, Ellipsis], dict]], name: str, fields: Tuple[str, Ellipsis]) List[Dict[str, Any]][source]

Traverse foreign key tree.

Parameters
Returns

  • fields (List[str]): Local fields.

  • reference[‘resource’] (str): Reference resource name.

  • reference[‘fields’] (List[str]): Reference primary key fields.

Return type

Sequence of foreign keys starting from name and fields

pudl.metadata.helpers.build_foreign_keys(resources: Dict[str, dict], prune: bool = True) Dict[str, List[dict]][source]

Build foreign keys for each resource.

A resource’s foreign_key_rules (if present) determines which other resources will be assigned a foreign key (foreign_keys) to the reference’s primary key:

  • fields (List[List[str]]): Sets of field names for which to create a foreign key. These are assumed to match the order of the reference’s primary key fields.

  • exclude (Optional[List[str]]): Names of resources to exclude.

Parameters
  • resources – Resource descriptors by name.

  • prune – Whether to prune redundant foreign keys.

Returns

Foreign keys for each resource (if any), by resource name.

  • fields (List[str]): Field names.

  • reference[‘resource’] (str): Reference resource name.

  • reference[‘fields’] (List[str]): Reference resource field names.

Examples

>>> resources = {
...     'x': {
...         'schema': {
...             'fields': ['z'],
...             'primary_key': ['z'],
...             'foreign_key_rules': {'fields': [['z']]}
...         }
...     },
...     'y': {
...         'schema': {
...             'fields': ['z', 'yy'],
...             'primary_key': ['z', 'yy'],
...             'foreign_key_rules': {'fields': [['z', 'zz']]}
...         }
...     },
...     'z': {'schema': {'fields': ['z', 'zz']}}
... }
>>> keys = build_foreign_keys(resources)
>>> keys['z']
[{'fields': ['z', 'zz'], 'reference': {'resource': 'y', 'fields': ['z', 'yy']}}]
>>> keys['y']
[{'fields': ['z'], 'reference': {'resource': 'x', 'fields': ['z']}}]
>>> keys = build_foreign_keys(resources, prune=False)
>>> keys['z'][0]
{'fields': ['z'], 'reference': {'resource': 'x', 'fields': ['z']}}
pudl.metadata.helpers.split_period(name: str) Tuple[str, Optional[str]][source]

Split the time period from a column name.

Parameters

name – Column name.

Returns

Base name and time period, if any.

Examples

>>> split_period('report_date')
('report', 'date')
>>> split_period('report_day')
('report_day', None)
>>> split_period('date')
('date', None)
pudl.metadata.helpers.expand_periodic_column_names(names: Iterable[str]) List[str][source]

Add smaller periods to a list of column names.

Parameters

names – Column names.

Returns

Column names with additional names for smaller periods.

Examples

>>> expand_periodic_column_names(['id', 'report_year'])
['id', 'report_year', 'report_quarter', 'report_month', 'report_date']
exception pudl.metadata.helpers.AggregationError[source]

Bases: ValueError

Error raised by aggregation functions.

pudl.metadata.helpers.most_frequent(x: pandas.Series) Any[source]

Return most frequent value (or error if none exists).

pudl.metadata.helpers.most_and_more_frequent(x: pandas.Series, min_frequency: float = None) Any[source]

Return most frequent value if more frequent than minimum (or error if none exists).

The minimum frequency ignores null values, so for example, 1 in [1, 1, 1, nan] has a frequency of 1.

pudl.metadata.helpers.unique(x: pandas.Series) Any[source]

Return single unique value (or error if none exists).

pudl.metadata.helpers.as_dict(x: pandas.Series) Dict[Any, list][source]

Return dictionary of values, listed by index.

pudl.metadata.helpers.try_aggfunc(func: Callable, raised: bool = True, error: Union[str, Callable] = None) Callable[source]

Wrap aggregate function in a try-except for error handling.

Parameters
  • func – Aggregate function.

  • raised – Whether AggregationError errors are raised or returned.

  • error

    Error value, whose type and format depends on raise. Below, x is the original input and e is the original error.

    • raised=True: A string with substitions (e.g. ‘Error at {x.name}: {e}’) that replaces the arguments of the original error. By default, the original error is raised unchanged.

    • raised=False: A function with signature f(x, e) returning a value that replaces the arguments of the original error. By default, the original error is returned unchanged.

Returns

Aggregate function with custom error handling.

Examples

>>> x = pd.Series([0, 0, 1, 1], index=['a', 'a', 'a', 'b'])
>>> most_frequent(x)
Traceback (most recent call last):
AggregationError: No value is most frequent.
>>> try_aggfunc(most_frequent, raised=False)(x)
AggregationError('No value is most frequent.')
>>> try_aggfunc(most_frequent, error='Bad dtype {x.dtype}')(x)
Traceback (most recent call last):
AggregationError: Bad dtype int64
>>> error = lambda x, e: as_dict(x)
>>> try_aggfunc(most_frequent, raised=False, error=error)(x)
AggregationError({'a': [0, 0, 1], 'b': [1]})
pudl.metadata.helpers.groupby_apply(df: pandas.DataFrame, by: Iterable, aggfuncs: Dict[Any, Callable], raised: bool = True, error: Callable = None) Tuple[pandas.DataFrame, Dict[Any, pandas.Series]][source]

Aggregate dataframe and capture errors (using apply).

Parameters
  • df – Dataframe to aggregate.

  • by – Columns names to use to group rows (see pandas.DataFrame.groupby()).

  • aggfuncs – Aggregation functions for columns not in by.

  • raised – Whether AggregationError errors are raised or replaced with np.nan and returned in an error report.

  • error – A function with signature f(x, e) -> Tuple[Any, Any], where x is the original input and e is the original error, used when raised=False. The first and second value of the returned tuple are used as the index and values, respectively, of the pandas.Series returned for each column. By default, the first value is x.name (the values of columns by for that row group), and the second is the original error.

Returns

Aggregated dataframe with by columns set as the index and an error report with (if raised=False) a pandas.Series for each column where errors occured.

Examples

>>> df = pd.DataFrame({'x': [0, 0, 1, 1], 'y': pd.Series([2, 2, 2, 3], dtype='Int64')})
>>> df.index = [0, 0, 0, 1]
>>> base = dict(df=df, by='x', aggfuncs={'y': unique})
>>> groupby_apply(**base)
Traceback (most recent call last):
AggregationError: Could not aggregate y at x = 1: Not unique.
>>> _, report = groupby_apply(**base, raised=False)
>>> report['y']
x
1    Not unique.
dtype: object
>>> error = lambda x, e: (x.name, as_dict(x))
>>> _, report = groupby_apply(**base, raised=False, error=error)
>>> report['y']
x
1    {0: [2], 1: [3]}
dtype: object
pudl.metadata.helpers.groupby_aggregate(df: pandas.DataFrame, by: Iterable, aggfuncs: Dict[Any, Callable], raised: bool = True, error: Callable = None) Tuple[pandas.DataFrame, Dict[Any, pandas.Series]][source]

Aggregate dataframe and capture errors (using aggregate).

Although faster than groupby_apply(), it has some limitations:

  • Raised errors cannot access the group index.

  • Aggregation functions must return a scalar (must ‘reduce’). This is not a limitation with pandas.Series.apply().

Parameters
  • df – Dataframe to aggregate.

  • by – Columns names to use to group rows (see pandas.DataFrame.groupby()).

  • aggfuncs – Aggregation functions for columns not in by.

  • raised – Whether AggregationError errors are raised or replaced with np.nan and returned in an error report.

  • error – A function with signature f(x, e) -> Any, where x is the original input and e is the original error, used when raised=False. By default, the original error is returned.

Returns

Aggregated dataframe with by columns set as the index and an error report with (if raised=False) a pandas.Series of errors (or the value returned by error) for each column where errors occured.

Examples

>>> df = pd.DataFrame({
...     'x': [0, 0, 1, 1],
...     'y': pd.Series([2, 2, 2, 3], dtype='Int64')
... })
>>> df.index = [0, 0, 0, 1]
>>> base = dict(df=df, by='x', aggfuncs={'y': unique})
>>> groupby_aggregate(**base)
Traceback (most recent call last):
AggregationError: Could not aggregate y: Not unique.
>>> result, report = groupby_aggregate(**base, raised=False)
>>> result
      y
x
0     2
1  <NA>
>>> report['y']
x
1    Not unique.
Name: y, dtype: object
>>> error = lambda x, e: as_dict(x)
>>> result, report = groupby_aggregate(**base, raised=False, error=error)
>>> report['y']
x
1    {0: [2], 1: [3]}
Name: y, dtype: object