pudl.metadata.helpers ===================== .. py:module:: pudl.metadata.helpers .. autoapi-nested-parse:: Functions for manipulating metadata constants. Exceptions ---------- .. autoapisummary:: pudl.metadata.helpers.AggregationError Functions --------- .. autoapisummary:: pudl.metadata.helpers.format_errors pudl.metadata.helpers._parse_field_names pudl.metadata.helpers._parse_foreign_key_rule pudl.metadata.helpers._build_foreign_key_tree pudl.metadata.helpers._traverse_foreign_key_tree pudl.metadata.helpers.build_foreign_keys pudl.metadata.helpers.split_period pudl.metadata.helpers.expand_periodic_column_names pudl.metadata.helpers.most_frequent pudl.metadata.helpers.most_and_more_frequent pudl.metadata.helpers.unique pudl.metadata.helpers.as_dict pudl.metadata.helpers.try_aggfunc pudl.metadata.helpers.groupby_apply pudl.metadata.helpers.groupby_aggregate Module Contents --------------- .. py:function:: format_errors(*errors: str, title: str = None, pydantic: bool = False) -> str Format multiple errors into a single error. :param errors: Error messages. :param title: Title for error messages. .. rubric:: Examples >>> e = format_errors('worse', title='bad') >>> print(e) bad * worse >>> e = format_errors('worse', title='bad', pydantic=True) >>> print(e) bad * worse >>> e = format_errors('bad', 'worse') >>> print(e) * bad * worse >>> e = format_errors('bad', 'worse', pydantic=True) >>> print(e) * bad * worse .. py:function:: _parse_field_names(fields: list[str | dict]) -> list[str] Parse field names. :param fields: Either field names or field descriptors with a `name` key. :returns: Field names. .. py:function:: _parse_foreign_key_rule(rule: dict, name: str, key: list[str]) -> list[dict] Parse foreign key rule from resource descriptor. :param meta: Resource descriptor. :param name: Resource name. :param key: Resource primary key. :returns: * `fields` (List[str]): Local fields. * `reference['resource']` (str): Reference resource name. * `reference['fields']` (List[str]): Reference primary key fields. * `exclude` (List[str]): Names of resources to exclude, including `name`. :rtype: Parsed foreign key rules .. py:function:: _build_foreign_key_tree(resources: dict[str, dict]) -> dict[str, dict[tuple[str, Ellipsis], dict]] Build foreign key tree. :param resources: Resource descriptors by name. :returns: Foreign key tree where the first key is a resource name (str), the second key is resource field names (Tuple[str, ...]), and the value describes the reference resource (dict): * `reference['resource']` (str): Reference name. * `reference['fields']` (List[str]): Reference field names. .. py:function:: _traverse_foreign_key_tree(tree: dict[str, dict[tuple[str, Ellipsis], dict]], name: str, fields: tuple[str, Ellipsis]) -> list[dict[str, Any]] Traverse foreign key tree. :param tree: Foreign key tree (see :func:`_build_foreign_key_tree`). :param name: Local resource name. :param fields: Local resource fields. :returns: * `fields` (List[str]): Local fields. * `reference['resource']` (str): Reference resource name. * `reference['fields']` (List[str]): Reference primary key fields. :rtype: Sequence of foreign keys starting from `name` and `fields` .. py:function:: build_foreign_keys(resources: dict[str, dict], prune: bool = True) -> dict[str, list[dict]] Build foreign keys for each resource. A resource's `foreign_key_rules` (if present) determines which other resources will be assigned a foreign key (`foreign_keys`) to the reference's primary key: * `fields` (list[list[str]]): Sets of field names for which to create a foreign key. These are assumed to match the order of the reference's primary key fields. * `exclude` (Optional[list[str]]): Names of resources to exclude. :param resources: Resource descriptors by name. :param prune: Whether to prune redundant foreign keys. :returns: Foreign keys for each resource (if any), by resource name. * `fields` (list[str]): Field names. * `reference['resource']` (str): Reference resource name. * `reference['fields']` (list[str]): Reference resource field names. .. rubric:: Examples >>> resources = { ... 'x': { ... 'schema': { ... 'fields': ['z'], ... 'primary_key': ['z'], ... 'foreign_key_rules': {'fields': [['z']]} ... } ... }, ... 'y': { ... 'schema': { ... 'fields': ['z', 'yy'], ... 'primary_key': ['z', 'yy'], ... 'foreign_key_rules': {'fields': [['z', 'zz']]} ... } ... }, ... 'z': {'schema': {'fields': ['z', 'zz']}} ... } >>> keys = build_foreign_keys(resources) >>> keys['z'] [{'fields': ['z', 'zz'], 'reference': {'resource': 'y', 'fields': ['z', 'yy']}}] >>> keys['y'] [{'fields': ['z'], 'reference': {'resource': 'x', 'fields': ['z']}}] >>> keys = build_foreign_keys(resources, prune=False) >>> keys['z'][0] {'fields': ['z'], 'reference': {'resource': 'x', 'fields': ['z']}} .. py:function:: split_period(name: str) -> tuple[str, str | None] Split the time period from a column name. :param name: Column name. :returns: Base name and time period, if any. .. rubric:: Examples >>> split_period('report_date') ('report', 'date') >>> split_period('report_day') ('report_day', None) >>> split_period('date') ('date', None) .. py:function:: expand_periodic_column_names(names: collections.abc.Iterable[str]) -> list[str] Add smaller periods to a list of column names. :param names: Column names. :returns: Column names with additional names for smaller periods. .. rubric:: Examples >>> expand_periodic_column_names(['id', 'report_year']) ['id', 'report_year', 'report_quarter', 'report_month', 'report_date'] .. py:exception:: AggregationError Bases: :py:obj:`ValueError` Error raised by aggregation functions. .. py:function:: most_frequent(x: pandas.Series) -> Any Return most frequent value (or error if none exists). .. py:function:: most_and_more_frequent(x: pandas.Series, min_frequency: float = None) -> Any Return the most frequent value if more frequent than ``min_frequency``. The minimum frequency ignores null values, so for example, `1` in `[1, 1, 1, nan]` has a frequency of 1. :raises AggregationError: if no value is more frequent than ``min_frequency``. .. py:function:: unique(x: pandas.Series) -> Any Return single unique value (or error if none exists). .. py:function:: as_dict(x: pandas.Series) -> dict[Any, list] Return dictionary of values, listed by index. .. py:function:: try_aggfunc(func: collections.abc.Callable, raised: bool = True, error: str | collections.abc.Callable = None) -> collections.abc.Callable Wrap aggregate function in a try-except for error handling. :param func: Aggregate function. :param raised: Whether :class:`AggregationError` errors are raised or returned. :param error: Error value, whose type and format depends on `raise`. Below, `x` is the original input and `e` is the original error. * `raised=True`: A string with substitions (e.g. 'Error at {x.name}: {e}') that replaces the arguments of the original error. By default, the original error is raised unchanged. * `raised=False`: A function with signature `f(x, e)` returning a value that replaces the arguments of the original error. By default, the original error is returned unchanged. :returns: Aggregate function with custom error handling. .. rubric:: Examples >>> x = pd.Series([0, 0, 1, 1], index=['a', 'a', 'a', 'b']) >>> most_frequent(x) Traceback (most recent call last): AggregationError: No value is most frequent. >>> try_aggfunc(most_frequent, raised=False)(x) AggregationError('No value is most frequent.') >>> try_aggfunc(most_frequent, error='Bad dtype {x.dtype}')(x) Traceback (most recent call last): AggregationError: Bad dtype int64 >>> error = lambda x, e: as_dict(x) >>> try_aggfunc(most_frequent, raised=False, error=error)(x) AggregationError({'a': [0, 0, 1], 'b': [1]}) .. py:function:: groupby_apply(df: pandas.DataFrame, by: collections.abc.Iterable, aggfuncs: dict[Any, collections.abc.Callable], raised: bool = True, error: collections.abc.Callable = None) -> tuple[pandas.DataFrame, dict[Any, pandas.Series]] Aggregate dataframe and capture errors (using apply). :param df: Dataframe to aggregate. :param by: Columns names to use to group rows (see :meth:`pandas.DataFrame.groupby`). :param aggfuncs: Aggregation functions for columns not in `by`. :param raised: Whether :class:`AggregationError` errors are raised or replaced with :obj:`np.nan` and returned in an error report. :param error: A function with signature `f(x, e) -> Tuple[Any, Any]`, where `x` is the original input and `e` is the original error, used when `raised=False`. The first and second value of the returned tuple are used as the index and values, respectively, of the :class:`pandas.Series` returned for each column. By default, the first value is `x.name` (the values of columns `by` for that row group), and the second is the original error. :returns: Aggregated dataframe with `by` columns set as the index and an error report with (if `raised=False`) a :class:`pandas.Series` for each column where errors occured. .. rubric:: Examples >>> df = pd.DataFrame({'x': [0, 0, 1, 1], 'y': pd.Series([2, 2, 2, 3], dtype='Int64')}) >>> df.index = [0, 0, 0, 1] >>> base = dict(df=df, by='x', aggfuncs={'y': unique}) >>> groupby_apply(**base) Traceback (most recent call last): AggregationError: Could not aggregate y at x = 1: Not unique. >>> _, report = groupby_apply(**base, raised=False) >>> report['y'] x 1 Not unique. dtype: object >>> error = lambda x, e: (x.name, as_dict(x)) >>> _, report = groupby_apply(**base, raised=False, error=error) >>> report['y'] x 1 {0: [2], 1: [3]} dtype: object .. py:function:: groupby_aggregate(df: pandas.DataFrame, by: collections.abc.Iterable, aggfuncs: dict[Any, collections.abc.Callable], raised: bool = True, error: collections.abc.Callable = None) -> tuple[pandas.DataFrame, dict[Any, pandas.Series]] Aggregate dataframe and capture errors (using aggregate). Although faster than :func:`groupby_apply`, it has some limitations: * Raised errors cannot access the group index. * Aggregation functions must return a scalar (must 'reduce'). This is not a limitation with :meth:`pandas.Series.apply`. :param df: Dataframe to aggregate. :param by: Columns names to use to group rows (see :meth:`pandas.DataFrame.groupby`). :param aggfuncs: Aggregation functions for columns not in `by`. :param raised: Whether :class:`AggregationError` errors are raised or replaced with :obj:`np.nan` and returned in an error report. :param error: A function with signature `f(x, e) -> Any`, where `x` is the original input and `e` is the original error, used when `raised=False`. By default, the original error is returned. :returns: Aggregated dataframe with `by` columns set as the index and an error report with (if `raised=False`) a :class:`pandas.Series` of errors (or the value returned by `error`) for each column where errors occured. .. rubric:: Examples >>> df = pd.DataFrame({ ... 'x': [0, 0, 1, 1], ... 'y': pd.Series([2, 2, 2, 3], dtype='Int64') ... }) >>> df.index = [0, 0, 0, 1] >>> base = dict(df=df, by='x', aggfuncs={'y': unique}) >>> groupby_aggregate(**base) Traceback (most recent call last): AggregationError: Could not aggregate y: Not unique. >>> result, report = groupby_aggregate(**base, raised=False) >>> result y x 0 2 1 >>> report['y'] x 1 Not unique. Name: y, dtype: object >>> error = lambda x, e: as_dict(x) >>> result, report = groupby_aggregate(**base, raised=False, error=error) >>> report['y'] x 1 {0: [2], 1: [3]} Name: y, dtype: object