Source code for openoa.utils.filters

"""
This module provides functions for flagging pandas data series based on a range of criteria. The functions are largely
intended for application in wind plant operational energy analysis, particularly wind speed vs. power curves.
"""

from __future__ import annotations

import numpy as np
import scipy as sp
import pandas as pd
from sklearn.cluster import KMeans

from openoa.utils._converters import (
    series_to_df,
    series_method,
    dataframe_method,
    convert_args_to_lists,
)


[docs] def range_flag( data: pd.DataFrame | pd.Series, lower: float | list[float], upper: float | list[float], col: list[str] | None = None, ) -> pd.Series | pd.DataFrame: """Flag data for which the specified data is outside the provided range of [lower, upper]. Args: data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged; can either be a ``pandas.Series`` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of threshold values and columns (if checking a subset of the columns) must be provided. col (:obj:`list[str]`): column(s) in :pyattr:`data` to be flagged, by default None. Only required when the `data` is a ``pandas.DataFrame`` and a subset of the columns will be checked. Must be the same length as :py:attr:`lower` and :py:attr:`upper`. lower (:obj:`float` | `list[float]`): lower threshold (inclusive) for each element of :py:attr:`data`, if it's a ``pd.Series``, or the list of lower thresholds for each column in `col`. If the same threshold is applied to each column, then pass the single value, otherwise, it must be the same length as :py:attr:`col` and :py:attr:`upper`. upper (:obj:`float` | `list[float]`): upper threshold (inclusive) for each element of :py:attr:`data`, if it's a ``pd.Series``, or the list of upper thresholds for each column in :py:attr:`col`. If the same threshold is applied to each column, then pass the single value, otherwise, it must be the same length as :py:attr:`lower` and :py:attr:`col`. Returns: :obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on :py:attr:`data` type) with boolean entries. """ # Prepare the inputs to be standardized for use with DataFrames if to_series := isinstance(data, pd.Series): data, col = series_to_df(data) if col is None: col = data.columns.tolist() upper, lower = convert_args_to_lists(len(col), upper, lower) if len(col) != len(lower) != len(upper): raise ValueError("The inputs to `col`, `above`, and `below` must be the same length.") # Only flag the desired columns subset = data.loc[:, col].copy() flag = ~(subset.ge(lower) & subset.le(upper)) # Return back a pd.Series if one was provided, else a pd.DataFrame return flag[col[0]] if to_series else flag
[docs] def unresponsive_flag( data: pd.DataFrame | pd.Series, threshold: int = 3, col: list[str] | None = None, ) -> pd.Series | pd.DataFrame: """Flag time stamps for which the reported data does not change for `threshold` repeated intervals. Args: data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged; can either be a `pandas.Series` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of threshold values and columns (if checking a subset of the columns) must be provided. col (:obj:`list[str]`): column(s) in `data` to be flagged, by default None. Only required when the `data` is a ``pandas.DataFrame`` and a subset of the columns will be checked. Must be the same length as :py:attr:`lower` and :py:attr:`upper`. threshold (:obj:`int`): number of intervals over which measurment does not change for each element of :py:attr:`data`, regardless if it's a ``pd.Series`` or ``pd.DataFrame``. Defaults to 3. Returns: :obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on ``data`` type) with boolean entries. """ # Prepare the inputs to be standardized for use with DataFrames if to_series := isinstance(data, pd.Series): data, col = series_to_df(data) if col is None: col = data.columns.tolist() if not isinstance(threshold, int): raise TypeError("The input to `threshold` must be an integer.") # Get boolean value of the difference in successive time steps is not equal to zero, and take the # rolling sum of the boolean diff column in period lengths defined by threshold subset = data.loc[:, col].copy() flag = subset.diff(axis=0).ne(0).rolling(threshold - 1).sum() # Create boolean series that is True if rolling sum is zero flag = flag == 0 # Need to flag preceding `threshold` values as well flag = flag | np.any([flag.shift(-1 - i, axis=0) for i in range(threshold - 1)], axis=0) # Return back a pd.Series if one was provided, else a pd.DataFrame return flag[col[0]] if to_series else flag
[docs] def std_range_flag( data: pd.DataFrame | pd.Series, threshold: float | list[float] = 2.0, col: list[str] | None = None, ) -> pd.Series | pd.DataFrame: """Flag time stamps for which the measurement is outside of the threshold number of standard deviations from the mean across the data. ... note:: This method does not distinguish between asset IDs. Args: data (:obj:`pandas.Series` | `pandas.DataFrame`): data frame containing the column to be flagged; can either be a ``pandas.Series`` or ``pandas.DataFrame``. If a ``pandas.DataFrame``, a list of threshold values and columns (if checking a subset of the columns) must be provided. col (:obj:`list[str]`): column(s) in :py:attr:`data` to be flagged, by default None. Only required when the :py:attr:`data` is a `pandas.DataFrame` and a subset of the columns will be checked. Must be the same length as :py:attr:`lower` and :py:attr:`upper`. threshold (:obj:`float` | `list[float]`): multiplicative factor on the standard deviation of :py:attr:`data`, if it's a ``pd.Series``, or the list of multiplicative factors on the standard deviation for each column in :py:attr:`col`. If the same factor is applied to each column, then pass the single value, otherwise, it must be the same length as :py:attr:`col` and :py:attr:`upper`. Returns: :obj:`pandas.Series` | `pandas.DataFrame`: Series or DataFrame (depending on :py:attr:`data` type) with boolean entries. """ # Prepare the inputs to be standardized for use with DataFrames if to_series := isinstance(data, pd.Series): data, col = series_to_df(data) if col is None: col = data.columns.tolist() threshold, *_ = convert_args_to_lists(len(col), threshold) if len(col) != len(threshold): raise ValueError("The inputs to `col` and `threshold` must be the same length.") subset = data.loc[:, col].copy() data_mean = np.nanmean(subset.values, axis=0) data_std = np.nanstd(subset.values, ddof=1, axis=0) * np.array(threshold) flag = subset.le(data_mean - data_std) | subset.ge(data_mean + data_std) # Return back a pd.Series if one was provided, else a pd.DataFrame return flag[col[0]] if to_series else flag
[docs] @series_method(data_cols=["window_col", "value_col"]) def window_range_flag( window_col: str | pd.Series = None, window_start: float = -np.inf, window_end: float = np.inf, value_col: str | pd.Series = None, value_min: float = -np.inf, value_max: float = np.inf, data: pd.DataFrame = None, ) -> pd.Series: """Flag time stamps for which measurement in `window_col` are within the range: [`window_start`, `window_end`], and the measurements in `value_col` are outside of the range [`value_min`, `value_max`]. Args: data (:obj:`pandas.DataFrame`): data frame containing the columns :py:attr:`window_col` and `value_col`, by default None. window_col (:obj:`str` | `pandas.Series`): Name of the column or used to define the window range or the data as a pandas Series, by default None. window_start(:obj:`float`): minimum value for the inclusive window, by default -np.inf. window_end(:obj:`float`): maximum value for the inclusive window, by default np.inf. value_col (:obj:`str` | `pandas.Series`): Name of the column used to define the value range or the data as a pandas Series, by default None. value_max(:obj:`float`): upper threshold for the inclusive data range; default np.inf value_min(:obj:`float`): lower threshold for the inclusive data range; default -np.inf Returns: :obj:`pandas.Series`: Series with boolean entries. """ flag = window_col.between(window_start, window_end) & ~value_col.between(value_min, value_max) return flag
[docs] @series_method(data_cols=["bin_col", "value_col"]) def bin_filter( bin_col: pd.Series | str, value_col: pd.Series | str, bin_width: float, threshold: float = 2, center_type: str = "mean", bin_min: float = None, bin_max: float = None, threshold_type: str = "std", direction: str = "all", data: pd.DataFrame = None, ): """Flag time stamps for which data in `value_col` when binned by data in `bin_col` into bins of width `bin_width` are outside the `threhsold` bin. The `center_type` of each bin can be either the median or mean, and flagging can be applied directionally (i.e. above or below the center, or both) Args: bin_col(:obj:`pandas.Series` | `str`): The Series or column in :py:attr:`data` to be used for binning. value_col(:obj:`pandas.Series`): The Series or column in :py:attr:`data` to be flagged. bin_width(:obj:`float`): Width of bin in units of :py:attr:`bin_col` threshold(:obj:`float`): Outlier threshold (multiplicative factor of std of `value_col` in bin) bin_min(:obj:`float`): Minimum bin value below which flag should not be applied bin_max(:obj:`float`): Maximum bin value above which flag should not be applied threshold_type(:obj:`str`): Option to apply a 'std', 'scalar', or 'mad' (median absolute deviation) based threshold center_type(:obj:`str`): Option to use a 'mean' or 'median' center for each bin direction(:obj:`str`): Option to apply flag only to data 'above' or 'below' the mean, by default 'all' data(:obj:`pd.DataFrame`): DataFrame containing both :py:attr:`bin_col` and :py:attr:`value_col`, if data are part of the same DataFrame, by default None. Returns: :obj:`pandas.Series(bool)`: Array-like object with boolean entries. """ if center_type not in ("mean", "median"): raise ValueError("Incorrect `center_type` specified; must be one of 'mean' or 'median'.") if threshold_type not in ("std", "scalar", "mad"): raise ValueError("Incorrect `threshold_type` specified; must be one of 'std' or 'scalar'.") if direction not in ("all", "above", "below"): raise ValueError( "Incorrect `direction` specified; must be one of 'all', 'above', or 'below'." ) # Set bin min and max values if not passed to function if bin_min is None: bin_min = np.min(bin_col.values) if bin_max is None: bin_max = np.max(bin_col.values) # Define bin edges bin_edges = np.arange(bin_min, bin_max, bin_width) # Ensure the last bin edge value is bin_max bin_edges = np.unique(np.clip(np.append(bin_edges, bin_max), bin_min, bin_max)) # Bin the data and recreate the comparison data as a multi-column data frame which_bin_col = np.digitize(bin_col, bin_edges, right=True) # Create the flag values as a matrix with each column being the timestamp's binned value, # e.g., all columns values are NaN if the data point is not in that bin flag_vals = ( value_col.to_frame().set_index(pd.Series(which_bin_col, name="bin"), append=True).unstack() ) drop = [i for i, el in enumerate(flag_vals.columns.names) if el != "bin"] flag_vals.columns = flag_vals.columns.droplevel(drop).rename(None) # Create a False array as default, so flags are set to True flag_df = pd.DataFrame(np.zeros_like(flag_vals, dtype=bool), index=flag_vals.index) # Get center of binned data if center_type == "median": center = np.nanmedian(flag_vals.values, axis=0) else: center = np.nanmean(flag_vals.values, axis=0) center = pd.DataFrame( np.full(flag_vals.shape, center), index=flag_vals.index, columns=flag_vals.columns, ) # Define threshold of data flag if threshold_type == "std": deviation = np.nanstd(flag_vals.values, ddof=1, axis=0) * threshold elif threshold_type == "scalar": deviation = threshold else: # median absolute deviation (mad) deviation = np.nanmedian(np.abs(flag_vals.values - center.values), axis=0) * threshold # Perform flagging depending on specfied direction if direction in ("above", "all"): flag_df |= flag_vals > center + deviation if direction in ("below", "all"): flag_df |= flag_vals < center - deviation # Get all instances where the value is True, and reset any values outside the bin limits flag = pd.Series(np.nanmax(flag_df, axis=1), index=flag_df.index, dtype="bool") flag.loc[(bin_col <= bin_min) | (bin_col > bin_max)] = False return flag
[docs] @dataframe_method(data_cols=["data_col1", "data_col2"]) def cluster_mahalanobis_2d( data_col1: pd.Series | str, data_col2: pd.Series | str, n_clusters: int = 13, dist_thresh: float = 3.0, data: pd.DataFrame = None, ) -> pd.Series: """K-means clustering of data into `n_cluster` clusters; Mahalanobis distance evaluated for each cluster and points with distances outside of `dist_thresh` are flagged; distinguishes between asset IDs. Args: data_col1(:obj:`pandas.Series` | `str`): Series or column :py:attr:`data` corresponding to the first data column in a 2D cluster analysis data_col2(:obj:`pandas.Series` | `str`): Series or column :py:attr:`data` corresponding to the second data column in a 2D cluster analysis n_clusters(:obj:`int`):' number of clusters to use dist_thresh(:obj:`float`): maximum Mahalanobis distance within each cluster for data to be remain unflagged data(:obj:`pd.DataFrame`): DataFrame containing both :py:attr:`data_col1` and :py:attr:`data_col2`, if data are part of the same DataFrame, by default None. Returns: :obj:`pandas.Series(bool)`: Array-like object with boolean entries. """ data = data.loc[:, [data_col1, data_col2]].copy() kmeans = KMeans(n_clusters=n_clusters).fit(data) # Define empty flag of 'False' values with indices matching value_col flag = pd.Series(index=data.index, data=False) # Loop through clusters and flag data that fall outside a threshold distance from cluster center for i in range(n_clusters): # Extract data for cluster clust_sub = kmeans.labels_ == i cluster = data.loc[clust_sub] # Cluster centroid centroid = kmeans.cluster_centers_[i] # Cluster covariance and inverse covariance covmx = cluster.cov() invcovmx = sp.linalg.inv(covmx) # Compute mahalnobis distance of each point in cluster mahalanobis_dist = cluster.apply( lambda r: sp.spatial.distance.mahalanobis(r.values, centroid, invcovmx), axis=1 ) # Flag data outside the distance threshold flag_bin = mahalanobis_dist > dist_thresh # Record flags in final flag column flag.loc[flag_bin.index] = flag_bin return flag