"""Screen timeseries for anomalies and impute missing and anomalous values.
The screening methods were originally designed to identify unrealistic data in the
electricity demand timeseries reported to EIA on Form 930, and have also been
applied to the FERC Form 714, and various historical demand timeseries
published by regional grid operators like MISO, PJM, ERCOT, and SPP.
They are adapted from code published and modified by:
* Tyler Ruggles <truggles@carnegiescience.edu>
* Greg Schivley <greg@carbonimpact.co>
And described at:
* https://doi.org/10.1038/s41597-020-0483-x
* https://zenodo.org/record/3737085
* https://github.com/truggles/EIA_Cleaned_Hourly_Electricity_Demand_Code
The imputation methods were designed for multivariate time series forecasting.
They are adapted from code published by:
* Xinyu Chen <chenxy346@gmail.com>
And described at:
* https://arxiv.org/abs/2006.10436
* https://arxiv.org/abs/2008.03194
* https://github.com/xinychen/tensor-learning
"""
import functools
import warnings
from collections.abc import Iterable, Sequence
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.stats
# ---- Helpers ---- #
[docs]
def slice_axis(
x: np.ndarray, start: int = None, end: int = None, step: int = None, axis: int = 0
) -> tuple[slice, ...]:
"""Return an index that slices an array along an axis.
Args:
x: Array to slice.
start: Start index of slice.
end: End index of slice.
step: Step size of slice.
axis: Axis along which to slice.
Returns:
Tuple of :class:`slice` that slices array `x` along axis `axis`
(`x[..., start:stop:step]`).
Examples:
>>> x = np.random.random((3, 4, 5))
>>> np.all(x[1:] == x[slice_axis(x, start=1, axis=0)])
True
>>> np.all(x[:, 1:] == x[slice_axis(x, start=1, axis=1)])
True
>>> np.all(x[:, :, 1:] == x[slice_axis(x, start=1, axis=2)])
True
"""
index = [slice(None)] * np.mod(axis, x.ndim) + [slice(start, end, step)]
return tuple(index)
[docs]
def array_diff(
x: np.ndarray, periods: int = 1, axis: int = 0, fill: Any = np.nan
) -> np.ndarray:
"""First discrete difference of array elements.
This is a fast numpy implementation of :meth:`pd.DataFrame.diff`.
Args:
periods: Periods to shift for calculating difference, accepts negative values.
axis: Array axis along which to calculate the difference.
fill: Value to use at the margins where a difference cannot be calculated.
Returns:
Array of same shape and type as `x` with discrete element differences.
Examples:
>>> x = np.random.random((4, 2))
>>> np.all(array_diff(x, 1)[1:] == pd.DataFrame(x).diff(1).to_numpy()[1:])
True
>>> np.all(array_diff(x, 2)[2:] == pd.DataFrame(x).diff(2).to_numpy()[2:])
True
>>> np.all(array_diff(x, -1)[:-1] == pd.DataFrame(x).diff(-1).to_numpy()[:-1])
True
"""
if not periods:
return x - x
dx = np.empty_like(x)
prepend = slice_axis(x, end=periods, axis=axis)
append = slice_axis(x, start=periods, axis=axis)
if periods > 0:
dx[prepend] = fill
dx[append] = x[append] - x[slice_axis(x, end=-periods, axis=axis)]
else:
dx[prepend] = x[prepend] - x[slice_axis(x, start=-periods, axis=axis)]
dx[append] = fill
return dx
[docs]
def encode_run_length(x: Sequence | np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""Encode vector with run-length encoding.
Args:
x: Vector to encode.
Returns:
Values and their run lengths.
Examples:
>>> x = np.array([0, 1, 1, 0, 1])
>>> encode_run_length(x)
(array([0, 1, 0, 1]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('bool'))
(array([False, True, False, True]), array([1, 2, 1, 1]))
>>> encode_run_length(x.astype('<U1'))
(array(['0', '1', '0', '1'], dtype='<U1'), array([1, 2, 1, 1]))
>>> encode_run_length(np.where(x == 0, np.nan, x))
(array([nan, 1., nan, 1.]), array([1, 2, 1, 1]))
"""
# Inspired by https://stackoverflow.com/a/32681075
x = np.asarray(x)
n = len(x)
if not n:
return x, np.array([], dtype=int)
# Pairwise unequal (string safe)
y = np.array(x[1:] != x[:-1])
# Must include last element position
i = np.append(np.where(y), n - 1)
lengths = np.diff(np.append(-1, i))
# starts = np.cumsum(np.append(0, lengths))[:-1]
return x[i], lengths
[docs]
def insert_run_length( # noqa: C901
x: Sequence | np.ndarray,
values: Sequence | np.ndarray,
lengths: Sequence[int],
mask: Sequence[bool] = None,
padding: int = 0,
intersect: bool = False,
) -> np.ndarray:
"""Insert run-length encoded values into a vector.
Args:
x: Vector to insert values into.
values: Values to insert.
lengths: Length of run to insert for each value in `values`.
mask: Boolean mask, of the same length as `x`, where values can be inserted.
By default, values can be inserted anywhere in `x`.
padding: Minimum space between inserted runs and,
if `mask` is provided, the edges of masked-out areas.
intersect: Whether to allow inserted runs to intersect each other.
Raises:
ValueError: Padding must zero or greater.
ValueError: Run length must be greater than zero.
ValueError: Cound not find space for run of length {length}.
Returns:
Copy of array `x` with values inserted.
Example:
>>> x = [0, 0, 0, 0]
>>> mask = [True, False, True, True]
>>> insert_run_length(x, values=[1, 2], lengths=[1, 2], mask=mask)
array([1, 0, 2, 2])
If we use unique values for the background and each inserted run,
the run length encoding of the result (ignoring the background)
is the same as the inserted run, albeit in a different order.
>>> x = np.zeros(10, dtype=int)
>>> values = [1, 2, 3]
>>> lengths = [1, 2, 3]
>>> x = insert_run_length(x, values=values, lengths=lengths)
>>> rvalues, rlengths = encode_run_length(x[x != 0])
>>> order = np.argsort(rvalues)
>>> all(rvalues[order] == values) and all(rlengths[order] == lengths)
True
Null values can be inserted into a vector such that the new null runs
match the run length encoding of the existing null runs.
>>> x = [1, 2, np.nan, np.nan, 5, 6, 7, 8, np.nan]
>>> is_nan = np.isnan(x)
>>> rvalues, rlengths = encode_run_length(is_nan)
>>> xi = insert_run_length(
... x,
... values=[np.nan] * rvalues.sum(),
... lengths=rlengths[rvalues],
... mask=~is_nan
... )
>>> np.isnan(xi).sum() == 2 * is_nan.sum()
True
The same as above, with non-zero `padding`, yields a unique solution:
>>> insert_run_length(
... x,
... values=[np.nan] * rvalues.sum(),
... lengths=rlengths[rvalues],
... mask=~is_nan,
... padding=1
... )
array([nan, 2., nan, nan, 5., nan, nan, 8., nan])
"""
if padding < 0:
raise ValueError("Padding must zero or greater")
# Make a new array to modify in place
x = np.array(x)
# Compute runs available for insertions
if mask is None:
run_starts = np.array([0])
run_lengths = np.array([len(x)])
else:
mask_values, mask_lengths = encode_run_length(mask)
run_starts = np.cumsum(np.append(0, mask_lengths))[:-1][mask_values]
run_lengths = mask_lengths[mask_values]
if padding:
# Constrict runs
run_ends = run_starts + run_lengths
# Move run starts forward, unless endpoint
moved = slice(int(run_starts[0] == 0), None)
run_starts[moved] += padding
# Move run ends backward, unless endpoint
moved = slice(None, -1 if run_ends[-1] == len(x) else None)
run_ends[moved] -= padding
# Recalculate run lengths and keep runs with positive length
run_lengths = run_ends - run_starts
keep = run_lengths > 0
run_starts = run_starts[keep]
run_lengths = run_lengths[keep]
# Grow runs by maximum number of insertions (for speed)
n_runs = len(run_starts)
if not intersect:
buffer = np.zeros(len(values), dtype=int)
run_starts = np.concatenate((run_starts, buffer))
run_lengths = np.concatenate((run_lengths, buffer))
# Initialize random number generator
rng = np.random.default_rng()
# Sort insertions from longest to shortest
order = np.argsort(lengths)[::-1]
values = np.asarray(values)[order]
lengths = np.asarray(lengths)[order]
for value, length in zip(values, lengths, strict=True):
if length < 1:
raise ValueError("Run length must be greater than zero")
# Choose runs of adequate length
choices = np.nonzero(run_lengths[:n_runs] >= length)[0]
if not choices.size:
raise ValueError(f"Could not find space for run of length {length}")
idx = rng.choice(choices)
# Choose adequate start position in run
offset = rng.integers(0, run_lengths[idx] - length, endpoint=True)
start = run_starts[idx] + offset
# Insert value
x[start : start + length] = value
if intersect:
continue
# Update runs
padded_length = length + padding
if offset:
tail = run_lengths[idx] - offset - padded_length
if tail > 0:
# Insert run
run_starts[n_runs] = start + padded_length
run_lengths[n_runs] = tail
n_runs += 1
# Shorten run
run_lengths[idx] = offset - padding
else:
# Shift and shorten run
run_starts[idx] += padded_length
run_lengths[idx] -= padded_length
return x
[docs]
def _mat2ten(matrix: np.ndarray, shape: np.ndarray, mode: int) -> np.ndarray:
"""Fold matrix into a tensor."""
index = [mode] + [i for i in range(len(shape)) if i != mode]
return np.moveaxis(
np.reshape(matrix, newshape=shape[index], order="F"), source=0, destination=mode
)
[docs]
def _ten2mat(tensor: np.ndarray, mode: int) -> np.ndarray:
"""Unfold tensor into a matrix."""
return np.reshape(
np.moveaxis(tensor, source=mode, destination=0),
newshape=(tensor.shape[mode], -1),
order="F",
)
[docs]
def _svt_tnn(matrix: np.ndarray, tau: float, theta: int) -> np.ndarray:
"""Singular value thresholding (SVT) truncated nuclear norm (TNN) minimization."""
[m, n] = matrix.shape
if 2 * m < n:
u, s, v = np.linalg.svd(matrix @ matrix.T, full_matrices=0)
s = np.sqrt(s)
idx = np.sum(s > tau)
mid = np.zeros(idx)
mid[:theta] = 1
mid[theta:idx] = (s[theta:idx] - tau) / s[theta:idx]
return (u[:, :idx] @ np.diag(mid)) @ (u[:, :idx].T @ matrix)
if m > 2 * n:
return _svt_tnn(matrix.T, tau, theta).T
u, s, v = np.linalg.svd(matrix, full_matrices=0)
idx = np.sum(s > tau)
vec = s[:idx].copy()
vec[theta:idx] = s[theta:idx] - tau
return u[:, :idx] @ np.diag(vec) @ v[:idx, :]
[docs]
def impute_latc_tnn(
tensor: np.ndarray,
lags: Sequence[int] = [1],
alpha: Sequence[float] = [1 / 3, 1 / 3, 1 / 3],
rho0: float = 1e-7,
lambda0: float = 2e-7,
theta: int = 20,
epsilon: float = 1e-7,
maxiter: int = 300,
) -> np.ndarray:
"""Impute tensor values with LATC-TNN method by Chen and Sun (2020).
Uses low-rank autoregressive tensor completion (LATC) with
truncated nuclear norm (TNN) minimization.
* description: https://arxiv.org/abs/2006.10436
* code: https://github.com/xinychen/tensor-learning/blob/master/mats
Args:
tensor: Observational series in the form (series, groups, periods).
Null values are replaced with zeros, so any zeros will be treated as null.
lags:
alpha:
rho0:
lambda0:
theta:
epsilon: Convergence criterion. A smaller number will result in more iterations.
maxiter: Maximum number of iterations.
Returns:
Tensor with missing values in `tensor` replaced by imputed values.
"""
rng = np.random.default_rng()
tensor = np.where(np.isnan(tensor), 0, tensor)
dim = np.array(tensor.shape)
dim_time = int(np.prod(dim) / dim[0])
d = len(lags)
max_lag = np.max(lags)
mat = _ten2mat(tensor, mode=0)
pos_missing = np.where(mat == 0)
x = np.zeros(np.insert(dim, 0, len(dim)))
t = np.zeros(np.insert(dim, 0, len(dim)))
z = mat.copy()
z[pos_missing] = np.mean(mat[mat != 0])
a = 0.001 * rng.random(dim[0] * d).reshape([dim[0], d])
it = 0
ind = np.zeros((d, dim_time - max_lag), dtype=int)
for i in range(d):
ind[i, :] = np.arange(max_lag - lags[i], dim_time - lags[i])
last_mat = mat.copy()
snorm = np.linalg.norm(mat, "fro")
rho = rho0
while True:
rho = min(rho * 1.05, 1e5)
for k in range(len(dim)):
x[k] = _mat2ten(
_svt_tnn(
_ten2mat(_mat2ten(z, shape=dim, mode=0) - t[k] / rho, mode=k),
tau=alpha[k] / rho,
theta=theta,
),
shape=dim,
mode=k,
)
tensor_hat = np.einsum("k, kmnt -> mnt", alpha, x)
mat_hat = _ten2mat(tensor_hat, 0)
mat0 = np.zeros((dim[0], dim_time - max_lag))
if lambda0 > 0:
for m in range(dim[0]):
qm = mat_hat[m, ind].T
a[m, :] = np.linalg.pinv(qm) @ z[m, max_lag:]
mat0[m, :] = qm @ a[m, :]
mat1 = _ten2mat(np.mean(rho * x + t, axis=0), 0)
z[pos_missing] = np.append(
(mat1[:, :max_lag] / rho),
(mat1[:, max_lag:] + lambda0 * mat0) / (rho + lambda0),
axis=1,
)[pos_missing]
else:
z[pos_missing] = (_ten2mat(np.mean(x + t / rho, axis=0), 0))[pos_missing]
t = t + rho * (
x - np.broadcast_to(_mat2ten(z, dim, 0), np.insert(dim, 0, len(dim)))
)
tol = np.linalg.norm((mat_hat - last_mat), "fro") / snorm
last_mat = mat_hat.copy()
it += 1
print(f"Iteration: {it}", end="\r")
if tol < epsilon or it >= maxiter:
break
print(f"Iteration: {it}")
return tensor_hat
[docs]
def _tsvt(tensor: np.ndarray, phi: np.ndarray, tau: float) -> np.ndarray:
"""Tensor singular value thresholding (TSVT)."""
dim = tensor.shape
x = np.zeros(dim)
tensor = np.einsum("kt, ijk -> ijt", phi, tensor)
for t in range(dim[2]):
u, s, v = np.linalg.svd(tensor[:, :, t], full_matrices=False)
r = len(np.where(s > tau)[0])
if r >= 1:
s = s[:r]
s[:r] = s[:r] - tau
x[:, :, t] = u[:, :r] @ np.diag(s) @ v[:r, :]
return np.einsum("kt, ijt -> ijk", phi, x)
[docs]
def impute_latc_tubal( # noqa: C901
tensor: np.ndarray,
lags: Sequence[int] = [1],
rho0: float = 1e-7,
lambda0: float = 2e-7,
epsilon: float = 1e-7,
maxiter: int = 300,
) -> np.ndarray:
"""Impute tensor values with LATC-Tubal method by Chen, Chen and Sun (2020).
Uses low-tubal-rank autoregressive tensor completion (LATC-Tubal).
It is much faster than :func:`impute_latc_tnn` for very large datasets,
with comparable accuracy.
* description: https://arxiv.org/abs/2008.03194
* code: https://github.com/xinychen/tensor-learning/blob/master/mats
Args:
tensor: Observational series in the form (series, groups, periods).
Null values are replaced with zeros, so any zeros will be treated as null.
lags:
rho0:
lambda0:
epsilon: Convergence criterion. A smaller number will result in more iterations.
maxiter: Maximum number of iterations.
Returns:
Tensor with missing values in `tensor` replaced by imputed values.
"""
rng = np.random.default_rng()
tensor = np.where(np.isnan(tensor), 0, tensor)
dim = np.array(tensor.shape)
dim_time = int(np.prod(dim) / dim[0])
d = len(lags)
max_lag = np.max(lags)
mat = _ten2mat(tensor, 0)
pos_missing = np.where(mat == 0)
t = np.zeros(dim)
z = mat.copy()
z[pos_missing] = np.mean(mat[mat != 0])
a = 0.001 * rng.random(dim[0] * d).reshape([dim[0], d])
it = 0
ind = np.zeros((d, dim_time - max_lag), dtype=np.int_)
for i in range(d):
ind[i, :] = np.arange(max_lag - lags[i], dim_time - lags[i])
last_mat = mat.copy()
snorm = np.linalg.norm(mat, "fro")
rho = rho0
temp1 = _ten2mat(_mat2ten(z, dim, 0), 2)
_, phi = np.linalg.eig(temp1 @ temp1.T)
del temp1
if dim_time > 5e3 and dim_time <= 1e4:
sample_rate = 0.2
elif dim_time > 1e4:
sample_rate = 0.1
while True:
rho = min(rho * 1.05, 1e5)
x = _tsvt(_mat2ten(z, dim, 0) - t / rho, phi, 1 / rho)
mat_hat = _ten2mat(x, 0)
mat0 = np.zeros((dim[0], dim_time - max_lag))
temp2 = _ten2mat(rho * x + t, 0)
if lambda0 > 0:
if dim_time <= 5e3:
for m in range(dim[0]):
qm = mat_hat[m, ind].T
a[m, :] = np.linalg.pinv(qm) @ z[m, max_lag:]
mat0[m, :] = qm @ a[m, :]
elif dim_time > 5e3:
for m in range(dim[0]):
idx = np.arange(0, dim_time - max_lag)
rng.shuffle(idx)
idx = idx[: int(sample_rate * (dim_time - max_lag))]
qm = mat_hat[m, ind].T
a[m, :] = np.linalg.pinv(qm[idx[:], :]) @ z[m, max_lag:][idx[:]]
mat0[m, :] = qm @ a[m, :]
z[pos_missing] = np.append(
(temp2[:, :max_lag] / rho),
(temp2[:, max_lag:] + lambda0 * mat0) / (rho + lambda0),
axis=1,
)[pos_missing]
else:
z[pos_missing] = temp2[pos_missing] / rho
t = t + rho * (x - _mat2ten(z, dim, 0))
tol = np.linalg.norm((mat_hat - last_mat), "fro") / snorm
last_mat = mat_hat.copy()
it += 1
if not np.mod(it, 10):
temp1 = _ten2mat(_mat2ten(z, dim, 0) - t / rho, 2)
_, phi = np.linalg.eig(temp1 @ temp1.T)
del temp1
print(f"Iteration: {it}", end="\r")
if tol < epsilon or it >= maxiter:
break
print(f"Iteration: {it}")
return x
# ---- Anomaly detection ---- #
[docs]
class Timeseries:
"""Multivariate timeseries for anomalies detection and imputation.
Attributes:
xi: Reference to the original values (can be null).
Many methods assume that these represent chronological, regular timeseries.
x: Copy of :attr:`xi` with any flagged values replaced with null.
flags: Flag label for each value, or null if not flagged.
flagged: Running list of flags that have been checked so far.
index: Row index.
columns: Column names.
"""
def __init__(self, x: np.ndarray | pd.DataFrame) -> None:
"""Initialize a multivariate timeseries.
Args:
x: Timeseries with shape (n observations, m variables).
If :class:`pandas.DataFrame`, :attr:`index` and :attr:`columns`
are equal to `x.index` and `x.columns`, respectively.
Otherwise, :attr:`index` and :attr:`columns` are the default
`pandas.RangeIndex`.
"""
self.xi: np.ndarray
self.index: pd.Index
self.columns: pd.Index
if isinstance(x, pd.DataFrame):
self.xi = x.to_numpy()
self.index = x.index
self.columns = x.columns
else:
self.xi = x
self.index = pd.RangeIndex(x.shape[0])
self.columns = pd.RangeIndex(x.shape[1])
self.x: np.ndarray = self.xi.copy()
self.flags: np.ndarray = np.empty(self.x.shape, dtype=object)
self.flagged: list[str] = []
[docs]
def to_dataframe(self, array: np.ndarray = None, copy: bool = True) -> pd.DataFrame:
"""Return multivariate timeseries as a :class:`pandas.DataFrame`.
Args:
array: Two-dimensional array to use. If `None`, uses :attr:`x`.
copy: Whether to use a copy of `array`.
"""
x = self.x if array is None else array
return pd.DataFrame(x, columns=self.columns, index=self.index, copy=copy)
[docs]
def flag(self, mask: np.ndarray, flag: str) -> None:
"""Flag values.
Flags values (if not already flagged) and nulls flagged values.
Args:
mask: Boolean mask of the values to flag.
flag: Flag name.
"""
# Only flag unflagged values
mask = mask & ~np.isnan(self.x)
self.flags[mask] = flag
self.flagged.append(flag)
# Null flagged values
self.x[mask] = np.nan
# Clear cached metrics
for name in dir(self):
attr = getattr(self, name)
if hasattr(attr, "cache_clear"):
attr.cache_clear()
[docs]
def unflag(self, flags: Iterable[str] = None) -> None:
"""Unflag values.
Unflags values by restoring their original values and removing their flag.
Args:
flags: Flag names. If `None`, all flags are removed.
"""
mask = slice(None) if flags is None else np.isin(self.flags, flags)
self.flags[mask] = None
self.x[mask] = self.xi[mask]
self.flagged = [f for f in self.flagged if flags is not None and f not in flags]
[docs]
def flag_negative_or_zero(self) -> None:
"""Flag negative or zero values (NEGATIVE_OR_ZERO)."""
mask = self.x <= 0
self.flag(mask, "NEGATIVE_OR_ZERO")
[docs]
def flag_identical_run(self, length: int = 3) -> None:
"""Flag the last values in identical runs (IDENTICAL_RUN).
Args:
length: Run length to flag.
If `3`, the third (and subsequent) identical values are flagged.
Raises:
ValueError: Run length must be 2 or greater.
"""
if length < 2:
raise ValueError("Run length must be 2 or greater")
mask = np.ones(self.x.shape, dtype=bool)
mask[0] = False
for n in range(1, length):
mask[n:] &= self.x[n:] == self.x[:-n]
self.flag(mask, "IDENTICAL_RUN")
[docs]
def flag_global_outlier(self, medians: float = 9) -> None:
"""Flag values greater or less than n times the global median (GLOBAL_OUTLIER).
Args:
medians: Number of times the median the value must exceed the median.
"""
median = np.nanmedian(self.x, axis=0)
mask = np.abs(self.x - median) > np.abs(median * medians)
self.flag(mask, "GLOBAL_OUTLIER")
[docs]
def flag_global_outlier_neighbor(self, neighbors: int = 1) -> None:
"""Flag values neighboring global outliers (GLOBAL_OUTLIER_NEIGHBOR).
Args:
neighbors: Number of neighbors to flag on either side of each outlier.
Raises:
ValueError: Global outliers must be flagged first.
"""
if "GLOBAL_OUTLIER" not in self.flagged:
raise ValueError("Global outliers must be flagged first")
mask = np.zeros(self.x.shape, dtype=bool)
outliers = self.flags == "GLOBAL_OUTLIER"
for shift in range(1, neighbors + 1):
# Neighbors before
mask[:-shift][outliers[shift:]] = True
# Neighors after
mask[shift:][outliers[:-shift]] = True
self.flag(mask, "GLOBAL_OUTLIER_NEIGHBOR")
@functools.lru_cache(maxsize=2) # noqa: B019
[docs]
def flag_local_outlier(
self,
window: int = 48,
shifts: Sequence[int] = range(-240, 241, 24),
long_window: int = 480,
iqr_window: int = 240,
multiplier: tuple[float, float] = (3.5, 2.5),
) -> None:
"""Flag local outliers (LOCAL_OUTLIER_HIGH, LOCAL_OUTLIER_LOW).
Flags values which are above or below the :meth:`median_prediction` by more than
a `multiplier` times the :meth:`rolling_iqr_of_rolling_median_offset`.
Args:
window: Number of values in the moving window for the local rolling median.
shifts: Positions to shift the local rolling median offset by,
for computing its median.
long_window: Number of values in the moving window
for the regional (long) rolling median.
iqr_window: Number of values in the moving window
for the rolling interquartile range (IQR).
multiplier: Number of times the :meth:`rolling_iqr_of_rolling_median_offset`
the value must be above (HIGH) and below (LOW)
the :meth:`median_prediction` to be flagged.
"""
# Compute constants
prediction = self.median_prediction(
window=window, shifts=shifts, long_window=long_window
)
iqr = self.rolling_iqr_of_rolling_median_offset(
window=window, iqr_window=iqr_window
)
mask = self.x > prediction + multiplier[0] * iqr
self.flag(mask, "LOCAL_OUTLIER_HIGH")
# As in original code, do not recompute constants with new nulls
mask = self.x < prediction - multiplier[1] * iqr
self.flag(mask, "LOCAL_OUTLIER_LOW")
[docs]
def diff(self, shift: int = 1) -> np.ndarray:
"""Values minus the value of their neighbor.
Args:
shift: Positions to shift for calculating the difference.
Positive values select a preceding (left) neighbor.
"""
# RUGGLES: delta_pre (shift=1), delta_post (shift=-1)
return array_diff(self.x, shift)
[docs]
def rolling_iqr_of_diff(self, shift: int = 1, window: int = 240) -> np.ndarray:
"""Rolling interquartile range (IQR) of difference between neighboring values.
Args:
shift: Positions to shift for calculating the difference.
window: Number of values in the moving window for the rolling IQR.
"""
# RUGGLES: delta_rolling_iqr
diff = self.diff(shift=shift)
df = pd.DataFrame(diff, copy=False)
rolling = df.rolling(window, min_periods=1, center=True)
return (rolling.quantile(0.75) - rolling.quantile(0.25)).to_numpy()
[docs]
def flag_double_delta(self, iqr_window: int = 240, multiplier: float = 2) -> None:
"""Flag values very different from neighbors on either side (DOUBLE_DELTA).
Flags values whose differences to both neighbors on either side exceeds a
`multiplier` times the rolling interquartile range (IQR) of neighbor difference.
Args:
iqr_window: Number of values in the moving window for the rolling IQR
of neighbor difference.
multiplier: Number of times the rolling IQR of neighbor difference
the value's difference to its neighbors must exceed
for the value to be flagged.
"""
before = self.diff(shift=1)
after = self.diff(shift=-1)
iqr = multiplier * self.rolling_iqr_of_diff(shift=1, window=iqr_window)
mask = (np.minimum(before, after) > iqr) | (np.maximum(before, after) < -iqr)
self.flag(mask, "DOUBLE_DELTA")
@functools.lru_cache(maxsize=2) # noqa: B019
[docs]
def _find_single_delta(
self,
relative_median_prediction: np.ndarray,
relative_median_prediction_long: np.ndarray,
rolling_iqr_of_diff: np.ndarray,
iqr_of_diff_of_relative_median_prediction: np.ndarray,
reverse: bool = False,
) -> np.ndarray:
not_nan = ~np.isnan(self.x)
mask = np.zeros(self.x.shape, dtype=bool)
for col in range(self.x.shape[1]):
indices = np.flatnonzero(not_nan[:, col])[:: (-1 if reverse else 1)]
previous, current = indices[:-1], indices[1:]
while len(current):
# Evaluate value pairs
diff = np.abs(self.x[current, col] - self.x[previous, col])
diff_relative_median_prediction = np.abs(
relative_median_prediction[current, col]
- relative_median_prediction[previous, col]
)
# Compare max deviation across short and long rolling median
# to catch when outliers pull short median towards themselves.
previous_max = np.maximum(
np.abs(1 - relative_median_prediction[previous, col]),
np.abs(1 - relative_median_prediction_long[previous, col]),
)
current_max = np.maximum(
np.abs(1 - relative_median_prediction[current, col]),
np.abs(1 - relative_median_prediction_long[current, col]),
)
flagged = (
(diff > rolling_iqr_of_diff[current, col])
& (
diff_relative_median_prediction
> iqr_of_diff_of_relative_median_prediction[col]
)
& (current_max > previous_max)
)
flagged_indices = current[flagged]
if not flagged_indices.size:
break
# Find position of flagged indices in index
if reverse:
indices_idx = indices.size - (
indices[::-1].searchsorted(flagged_indices, side="right")
)
else:
indices_idx = indices.searchsorted(flagged_indices, side="left")
# Only flag first of consecutive flagged indices
# TODO: May not be necessary after first iteration
unflagged = np.concatenate(([False], np.diff(indices_idx) == 1))
flagged_indices = np.delete(flagged_indices, unflagged)
indices_idx = np.delete(indices_idx, unflagged)
mask[flagged_indices, col] = True
flagged[flagged] = ~unflagged
# Bump current index of flagged pairs to next unflagged index
# Next index always unflagged because flagged runs are not permitted
next_indices_idx = indices_idx + 1
if next_indices_idx[-1] == len(indices):
# Drop last index if out of range
next_indices_idx = next_indices_idx[:-1]
current = indices[next_indices_idx]
# Trim previous values to length of current values
previous = previous[flagged][: len(current)]
# Delete flagged indices
indices = np.delete(indices, indices_idx)
return mask
[docs]
def flag_single_delta(
self,
window: int = 48,
shifts: Sequence[int] = range(-240, 241, 24),
long_window: int = 480,
iqr_window: int = 240,
multiplier: float = 5,
rel_multiplier: float = 15,
) -> None:
"""Flag values very different from the nearest unflagged value (SINGLE_DELTA).
Flags values whose difference to the nearest unflagged value,
with respect to value and relative median prediction,
differ by less than a multiplier times the rolling interquartile range (IQR)
of the difference -
`multiplier` times :meth:`rolling_iqr_of_diff` and
`rel_multiplier` times :meth:`iqr_of_diff_of_relative_mean_prediction`,
respectively.
Args:
window: Number of values in the moving window for the rolling median
(for the relative median prediction).
shifts: Positions to shift the local rolling median offset by,
for computing its median (for the relative median prediction).
long_window: Number of values in the moving window for the long rolling
median (for the relative median prediction).
iqr_window: Number of values in the moving window for the rolling IQR
of neighbor difference.
multiplier: Number of times the rolling IQR of neighbor difference
the value's difference to its neighbor must exceed
for the value to be flagged.
rel_multiplier: Number of times the rolling IQR of relative median
prediction the value's prediction difference to its neighbor must exceed
for the value to be flagged.
"""
# Compute constants used in both forward and reverse pass
relative_median_prediction = self.relative_median_prediction(
window=window, shifts=shifts, long_window=long_window
)
relative_median_prediction_long = self.relative_median_prediction(
window=long_window, shifts=shifts, long_window=long_window
)
rolling_iqr_of_diff = multiplier * self.rolling_iqr_of_diff(
shift=1, window=iqr_window
)
iqr_of_diff_of_relative_median_prediction = rel_multiplier * (
self.iqr_of_diff_of_relative_median_prediction(
shift=1, window=window, shifts=shifts, long_window=long_window
)
)
# Set values flagged in forward pass to null before reverse pass
mask = self._find_single_delta(
relative_median_prediction,
relative_median_prediction_long,
rolling_iqr_of_diff,
iqr_of_diff_of_relative_median_prediction,
reverse=False,
)
self.flag(mask, "SINGLE_DELTA")
# Repeat in reverse to get all options.
# As in original code, do not recompute constants with new nulls
mask = self._find_single_delta(
relative_median_prediction,
relative_median_prediction_long,
rolling_iqr_of_diff,
iqr_of_diff_of_relative_median_prediction,
reverse=True,
)
self.flag(mask, "SINGLE_DELTA")
[docs]
def flag_anomalous_region(self, window: int = 48, threshold: float = 0.15) -> None:
"""Flag values surrounded by flagged values (ANOMALOUS_REGION).
Original null values are not considered flagged values.
Args:
window: Width of regions.
threshold: Fraction of flagged values required for a region to be flagged.
"""
# Check whether unflagged
mask = np.equal(self.flags, None)
# Check whether after or before half-width region with 1+ flagged values
half_window = window // 2
is_after = (
pd.DataFrame(mask, copy=False)
.rolling(window=half_window)
.mean()
.lt(1)
.to_numpy()
)
is_before = np.roll(is_after, -(half_window - 1), axis=0)
# Check whether not part of a run of unflagged values longer than a half-width
is_not_run = np.empty_like(mask)
for col in range(mask.shape[1]):
rvalues, rlengths = encode_run_length(mask[:, col])
is_short_run = np.where(rvalues, rlengths, 0) <= half_window
is_not_run[:, col] = np.repeat(is_short_run, rlengths)
# Check whether within full-width region with too many flagged values
is_region = (
pd.DataFrame(~mask, copy=False)
.rolling(window=window, center=True)
.mean()
.gt(threshold)
.rolling(window=window, center=True)
.max()
.eq(True)
.to_numpy()
)
# Flag if all conditions are met
mask &= is_after & is_before & is_not_run & is_region
self.flag(mask, "ANOMALOUS_REGION")
[docs]
def flag_ruggles(self) -> None:
"""Flag values following the method of Ruggles and others (2020).
Assumes values are hourly electricity demand.
* description: https://doi.org/10.1038/s41597-020-0483-x
* code: https://github.com/truggles/EIA_Cleaned_Hourly_Electricity_Demand_Code
"""
# Step 1
self.flag_negative_or_zero()
self.flag_identical_run(length=3)
self.flag_global_outlier(medians=9)
self.flag_global_outlier_neighbor(neighbors=1)
# Step 2
# NOTE: In original code, statistics used for the flags below are precomputed
# here, rather than computed for each flag with nulls added by previous flags.
window = 48
long_window = 480
iqr_window = 240
shifts = range(-240, 241, 24)
self.flag_local_outlier(
window=window,
shifts=shifts,
long_window=long_window,
iqr_window=iqr_window,
multiplier=(3.5, 2.5),
)
self.flag_double_delta(iqr_window=iqr_window, multiplier=2)
self.flag_single_delta(
window=window,
shifts=shifts,
long_window=long_window,
iqr_window=iqr_window,
multiplier=5,
rel_multiplier=15,
)
self.flag_anomalous_region(window=window + 1, threshold=0.15)
[docs]
def summarize_flags(self) -> pd.DataFrame:
"""Summarize flagged values by flag, count and median."""
stats = {}
for col in range(self.xi.shape[1]):
stats[self.columns[col]] = (
pd.Series(self.xi[:, col])
.groupby(self.flags[:, col])
.agg(["count", "median"])
)
df = pd.concat(stats, names=["column", "flag"]).reset_index()
# Sort flags by flagged order
ordered = df["flag"].astype(pd.CategoricalDtype(set(self.flagged)))
return df.assign(flag=ordered).sort_values(["column", "flag"])
[docs]
def plot_flags(self, name: Any = 0) -> None:
"""Plot cleaned series and anomalous values colored by flag.
Args:
name: Series to plot, as either an integer index or name in :attr:`columns`.
"""
if name not in self.columns:
name = self.columns[name]
col = list(self.columns).index(name)
plt.plot(self.index, self.x[:, col], color="lightgrey", marker=".", zorder=1)
colors = {
"NEGATIVE_OR_ZERO": "pink",
"IDENTICAL_RUN": "blue",
"GLOBAL_OUTLIER": "brown",
"GLOBAL_OUTLIER_NEIGHBOR": "brown",
"LOCAL_OUTLIER_HIGH": "purple",
"LOCAL_OUTLIER_LOW": "purple",
"DOUBLE_DELTA": "green",
"SINGLE_DELTA": "red",
"ANOMALOUS_REGION": "orange",
}
for flag in colors:
mask = self.flags[:, col] == flag
x, y = self.index[mask], self.xi[mask, col]
# Set zorder manually to ensure flagged points are drawn on top
plt.scatter(x, y, c=colors[flag], label=flag, zorder=2)
plt.legend()
[docs]
def simulate_nulls(
self,
lengths: Sequence[int] = None,
padding: int = 1,
intersect: bool = False,
overlap: bool = False,
) -> np.ndarray:
"""Find non-null values to null to match a run-length distribution.
Args:
length: Length of null runs to simulate for each series.
By default, uses the run lengths of null values in each series.
padding: Minimum number of non-null values between simulated null runs
and between simulated and existing null runs.
intersect: Whether simulated null runs can intersect each other.
overlap: Whether simulated null runs can overlap existing null runs.
If `True`, `padding` is ignored.
Returns:
Boolean mask of current non-null values to set to null.
Raises:
ValueError: Cound not find space for run of length {length}.
Examples:
>>> x = np.column_stack([[1, 2, np.nan, 4, 5, 6, 7, np.nan, np.nan]])
>>> s = Timeseries(x)
>>> s.simulate_nulls().ravel()
array([ True, False, False, False, True, True, False, False, False])
>>> s.simulate_nulls(lengths=[4], padding=0).ravel()
array([False, False, False, True, True, True, True, False, False])
"""
new_nulls = np.zeros(self.x.shape, dtype=bool)
for col in range(self.x.shape[1]):
is_null = np.isnan(self.x[:, col])
if lengths is None:
run_values, run_lengths = encode_run_length(is_null)
run_lengths = run_lengths[run_values]
else:
run_lengths = lengths
is_new_null = insert_run_length(
new_nulls[:, col],
values=np.ones(len(run_lengths), dtype=bool),
lengths=run_lengths,
mask=None if overlap else ~is_null,
padding=0 if overlap else padding,
intersect=intersect,
)
if overlap:
is_new_null &= ~is_null
new_nulls[:, col] = is_new_null
return new_nulls
[docs]
def fold_tensor(self, x: np.ndarray = None, periods: int = 24) -> np.ndarray:
"""Fold into a 3-dimensional tensor representation.
Folds the series `x` (number of observations, number of series)
into a 3-d tensor (number of series, number of groups, number of periods),
splitting observations into groups of length `periods`.
For example, each group may represent a day and each period the hour of the day.
Args:
x: Series array to fold. Uses :attr:`x` by default.
periods: Number of consecutive values in each series to fold into a group.
Returns:
>>> x = np.column_stack([[1, 2, 3, 4, 5, 6], [10, 20, 30, 40, 50, 60]])
>>> s = Timeseries(x)
>>> tensor = s.fold_tensor(periods=3)
>>> tensor[0]
array([[1, 2, 3],
[4, 5, 6]])
>>> np.all(x == s.unfold_tensor(tensor))
True
"""
tensor_shape = self.x.shape[1], self.x.shape[0] // periods, periods
x = self.x if x is None else x
return x.T.reshape(tensor_shape)
[docs]
def unfold_tensor(self, tensor: np.ndarray) -> np.ndarray:
"""Unfold a 3-dimensional tensor representation.
Performs the reverse of :meth:`fold_tensor`.
"""
return tensor.T.reshape(self.x.shape, order="F")
[docs]
def impute(
self,
mask: np.ndarray = None,
periods: int = 24,
blocks: int = 1,
method: str = "tubal",
**kwargs: Any,
) -> np.ndarray:
"""Impute null values.
.. note::
The imputation method requires that nulls be replaced by zeros,
so the series cannot already contain zeros.
Args:
mask: Boolean mask of values to impute in addition to
any null values in :attr:`x`.
periods: Number of consecutive values in each series to fold into a group.
See :meth:`fold_tensor`.
blocks: Number of blocks into which to split the series for imputation.
This has been found to reduce processing time for `method='tnn'`.
method: Imputation method to use
('tubal': :func:`impute_latc_tubal`, 'tnn': :func:`impute_latc_tnn`).
kwargs: Optional arguments to `method`.
Returns:
Array of same shape as :attr:`x` with all null values
(and those selected by `mask`) replaced with imputed values.
Raises:
ValueError: Zero values present. Replace with very small value.
"""
imputer = {"tubal": impute_latc_tubal, "tnn": impute_latc_tnn}[method]
x = self.x.copy() if mask is None else np.where(mask, np.nan, self.x)
if (x == 0).any():
raise ValueError("Zero values present. Replace with very small value.")
tensor = self.fold_tensor(x, periods=periods)
n = tensor.shape[1]
ends = [*range(0, n, int(np.ceil(n / blocks))), n]
for i in range(blocks):
if blocks > 1:
print(f"Block: {i}")
idx = slice(None), slice(ends[i], ends[i + 1]), slice(None)
tensor[idx] = imputer(tensor[idx], **kwargs)
return self.unfold_tensor(tensor)
[docs]
def summarize_imputed(self, imputed: np.ndarray, mask: np.ndarray) -> pd.DataFrame:
"""Summarize the fit of imputed values to actual values.
Summarizes the agreement between actual and imputed values with the
following statistics:
* `mpe`: Mean percent error, `(actual - imputed) / actual`.
* `mape`: Mean absolute percent error, `abs(mpe)`.
Args:
imputed: Series of same shape as :attr:`x` with imputed values.
See :meth:`impute`.
mask: Boolean mask of imputed values that were not null in :attr:`x`.
See :meth:`simulate_nulls`.
Returns:
Table of imputed value statistics for each series.
"""
stats = []
for col in range(self.x.shape[1]):
x = self.x[mask[:, col], col]
if not x.size:
continue
pe = (x - imputed[mask[:, col], col]) / x
pe = pe[~np.isnan(pe)]
stats.append(
{
"column": self.columns[col],
"count": x.size,
"mpe": np.mean(pe),
"mape": np.mean(np.abs(pe)),
}
)
return pd.DataFrame(stats)