from __future__ import annotations
import sys
import logging
import itertools
from typing import Callable, Optional, Sequence
from pathlib import Path
import yaml
import attrs
import numpy as np
import pandas as pd
from attrs import field, define
from pyproj import Transformer
from tabulate import tabulate
from IPython.display import Markdown, display
from shapely.geometry import Point
import openoa.utils.timeseries as ts
import openoa.utils.met_data_processing as met
from openoa.logging import set_log_level, setup_logging, logged_method_call
from openoa.schema.metadata import ANALYSIS_REQUIREMENTS, PlantMetaData
from openoa.utils.metadata_fetch import attach_eia_data
from openoa.utils.unit_conversion import convert_power_to_energy
setup_logging(level="WARNING")
logger = logging.getLogger(__name__)
# ****************************************
# Validators, Loading, and General methods
# ****************************************
@logged_method_call
def _analysis_filter(
error_dict: dict, metadata: PlantMetaData, analysis_types: list[str] = ["all"]
) -> dict:
"""Filters the errors found by the analysis requirements provided by the
:py:attr:`analysis_types`.
Args:
error_dict (:obj:`dict`): The dictionary of errors separated by the keys:
"missing", "dtype", and "frequency".
metadata (:obj:`PlantMetaData`): The ``PlantMetaData`` object containing the column
mappings for each data type.
analysis_types (:obj:`list[str]`, optional): The list of analysis types to
consider for validation. If "all" is contained in the list, then all errors
are returned back, and if ``None`` is contained in the list, then no errors
are returned, otherwise the union of analysis requirements is returned back.
Defaults to ["all"].
Returns:
dict: The missing column, bad dtype, and incorrect timestamp frequency errors
corresponding to the user's analysis types.
"""
if "all" in analysis_types:
return error_dict
if analysis_types == [None]:
return {}
if None in analysis_types:
_ = analysis_types.pop(analysis_types.index(None))
categories = ("scada", "meter", "tower", "curtail", "reanalysis", "asset")
requirements = {key: ANALYSIS_REQUIREMENTS[key] for key in analysis_types}
column_requirements = {
cat: set(
itertools.chain(*[r.get(cat, {}).get("columns", []) for r in requirements.values()])
)
for cat in categories
}
for key, value in column_requirements.items():
if key == "reanalysis":
reanalysis_keys = [k for k in error_dict["missing"] if k.startswith(key)]
_add = {}
for k in reanalysis_keys:
name = k.split("-")[1]
col_map = getattr(metadata, key)[name].col_map
_add[k] = {col_map[v] for v in value}
else:
col_map = getattr(metadata, key).col_map
column_requirements.update({key: {col_map[v] for v in value}})
column_requirements.update(_add)
# Filter the missing columns, so only analysis-specific columns are provided
error_dict["missing"] = {
key: values.intersection(error_dict["missing"].get(key, []))
for key, values in column_requirements.items()
}
# Filter the bad dtype columns, so only analysis-specific columns are provided
error_dict["dtype"] = {
key: values.intersection(error_dict["dtype"].get(key, []))
for key, values in column_requirements.items()
}
return error_dict
@logged_method_call
def _compose_error_message(
error_dict: dict, metadata: PlantMetaData, analysis_types: list[str] = ["all"]
) -> str:
"""Takes a dictionary of error messages from the ``PlantData`` validation routines,
filters out errors unrelated to the intended analysis types, and creates a
human-readable error message.
Args:
error_dict (dict): See ``PlantData._errors`` for more details.
metadata (PlantMetaData): The ``PlantMetaData`` object containing the column
mappings for each data type.
analysis_types (list[str], optional): The user-input analysis types, which are
used to filter out unlreated errors. Defaults to ["all"].
Returns:
str: The human-readable error message breakdown.
"""
if analysis_types == [None]:
return ""
if "all" not in analysis_types:
error_dict = _analysis_filter(error_dict, metadata, analysis_types)
messages = [
f"`{name}` data is missing the following columns: {cols}"
for name, cols in error_dict["missing"].items()
if len(cols) > 0
]
messages.extend(
[
f"`{name}` data columns were of the wrong type: {cols}"
for name, cols in error_dict["dtype"].items()
if len(cols) > 0
]
)
messages.extend(
[
f"`{name}` data is of the wrong frequency: {freq}"
for name, freq in error_dict["frequency"].items()
]
)
return "\n".join(messages)
@logged_method_call
def frequency_validator(
actual_freq: str | int | float | None,
desired_freq: str | set[str] | None,
exact: bool,
) -> bool:
"""Helper function to check if the actual datetime stamp frequency is valid compared
to what is required.
Args:
actual_freq(:obj:`str` | :obj:`int` | :obj:`float` | :obj:`None`): The frequency of the
timestamp, either as an offset alias or manually determined in seconds between timestamps.
desired_freq (Optional[str | None | set[str]]): Either the exact frequency,
required or a set of options that are also valid, in which case any numeric
information encoded in ``actual_freq`` will be dropped.
exact(:obj:`bool`): If the provided frequency codes should be exact matches (``True``),
or, if ``False``, the check should be for a combination of matches.
Returns:
(:obj:`bool`): If the actual datetime frequency is sufficient, per the match requirements.
"""
if desired_freq is None:
return True
if actual_freq is None:
return False
if isinstance(desired_freq, str):
desired_freq = {desired_freq}
# If an offset alias couldn't be found, then convert the desired frequency strings to seconds
# unless the frequency string is a monthly time encoding, which is deprecated.
if not isinstance(actual_freq, str):
desired_freq = {ts.offset_to_seconds(el) for el in desired_freq if el not in ("MS", "ME")}
if exact:
return actual_freq in desired_freq
if isinstance(actual_freq, str):
actual_freq = "".join(filter(str.isalpha, actual_freq))
return actual_freq in desired_freq
# For non-exact matches, just check that the actual is less than the maximum allowable frequency
return actual_freq < max(desired_freq)
def convert_to_list(
value: Sequence | str | int | float | None,
manipulation: Callable | None = None,
) -> list:
"""Converts an unknown element that could be a list or single, non-sequence element
to a list of elements.
Args:
value(:obj:`Sequence` | :obj:`str` | :obj:`int` | :obj:`float`): The unknown element to be
converted to a list of element(s).
manipulation(:obj:`Callable` | :obj:`None`) A function to be performed upon the individual elements, by default None.
Returns:
(:ojb:`list`): The new list of elements.
"""
if isinstance(value, (str, int, float)) or value is None:
value = [value]
if manipulation is not None:
return [manipulation(el) for el in value]
return list(value)
@logged_method_call
def column_validator(df: pd.DataFrame, column_names={}) -> None | list[str]:
"""Validates that the column names exist as provided for each expected column.
Args:
df (pd.DataFrame): The DataFrame for column naming validation
column_names (dict, optional): Dictionary of column type (key) to real column
value (value) pairs. Defaults to {}.
Returns:
None | list[str]: A list of error messages that can be raised at a later step
in the validation process.
"""
try:
missing = set(column_names.values()).difference(df.columns)
except AttributeError:
# Catches 'NoneType' object has no attribute 'columns' for no data
missing = column_names.values()
if missing:
return list(missing)
return []
@logged_method_call
def dtype_converter(df: pd.DataFrame, column_types={}) -> list[str]:
"""Converts the columns provided in :py:attr:`column_types` of :py:attr:`df` to the appropriate
data type.
Args:
df (pd.DataFrame): The DataFrame for type validation/conversion
column_types (dict, optional): Dictionary of column name (key) and data type
(value) pairs. Defaults to {}.
Returns:
None | list[str]: List of error messages that were encountered in the conversion
process that will be raised at another step of the data validation.
"""
errors = []
for column, new_type in column_types.items():
if new_type in (np.datetime64, pd.DatetimeIndex):
try:
df[column] = pd.DatetimeIndex(df[column])
except Exception as e: # noqa: disable=E722
errors.append(column)
continue
try:
df[column] = df[column].astype(new_type)
except: # noqa: disable=E722
errors.append(column)
return errors
@logged_method_call
def load_to_pandas(data: str | Path | pd.DataFrame) -> pd.DataFrame | None:
"""Loads the input data or filepath to apandas DataFrame.
Args:
data (str | Path | pd.DataFrame): The input data.
Raises:
ValueError: Raised if an invalid data type was passed.
Returns:
pd.DataFrame | None: The passed ``None`` or the converted pandas DataFrame object.
"""
if data is None:
return data
elif isinstance(data, (str, Path)):
logger.info(f"Loading {data} to a pandas DataFrame")
return pd.read_csv(data)
elif isinstance(data, pd.DataFrame):
return data
else:
raise ValueError("Input data could not be converted to pandas")
def load_to_pandas_dict(
data: dict[str | Path | pd.DataFrame],
) -> dict[str, pd.DataFrame] | None:
"""Converts a dictionary of data or data locations to a dictionary of ``pd.DataFrame``s
by iterating over the dictionary and passing each value to ``load_to_pandas``.
Args:
data (dict[str | Path | pd.DataFrame]): The input data.
Returns:
dict[str, pd.DataFrame] | None: The passed ``None`` or the converted ``pd.DataFrame``
object.
"""
if data is None:
return data
for key, val in data.items():
data[key] = load_to_pandas(val)
return data
@logged_method_call
def rename_columns(df: pd.DataFrame, col_map: dict, reverse: bool = True) -> pd.DataFrame:
"""Renames the pandas DataFrame columns using col_map. Intended to be used in
conjunction with the a data objects meta data column mapping (``reverse=True``).
Args:
df (pd.DataFrame): The DataFrame to have its columns remapped.
col_map (dict): Dictionary of existing column names and new column names.
reverse (bool, optional): True, if the new column names are the keys (using the
xxMetaData.col_map as input), or False, if the current column names are the
values (original column names). Defaults to True.
Returns:
pd.DataFrame: Input DataFrame with remapped column names.
"""
if reverse:
col_map = {v: k for k, v in col_map.items()}
return df.rename(columns=col_map)
############################
# Define the PlantData class
############################
[docs]
@define(auto_attribs=True)
class PlantData:
"""Overarching data object used for storing, accessing, and acting on the primary
operational analysis data types, including: SCADA, meter, tower, status, curtailment,
asset, and reanalysis data. As of version 3.0, this class provides an automated
validation scheme through the use of `analysis_type` as well as a secondary scheme
that can be run after further manipulations are performed. Additionally, version 3.0
incorporates a metadata scheme `PlantMetaData` to map between user column naming
conventions and the internal column naming conventions for both usability and code
consistency.
Args:
metadata (`PlantMetaData`): A nested dictionary of the schema definition
for each of the data types that will be input, and some additional plant
parameters. See ``PlantMetaData``, ``SCADAMetaData``, ``MeterMetaData``,
``TowerMetaData``, ``StatusMetaData``, ``CurtailMetaData``, ``AssetMetaData``,
and/or ``ReanalysisMetaData`` for more information.
analysis_type (`list[str]`): A single, or list of, analysis type(s) that
will be run, that are configured in ``ANALYSIS_REQUIREMENTS``. See
:py:attr:`openoa.schema.metadata.ANALYSIS_REQUIREMENTS` for requirements details.
- None: Don't raise any errors for errors found in the data. This is intended
for loading in messy data, but :py:meth:`validate` should be run later
if planning on running any analyses.
- "all": This is to check that all columns specified in the metadata schema
align with the data provided, as well as data types and frequencies (where
applicable).
- "MonteCarloAEP": Checks the data components that are relevant to a Monte
Carlo AEP analysis.
- "MonteCarloAEP-temp": Checks the data components that are relevant to a
Monte Carlo AEP analysis with ambient temperature data.
- "MonteCarloAEP-wd": Checks the data components that are relevant to a
Monte Carlo AEP analysis using an additional wind direction data point.
- "MonteCarloAEP-temp-wd": Checks the data components that are relevant to a
Monte Carlo AEP analysis with ambient temperature and wind direction data.
- "TurbineLongTermGrossEnergy": Checks the data components that are relevant
to a turbine long term gross energy analysis.
- "ElectricalLosses": Checks the data components that are relevant to an
electrical losses analysis.
- "WakeLosses-scada": Checks the data components that are relevant to a
wake losses analysis that uses the SCADA-based wind speed and direction
data.
- "WakeLosses-tower": Checks the data components that are relevant to a
wake losses analysis that uses the met tower-based wind speed and
direction data.
scada (``pd.DataFrame``): Either the SCADA data that's been pre-loaded to a
pandas `DataFrame`, or a path to the location of the data to be imported.
See :py:class:`SCADAMetaData` for column data specifications.
meter (``pd.DataFrame``): Either the meter data that's been pre-loaded to a
pandas `DataFrame`, or a path to the location of the data to be imported.
See :py:class:`MeterMetaData` for column data specifications.
tower (``pd.DataFrame``): Either the met tower data that's been pre-loaded
to a pandas `DataFrame`, or a path to the location of the data to be
imported. See :py:class:`TowerMetaData` for column data specifications.
status (``pd.DataFrame``): Either the status data that's been pre-loaded to
a pandas `DataFrame`, or a path to the location of the data to be imported.
See :py:class:`StatusMetaData` for column data specifications.
curtail (``pd.DataFrame``): Either the curtailment data that's been
pre-loaded to a pandas ``DataFrame``, or a path to the location of the data to
be imported. See :py:class:`CurtailMetaData` for column data specifications.
asset (``pd.DataFrame``): Either the asset summary data that's been
pre-loaded to a pandas `DataFrame`, or a path to the location of the data to
be imported. See :py:class:`AssetMetaData` for column data specifications.
reanalysis (``dict[str, pd.DataFrame]``): Either the reanalysis data that's
been pre-loaded to a dictionary of pandas ``DataFrame`` with keys indicating
the data source, such as "era5" or "merra2", or a dictionary of paths to the
location of the data to be imported following the same key naming convention.
See :py:class:`ReanalysisMetaData` for column data specifications.
Raises:
ValueError: Raised if any analysis specific validation checks don't pass with an
error message highlighting the appropriate issues.
"""
log_level: str = field(default="WARNING", converter=set_log_level)
metadata: PlantMetaData = field(
default={},
converter=PlantMetaData.load,
on_setattr=[attrs.converters, attrs.validators],
repr=False,
)
analysis_type: list[str] | None = field(
default=None,
converter=convert_to_list, # noqa: F821
validator=attrs.validators.deep_iterable(
iterable_validator=attrs.validators.instance_of(list),
member_validator=attrs.validators.in_([*ANALYSIS_REQUIREMENTS] + ["all", None]),
),
on_setattr=[attrs.setters.convert, attrs.setters.validate],
)
scada: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
meter: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
tower: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
status: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
curtail: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
asset: pd.DataFrame | None = field(default=None, converter=load_to_pandas) # noqa: F821
reanalysis: dict[str, pd.DataFrame] | None = field(
default=None, converter=load_to_pandas_dict # noqa: F821
)
# No user initialization required for attributes defined below here
# Error catching in validation
_errors: dict[str, list[str]] = field(
default={"missing": {}, "dtype": {}, "frequency": {}, "attributes": []}, init=False
)
eia: dict = field(default={}, init=False)
asset_distance_matrix: pd.DataFrame = field(init=False)
asset_direction_matrix: pd.DataFrame = field(init=False)
def __attrs_post_init__(self):
"""Post-initialization hook."""
self._calculate_reanalysis_columns()
self._set_index_columns()
self._validate_frequency()
# Remove the non-product-specific reanalysis key if it exists
# TODO: Find where this is actually entering the missing/dtype dictionaries
[d.pop("reanalysis") for d in self._errors.values() if "reanalysis" in d]
# Check the errors againts the analysis requirements
error_message = _compose_error_message(
self._errors, metadata=self.metadata, analysis_types=self.analysis_type
)
if error_message != "":
raise ValueError(error_message)
# Post-validation data manipulations
self.calculate_asset_geometries()
if self.asset is not None:
self.parse_asset_geometry()
self.calculate_asset_distance_matrix()
self.calculate_asset_direction_matrix()
if self.scada is not None:
self.calculate_turbine_energy()
# Change the column names to the -25 convention for easier use in the rest of the code base
self.update_column_names()
[docs]
@scada.validator
@meter.validator
@tower.validator
@status.validator
@curtail.validator
@asset.validator
@logged_method_call
def data_validator(self, instance: attrs.Attribute, value: pd.DataFrame | None) -> None:
"""Validator function for each of the data buckets in ``PlantData`` that checks
that the appropriate columns exist for each dataframe, each column is of the
right type, and that the timestamp frequencies are appropriate for the given
``analysis_type``.
Args:
instance (:obj:`attrs.Attribute`): The ``attrs.Attribute`` details
value (:obj:`pd.DataFrame | None`): The attribute's user-provided value. A
dictionary of dataframes is expected for reanalysis data only.
"""
name = instance.name
if self.analysis_type == [None]:
logger.info(f"Skipping data validation for {name} because `analysis_type=None`.")
return
if value is None:
columns = list(getattr(self.metadata, name).col_map.values())
self._errors["missing"].update({name: columns})
self._errors["dtype"].update({name: columns})
else:
self._errors["missing"].update(self._validate_column_names(category=name))
self._errors["dtype"].update(self._validate_dtypes(category=name))
[docs]
@reanalysis.validator
@logged_method_call
def reanalysis_validator(
self, instance: attrs.Attribute, value: dict[str, pd.DataFrame] | None
) -> None:
"""Validator function for the reanalysis data that checks for both matching reanalysis
product keys in the ``PlantMetaData.reanalysis`` metadata definition, and the following:
appropriate columns exist for each dataframe, each column is of the right type,
and that the timestamp frequencies are appropriate for the given
``analysis_type``.
Args:
instance (:obj:`attrs.Attribute`): The :py:attr:`attrs.Attribute` details.
value (:obj:`dict[str, pd.DataFrame]` | None): The attribute's user-provided value. A
dictionary of dataframes is expected for reanalysis data only.
"""
name = instance.name
if value is not None:
meta_products = [*self.metadata.reanalysis]
data_products = [*value]
if missing := set(data_products).difference(meta_products):
raise KeyError(
f"Reanalysis meta data definitions were not provided for the following"
f" reanalysis data products: {missing}"
)
if self.analysis_type == [None]:
logger.info(f"Skipping data validation for {name} because `analysis_type=None`.")
return
if value is None:
for product, metadata in self.metadata.reanalysis.items():
_name = f"{name}-{product}"
columns = list(metadata.col_map.values())
self._errors["missing"].update({_name: columns})
self._errors["dtype"].update({_name: columns})
else:
self._errors["missing"].update(self._validate_column_names(category=name))
self._errors["dtype"].update(self._validate_dtypes(category=name))
def __generate_text_repr(self):
"""Generates a text summary of the core internal data."""
repr = []
for attribute in self.__attrs_attrs__:
if not attribute.repr:
continue
name = attribute.name
value = self.__getattribute__(name)
if name == "analysis_type":
repr.append(f"{name}: {value}")
elif name in ("scada", "meter", "tower", "status", "curtail"):
repr.append(f"\n{name}")
repr.append("-" * len(name))
if value is None:
repr.append("no data")
else:
_repr = value.describe().T
repr.append(
tabulate(_repr, headers=_repr.columns, floatfmt=",.3f", tablefmt="grid")
)
elif name == "reanalysis":
repr.append(f"\n{name}")
repr.append("-" * len(name))
if "product" in value:
repr.append("no data")
else:
for product, df in value.items():
repr.append(f"\n{product}")
_repr = df.describe().T
repr.append(
tabulate(_repr, headers=_repr.columns, floatfmt=",.3f", tablefmt="grid")
)
elif name == "asset":
repr.append(f"\n{name}")
repr.append("-" * len(name))
if value is None:
repr.append("no data")
else:
value = value.drop(columns=["geometry"])
repr.append(
tabulate(value, headers=value.columns, floatfmt=",.3f", tablefmt="grid")
)
return "\n".join(repr)
def __generate_markdown_repr(self):
"""Generates a markdown-friendly summary of the core internal data."""
new_line = "\n"
repr = [
"PlantData",
new_line,
"**analysis_type**",
*[f"- {el}" for el in self.analysis_type],
new_line,
]
data = (
"no data" if self.asset is None else self.asset.drop(columns=["geometry"]).to_markdown()
)
repr.extend(["**asset**", new_line, data, new_line])
data = "no data" if self.scada is None else self.scada.describe().T.to_markdown()
repr.extend(["**scada**", new_line, data, new_line])
data = "no data" if self.meter is None else self.meter.describe().T.to_markdown()
repr.extend(["**meter**", new_line, data, new_line])
data = "no data" if self.tower is None else self.tower.describe().T.to_markdown()
repr.extend(["**tower**", new_line, data, new_line])
data = "no data" if self.status is None else self.status.describe().T.to_markdown()
repr.extend(["**status**", new_line, data, new_line])
data = "no data" if self.curtail is None else self.curtail.describe().T.to_markdown()
repr.extend(["**curtail**", new_line, data, new_line])
repr.extend(["**reanalysis**", new_line])
if "product" in self.reanalysis:
repr.append("no data")
for name, df in self.reanalysis.items():
data = df.describe().T.to_markdown()
repr.extend([f"**{name}**", new_line, data, new_line])
return (new_line).join(repr)
def __str__(self):
"""The string summary."""
return self.__generate_text_repr()
[docs]
def markdown(self):
"""A markdown-formatted version of the ``__str__``."""
display(Markdown(self.__generate_markdown_repr()))
def __repr__(self):
"""A context-aware summary generator for printing out the objects."""
is_terminal = sys.stderr.isatty()
if is_terminal:
return self.__generate_text_repr()
else:
return repr(display(Markdown(self.__generate_markdown_repr())))
@logged_method_call
def _set_index_columns(self) -> None:
"""Sets the index value for each of the `PlantData` objects that are not `None`."""
with attrs.validators.disabled():
if self.scada is not None:
time_col = self.metadata.scada.col_map["time"]
id_col = self.metadata.scada.col_map["asset_id"]
self.scada[time_col] = pd.DatetimeIndex(self.scada[time_col])
self.scada = self.scada.set_index([time_col, id_col])
self.scada.index.names = ["time", "asset_id"]
if self.meter is not None:
time_col = self.metadata.meter.col_map["time"]
self.meter[time_col] = pd.DatetimeIndex(self.meter[time_col])
self.meter = self.meter.set_index([time_col])
self.meter.index.name = "time"
if self.status is not None:
time_col = self.metadata.status.col_map["time"]
id_col = self.metadata.status.col_map["asset_id"]
self.status[time_col] = pd.DatetimeIndex(self.status[time_col])
self.status = self.status.set_index([time_col, id_col])
self.status.index.names = ["time", "asset_id"]
if self.tower is not None:
time_col = self.metadata.tower.col_map["time"]
id_col = self.metadata.tower.col_map["asset_id"]
self.tower[time_col] = pd.DatetimeIndex(self.tower[time_col])
self.tower = self.tower.set_index([time_col, id_col])
self.tower.index.names = ["time", "asset_id"]
if self.curtail is not None:
time_col = self.metadata.curtail.col_map["time"]
self.curtail[time_col] = pd.DatetimeIndex(self.curtail[time_col])
self.curtail = self.curtail.set_index([time_col])
self.curtail.index.name = "time"
if self.asset is not None:
id_col = self.metadata.asset.col_map["asset_id"]
self.asset = self.asset.set_index([id_col])
self.asset.index.name = "asset_id"
if self.reanalysis is not None:
for name in self.reanalysis:
time_col = self.metadata.reanalysis[name].col_map["time"]
self.reanalysis[name][time_col] = pd.DatetimeIndex(
self.reanalysis[name][time_col]
)
self.reanalysis[name] = self.reanalysis[name].set_index([time_col])
self.reanalysis[name].index.name = "time"
@logged_method_call
def _unset_index_columns(self) -> None:
"""Resets the index for each of the data types. This is intended solely for the use with
the :py:meth:`validate` to ensure the validation methods are able to find the index columns
in the column space
"""
if self.scada is not None:
self.scada.reset_index(drop=False, inplace=True)
if self.meter is not None:
self.meter.reset_index(drop=False, inplace=True)
if self.status is not None:
self.status.reset_index(drop=False, inplace=True)
if self.tower is not None:
self.tower.reset_index(drop=False, inplace=True)
if self.curtail is not None:
self.curtail.reset_index(drop=False, inplace=True)
if self.asset is not None:
self.asset.reset_index(drop=False, inplace=True)
if self.reanalysis is not None:
for name in self.reanalysis:
self.reanalysis[name].reset_index(drop=False, inplace=True)
@property
def data_dict(self) -> dict[str, pd.DataFrame]:
"""Property that returns a dictionary of the data contained in the ``PlantData`` object.
Returns:
(:obj:`dict[str, pd.DataFrame]`): A mapping of the data type's name and the ``DataFrame``.
"""
values = dict(
scada=self.scada,
meter=self.meter,
tower=self.tower,
asset=self.asset,
status=self.status,
curtail=self.curtail,
reanalysis=self.reanalysis,
)
return values
[docs]
@logged_method_call
def to_csv(
self,
save_path: str | Path,
with_openoa_col_names: bool = True,
metadata: str = "metadata",
scada: str = "scada",
meter: str = "meter",
tower: str = "tower",
asset: str = "asset",
status: str = "status",
curtail: str = "curtail",
reanalysis: str = "reanalysis",
) -> None:
"""Saves all of the dataframe objects to a CSV file in the provided `save_path` directory.
Args:
save_path (str | Path): The folder where all the data should be saved.
with_openoa_col_names (bool, optional): Use the PlantData column names (``True``), or
convert the column names back to the originally provided values. Defaults to True.
metadata (str, optional): File name (without extension) to be used for the metadata.
Defaults to "metadata".
scada (str, optional): File name (without extension) to be used for the SCADA data.
Defaults to "scada".
meter (str, optional): File name (without extension) to be used for the meter data.
Defaults to "meter".
tower (str, optional): File name (without extension) to be used for the tower data.
Defaults to "tower".
asset (str, optional): File name (without extension) to be used for the asset data.
Defaults to "scada".
status (str, optional): File name (without extension) to be used for the status data.
Defaults to "status".
curtail (str, optional): File name (without extension) to be used for the curtailment
data. Defaults to "curtail".
reanalysis (str, optional): Base file name (without extension) to be used for the
reanalysis data, where each dataset will use the name provided to form the following
file name: {save_path}/{reanalysis}_{name}. Defaults to "reanalysis".
"""
save_path = Path(save_path).resolve()
if not save_path.exists():
save_path.mkdir()
meta = self.metadata.column_map
if not with_openoa_col_names:
self.update_column_names(to_original=True)
else:
for name, col_map in meta.items():
if name == "reanalysis":
for re_name, re_col_map in col_map.items():
re_col_map = {k: k for k in re_col_map}
re_col_map["frequency"] = self.metadata.reanalysis[re_name].frequency
meta[name][re_name] = re_col_map
continue
col_map = {k: k for k in col_map}
meta_obj = getattr(self.metadata, name)
if hasattr(meta_obj, "frequency"):
col_map["frequency"] = meta_obj.frequency
meta[name] = col_map
with open((save_path / metadata).with_suffix(".yml"), "w") as f:
yaml.safe_dump(meta, f, default_flow_style=False, sort_keys=False)
if self.scada is not None:
scada_fn = (save_path / scada).with_suffix(".csv")
self.scada.reset_index(drop=False).to_csv(scada_fn, index=False)
logger.info(f"SCADA data saved to: {scada_fn}")
if self.status is not None:
status_fn = (save_path / status).with_suffix(".csv")
self.status.reset_index(drop=False).to_csv(status_fn, index=False)
logger.info(f"Status data saved to: {status_fn}")
if self.tower is not None:
tower_fn = (save_path / tower).with_suffix(".csv")
self.tower.reset_index(drop=False).to_csv(tower_fn, index=False)
logger.info(f"Tower data saved to: {tower_fn}")
if self.meter is not None:
meter_fn = (save_path / meter).with_suffix(".csv")
self.meter.reset_index(drop=False).to_csv(meter_fn, index=False)
logger.info(f"Meter data saved to: {meter_fn}")
if self.curtail is not None:
curtail_fn = (save_path / curtail).with_suffix(".csv")
self.curtail.reset_index(drop=False).to_csv(curtail_fn, index=False)
logger.info(f"SCADA data saved to: {curtail_fn}")
if self.asset is not None:
asset_fn = (save_path / asset).with_suffix(".csv")
self.asset.reset_index(drop=False).to_csv(asset_fn, index=False)
logger.info(f"Asset data saved to: {asset_fn}")
if self.reanalysis is not None:
for name, df in self.reanalysis.items():
reanalysis_fn = (save_path / f"{reanalysis}_{name}").with_suffix(".csv")
df.reset_index(drop=False).to_csv(reanalysis_fn, index=False)
logger.info(f"{name} reanalysis data saved to: {reanalysis_fn}")
@logged_method_call
def _validate_column_names(self, category: str = "all") -> dict[str, list[str]]:
"""Validates that the column names in each of the data types matches the mapping
provided in the `metadata` object.
Args:
category (str, optional): _description_. Defaults to "all".
Returns:
dict[str, list[str]]: _description_
"""
column_map = self.metadata.column_map
missing_cols = {}
for name, df in self.data_dict.items():
if category != "all" and category != name:
# Skip any irrelevant columns if not processing all data types
continue
if name == "reanalysis":
# If no reanalysis data, get the default key from ReanalysisMetaData
if df is None:
sub_name = [*column_map[name]][0]
missing_cols[f"{name}-{sub_name}"] = column_validator(
df, column_names=column_map[name][sub_name]
)
continue
for sub_name, df in df.items():
logger.info(f"Validating column names in the {sub_name} {name} data")
missing_cols[f"{name}-{sub_name}"] = column_validator(
df, column_names=column_map[name][sub_name]
)
else:
logger.info(f"Validating column names in the {name} data")
missing_cols[name] = column_validator(df, column_names=column_map[name])
return missing_cols
@logged_method_call
def _validate_dtypes(self, category: str = "all") -> dict[str, list[str]]:
"""Validates the dtype for each column for the specified `category`.
Args:
category (`str`, optional): The name of the data that should be
checked, or "all" to validate all of the data types. Defaults to "all".
Returns:
(`dict[str, list[str]]`): A dictionary of each data type and any
columns that don't match the required dtype and can't be converted to
it successfully.
"""
# Create a new mapping of the data's column names to the expected dtype
# TODO: Consider if this should be a encoded in the metadata/plantdata object elsewhere
column_name_map = self.metadata.column_map
column_dtype_map = self.metadata.dtype_map
column_map = {}
for name in column_name_map:
if name == "reanalysis":
column_map[name] = {}
for sub_name in column_name_map[name]:
column_map[name][sub_name] = dict(
zip(
column_name_map[name][sub_name].values(),
column_dtype_map[name][sub_name].values(),
)
)
else:
column_map[name] = dict(
zip(column_name_map[name].values(), column_dtype_map[name].values())
)
error_cols = {}
for name, df in self.data_dict.items():
if category != "all" and category != name:
# Skip irrelevant data types if not checking all data types
continue
if name == "reanalysis":
if df is None:
# If no reanalysis data, get the default key from ReanalysisMetaData
sub_name = [*column_map[name]][0]
error_cols[f"{name}-{sub_name}"] = dtype_converter(
df, column_types=column_map[name][sub_name]
)
continue
for sub_name, df in df.items():
logger.info(f"Validating the data types in the {sub_name} {name} data")
error_cols[f"{name}-{sub_name}"] = dtype_converter(
df, column_types=column_map[name][sub_name]
)
else:
logger.info(f"Validating the data types in the {name} data")
error_cols[name] = dtype_converter(df, column_types=column_map[name])
return error_cols
@logged_method_call
def _validate_frequency(self, category: str = "all") -> list[str]:
"""Internal method to check the actual datetime frequencies against the required
frequencies for the specified analysis types, and produces a list of data types
that do not meet the frequency criteria.
Args:
category (`str`, optional): The data type category. Defaults to "all".
Returns:
list[str]: The list of data types that don't meet the required datetime frequency.
"""
frequency_requirements = self.metadata.frequency_requirements(self.analysis_type)
# Collect all the frequencies for each of the data types
data_dict = self.data_dict
actual_frequencies = {}
for name, df in data_dict.items():
if df is None:
continue
if name in ("scada", "status", "tower"):
actual_frequencies[name] = ts.determine_frequency(df, "time")
elif name in ("meter", "curtail"):
actual_frequencies[name] = ts.determine_frequency(df)
elif name == "reanalysis":
actual_frequencies["reanalysis"] = {}
for sub_name, df in data_dict[name].items():
actual_frequencies["reanalysis"][sub_name] = ts.determine_frequency(df)
invalid_freq = {}
for name, freq in actual_frequencies.items():
if category != "all" and category != name:
# If only checking one data type, then skip all others
continue
if name == "reanalysis":
for sub_name, freq in freq.items():
logger.info(f"Validating the frequency of the {sub_name} {name} data")
is_valid = frequency_validator(freq, frequency_requirements.get(name), True)
is_valid |= frequency_validator(freq, frequency_requirements.get(name), False)
if not is_valid:
invalid_freq.update({f"{name}-{sub_name}": freq})
else:
logger.info(f"Validating the frequency of the {name} data")
is_valid = frequency_validator(freq, frequency_requirements.get(name), True)
is_valid |= frequency_validator(freq, frequency_requirements.get(name), False)
if not is_valid:
invalid_freq.update({name: freq})
return invalid_freq
[docs]
@logged_method_call
def validate(self, metadata: dict | str | Path | PlantMetaData | None = None) -> None:
"""Secondary method to validate the plant data objects after loading or changing
data with option to provide an updated `metadata` object/file as well
Args:
metadata (Optional[dict]): Updated metadata object, dictionary, or file to
create the updated metadata for data validation, which should align with
the mapped column names during initialization.
Raises:
ValueError: Raised at the end if errors are caught in the validation steps.
"""
logger.info("Post-intialization data validation")
# Put the index columns back into the column space to ensure success of re-validation
self._unset_index_columns()
# Initialization will have converted the column naming convention, but an updated
# metadata should account for the renaming of the columns
if metadata is None:
self.update_column_names(to_original=True)
else:
self.metadata = metadata
# Reset the index columns to be part of the columns space so the validations still work
self._errors = {
"missing": self._validate_column_names(),
"dtype": self._validate_dtypes(),
}
self._set_index_columns()
self._errors["frequency"] = self._validate_frequency()
error_message = _compose_error_message(self._errors, self.metadata, self.analysis_type)
if error_message:
raise ValueError(error_message)
self.update_column_names()
@logged_method_call
def _calculate_reanalysis_columns(self) -> None:
"""Calculates extra variables such as wind direction from the provided
reanalysis data if they don't already exist.
"""
if self.reanalysis is None:
return
logger.info("Calculating extra variables for the reanalysis data")
reanalysis = {}
for name, df in self.reanalysis.items():
col_map = self.metadata.reanalysis[name].col_map
u = col_map["WMETR_HorWdSpdU"]
v = col_map["WMETR_HorWdSpdV"]
has_u_v = (u in df) & (v in df)
ws = col_map["WMETR_HorWdSpd"]
if ws not in df and has_u_v:
df[ws] = np.sqrt(df[u].values ** 2 + df[v].values ** 2)
wd = col_map["WMETR_HorWdDir"]
if wd not in df and has_u_v:
# .values to fix an issue where df[u] and df[v] with ANY NaN values
# would cause df[wd] to be all NaN.
df[wd] = met.compute_wind_direction(df[u], df[v]).values
dens = col_map["WMETR_AirDen"]
sp = col_map["WMETR_EnvPres"]
temp = col_map["WMETR_EnvTmp"]
has_sp_temp = (sp in df) & (temp in df)
if dens not in df and has_sp_temp:
df[dens] = met.compute_air_density(df[temp], df[sp])
reanalysis[name] = df
self.reanalysis = reanalysis
[docs]
@logged_method_call
def parse_asset_geometry(
self,
reference_system: str | None = None,
utm_zone: int | None = None,
reference_longitude: float | None = None,
) -> None:
"""Calculate UTM coordinates from latitude/longitude.
The UTM system divides the Earth into 60 zones, each 6deg of longitude in width. Zone 1
covers longitude 180deg to 174deg W; zone numbering increases eastward to zone 60, which
covers longitude 174deg E to 180deg. The polar regions south of 80deg S and north of 84deg N
are excluded.
Ref: http://geopandas.org/projections.html
Args:
reference_system (:obj:`str`, optional): Used to define the coordinate reference system
(CRS). If None is used, then the `metadata.reference_system` value will be used.
Defaults to the European Petroleum Survey Group (EPSG) code 4326 to be used with
the World Geodetic System reference system, WGS 84.
utm_zone (:obj:`int`, optional): UTM zone. If None is used, then the
`metadata.utm_zone` value will be used. Defaults to the being calculated from
:py:attr:`reference_longitude`.
reference_longitude (:obj:`float`, optional): Reference longitude for calculating the
UTM zone. If None is used, then the `metadata.reference_longitude` value will be
used. Defaults to the mean of `asset.longitude`.
Returns: None
Sets the asset "geometry" column.
"""
# Check for metadata inputs
if utm_zone is None:
utm_zone = self.metadata.utm_zone
if reference_longitude is None:
reference_longitude = self.metadata.reference_longitude
if reference_system is None:
reference_system = self.metadata.reference_system
# Calculate the UTM Zone as needed
logger.info("Parsing the geometry of the asset coordinate data")
if utm_zone is None:
if reference_longitude is None:
longitude = self.asset[self.metadata.asset.longitude].mean()
utm_zone = int(np.floor((180 + longitude) / 6.0)) + 1
to_crs = f"+proj=utm +zone={utm_zone} +ellps=WGS84 +datum=WGS84 +units=m +no_defs"
transformer = Transformer.from_crs(reference_system.upper(), to_crs)
lats, lons = transformer.transform(
self.asset[self.metadata.asset.latitude].values,
self.asset[self.metadata.asset.longitude].values,
)
self.asset["geometry"] = [Point(lat, lon) for lat, lon in zip(lats, lons)]
[docs]
@logged_method_call
def update_column_names(self, to_original: bool = False) -> None:
"""Renames the columns of each dataframe to the be the keys from the
`metadata.xx.col_map` that was passed during initialization.
Args:
to_original (bool, optional): An indicator to map the column names back to
the originally passed values. Defaults to False.
"""
meta = self.metadata
reverse = not to_original # flip the boolean to correctly map between the col_map entries
if to_original:
logger.info("Converting column names back to their original naming convention")
else:
logger.info("Converting column names to OpenOA conventions")
with attrs.validators.disabled():
if self.scada is not None:
self.scada = rename_columns(self.scada, meta.scada.col_map, reverse=reverse)
if self.meter is not None:
self.meter = rename_columns(self.meter, meta.meter.col_map, reverse=reverse)
if self.tower is not None:
self.tower = rename_columns(self.tower, meta.tower.col_map, reverse=reverse)
if self.status is not None:
self.status = rename_columns(self.status, meta.status.col_map, reverse=reverse)
if self.curtail is not None:
self.curtail = rename_columns(self.curtail, meta.curtail.col_map, reverse=reverse)
if self.asset is not None:
self.asset = rename_columns(self.asset, meta.asset.col_map, reverse=reverse)
if self.reanalysis is not None:
reanalysis = {}
for name, df in self.reanalysis.items():
reanalysis[name] = rename_columns(
df, meta.reanalysis[name].col_map, reverse=reverse
)
self.reanalysis = reanalysis
@logged_method_call
def calculate_turbine_energy(self) -> None:
energy_col = self.metadata.scada.WTUR_SupWh
power_col = self.metadata.scada.WTUR_W
frequency = self.metadata.scada.frequency
self.scada[energy_col] = convert_power_to_energy(self.scada[power_col], frequency)
@property
def turbine_ids(self) -> np.ndarray:
"""The 1D array of turbine IDs. This is created from the `asset` data, or unique IDs from the
SCADA data, if `asset` is undefined.
"""
if self.asset is None:
return self.scada.index.get_level_values("asset_id").unique()
return self.asset.loc[self.asset["type"] == "turbine"].index.values
@property
def n_turbines(self) -> int:
"""The number of turbines contained in the data."""
return self.turbine_ids.size
[docs]
def turbine_df(self, turbine_id: str) -> pd.DataFrame:
"""Filters `scada` on a single `turbine_id` and returns the filtered data frame.
Args:
turbine_id (str): The asset_id of the turbine to retrieve its data.
Returns:
pd.DataFrame: The turbine-specific SCADA data frame.
"""
if self.scada is None:
raise AttributeError("This method can't be used unless `scada` data is provided.")
return self.scada.xs(turbine_id, level=1)
@property
def tower_ids(self) -> np.ndarray:
"""The 1D array of met tower IDs. This is created from the `asset` data, or unique IDs from the
tower data, if `asset` is undefined.
"""
if self.asset is None:
return self.tower.index.get_level_values("asset_id").unique()
return self.asset.loc[self.asset["type"] == "tower"].index.values
@property
def n_towers(self) -> int:
"""The number of met towers contained in the data."""
return self.tower_ids.size
[docs]
def tower_df(self, tower_id: str) -> pd.DataFrame:
"""Filters `tower` on a single `tower_id` and returns the filtered data frame.
Args:
tower_id (str): The ID of the met tower to retrieve its data.
Returns:
pd.DataFrame: The met tower-specific data frame.
"""
if self.tower is None:
raise AttributeError("This method can't be used unless `tower` data is provided.")
return self.tower.xs(tower_id, level=1)
@property
def asset_ids(self) -> np.ndarray:
"""The ID array of turbine and met tower IDs. This is created from the `asset` data, or unique
IDs from both the SCADA data and tower data, if `asset` is undefined.
"""
if self.asset is None:
return np.concatenate([self.turbine_ids, self.tower_ids])
return self.asset.index.values
# NOTE: v2 AssetData methods
[docs]
@logged_method_call
def calculate_asset_distance_matrix(self) -> pd.DataFrame:
"""Calculates the distance between all assets on the site with `np.inf` for the distance
between an asset and itself.
Returns:
pd.DataFrame: Dataframe containing distances between each pair of assets
"""
ix = self.asset.index.values
distance = (
pd.DataFrame(
[i, j, self.asset.loc[i, "geometry"].distance(self.asset.loc[j, "geometry"])]
for i, j in itertools.combinations(ix, 2)
)
.pivot(index=0, columns=1, values=2)
.rename_axis(index={0: None}, columns={1: None})
.fillna(0)
.loc[ix[:-1], ix[1:]]
)
# Insert the first column and last row because the self-self combinations are not produced in the above
distance.insert(0, ix[0], 0.0)
distance.loc[ix[-1]] = 0
# Maintain v2 compatibility of np.inf for the diagonal
distance = distance + distance.values.T - np.diag(np.diag(distance.values))
distance_array = distance.values
np.fill_diagonal(distance_array, np.inf)
distance.loc[:, :] = distance_array
self.asset_distance_matrix = distance
[docs]
def turbine_distance_matrix(self, turbine_id: str = None) -> pd.DataFrame:
"""Returns the distances between all turbines in the plant with `np.inf` for the distance
between a turbine and itself.
Args:
turbine_id (str, optional): Specific turbine ID for which the distances to other turbines
are returned. If None, a matrix containing the distances between all pairs of turbines
is returned. Defaults to None.
Returns:
pd.DataFrame: Dataframe containing distances between each pair of turbines
"""
if self.asset_distance_matrix.size == 0:
self.calculate_asset_distance_matrix()
row_ix = self.turbine_ids if turbine_id is None else turbine_id
return self.asset_distance_matrix.loc[row_ix, self.turbine_ids]
[docs]
def tower_distance_matrix(self, tower_id: str = None) -> pd.DataFrame:
"""Returns the distances between all towers in the plant with `np.inf` for the distance
between a tower and itself.
Args:
tower_id (str, optional): Specific tower ID for which the distances to other towers
are returned. If None, a matrix containing the distances between all pairs of towers
is returned. Defaults to None.
Returns:
pd.DataFrame: Dataframe containing distances between each pair of towers
"""
if self.asset_distance_matrix.size == 0:
self.calculate_asset_distance_matrix()
row_ix = self.tower_ids if tower_id is None else tower_id
return self.asset_distance_matrix.loc[row_ix, self.tower_ids]
[docs]
@logged_method_call
def calculate_asset_direction_matrix(self) -> pd.DataFrame:
"""Calculates the direction between all assets on the site with `np.inf` for the direction
between an asset and itself, for all assets.
Returns:
pd.DataFrame: Dataframe containing directions between each pair of assets (defined as the direction
from the asset given by the row index to the asset given by the column index, relative to north)
"""
ix = self.asset.index.values
direction = (
pd.DataFrame(
[
i,
j,
np.degrees(
np.arctan2(
self.asset.loc[j, "geometry"].x - self.asset.loc[i, "geometry"].x,
self.asset.loc[j, "geometry"].y - self.asset.loc[i, "geometry"].y,
)
)
% 360.0,
]
for i, j in itertools.combinations(ix, 2)
)
.pivot(index=0, columns=1, values=2)
.rename_axis(index={0: None}, columns={1: None})
.fillna(0)
.loc[ix[:-1], ix[1:]]
)
# Insert the first column and last row because the self-self combinations are not produced in the above
direction.insert(0, ix[0], 0.0)
direction.loc[ix[-1]] = 0
# Maintain v2 compatibility of np.inf for the diagonal
direction = (
direction
+ np.triu((direction.values - 180.0) % 360.0, 1).T
- np.diag(np.diag(direction.values))
)
direction_array = direction.values
np.fill_diagonal(direction_array, np.inf)
direction.loc[:, :] = direction_array
self.asset_direction_matrix = direction
[docs]
def turbine_direction_matrix(self, turbine_id: str = None) -> pd.DataFrame:
"""Returns the directions between all turbines in the plant with `np.inf` for the direction
between a turbine and itself.
Args:
turbine_id (str, optional): Specific turbine ID for which the directions to other turbines
are returned. If None, a matrix containing the directions between all pairs of turbines
is returned. Defaults to None.
Returns:
pd.DataFrame: Dataframe containing directions between each pair of turbines (defined as the
direction from the turbine given by the row index to the turbine given by the column
index, relative to north)
"""
if self.asset_direction_matrix.size == 0:
self.calculate_asset_direction_matrix()
row_ix = self.turbine_ids if turbine_id is None else turbine_id
return self.asset_direction_matrix.loc[row_ix, self.turbine_ids]
[docs]
def tower_direction_matrix(self, tower_id: str = None) -> pd.DataFrame:
"""Returns the directions between all towers in the plant with `np.inf` for the direction
between a tower and itself.
Args:
tower_id (str, optional): Specific tower ID for which the directions to other towers
are returned. If None, a matrix containing the directions between all pairs of towers
is returned. Defaults to None.
Returns:
pd.DataFrame: Dataframe containing directions between each pair of towers (defined as the
direction from the tower given by the row index to the tower given by the column
index, relative to north)
"""
if self.asset_direction_matrix.size == 0:
self.calculate_asset_direction_matrix()
row_ix = self.tower_ids if tower_id is None else tower_id
return self.asset_direction_matrix.loc[row_ix, self.tower_ids]
[docs]
def calculate_asset_geometries(self) -> None:
"""Calculates the asset distances and parses the asset geometries. This is intended for use
during initialization and for when asset data is added after initialization
"""
if self.asset is not None:
self.parse_asset_geometry()
self.calculate_asset_distance_matrix()
self.calculate_asset_direction_matrix()
[docs]
def get_freestream_turbines(
self, wd: float, freestream_method: str = "sector", sector_width: float = 90.0
):
"""
Returns a list of freestream (unwaked) turbines for a given wind direction. Freestream turbines can be
identified using different methods ("sector" or "IEC" methods). For the sector method, if there are any
turbines upstream of a turbine within a fixed wind direction sector centered on the wind direction of interest,
defined by the sector_width argument, the turbine is considered waked. The IEC method uses the freestream
definition provided in Annex A of IEC 61400-12-1 (2005).
Args:
wd (float): Wind direction to identify freestream turbines for (degrees)
freestream_method (str, optional): Method used to identify freestream turbines
("sector" or "IEC"). Defaults to "sector".
sector_width (float, optional): Width of wind direction sector centered on the wind direction of
interest used to determine whether a turbine is waked for the "sector" method (degrees). For a given
turbine, if any other upstream turbines are located within the sector, then the turbine is considered
waked. Defaults to 90 degrees.
Returns:
list: List of freestream turbine asset IDs
"""
turbine_direction_matrix = self.turbine_direction_matrix()
if freestream_method == "sector":
# find turbines for which no other upstream turbines are within half of the sector width of the specified
# wind direction
freestream_indices = np.all(
(np.abs(met.wrap_180(wd - turbine_direction_matrix.values)) > 0.5 * sector_width)
| np.diag(np.ones(len(turbine_direction_matrix), dtype=bool)),
axis=1,
)
elif freestream_method == "IEC":
# find freestream turbines according to the definition in Annex A of IEC 61400-12-1 (2005)
turbine_distance_matrix = self.turbine_distance_matrix()
# normalize distances by rotor diameters of upstream turbines
rotor_diameters_vector = self.asset.loc[
turbine_direction_matrix.index, "rotor_diameter"
].values
rotor_diameters = np.ones((len(turbine_direction_matrix), 1)) * rotor_diameters_vector
turbine_distance_matrix /= rotor_diameters
freestream_indices = np.all(
(
(turbine_distance_matrix.values > 2)
& (
np.abs(met.wrap_180(wd - turbine_direction_matrix.values))
> 0.5
* (
1.3 * np.degrees(np.arctan(2.5 / turbine_distance_matrix.values + 0.15))
+ 10
)
)
)
| (turbine_distance_matrix.values > 20)
| (turbine_distance_matrix.values < 0),
axis=1,
)
else:
raise ValueError(
'Invalid freestream method. Currently, "sector" and "IEC" are supported.'
)
return list(self.asset.loc[self.asset["type"] == "turbine"].index[freestream_indices])
[docs]
@logged_method_call
def calculate_nearest_neighbor(
self, turbine_ids: list | np.ndarray = None, tower_ids: list | np.ndarray = None
) -> None:
"""Finds nearest turbine and met tower neighbors all of the available turbines and towers
in `asset` or as defined in `turbine_ids` and `tower_ids`.
Args:
turbine_ids (list | np.ndarray, optional): A list of turbine IDs, if not using all
turbines in the data. Defaults to None.
tower_ids (list | np.ndarray, optional): A list of met tower IDs, if not using all
met towers in the data. Defaults to None.
Returns: None
Creates the "nearest_turbine_id" and "nearest_tower_id" column in `asset`.
"""
# Get the valid IDs for both the turbines and towers
ix_turb = self.turbine_ids if turbine_ids is None else np.array(turbine_ids)
ix_tower = self.tower_ids if tower_ids is None else np.array(tower_ids)
ix = np.concatenate([ix_turb, ix_tower])
distance = self.asset_distance_matrix.loc[ix, ix]
nearest_turbine = distance[ix_turb].values.argsort(axis=1)
nearest_turbine = pd.DataFrame(
distance.columns.values[nearest_turbine], index=distance.index
).loc[ix, 0]
nearest_tower = distance[ix_tower].values.argsort(axis=1)
nearest_tower = pd.DataFrame(
distance.columns.values[nearest_tower], index=distance.index
).loc[ix, 0]
self.asset.loc[ix, "nearest_turbine_id"] = nearest_turbine.values
self.asset.loc[ix, "nearest_tower_id"] = nearest_tower.values
[docs]
def nearest_turbine(self, asset_id: str) -> str:
"""Finds the nearest turbine to the provided `asset_id`.
Args:
asset_id (str): A valid `asset` `asset_id`.
Returns:
str: The turbine `asset_id` closest to the provided `asset_id`.
"""
if "nearest_turbine_id" not in self.asset.columns:
self.calculate_nearest_neighbor()
return self.asset.loc[asset_id, "nearest_turbine_id"].values[0]
[docs]
def nearest_tower(self, asset_id: str) -> str:
"""Finds the nearest tower to the provided `asset_id`.
Args:
asset_id (str): A valid `asset` `asset_id`.
Returns:
str: The tower `asset_id` closest to the provided `asset_id`.
"""
if "nearest_tower_id" not in self.asset.columns:
self.calculate_nearest_neighbor()
return self.asset.loc[asset_id, "nearest_tower_id"].values[0]
@classmethod
def from_entr(cls, *args, **kwargs):
try:
from entr.plantdata import from_entr
except ModuleNotFoundError:
raise NotImplementedError(
"The entr python package was not found. Please install py-entr by visiting https://github.com/entralliance/py-entr and following the instructions."
)
return from_entr(*args, **kwargs)
# **********************************************************
# Define additional class methods for custom loading methods
# **********************************************************
# Add the method for fetching and attaching the EIA plant data to the project
setattr(PlantData, "attach_eia_data", attach_eia_data)