"""
Module handling the execution of code provided as string by the user.
This code is inspired from the OpenMDAO module openmdao.components.exec_comp.
"""
from __future__ import annotations
import re
import numpy
import ast
from enum import Enum
from numbers import Number
from typing import (
Union, Any, Dict, Tuple, FrozenSet,
Iterable, Optional,
TYPE_CHECKING,
)
from cosapp.ports.port import BasePort
if TYPE_CHECKING:
from cosapp.systems import System
[docs]
class AstVisitor(ast.NodeTransformer):
"""Visitor of AST python of the string expression to evaluate.
Convert initial string expression with 'object.attributes' patterns to
'object_attributes' relatives allowing to handle them as unique variables
without exploring the whole object tree during value update.
Parameters
----------
- context : cosapp.systems.System
CoSApp System to which variables in expression can be retrieved.
"""
def __init__(self, context: System):
"""Initialization parameters:
----------
- context : cosapp.systems.System
CoSApp System to which variables in expression can be retrieved.
"""
self._context = context
self._vars = set()
self._expr_vars = {}
[docs]
def map_attributes(self, attr: str, key: str = None) -> None:
"""Generic method to reach attribute getter mapping."""
key = attr if key is None else key
self._expr_vars[key] = (self._context, attr)
self._vars.add(attr)
if attr in self._context.name2variable:
var_ref = self._context.name2variable[attr]
if isinstance(var_ref.mapping, BasePort):
self._expr_vars[key] = (var_ref.mapping, var_ref.key)
[docs]
def visit_Attribute(self, node: ast.Attribute) -> Union[ast.Attribute, ast.Name]:
"""Visit a `Attribute` and return a concatenate ast.Name if possible."""
# Determine the longest object.attributes path
attr_str = ast.unparse(node)
func_attr = ''
max_iter = len(attr_str.split("."))
i = 0
while i < max_iter and not hasattr(self._context, attr_str):
split_attr = attr_str.rsplit('.', maxsplit=1)
attr_str = split_attr[0]
func_attr = ".".join(split_attr[1:])
i += 1
# Collect variables from expression
if attr_str and i != max_iter:
key = attr_str.replace(".", "_")
self.map_attributes(attr_str, key)
# Return custom node
if func_attr:
ast_func = ast.parse(f"{key}.{func_attr}").body[0].value
return ast_func
else:
return ast.Name(id=key, ctx=ast.Load())
return self.generic_visit(node)
[docs]
def visit_Name(self, node: ast.Name) -> ast.Name:
"""Visit a `Name` and return a concatenate ast.Name if possible."""
if hasattr(self._context, node.id):
self.map_attributes(node.id)
return node
[docs]
class EvalString:
"""Create a executable statement using an expression string.
The following functions are available for use in expression:
========================= ====================================
Function Description
========================= ====================================
abs(x) Absolute value of x
acos(x) Inverse cosine of x
acosh(x) Inverse hyperbolic cosine of x
arange(start, stop, step) Array creation
arccos(x) Inverse cosine of x
arccosh(x) Inverse hyperbolic cosine of x
arcsin(x) Inverse sine of x
arcsinh(x) Inverse hyperbolic sine of x
arctan(x) Inverse tangent of x
asin(x) Inverse sine of x
asinh(x) Inverse hyperbolic sine of x
atan(x) Inverse tangent of x
cos(x) Cosine of x
cosh(x) Hyperbolic cosine of x
cross(x, y) Cross product of arrays x and y
dot(x, y) Dot-product of x and y
e Euler's number
erf(x) Error function
erfc(x) Complementary error function
exp(x) Exponential function
expm1(x) exp(x) - 1
factorial(x) Factorial of all numbers in x
fmax(x, y) Element-wise maximum of x and y
fmin(x, y) Element-wise minimum of x and y
inner(x, y) Inner product of arrays x and y
isinf(x) Element-wise detection of numpy.inf
isnan(x) Element-wise detection of numpy.nan
kron(x, y) Kronecker product of arrays x and y
linspace(x, y, N) Numpy linear spaced array creation
log(x) Natural logarithm of x
log10(x) Base-10 logarithm of x
log1p(x) log(1+x)
matmul(x, y) Matrix multiplication of x and y
maximum(x, y) Element-wise maximum of x and y
minimum(x, y) Element-wise minimum of x and y
ones(N) Create an array of ones
outer(x, y) Outer product of x and y
pi Pi
power(x, y) Element-wise x**y
prod(x) The product of all elements in x
sin(x) Sine of x
sinh(x) Hyperbolic sine of x
sum(x) The sum of all elements in x
round(x) Round all elements in x
tan(x) Tangent of x
tanh(x) Hyperbolic tangent of x
tensordot(x, y) Tensor dot product of x and y
zeros(N) Create an array of zeros
========================= ====================================
Full list is returned by `EvalString.available_symbols()`
Parameters
----------
expression : str
Interpretable Python statement. In addition to standard Python operators,
a subset of numpy and scipy functions is supported.
context : cosapp.core.module.Module or cosapp.core.numerics.basics.Residue
System or Residue defining the local context in which the statement will be executed
Notes
-----
The context is used to include the `System` ports, children, inwards and outwards.
If the context is a `Residue`, the reference value from the residue is added as
`residue_reference` variable in the execution context.
"""
# this dict will act as the global scope when we eval our expressions
__globals = {} # type: Dict[str, Any]
[docs]
@classmethod
def available_symbols(cls) -> Dict[str, Any]:
"""
List of available symbols (constants and functions) in current execution context.
Returns
-------
Dict[str, Any]
Mapping of available symbols by their name.
"""
mapping = cls.__globals
if len(mapping) > 0:
return mapping
def add_symbols(
module: object,
names: Optional[Iterable[str]]=None,
) -> None:
"""
Map attribute names from the given module into the global dict.
Parameters
----------
mod : object
Module to check.
names : iter of str, optional
If supplied, only map attrs that match the given names
"""
# nonlocal mapping
if names is None:
names = dir(module)
for name in names:
if isinstance(name, tuple):
name, alias = name
else:
alias = name
if not name.startswith("_"):
mapping[alias] = mapping[name] = getattr(module, name)
add_symbols(numpy,
names=[
# Numpy types
"int8",
"int16",
"int32",
"int64",
"uint8",
"uint16",
"uint32",
"uint64",
"float32",
"float64",
"complex64",
"complex128",
# Array creation
"array",
"asarray",
"arange",
"concatenate",
"ones",
"zeros",
"full",
"full_like",
"linspace",
# Constants
"e",
"pi",
"inf",
"isinf",
"isnan", # Logic
"log",
"log10",
"log1p",
"power",
# Math operations
"abs",
"sqrt",
"cbrt",
"exp",
"expm1",
"fmax",
"fmin",
"maximum",
"minimum",
"round",
"sum",
# Reductions
"prod",
"tensordot",
# Linear algebra
"matmul",
"cross",
"outer",
"inner",
"kron",
"dot",
# Trigo
"sin",
"cos",
"tan",
("arcsin", "asin"),
("arccos", "acos"),
("arctan", "atan"),
("arctan2", "atan2"),
"degrees",
"radians",
# Hyperbolic trigo
"sinh",
"cosh",
"tanh",
("arcsinh", "asinh"),
("arccosh", "acosh"),
("arctanh", "atanh"),
],
)
add_symbols(numpy.linalg, names=["norm"])
# if scipy is available, add few specials functions
try:
import scipy.special
except ImportError:
pass
else:
add_symbols(scipy.special,
names=["factorial", "erf", "erfc"]
)
return mapping
def __init__(self, expression: Any, context: System) -> None:
"""Class constructor.
Compiles an expression, and checks that it is evaluable within a given context.
"""
from cosapp.systems import System
if not isinstance(context, System):
cname = type(context).__name__
raise TypeError(
f"Object of type {cname!r} is not a valid context to evaluate expression '{expression}'."
)
self.__context = context
self.__str = self.string(expression) # type: str
if len(self.__str) == 0:
raise ValueError("Can't evaluate empty expressions")
# Visit expression from its AST
ast_from_str = ast.parse(self.__str).body[0]
if not isinstance(ast_from_str, (ast.Expression, ast.Expr)):
raise SyntaxError(f"Expression {self.__str} must be in a correct format.")
ast_visitor = AstVisitor(context)
ast_visited = ast.fix_missing_locations(ast_visitor.visit(ast_from_str.value))
code = compile(ast.Expression(ast_visited), "<string>", "eval") # type: CodeType
# Look for the requested variables
global_dict = self.available_symbols()
self.__locals = {}
# Modified variable names after passing in the ast visitor
self.__expr_vars = ast_visitor._expr_vars
self.__const_vars = frozenset({key.replace(".", "_"): value for key, value in context.properties.items()})
# Original variables names from the expression
self.__all_vars = frozenset(ast_visitor._vars)
self.__unconst_vars = frozenset(set(self.__all_vars) - set(context.properties))
if isinstance(expression, Enum):
etype = type(expression)
global_dict = global_dict.copy()
global_dict[etype.__name__] = etype
self.__constant = False
if set(self.__expr_vars).issubset(self.__const_vars):
self.__constant = True
else:
required = set(self.__expr_vars)
self.__constant = required and required.issubset(self.__const_vars)
if self.__constant:
value = eval(code, global_dict, self.locals)
# simply return constant value
eval_impl = lambda: value
else:
# By specifying global and local contexts, we limit the user scope.
eval_impl = lambda: eval(code, self.globals, self.locals)
self.__eval = eval_impl # type: Callable[[], Any]
[docs]
@staticmethod
def string(expression: Any) -> str:
"""Converts an expression into a suitably formatted string.
Notes
-----
Necessary step for numpy arrays (and possibly other types), whose plain string
conversion is not readily evaluable (hence the use of 'repr' instead of 'str').
Examples
--------
>>> str(numpy.array([0.1, 0.2]))
[0.1 0.2]
>>> repr(numpy.array([0.1, 0.2]))
array([0.1, 0.2])
"""
if isinstance(expression, str):
# Include a substitution pass to make sure that no spaces are left
# between objects and attributes, that is "foo.bar" instead of "foo . bar"
return re.sub("(?![0-9]) *\. *(?![0-9])", ".", expression.strip())
elif isinstance(expression, (EvalString, Enum)):
return str(expression)
else:
return repr(expression)
def __str__(self) -> str:
return self.__str
def __repr__(self) -> str:
return repr(self.__str)
def __contains__(self, pattern: str):
return (pattern in self.__str)
@property
def locals(self) -> Dict[str, Any]:
"""Dict[str, Any]: Context attributes required to evaluate the string expression."""
# Read attribute values from context or from variable reference mapping of the context
for key, (port, name) in self.__expr_vars.items():
self.__locals[key] = getattr(port, name)
return self.__locals
@property
def globals(self) -> Dict[str, Any]:
"""Dict[str, Any]: Global functions and variables required to evaluate the string expression."""
return EvalString.__globals
@property
def eval_context(self) -> System:
"""cosapp.systems.System: Context of string expression evaluation."""
return self.__context
@property
def constant(self) -> bool:
"""bool: `True` if evaluated expression is constant,
that is independent of its context; `False` otherwise.
"""
return self.__constant
[docs]
def eval(self) -> Any:
"""Evaluate the expression in the system context.
Returns
-------
Any
The result of the expression evaluation.
"""
return self.__eval()
@property
def variables(self) -> FrozenSet[str]:
"""FrozenSet[str]: Variables without system constant properties required for the evaluation of the expression."""
return self.__unconst_vars
@property
def all_variables(self) -> FrozenSet[str]:
"""FrozenSet[str]: All variables required for the evaluation of the expression."""
return self.__all_vars
@property
def constants(self) -> FrozenSet[str]:
"""FrozenSet[str]: System constant properties required for the evaluation of the expression."""
return self.__all_vars - self.__unconst_vars
[docs]
class AssignString:
"""Create an executable assignment of the kind 'lhs = rhs' from two evaluable expressions lhs and rhs.
"""
def __init__(self, lhs: str, rhs: Any, context: System) -> None:
lhs = EvalString(lhs, context)
if lhs.constant:
raise ValueError(
f"The left-hand side of an assignment expression cannot be constant ({lhs!r})")
# At this point, lhs is a valid expression within given context
self.__sides = None
self.__raw_sides = [str(lhs), str(rhs)] # raw sides lhs and rhs, without reformatting
value = lhs.eval()
if isinstance(value, numpy.ndarray):
self.__shape = value.shape
self.__dtype = value.dtype
else:
self.__shape = None
self.__dtype = type(value)
self.__context = context
self.__lhs_vars = lhs.variables
self.__rhs_vars = frozenset()
self.__locals = lhs.locals.copy()
self.__locals.update({"rhs_value": value, context.name: context})
assignment = f"{context.name}.{lhs!s} = rhs_value"
self.__code = compile(assignment, "<string>", "single") # assignment bytecode
self.rhs = rhs
@property
def eval_context(self) -> System:
"""cosapp.systems.System: Evaluation context of the assignment."""
return self.__context
@property
def lhs(self) -> str:
"""str: Left-hand side of the assignment."""
return self.__raw_sides[0]
@property
def lhs_variables(self) -> FrozenSet[str]:
"""FrozenSet[str]: set of variable names in left-hand side."""
return self.__lhs_vars
@property
def rhs_variables(self) -> FrozenSet[str]:
"""FrozenSet[str]: set of variable names in right-hand side."""
return self.__rhs_vars
@property
def contextual_lhs(self) -> str:
"""str: Contextual name of assignment left-hand side."""
return f"{self.__context.name}.{self.lhs}"
@property
def rhs(self) -> str:
"""str: Right-hand side of the assignment."""
return self.__raw_sides[1]
@rhs.setter
def rhs(self, rhs: Any) -> None:
context = self.eval_context
lhs = self.lhs
erhs = EvalString(rhs, context)
raw_rhs = str(erhs)
value = erhs.eval()
if erhs.constant:
rhs = EvalString.string(value)
else:
rhs = raw_rhs
sides = EvalString(f"({lhs}, {rhs})", context)
if self.__shape:
if value is None:
value = sides.eval()[1]
if isinstance(value, Number):
sides = EvalString(f"({lhs}, full_like({lhs}, {rhs}))", context)
else: # tentatively copy rhs into a numpy array
self.__check_size(value)
sides = EvalString(f"({lhs}, array({rhs}, dtype={self.__dtype}))", context)
self.__sides = sides
self.__constant = erhs.constant
self.__rhs_vars = erhs.variables
self.__raw_sides[1] = raw_rhs
@property
def constant(self) -> bool:
"""bool: `True` if assignment right-hand side is constant, `False` otherwise."""
return self.__constant
[docs]
def exec(self, context=None) -> Tuple[Any, bool]:
"""
Evaluates rhs, and executes assignment lhs <- rhs.
Returns
-------
Tuple[Any, bool]
(rhs, changed), where 'changed' is True if the value of rhs has changed, False otherwise.
"""
sides = self.__sides.eval() # updates context at the same time
changed = not numpy.array_equal(sides[0], sides[1])
if context is None:
context = self.__locals
context['rhs_value'] = sides[1]
exec(self.__code, dict(), context)
return sides[1], changed
@property
def shape(self) -> Union[Tuple[int, int], None]:
"""Union[Tuple[int, int], None]: shape of assigned object (lhs) if it is an array, else None."""
return self.__shape
[docs]
def variables(self) -> FrozenSet[str]:
"""Extracts all variables required for the assignment
Returns
-------
FrozenSet[str]:
Variable names as a set of strings
"""
return self.__lhs_vars.union(self.__rhs_vars)
def __str__(self) -> str:
return " = ".join(self.__raw_sides)
def __repr__(self) -> str:
cls_name = self.__class__.__qualname__
return "{}({!r}, {!r}, {})".format(cls_name, *self.__raw_sides, self.__context.name)
def __check_size(self, array) -> None:
"""Checks if `array` is shape-compatible with lhs."""
shape = self.__shape
if numpy.shape(array) != shape:
raise ValueError(f"Cannot assign {array} to array of shape {shape}")