Source code for cosapp.core.eval_str

"""
Module handling the execution of code provided as string by the user.

This code is inspired from the OpenMDAO module openmdao.components.exec_comp.
"""
from __future__ import annotations
import re
import numpy
import ast
from enum import Enum
from numbers import Number
from typing import (
    Union, Any, Dict, Tuple, FrozenSet,
    Iterable, Optional,
    TYPE_CHECKING,
)
from cosapp.ports.port import BasePort
if TYPE_CHECKING:
    from cosapp.systems import System


[docs] class AstVisitor(ast.NodeTransformer): """Visitor of AST python of the string expression to evaluate. Convert initial string expression with 'object.attributes' patterns to 'object_attributes' relatives allowing to handle them as unique variables without exploring the whole object tree during value update. Parameters ---------- - context : cosapp.systems.System CoSApp System to which variables in expression can be retrieved. """ def __init__(self, context: System): """Initialization parameters: ---------- - context : cosapp.systems.System CoSApp System to which variables in expression can be retrieved. """ self._context = context self._vars = set() self._expr_vars = {}
[docs] def map_attributes(self, attr: str, key: str = None) -> None: """Generic method to reach attribute getter mapping.""" key = attr if key is None else key self._expr_vars[key] = (self._context, attr) self._vars.add(attr) if attr in self._context.name2variable: var_ref = self._context.name2variable[attr] if isinstance(var_ref.mapping, BasePort): self._expr_vars[key] = (var_ref.mapping, var_ref.key)
[docs] def visit_Attribute(self, node: ast.Attribute) -> Union[ast.Attribute, ast.Name]: """Visit a `Attribute` and return a concatenate ast.Name if possible.""" # Determine the longest object.attributes path attr_str = ast.unparse(node) func_attr = '' max_iter = len(attr_str.split(".")) i = 0 while i < max_iter and not hasattr(self._context, attr_str): split_attr = attr_str.rsplit('.', maxsplit=1) attr_str = split_attr[0] func_attr = ".".join(split_attr[1:]) i += 1 # Collect variables from expression if attr_str and i != max_iter: key = attr_str.replace(".", "_") self.map_attributes(attr_str, key) # Return custom node if func_attr: ast_func = ast.parse(f"{key}.{func_attr}").body[0].value return ast_func else: return ast.Name(id=key, ctx=ast.Load()) return self.generic_visit(node)
[docs] def visit_Name(self, node: ast.Name) -> ast.Name: """Visit a `Name` and return a concatenate ast.Name if possible.""" if hasattr(self._context, node.id): self.map_attributes(node.id) return node
[docs] class EvalString: """Create a executable statement using an expression string. The following functions are available for use in expression: ========================= ==================================== Function Description ========================= ==================================== abs(x) Absolute value of x acos(x) Inverse cosine of x acosh(x) Inverse hyperbolic cosine of x arange(start, stop, step) Array creation arccos(x) Inverse cosine of x arccosh(x) Inverse hyperbolic cosine of x arcsin(x) Inverse sine of x arcsinh(x) Inverse hyperbolic sine of x arctan(x) Inverse tangent of x asin(x) Inverse sine of x asinh(x) Inverse hyperbolic sine of x atan(x) Inverse tangent of x cos(x) Cosine of x cosh(x) Hyperbolic cosine of x cross(x, y) Cross product of arrays x and y dot(x, y) Dot-product of x and y e Euler's number erf(x) Error function erfc(x) Complementary error function exp(x) Exponential function expm1(x) exp(x) - 1 factorial(x) Factorial of all numbers in x fmax(x, y) Element-wise maximum of x and y fmin(x, y) Element-wise minimum of x and y inner(x, y) Inner product of arrays x and y isinf(x) Element-wise detection of numpy.inf isnan(x) Element-wise detection of numpy.nan kron(x, y) Kronecker product of arrays x and y linspace(x, y, N) Numpy linear spaced array creation log(x) Natural logarithm of x log10(x) Base-10 logarithm of x log1p(x) log(1+x) matmul(x, y) Matrix multiplication of x and y maximum(x, y) Element-wise maximum of x and y minimum(x, y) Element-wise minimum of x and y ones(N) Create an array of ones outer(x, y) Outer product of x and y pi Pi power(x, y) Element-wise x**y prod(x) The product of all elements in x sin(x) Sine of x sinh(x) Hyperbolic sine of x sum(x) The sum of all elements in x round(x) Round all elements in x tan(x) Tangent of x tanh(x) Hyperbolic tangent of x tensordot(x, y) Tensor dot product of x and y zeros(N) Create an array of zeros ========================= ==================================== Full list is returned by `EvalString.available_symbols()` Parameters ---------- expression : str Interpretable Python statement. In addition to standard Python operators, a subset of numpy and scipy functions is supported. context : cosapp.core.module.Module or cosapp.core.numerics.basics.Residue System or Residue defining the local context in which the statement will be executed Notes ----- The context is used to include the `System` ports, children, inwards and outwards. If the context is a `Residue`, the reference value from the residue is added as `residue_reference` variable in the execution context. """ # this dict will act as the global scope when we eval our expressions __globals = {} # type: Dict[str, Any]
[docs] @classmethod def available_symbols(cls) -> Dict[str, Any]: """ List of available symbols (constants and functions) in current execution context. Returns ------- Dict[str, Any] Mapping of available symbols by their name. """ mapping = cls.__globals if len(mapping) > 0: return mapping def add_symbols( module: object, names: Optional[Iterable[str]]=None, ) -> None: """ Map attribute names from the given module into the global dict. Parameters ---------- mod : object Module to check. names : iter of str, optional If supplied, only map attrs that match the given names """ # nonlocal mapping if names is None: names = dir(module) for name in names: if isinstance(name, tuple): name, alias = name else: alias = name if not name.startswith("_"): mapping[alias] = mapping[name] = getattr(module, name) add_symbols(numpy, names=[ # Numpy types "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64", "float32", "float64", "complex64", "complex128", # Array creation "array", "asarray", "arange", "concatenate", "ones", "zeros", "full", "full_like", "linspace", # Constants "e", "pi", "inf", "isinf", "isnan", # Logic "log", "log10", "log1p", "power", # Math operations "abs", "sqrt", "cbrt", "exp", "expm1", "fmax", "fmin", "maximum", "minimum", "round", "sum", # Reductions "prod", "tensordot", # Linear algebra "matmul", "cross", "outer", "inner", "kron", "dot", # Trigo "sin", "cos", "tan", ("arcsin", "asin"), ("arccos", "acos"), ("arctan", "atan"), ("arctan2", "atan2"), "degrees", "radians", # Hyperbolic trigo "sinh", "cosh", "tanh", ("arcsinh", "asinh"), ("arccosh", "acosh"), ("arctanh", "atanh"), ], ) add_symbols(numpy.linalg, names=["norm"]) # if scipy is available, add few specials functions try: import scipy.special except ImportError: pass else: add_symbols(scipy.special, names=["factorial", "erf", "erfc"] ) return mapping
def __init__(self, expression: Any, context: System) -> None: """Class constructor. Compiles an expression, and checks that it is evaluable within a given context. """ from cosapp.systems import System if not isinstance(context, System): cname = type(context).__name__ raise TypeError( f"Object of type {cname!r} is not a valid context to evaluate expression '{expression}'." ) self.__context = context self.__str = self.string(expression) # type: str if len(self.__str) == 0: raise ValueError("Can't evaluate empty expressions") # Visit expression from its AST ast_from_str = ast.parse(self.__str).body[0] if not isinstance(ast_from_str, (ast.Expression, ast.Expr)): raise SyntaxError(f"Expression {self.__str} must be in a correct format.") ast_visitor = AstVisitor(context) ast_visited = ast.fix_missing_locations(ast_visitor.visit(ast_from_str.value)) code = compile(ast.Expression(ast_visited), "<string>", "eval") # type: CodeType # Look for the requested variables global_dict = self.available_symbols() self.__locals = {} # Modified variable names after passing in the ast visitor self.__expr_vars = ast_visitor._expr_vars self.__const_vars = frozenset({key.replace(".", "_"): value for key, value in context.properties.items()}) # Original variables names from the expression self.__all_vars = frozenset(ast_visitor._vars) self.__unconst_vars = frozenset(set(self.__all_vars) - set(context.properties)) if isinstance(expression, Enum): etype = type(expression) global_dict = global_dict.copy() global_dict[etype.__name__] = etype self.__constant = False if set(self.__expr_vars).issubset(self.__const_vars): self.__constant = True else: required = set(self.__expr_vars) self.__constant = required and required.issubset(self.__const_vars) if self.__constant: value = eval(code, global_dict, self.locals) # simply return constant value eval_impl = lambda: value else: # By specifying global and local contexts, we limit the user scope. eval_impl = lambda: eval(code, self.globals, self.locals) self.__eval = eval_impl # type: Callable[[], Any]
[docs] @staticmethod def string(expression: Any) -> str: """Converts an expression into a suitably formatted string. Notes ----- Necessary step for numpy arrays (and possibly other types), whose plain string conversion is not readily evaluable (hence the use of 'repr' instead of 'str'). Examples -------- >>> str(numpy.array([0.1, 0.2])) [0.1 0.2] >>> repr(numpy.array([0.1, 0.2])) array([0.1, 0.2]) """ if isinstance(expression, str): # Include a substitution pass to make sure that no spaces are left # between objects and attributes, that is "foo.bar" instead of "foo . bar" return re.sub("(?![0-9]) *\. *(?![0-9])", ".", expression.strip()) elif isinstance(expression, (EvalString, Enum)): return str(expression) else: return repr(expression)
def __str__(self) -> str: return self.__str def __repr__(self) -> str: return repr(self.__str) def __contains__(self, pattern: str): return (pattern in self.__str) @property def locals(self) -> Dict[str, Any]: """Dict[str, Any]: Context attributes required to evaluate the string expression.""" # Read attribute values from context or from variable reference mapping of the context for key, (port, name) in self.__expr_vars.items(): self.__locals[key] = getattr(port, name) return self.__locals @property def globals(self) -> Dict[str, Any]: """Dict[str, Any]: Global functions and variables required to evaluate the string expression.""" return EvalString.__globals @property def eval_context(self) -> System: """cosapp.systems.System: Context of string expression evaluation.""" return self.__context @property def constant(self) -> bool: """bool: `True` if evaluated expression is constant, that is independent of its context; `False` otherwise. """ return self.__constant
[docs] def eval(self) -> Any: """Evaluate the expression in the system context. Returns ------- Any The result of the expression evaluation. """ return self.__eval()
@property def variables(self) -> FrozenSet[str]: """FrozenSet[str]: Variables without system constant properties required for the evaluation of the expression.""" return self.__unconst_vars @property def all_variables(self) -> FrozenSet[str]: """FrozenSet[str]: All variables required for the evaluation of the expression.""" return self.__all_vars @property def constants(self) -> FrozenSet[str]: """FrozenSet[str]: System constant properties required for the evaluation of the expression.""" return self.__all_vars - self.__unconst_vars def __eq__(self, other: EvalString) -> bool: try: return self.__context is other.__context and self.__str == other.__str except: return False
[docs] class AssignString: """Create an executable assignment of the kind 'lhs = rhs' from two evaluable expressions lhs and rhs. """ def __init__(self, lhs: str, rhs: Any, context: System) -> None: lhs = EvalString(lhs, context) if lhs.constant: raise ValueError( f"The left-hand side of an assignment expression cannot be constant ({lhs!r})") # At this point, lhs is a valid expression within given context self.__sides = None self.__raw_sides = [str(lhs), str(rhs)] # raw sides lhs and rhs, without reformatting value = lhs.eval() if isinstance(value, numpy.ndarray): self.__shape = value.shape self.__dtype = value.dtype else: self.__shape = None self.__dtype = type(value) self.__context = context self.__lhs_vars = lhs.variables self.__rhs_vars = frozenset() self.__locals = lhs.locals.copy() self.__locals.update({"rhs_value": value, context.name: context}) assignment = f"{context.name}.{lhs!s} = rhs_value" self.__code = compile(assignment, "<string>", "single") # assignment bytecode self.rhs = rhs @property def eval_context(self) -> System: """cosapp.systems.System: Evaluation context of the assignment.""" return self.__context @property def lhs(self) -> str: """str: Left-hand side of the assignment.""" return self.__raw_sides[0] @property def lhs_variables(self) -> FrozenSet[str]: """FrozenSet[str]: set of variable names in left-hand side.""" return self.__lhs_vars @property def rhs_variables(self) -> FrozenSet[str]: """FrozenSet[str]: set of variable names in right-hand side.""" return self.__rhs_vars @property def contextual_lhs(self) -> str: """str: Contextual name of assignment left-hand side.""" return f"{self.__context.name}.{self.lhs}" @property def rhs(self) -> str: """str: Right-hand side of the assignment.""" return self.__raw_sides[1] @rhs.setter def rhs(self, rhs: Any) -> None: context = self.eval_context lhs = self.lhs erhs = EvalString(rhs, context) raw_rhs = str(erhs) value = erhs.eval() if erhs.constant: rhs = EvalString.string(value) else: rhs = raw_rhs sides = EvalString(f"({lhs}, {rhs})", context) if self.__shape: if value is None: value = sides.eval()[1] if isinstance(value, Number): sides = EvalString(f"({lhs}, full_like({lhs}, {rhs}))", context) else: # tentatively copy rhs into a numpy array self.__check_size(value) sides = EvalString(f"({lhs}, array({rhs}, dtype={self.__dtype}))", context) self.__sides = sides self.__constant = erhs.constant self.__rhs_vars = erhs.variables self.__raw_sides[1] = raw_rhs @property def constant(self) -> bool: """bool: `True` if assignment right-hand side is constant, `False` otherwise.""" return self.__constant
[docs] def exec(self, context=None) -> Tuple[Any, bool]: """ Evaluates rhs, and executes assignment lhs <- rhs. Returns ------- Tuple[Any, bool] (rhs, changed), where 'changed' is True if the value of rhs has changed, False otherwise. """ sides = self.__sides.eval() # updates context at the same time changed = not numpy.array_equal(sides[0], sides[1]) if context is None: context = self.__locals context['rhs_value'] = sides[1] exec(self.__code, dict(), context) return sides[1], changed
@property def shape(self) -> Union[Tuple[int, int], None]: """Union[Tuple[int, int], None]: shape of assigned object (lhs) if it is an array, else None.""" return self.__shape
[docs] def variables(self) -> FrozenSet[str]: """Extracts all variables required for the assignment Returns ------- FrozenSet[str]: Variable names as a set of strings """ return self.__lhs_vars.union(self.__rhs_vars)
def __str__(self) -> str: return " = ".join(self.__raw_sides) def __repr__(self) -> str: cls_name = self.__class__.__qualname__ return "{}({!r}, {!r}, {})".format(cls_name, *self.__raw_sides, self.__context.name) def __check_size(self, array) -> None: """Checks if `array` is shape-compatible with lhs.""" shape = self.__shape if numpy.shape(array) != shape: raise ValueError(f"Cannot assign {array} to array of shape {shape}")