"""This module define the basic class encapsulating a variable attributes."""
from __future__ import annotations
import abc
import array
import numpy
import logging
import copy
from collections.abc import MutableSequence
from numbers import Number
from typing import Any, Iterable, Optional, Union, TYPE_CHECKING
from cosapp.ports import units
from cosapp.ports.enum import Scope, Validity, RangeType
from cosapp.utils.distributions import Distribution
from cosapp.utils.naming import NameChecker, CommonPorts
from cosapp.utils.helpers import check_arg, is_numerical, get_typename
from cosapp.utils.json import jsonify
if TYPE_CHECKING:
from cosapp.ports.port import BasePort
logger = logging.getLogger(__name__)
RangeValue = tuple[Any, Any]
ArrayIndices = Optional[Union[int, Iterable[int], Iterable[Iterable[int]], slice]]
Types = Union[type, tuple[type, ...]]
[docs]
class BaseVariable(abc.ABC):
"""Base class for variable detail container.
Parameters
----------
- name [str]:
Variable name.
- port [BasePort]:
Port to which variable belongs.
- value [Any]:
Variable value.
- unit [str, optional]:
Variable unit; default empty string (i.e. dimensionless)
- dtype [type or iterable of type, optional]:
Variable type; default None (i.e. type of initial value).
- desc [str, optional]:
Variable description; default to ''.
- scope [Scope]: {PRIVATE, PROTECTED, PUBLIC},
Variable visibility; defaults to PRIVATE.
"""
# Value cannot be integrated in this object
# otherwise value getter execution time *200
# another advantage is that the metadata can be change/shared without providing
# a way to change the value no passing through the Port (which is bad for the
# clean dirty logic).
# Disadvantage: the logic to store the details and the value is a bit messy in
# BasePort.add_variable
__slots__ = (
"__weakref__",
"_name",
"_desc",
"_unit",
"_port",
"_scope",
"_dtype",
)
__name_check = NameChecker(excluded=CommonPorts.names())
def __init__(
self,
name: str,
port: BasePort,
value: Any,
unit = "",
dtype: Optional[Types] = None,
desc = "",
scope = Scope.PRIVATE,
):
self._name = self.name_check(name)
from cosapp.ports.port import BasePort
check_arg(port, 'port', BasePort)
self._port = port
check_arg(unit, 'unit', str)
if dtype is not None:
if not isinstance(dtype, type):
failed = False
try:
for dtype_ in dtype:
failed |= not isinstance(dtype_, type)
except TypeError:
failed = True
if failed:
raise TypeError(f"Type must be defined by one or several types; got {dtype}.")
check_arg(desc, 'desc', str)
check_arg(scope, 'scope', Scope)
if unit and not is_numerical(value):
unit = ""
logger.warning(
f"A physical unit is defined for non-numerical variable {name!r}; it will be ignored."
)
elif not units.is_valid_units(unit) and unit:
raise units.UnitError(f"Unknown unit {unit}.")
value, dtype = self._process_value(value, dtype)
self._unit = unit
self._desc = desc
self._scope = scope
self._dtype: Optional[Types] = dtype
[docs]
@abc.abstractmethod
def copy(self, port: BasePort, name: Optional[str]=None) -> BaseVariable:
pass
@abc.abstractmethod
def _repr_markdown_(self) -> str:
"""Returns the representation of this variable in Markdown format.
Returns
-------
str
Markdown formatted representation
"""
pass
def _process_value(self, value, dtype):
if dtype is None:
if value is None:
dtype = None # can't figure out type if both value and dtype are None
else:
dtype = type(value)
if is_numerical(value):
# Force generic number type only if user has not specified any type
if issubclass(dtype, Number):
dtype = (Number, numpy.ndarray)
elif isinstance(value, (MutableSequence, array.ArrayType)):
# We have a collection => transform Mutable to ndarray
dtype = (MutableSequence, array.ArrayType, numpy.ndarray)
value = numpy.asarray(value)
elif value is not None:
# Test value has the right type
if not isinstance(value, dtype):
typename = get_typename(dtype)
raise TypeError(
"Cannot set {} of type {} with a {}.".format(
self.full_name, typename, type(value).__qualname__,
)
)
return value, dtype
[docs]
@classmethod
def name_check(cls, name: str):
"""Check if the variable name is valid."""
return cls.__name_check(name)
@property
def port(self) -> BasePort:
"""BasePort: Port to which variable belongs"""
return self._port
@property
def full_name(self) -> str:
"""str: Full variable name, including port contextual name"""
return f"{self._port.contextual_name}.{self.name}"
@property
def name(self) -> str:
"""str: Variable name"""
return self._name
@property
def value(self) -> Any:
"""Any: Variable value"""
return getattr(self._port, self._name)
@value.setter
def value(self, value) -> None:
setattr(self._port, self._name, value)
@property
def unit(self) -> str:
"""str: Variable unit; empty string means dimensionless"""
return self._unit
@property
def dtype(self) -> Optional[Types]:
"""Type[Any] or tuple of Type[Any] or None: Type of the variable; default None (i.e. type of default value is set)"""
return self._dtype
@property
def description(self) -> str:
"""str: Variable description"""
return self._desc
@property
def scope(self) -> Scope:
"""Scope: Scope of variable visibility"""
return self._scope
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return self._repr_markdown_()
def __json__(self) -> dict[str, Any]:
"""Creates a JSONable dictionary representation of the object.
Returns
-------
dict[str, Any]
The dictionary
"""
return jsonify(self.to_dict())
[docs]
def filter_value(self, value: Any) -> Any:
if self.dtype == (
MutableSequence,
array.ArrayType,
numpy.ndarray,
):
value = numpy.asarray(value)
return value
def _to_raw_dict(self) -> dict[str, Any]:
"""Convert this variable into a dictionary.
Returns
-------
dict
The dictionary representing this variable.
"""
return {
"value": self.value,
"unit": self.unit or None,
"dtype": str(self._dtype) or None,
"desc": self.description or None,
}
[docs]
def to_dict(self) -> dict:
"""Convert this variable into a dictionary.
Returns
-------
dict[str, Any]:
Dictionary representing variable. Attributes with `None` value are filtered out.
"""
data = self._to_raw_dict()
def filter_func(item):
key, val = item
return key == "value" or val is not None
return dict(filter(filter_func, data.items()))
[docs]
class Variable(BaseVariable):
"""Variable detail container.
The `valid_range` defines the range of value for which a model is known to behave correctly.
The `limits` are at least as large as the validity range.
Parameters
----------
- name [str]:
Variable name.
- port [BasePort]:
Port to which variable belongs.
- value [Any]:
Variable value.
- unit [str, optional]:
Variable unit; default empty string (i.e. dimensionless)
- dtype [type or iterable of type, optional]:
Variable type; default None (i.e. type of initial value).
- desc [str, optional]:
Variable description; default to ''.
- valid_range [tuple[Any, Any] or tuple[tuple], optional]:
Validity range of the variable; default None (i.e. all values are valid).
tuple[Any, Any] in case of scalar value, tuple of tuples in case of vector value.
- invalid_comment [str, optional]:
Comment to show in case the value is not valid; default ''
- limits [tuple[Any, Any] or tuple[tuple], optional]:
Limits over which the use of the model is wrong; default valid_range.
tuple[Any, Any] in case of scalar value, tuple of tuples in case of vector value.
- out_of_limits_comment [str, optional]:
Comment to show in case the value is not valid; default ''
- distribution [Distribution, optional]:
Variable random distribution; default None (no distribution)
- scope [Scope]: {PRIVATE, PROTECTED, PUBLIC},
Variable visibility; defaults to PRIVATE.
"""
__slots__ = (
"_distribution",
"_limits",
"_out_of_limits_comment",
"_valid_range",
"_invalid_comment",
)
def __init__(
self,
name: str,
port: BasePort,
value: Any,
unit = "",
dtype: Optional[Types] = None,
valid_range: Optional[RangeValue] = None,
invalid_comment = "",
limits: Optional[RangeValue] = None,
out_of_limits_comment = "",
desc = "",
distribution: Optional[Distribution] = None,
scope = Scope.PRIVATE,
):
super().__init__(name, port, value, unit, dtype, desc, scope)
# Additional value check
value, dtype = self._process_value(value, dtype)
# Check validation ranges are compatible and meaningful for this type of data
limits, valid_range = self._check_range(limits, valid_range, value)
if valid_range is None and len(invalid_comment) > 0:
logger.warning(
f"Invalid comment specified for variable {name!r} without validity range."
)
if limits is None and len(out_of_limits_comment) > 0:
logger.warning(
f"Out-of-limits comment specified for variable {name!r} without limits."
)
self._valid_range = valid_range
self._invalid_comment = ""
self.invalid_comment = invalid_comment
self._limits = limits
self._out_of_limits_comment = ""
self.out_of_limits_comment = out_of_limits_comment
self._distribution: Optional[Distribution] = None
self.distribution = distribution
@staticmethod
def _get_limits_from_type(variable: Any) -> Optional[RangeValue]:
"""Get default limits for a variable depending of its type.
Parameters
----------
variable: Any
Variable of interest
Returns
-------
tuple[float, float] or None
Default (lower, upper) limits
"""
if is_numerical(variable):
return -numpy.inf, numpy.inf
else:
return None
[docs]
def check_range_type(self, value: Iterable) -> RangeType:
"""Get type of valid_range of limits of variable.
This function checks if `value` is a size 2 tuple of scalar
or a tuple of (lower, upper) tuples.
Parameters
----------
value: Any
value need to be checked
Returns
-------
int (from enum RangeType)
Type of `value`
"""
tuple_check = True
value_check = True
if isinstance(value, Number):
raise TypeError(
"Validity or limit range must be a tuple with format comparable to value"
)
if value is not None:
for bound in value:
if isinstance(bound, (tuple,list)):
value_check = False
elif isinstance(bound, Number) or bound is None:
tuple_check = False
else:
value_check = False
tuple_check = False
else:
return RangeType.NONE
if value_check and not tuple_check:
if len(value) != 2:
raise TypeError(
"Valid range or limits must be a size 2 tuple with type comparable to value"
)
return RangeType.VALUE
elif tuple_check and not value_check:
return RangeType.TUPLE
else:
raise ValueError(
f"Mixed values in valid_range {value} of {self.full_name!r}."
" Valid object can contain only numerical values or only tuples"
)
def _check_range(self,
limits: Optional[RangeValue],
valid_range: Optional[RangeValue],
value: Any,
) -> tuple[Optional[RangeValue], Optional[RangeValue]]:
"""Correct coherence of limits and validation range depending on value type.
Parameters
----------
limits: tuple[Any, Any] or tuple[tuple] or None
(lower, upper) limits
valid_range: tuple[Any, Any] tuple[tuple] or None
(lower, upper) validation range
value: Any
Returns
-------
tuple[tuple[Any, Any] or tuple[tuple] or None, tuple[Any, Any] or tuple[tuple] or None]
tuple of corrected (limits, validation range)
"""
default = Variable._get_limits_from_type(value)
range_type = self.check_range_type(valid_range)
limits_type = self.check_range_type(limits)
if default is not None:
def get_bounds(lower, upper) -> tuple[float, float]:
if lower is None:
lower = default[0]
if upper is None:
upper = default[1]
return (lower, upper) if lower < upper else (upper, lower)
def raise_inconsistency() -> None:
name = self.full_name
raise ValueError(
f"valid_range {valid_range} and limits {limits} of variable {name!r} have different formats"
)
if limits is None:
limits = (None, None)
if valid_range is None:
valid_range = limits
if range_type == RangeType.VALUE:
min_range, max_range = valid_range = get_bounds(*valid_range)
if limits_type == RangeType.VALUE:
min_limit, max_limit = get_bounds(*limits)
limits = (numpy.minimum(min_range, min_limit), numpy.maximum(max_range, max_limit))
elif limits_type == RangeType.NONE:
limits = tuple(default)
else:
raise_inconsistency()
elif range_type == RangeType.TUPLE:
valid_range = tuple(get_bounds(*pair) for pair in valid_range)
if limits_type == RangeType.TUPLE:
limits = tuple(get_bounds(*pair) for pair in limits)
elif limits_type == RangeType.NONE:
limits = tuple(default)
else:
raise_inconsistency()
elif range_type == RangeType.NONE:
if limits_type == RangeType.VALUE:
limits = valid_range = get_bounds(*limits)
elif limits_type == RangeType.TUPLE:
limits = valid_range = tuple(get_bounds(*pair) for pair in limits)
elif limits_type == RangeType.NONE:
limits = valid_range = tuple(default)
else:
limits = valid_range = None
return limits, valid_range
def _repr_markdown_(self) -> str:
"""Returns the representation of this variable in Markdown format.
Returns
-------
str
Markdown formatted representation
"""
msg = {"name":f"**{self.name}**" , "unit": f" {self.unit}" if self.unit else ""}
value = self.value
try:
msg["value"] = f"{value:.5g}"
except:
msg["value"] = value
min_valid, max_valid = (
self.valid_range if self.valid_range is not None else (None, None)
)
min_limit, max_limit = self.limits if self.limits is not None else (None, None)
left_bracket = "⦗"
left_arrow = "⟝"
right_arrow = "⟞"
right_bracket = "⦘"
lock_icon = "🔒"
if min_limit is None or numpy.all(numpy.isinf(min_limit)):
msg["min_limit"] = ""
else:
msg["min_limit"] = f" {left_bracket} {min_limit:.5g} {left_arrow} "
if max_limit is None or numpy.all(numpy.isinf(max_limit)):
msg["max_limit"] = ""
else:
msg["max_limit"] = f" {right_arrow} {max_limit:.5g} {right_bracket} "
if min_valid is None or min_limit == min_valid or numpy.all(numpy.isinf(min_valid)):
msg["min_valid"] = ""
else:
msg["min_valid"] = f"{min_valid:.5g} {left_arrow} "
if max_valid is None or max_limit == max_valid or numpy.all(numpy.isinf(max_valid)):
msg["max_valid"] = ""
else:
msg["max_valid"] = f" {right_arrow} {max_valid:.5g}"
if self.description:
msg["description"] = f" | {self.description}"
else:
msg["description"] = f" | "
scope_format = {
Scope.PRIVATE: f" {lock_icon*2} ",
Scope.PROTECTED: f" {lock_icon} ",
Scope.PUBLIC: "",
}
msg["scope"] = scope_format[self.scope]
if len(msg["min_limit"] + msg["min_valid"]) == len(msg["max_limit"] + msg["max_valid"]) == 0:
msg["range"] = ""
msg["separator"] = ""
else:
msg["range"] = " value "
msg["separator"] = "; "
return (
"{name}{scope}: {value!s}{unit}"
"{separator}{min_limit}{min_valid}{range}{max_valid}{max_limit}"
"{description}".format(**msg)
)
def __json__(self) -> dict[str, Any]:
"""Creates a JSONable dictionary representation of the object.
Returns
-------
dict[str, Any]
The dictionary
"""
data = super().__json__()
data.update({
"valid_range": self.valid_range,
"invalid_comment": self.invalid_comment,
"limits": self.limits,
"out_of_limits_comment": self.out_of_limits_comment,
"distribution": self.distribution.__json__() if self.distribution else None,
})
return jsonify(data)
@property
def valid_range(self) -> Optional[RangeValue]:
"""tuple[Any, Any] or None: validity range of the variable and optional comment if unvalid"""
return self._valid_range
@valid_range.setter
def valid_range(self, new_range: Optional[RangeValue]):
range_type = self.check_range_type(new_range)
value = self.value
default = self._get_limits_from_type(value)
if default is not None:
if range_type == RangeType.VALUE:
if len(new_range) != 2:
raise TypeError(
"Validity range must be a size 2 tuple with type comparable to value."
)
limits = self.limits
valid_range = new_range
elif range_type == RangeType.TUPLE:
limits = self.limits
valid_range = new_range
current_range = self.valid_range
elif range_type == RangeType.NONE:
limits = self.limits
valid_range = new_range
current_range = self.valid_range
min_valid = (
limits[0] if limits[0] > current_range[0] else current_range[0]
)
max_valid = (
limits[1] if limits[1] < current_range[1] else current_range[1]
)
valid_range = (min_valid, max_valid)
else:
valid_range = None
limits = None
limits, valid_range = self._check_range(self.limits, new_range, value)
self._valid_range = valid_range
self._limits = limits
@property
def invalid_comment(self) -> str:
"""str: Comment explaining the reasons of the validity range"""
return self._invalid_comment
@invalid_comment.setter
def invalid_comment(self, new_comment: str):
if not isinstance(new_comment, str):
raise TypeError(f"invalid_comment must be a string; got {new_comment!s}")
self._invalid_comment = new_comment
@property
def limits(self) -> Optional[RangeValue]:
"""tuple[Any, Any] or None: Variable limits and optional comment if unvalid"""
return self._limits
@limits.setter
def limits(self, new_limits: Optional[RangeValue]):
limits_type = self.check_range_type(new_limits)
value = self.value
default = self._get_limits_from_type(value)
if default is not None:
if limits_type == RangeType.VALUE:
if len(new_limits) != 2:
raise TypeError(
"Limits must be a size 2 tuple with type comparable to value."
)
limits = new_limits
if new_limits[0] is None:
limits = (default[0], limits[1])
if new_limits[1] is None:
limits = (limits[0], default[1])
current_range = self.valid_range
min_valid = max(limits[0], current_range[0])
max_valid = min(limits[1], current_range[1])
valid_range = (min_valid, max_valid)
elif limits_type == RangeType.TUPLE:
limits_list = list(new_limits)
for index in range(len(limits_list)):
if limits_list[index][0] is None:
limits_list[index] = (default[0], limits_list[index][1])
if limits_list[index][1] is None:
limits_list[index][1] = (limits_list[index][0],default[1])
limits = tuple(limits_list)
current_range = self.valid_range
valid_range_list = list(current_range)
for index, valid_range in enumerate(valid_range_list):
limit_value = limits[index]
range_value = current_range[index]
valid_range_list[index] = (max(limit_value[0], range_value[0]), valid_range[1])
valid_range_list[index] = (valid_range[0], min(limit_value[1], range_value[1]))
valid_range = tuple(valid_range_list)
elif limits_type == RangeType.NONE:
limits = self.limits
current_range = self.valid_range
min_valid = max(limits[0], current_range[0])
max_valid = min(limits[1], current_range[1])
valid_range = (min_valid, max_valid)
else:
limits = valid_range = None
limits, valid_range = self._check_range(limits, valid_range, value)
self._valid_range = valid_range
self._limits = limits
@property
def out_of_limits_comment(self) -> str:
"""str: Comment explaining the reasons of the limits"""
return self._out_of_limits_comment
@out_of_limits_comment.setter
def out_of_limits_comment(self, new_comment: str):
if not isinstance(new_comment, str):
raise TypeError(f"out_of_limits_comment must be a string; got {new_comment!s}")
self._out_of_limits_comment = new_comment
@property
def distribution(self) -> Optional[Distribution]:
"""Optional[Distribution]: Random distribution of the variable."""
return self._distribution
@distribution.setter
def distribution(self, new_distribution: Optional[Distribution]):
ok = new_distribution is None or isinstance(new_distribution, Distribution)
if not ok:
typename = type(new_distribution).__qualname__
raise TypeError(
f"Random distribution should be of type 'Distribution'; got {typename}."
)
self._distribution = new_distribution
[docs]
def is_valid(self) -> Validity:
"""Get the variable value validity.
Returns
-------
Validity
Variable value validity
"""
status = Validity.OK
value = self.value
if not isinstance(value, (Number, numpy.ndarray)):
return status
if self.valid_range is not None:
range_type = self.check_range_type(self.valid_range)
if isinstance(value, numpy.ndarray):
if range_type == RangeType.VALUE:
min_range, max_range = self.valid_range
if numpy.any(value > max_range) or numpy.any(value < min_range):
status = Validity.WARNING
if self.limits is not None:
min_limit, max_limit = self.limits
if numpy.any(value > max_limit) or numpy.any(value < min_limit):
status = Validity.ERROR
elif range_type == RangeType.TUPLE:
def check_values(bound_list, key) -> None:
nonlocal status
for v, (lower, upper) in zip(value, bound_list):
if not (lower <= v <= upper):
status = Validity[key]
break
check_values(self.valid_range, "WARNING")
if self.limits is not None:
check_values(self.limits, "ERROR")
else:
varname = f"{self._port.contextual_name}.{self.name}"
raise ValueError(
f"Mixed values in valid_range {self.valid_range} of {varname!r}."
" Valid object can contain only numerical values or only tuples")
else:
if range_type == RangeType.VALUE:
min_range, max_range = self.valid_range
if not min_range <= value <= max_range:
status = Validity.WARNING
if self.limits is not None:
min_limit, max_limit = self.limits
if not min_limit <= value <= max_limit:
status = Validity.ERROR
elif range_type == RangeType.TUPLE:
varname = f"{self._port.contextual_name}.{self.name}"
raise ValueError(
f"valid_range {self.valid_range} or limits {self.limits} of variable {varname!r}"
f" are incompatible with its value {value}")
return status
[docs]
def copy(self, port: BasePort, name: Optional[str]=None) -> Variable:
return Variable(
name or self.name,
value = copy.copy(self.value),
port = port,
unit = self._unit,
dtype = copy.copy(self._dtype),
valid_range = copy.deepcopy(self._valid_range),
invalid_comment = self._invalid_comment,
limits = copy.deepcopy(self._limits),
out_of_limits_comment = self._out_of_limits_comment,
desc = self._desc,
distribution = copy.deepcopy(self._distribution),
scope = self._scope,
)
def _to_raw_dict(self) -> dict:
"""Convert this variable into a dictionary.
Returns
-------
dict
The dictionary representing this variable.
"""
data = super()._to_raw_dict()
data.update({
"invalid_comment": self.invalid_comment or None,
"out_of_limits_comment": self.out_of_limits_comment or None,
"distribution": self.distribution if self.distribution else None,
})
for key in ["valid_range", "limits"]:
try:
tmp_val = list(getattr(self, key))
for idx, val in enumerate(tmp_val):
if numpy.isinf(val):
tmp_val[idx] = str(val)
if tmp_val == ["-inf", "inf"]:
tmp_val = None
except TypeError:
tmp_val = None
data[key] = tmp_val
return data