pudl.analysis.timeseries_cleaning

Screen timeseries for anomalies and impute missing and anomalous values.

The screening methods were originally designed to identify unrealistic data in the electricity demand timeseries reported to EIA on Form 930, and have also been applied to the FERC Form 714, and various historical demand timeseries published by regional grid operators like MISO, PJM, ERCOT, and SPP.

They are adapted from code published and modified by:

And described at:

The imputation methods were designed for multivariate time series forecasting.

They are adapted from code published by:

And described at:

Module Contents

Classes

Timeseries

Multivariate timeseries for anomalies detection and imputation.

Functions

slice_axis(x: numpy.ndarray, start: int = None, end: int = None, step: int = None, axis: int = 0) → Tuple[slice, Ellipsis]

Return an index that slices an array along an axis.

array_diff(x: numpy.ndarray, periods: int = 1, axis: int = 0, fill: Any = np.nan) → numpy.ndarray

First discrete difference of array elements.

encode_run_length(x: Union[Sequence, numpy.ndarray]) → Tuple[numpy.ndarray, numpy.ndarray]

Encode vector with run-length encoding.

insert_run_length(x: Union[Sequence, numpy.ndarray], values: Union[Sequence, numpy.ndarray], lengths: Sequence[int], mask: Sequence[bool] = None, padding: int = 0, intersect: bool = False) → numpy.ndarray

Insert run-length encoded values into a vector.

_mat2ten(matrix: numpy.ndarray, shape: numpy.ndarray, mode: int) → numpy.ndarray

Fold matrix into a tensor.

_ten2mat(tensor: numpy.ndarray, mode: int) → numpy.ndarray

Unfold tensor into a matrix.

_svt_tnn(matrix: numpy.ndarray, tau: float, theta: int) → numpy.ndarray

Singular value thresholding (SVT) truncated nuclear norm (TNN) minimization.

impute_latc_tnn(tensor: numpy.ndarray, lags: Sequence[int] = [1], alpha: Sequence[float] = [1 / 3, 1 / 3, 1 / 3], rho0: float = 1e-07, lambda0: float = 2e-07, theta: int = 20, epsilon: float = 1e-07, maxiter: int = 300) → numpy.ndarray

Impute tensor values with LATC-TNN method by Chen and Sun (2020).

_tsvt(tensor: numpy.ndarray, phi: numpy.ndarray, tau: float) → numpy.ndarray

Tensor singular value thresholding (TSVT).

impute_latc_tubal(tensor: numpy.ndarray, lags: Sequence[int] = [1], rho0: float = 1e-07, lambda0: float = 2e-07, epsilon: float = 1e-07, maxiter: int = 300) → numpy.ndarray

Impute tensor values with LATC-Tubal method by Chen, Chen and Sun (2020).

pudl.analysis.timeseries_cleaning.slice_axis(x: numpy.ndarray, start: int = None, end: int = None, step: int = None, axis: int = 0) Tuple[slice, Ellipsis][source]

Return an index that slices an array along an axis.

Parameters
  • x – Array to slice.

  • start – Start index of slice.

  • end – End index of slice.

  • step – Step size of slice.

  • axis – Axis along which to slice.

Returns

Tuple of slice that slices array x along axis axis (x[…, start:stop:step]).

Examples

>>> x = np.random.random((3, 4, 5))
>>> np.all(x[1:] == x[slice_axis(x, start=1, axis=0)])
True
>>> np.all(x[:, 1:] == x[slice_axis(x, start=1, axis=1)])
True
>>> np.all(x[:, :, 1:] == x[slice_axis(x, start=1, axis=2)])
True
pudl.analysis.timeseries_cleaning.array_diff(x: numpy.ndarray, periods: int = 1, axis: int = 0, fill: Any = np.nan) numpy.ndarray[source]

First discrete difference of array elements.

This is a fast numpy implementation of pd.DataFrame.diff().

Parameters
  • periods – Periods to shift for calculating difference, accepts negative values.

  • axis – Array axis along which to calculate the difference.

  • fill – Value to use at the margins where a difference cannot be calculated.

Returns

Array of same shape and type as x with discrete element differences.

Examples

>>> x = np.random.random((4, 2))
>>> np.all(array_diff(x, 1)[1:] == pd.DataFrame(x).diff(1).values[1:])
True
>>> np.all(array_diff(x, 2)[2:] == pd.DataFrame(x).diff(2).values[2:])
True
>>> np.all(array_diff(x, -1)[:-1] == pd.DataFrame(x).diff(-1).values[:-1])
True
pudl.analysis.timeseries_cleaning.encode_run_length(x: Union[Sequence, numpy.ndarray]) Tuple[numpy.ndarray, numpy.ndarray][source]

Encode vector with run-length encoding.

Parameters

x – Vector to encode.

Returns

Values and their run lengths.

Examples

>>> x = np.array([0, 1, 1, 0, 1])
>>> encode_run_length(x)
(array([0, 1, 0, 1]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('bool'))
(array([False,  True, False,  True]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('<U1'))
(array(['0', '1', '0', '1'], dtype='<U1'), array([1, 2, 1, 1]))
>>> encode_run_length(np.where(x == 0, np.nan, x))
(array([nan,  1., nan,  1.]), array([1, 2, 1, 1]))
pudl.analysis.timeseries_cleaning.insert_run_length(x: Union[Sequence, numpy.ndarray], values: Union[Sequence, numpy.ndarray], lengths: Sequence[int], mask: Sequence[bool] = None, padding: int = 0, intersect: bool = False) numpy.ndarray[source]

Insert run-length encoded values into a vector.

Parameters
  • x – Vector to insert values into.

  • values – Values to insert.

  • lengths – Length of run to insert for each value in values.

  • mask – Boolean mask, of the same length as x, where values can be inserted. By default, values can be inserted anywhere in x.

  • padding – Minimum space between inserted runs and, if mask is provided, the edges of masked-out areas.

  • intersect – Whether to allow inserted runs to intersect each other.

Raises
  • ValueError – Padding must zero or greater.

  • ValueError – Run length must be greater than zero.

  • ValueError – Cound not find space for run of length {length}.

Returns

Copy of array x with values inserted.

Example

>>> x = [0, 0, 0, 0]
>>> mask = [True, False, True, True]
>>> insert_run_length(x, values=[1, 2], lengths=[1, 2], mask=mask)
array([1, 0, 2, 2])

If we use unique values for the background and each inserted run, the run length encoding of the result (ignoring the background) is the same as the inserted run, albeit in a different order.

>>> x = np.zeros(10, dtype=int)
>>> values = [1, 2, 3]
>>> lengths = [1, 2, 3]
>>> x = insert_run_length(x, values=values, lengths=lengths)
>>> rvalues, rlengths = encode_run_length(x[x != 0])
>>> order = np.argsort(rvalues)
>>> all(rvalues[order] == values) and all(rlengths[order] == lengths)
True

Null values can be inserted into a vector such that the new null runs match the run length encoding of the existing null runs.

>>> x = [1, 2, np.nan, np.nan, 5, 6, 7, 8, np.nan]
>>> is_nan = np.isnan(x)
>>> rvalues, rlengths = encode_run_length(is_nan)
>>> xi = insert_run_length(
...     x,
...     values=[np.nan] * rvalues.sum(),
...     lengths=rlengths[rvalues],
...     mask=~is_nan
... )
>>> np.isnan(xi).sum() == 2 * is_nan.sum()
True

The same as above, with non-zero padding, yields a unique solution:

>>> insert_run_length(
...     x,
...     values=[np.nan] * rvalues.sum(),
...     lengths=rlengths[rvalues],
...     mask=~is_nan,
...     padding=1
... )
array([nan,  2., nan, nan,  5., nan, nan,  8., nan])
pudl.analysis.timeseries_cleaning._mat2ten(matrix: numpy.ndarray, shape: numpy.ndarray, mode: int) numpy.ndarray[source]

Fold matrix into a tensor.

pudl.analysis.timeseries_cleaning._ten2mat(tensor: numpy.ndarray, mode: int) numpy.ndarray[source]

Unfold tensor into a matrix.

pudl.analysis.timeseries_cleaning._svt_tnn(matrix: numpy.ndarray, tau: float, theta: int) numpy.ndarray[source]

Singular value thresholding (SVT) truncated nuclear norm (TNN) minimization.

pudl.analysis.timeseries_cleaning.impute_latc_tnn(tensor: numpy.ndarray, lags: Sequence[int] = [1], alpha: Sequence[float] = [1 / 3, 1 / 3, 1 / 3], rho0: float = 1e-07, lambda0: float = 2e-07, theta: int = 20, epsilon: float = 1e-07, maxiter: int = 300) numpy.ndarray[source]

Impute tensor values with LATC-TNN method by Chen and Sun (2020).

Uses low-rank autoregressive tensor completion (LATC) with truncated nuclear norm (TNN) minimization.

Parameters
  • tensor – Observational series in the form (series, groups, periods). Null values are replaced with zeros, so any zeros will be treated as null.

  • lags

  • alpha

  • rho0

  • lambda0

  • theta

  • epsilon – Convergence criterion. A smaller number will result in more iterations.

  • maxiter – Maximum number of iterations.

Returns

Tensor with missing values in tensor replaced by imputed values.

pudl.analysis.timeseries_cleaning._tsvt(tensor: numpy.ndarray, phi: numpy.ndarray, tau: float) numpy.ndarray[source]

Tensor singular value thresholding (TSVT).

pudl.analysis.timeseries_cleaning.impute_latc_tubal(tensor: numpy.ndarray, lags: Sequence[int] = [1], rho0: float = 1e-07, lambda0: float = 2e-07, epsilon: float = 1e-07, maxiter: int = 300) numpy.ndarray[source]

Impute tensor values with LATC-Tubal method by Chen, Chen and Sun (2020).

Uses low-tubal-rank autoregressive tensor completion (LATC-Tubal). It is much faster than impute_latc_tnn() for very large datasets, with comparable accuracy.

Parameters
  • tensor – Observational series in the form (series, groups, periods). Null values are replaced with zeros, so any zeros will be treated as null.

  • lags

  • rho0

  • lambda0

  • epsilon – Convergence criterion. A smaller number will result in more iterations.

  • maxiter – Maximum number of iterations.

Returns

Tensor with missing values in tensor replaced by imputed values.

class pudl.analysis.timeseries_cleaning.Timeseries(x: Union[numpy.ndarray, pandas.DataFrame])[source]

Multivariate timeseries for anomalies detection and imputation.

xi

Reference to the original values (can be null). Many methods assume that these represent chronological, regular timeseries.

x

Copy of xi with any flagged values replaced with null.

flags

Flag label for each value, or null if not flagged.

flagged

Running list of flags that have been checked so far.

index

Row index.

columns

Column names.

to_dataframe(self, array: numpy.ndarray = None, copy: bool = True) pandas.DataFrame[source]

Return multivariate timeseries as a pandas.DataFrame.

Parameters
  • array – Two-dimensional array to use. If None, uses x.

  • copy – Whether to use a copy of array.

flag(self, mask: numpy.ndarray, flag: str) None[source]

Flag values.

Flags values (if not already flagged) and nulls flagged values.

Parameters
  • mask – Boolean mask of the values to flag.

  • flag – Flag name.

unflag(self, flags: Iterable[str] = None) None[source]

Unflag values.

Unflags values by restoring their original values and removing their flag.

Parameters

flags – Flag names. If None, all flags are removed.

flag_negative_or_zero(self) None[source]

Flag negative or zero values (NEGATIVE_OR_ZERO).

flag_identical_run(self, length: int = 3) None[source]

Flag the last values in identical runs (IDENTICAL_RUN).

Parameters

length – Run length to flag. If 3, the third (and subsequent) identical values are flagged.

Raises

ValueError – Run length must be 2 or greater.

flag_global_outlier(self, medians: float = 9) None[source]

Flag values greater or less than n times the global median (GLOBAL_OUTLIER).

Parameters

medians – Number of times the median the value must exceed the median.

flag_global_outlier_neighbor(self, neighbors: int = 1) None[source]

Flag values neighboring global outliers (GLOBAL_OUTLIER_NEIGHBOR).

Parameters

neighbors – Number of neighbors to flag on either side of each outlier.

Raises

ValueError – Global outliers must be flagged first.

rolling_median(self, window: int = 48) numpy.ndarray[source]

Rolling median of values.

Parameters

window – Number of values in the moving window.

rolling_median_offset(self, window: int = 48) numpy.ndarray[source]

Values minus the rolling median.

Estimates the local cycle in cyclical data by removing longterm trends.

Parameters

window – Number of values in the moving window.

median_of_rolling_median_offset(self, window: int = 48, shifts: Sequence[int] = range(- 240, 241, 24)) numpy.ndarray[source]

Median of the offset from the rolling median.

Calculated by shifting the rolling median offset (rolling_median_offset()) by different numbers of values, then taking the median at each position. Estimates the typical local cycle in cyclical data.

Parameters
  • window – Number of values in the moving window for the rolling median.

  • shifts – Number of values to shift the rolling median offset by.

rolling_iqr_of_rolling_median_offset(self, window: int = 48, iqr_window: int = 240) numpy.ndarray[source]

Rolling interquartile range (IQR) of rolling median offset.

Estimates the spread of the local cycles in cyclical data.

Parameters
  • window – Number of values in the moving window for the rolling median.

  • iqr_window – Number of values in the moving window for the rolling IQR.

median_prediction(self, window: int = 48, shifts: Sequence[int] = range(- 240, 241, 24), long_window: int = 480) numpy.ndarray[source]

Values predicted from local and regional rolling medians.

Calculated as { local median } + { median of local median offset } * { local median } / { regional median }.

Parameters
  • window – Number of values in the moving window for the local rolling median.

  • shifts – Positions to shift the local rolling median offset by, for computing its median.

  • long_window – Number of values in the moving window for the regional (long) rolling median.

flag_local_outlier(self, window: int = 48, shifts: Sequence[int] = range(- 240, 241, 24), long_window: int = 480, iqr_window: int = 240, multiplier: Tuple[float, float] = (3.5, 2.5)) None[source]

Flag local outliers (LOCAL_OUTLIER_HIGH, LOCAL_OUTLIER_LOW).

Flags values which are above or below the median_prediction() by more than a multiplier times the rolling_iqr_of_rolling_median_offset().

Parameters
  • window – Number of values in the moving window for the local rolling median.

  • shifts – Positions to shift the local rolling median offset by, for computing its median.

  • long_window – Number of values in the moving window for the regional (long) rolling median.

  • iqr_window – Number of values in the moving window for the rolling interquartile range (IQR).

  • multiplier – Number of times the rolling_iqr_of_rolling_median_offset() the value must be above (HIGH) and below (LOW) the median_prediction() to be flagged.

diff(self, shift: int = 1) numpy.ndarray[source]

Values minus the value of their neighbor.

Parameters

shift – Positions to shift for calculating the difference. Positive values select a preceding (left) neighbor.

rolling_iqr_of_diff(self, shift: int = 1, window: int = 240) numpy.ndarray[source]

Rolling interquartile range (IQR) of difference between neighboring values.

Parameters
  • shift – Positions to shift for calculating the difference.

  • window – Number of values in the moving window for the rolling IQR.

flag_double_delta(self, iqr_window: int = 240, multiplier: float = 2) None[source]

Flag values very different from neighbors on either side (DOUBLE_DELTA).

Flags values whose differences to both neighbors on either side exceeds a multiplier times the rolling interquartile range (IQR) of neighbor difference.

Parameters
  • iqr_window – Number of values in the moving window for the rolling IQR of neighbor difference.

  • multiplier – Number of times the rolling IQR of neighbor difference the value’s difference to its neighbors must exceed for the value to be flagged.

relative_median_prediction(self, **kwargs: Any) numpy.ndarray[source]

Values divided by their value predicted from medians.

Parameters

kwargs – Arguments to median_prediction().

iqr_of_diff_of_relative_median_prediction(self, shift: int = 1, **kwargs: Any) numpy.ndarray[source]

Interquartile range of running difference of relative median prediction.

Parameters
  • shift – Positions to shift for calculating the difference. Positive values select a preceding (left) neighbor.

  • kwargs – Arguments to relative_median_prediction().

_find_single_delta(self, relative_median_prediction: numpy.ndarray, relative_median_prediction_long: numpy.ndarray, rolling_iqr_of_diff: numpy.ndarray, iqr_of_diff_of_relative_median_prediction: numpy.ndarray, reverse: bool = False) numpy.ndarray[source]
flag_single_delta(self, window: int = 48, shifts: Sequence[int] = range(- 240, 241, 24), long_window: int = 480, iqr_window: int = 240, multiplier: float = 5, rel_multiplier: float = 15) None[source]

Flag values very different from the nearest unflagged value (SINGLE_DELTA).

Flags values whose difference to the nearest unflagged value, with respect to value and relative median prediction, differ by less than a multiplier times the rolling interquartile range (IQR) of the difference - multiplier times rolling_iqr_of_diff() and rel_multiplier times iqr_of_diff_of_relative_mean_prediction(), respectively.

Parameters
  • window – Number of values in the moving window for the rolling median (for the relative median prediction).

  • shifts – Positions to shift the local rolling median offset by, for computing its median (for the relative median prediction).

  • long_window – Number of values in the moving window for the long rolling median (for the relative median prediction).

  • iqr_window – Number of values in the moving window for the rolling IQR of neighbor difference.

  • multiplier – Number of times the rolling IQR of neighbor difference the value’s difference to its neighbor must exceed for the value to be flagged.

  • rel_multiplier – Number of times the rolling IQR of relative median prediction the value’s prediction difference to its neighbor must exceed for the value to be flagged.

flag_anomalous_region(self, window: int = 48, threshold: float = 0.15) None[source]

Flag values surrounded by flagged values (ANOMALOUS_REGION).

Original null values are not considered flagged values.

Parameters
  • window – Width of regions.

  • threshold – Fraction of flagged values required for a region to be flagged.

flag_ruggles(self) None[source]

Flag values following the method of Ruggles and others (2020).

Assumes values are hourly electricity demand.

summarize_flags(self) pandas.DataFrame[source]

Summarize flagged values by flag, count and median.

plot_flags(self, name: Any = 0) None[source]

Plot cleaned series and anomalous values colored by flag.

Parameters

name – Series to plot, as either an integer index or name in columns.

simulate_nulls(self, lengths: Sequence[int] = None, padding: int = 1, intersect: bool = False, overlap: bool = False) numpy.ndarray[source]

Find non-null values to null to match a run-length distribution.

Parameters
  • length – Length of null runs to simulate for each series. By default, uses the run lengths of null values in each series.

  • padding – Minimum number of non-null values between simulated null runs and between simulated and existing null runs.

  • intersect – Whether simulated null runs can intersect each other.

  • overlap – Whether simulated null runs can overlap existing null runs. If True, padding is ignored.

Returns

Boolean mask of current non-null values to set to null.

Raises

ValueError – Cound not find space for run of length {length}.

Examples

>>> x = np.column_stack([[1, 2, np.nan, 4, 5, 6, 7, np.nan, np.nan]])
>>> s = Timeseries(x)
>>> s.simulate_nulls().ravel()
array([ True, False, False, False, True, True, False, False, False])
>>> s.simulate_nulls(lengths=[4], padding=0).ravel()
array([False, False, False, True, True, True, True, False, False])
fold_tensor(self, x: numpy.ndarray = None, periods: int = 24) numpy.ndarray[source]

Fold into a 3-dimensional tensor representation.

Folds the series x (number of observations, number of series) into a 3-d tensor (number of series, number of groups, number of periods), splitting observations into groups of length periods. For example, each group may represent a day and each period the hour of the day.

Parameters
  • x – Series array to fold. Uses x by default.

  • periods – Number of consecutive values in each series to fold into a group.

Returns

>>> x = np.column_stack([[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]])
>>> s = Timeseries(x)
>>> tensor = s.fold_tensor(periods=3)
>>> tensor[0]
array([[1, 2, 3],
       [4, 5, 6]])
>>> np.all(x == s.unfold_tensor(tensor))
True

unfold_tensor(self, tensor: numpy.ndarray) numpy.ndarray[source]

Unfold a 3-dimensional tensor representation.

Performs the reverse of fold_tensor().

impute(self, mask: numpy.ndarray = None, periods: int = 24, blocks: int = 1, method: str = 'tubal', **kwargs: Any) numpy.ndarray[source]

Impute null values.

Note

The imputation method requires that nulls be replaced by zeros, so the series cannot already contain zeros.

Parameters
  • mask – Boolean mask of values to impute in addition to any null values in x.

  • periods – Number of consecutive values in each series to fold into a group. See fold_tensor().

  • blocks – Number of blocks into which to split the series for imputation. This has been found to reduce processing time for method=’tnn’.

  • method – Imputation method to use (‘tubal’: impute_latc_tubal(), ‘tnn’: impute_latc_tnn()).

  • kwargs – Optional arguments to method.

Returns

Array of same shape as x with all null values (and those selected by mask) replaced with imputed values.

Raises

ValueError – Zero values present. Replace with very small value.

summarize_imputed(self, imputed: numpy.ndarray, mask: numpy.ndarray) pandas.DataFrame[source]

Summarize the fit of imputed values to actual values.

Summarizes the agreement between actual and imputed values with the following statistics:

  • mpe: Mean percent error, (actual - imputed) / actual.

  • mape: Mean absolute percent error, abs(mpe).

Parameters
  • imputed – Series of same shape as x with imputed values. See impute().

  • mask – Boolean mask of imputed values that were not null in x. See simulate_nulls().

Returns

Table of imputed value statistics for each series.