Source code for cosapp.drivers.optimizer

"""
`Driver`s for `System` optimization calculation.
"""
import numpy
import scipy.optimize
import warnings
import copy
from numbers import Number
from typing import (
    Union, Iterable, Callable, Optional,
    Any, Sequence, Dict, List, Set, Tuple,
)
from collections.abc import Collection

from cosapp.core.eval_str import EvalString
from cosapp.core.numerics.basics import MathematicalProblem, SolverResults
from cosapp.core.numerics.boundary import Unknown
from cosapp.drivers.abstractsolver import AbstractSolver
from cosapp.drivers.optionaldriver import OptionalDriver
from cosapp.drivers.utils import UnknownAnalyzer, ConstraintParser
from cosapp.recorders.recorder import BaseRecorder
from cosapp.utils.options_dictionary import OptionsDictionary
from cosapp.utils.helpers import check_arg

import logging
logger = logging.getLogger(__name__)


# TODO
# [ ] Pull unknowns from Systems
[docs]class Optimizer(AbstractSolver): """Driver running an optimization problem on its `System` owner. In general, the optimization problems are of the form:: minimize f(x) subject to g_i(x) >= 0, i = 1,...,m h_j(x) = 0, j = 1,...,p where ``x`` is a vector of one or more variables. ``g_i(x)`` are the inequality constraints. ``h_j(x)`` are the equality constrains. Optionally, the lower and upper bounds for each element in x can also be specified. Parameters ---------- name : str Name of the driver owner : System, optional :py:class:`~cosapp.systems.system.System` to which driver belongs; defaults to `None` **kwargs : Any Keyword arguments will be used to set driver options Attributes ---------- name : str Name of the driver parent : Driver, optional Top driver containing this driver; default None. owner : System :py:class:`~cosapp.systems.system.System` to which this driver belong. children : OrderedDict[Driver] Drivers belonging to this one. options : OptionsDictionary | Options for the current driver | **verbose** : int, {0, 1} | Verbosity level of the driver; default 0 (i.e. minimal information) | **eps** : float, [1.5e-8, 1.] | Step size used for numerical approximation of the Jacobian; default 1.5e-8 | **ftol** : float, [1.5e-8, 1.] | Iteration termination criteria (f^k - f^{k+1})/max{\|f^k\|,\|f^{k+1}\|,1} <= ftol; | default 1e-6 | **max_iter** : int, [1, [ | Maximum number of iterations; default 100 solution : # TODO Notes ----- This optimizer is a wrapper around ``scipy.optimize.minimize`` function. For more details please refer to: https://docs.scipy.org/doc/scipy-1.0.0/reference/generated/scipy.optimize.minimize.html """ __slots__ = ('_constraints', '_raw_constraints', '_initial_state', '_objective') def __init__(self, name: str, owner: Optional["cosapp.systems.System"] = None, **kwargs ) -> None: """Initialize driver Parameters ---------- name: str, optional Name of the `Driver`. owner: System, optional :py:class:`~cosapp.systems.system.System` to which this driver belong; defaults to `None`. **kwargs: Additional keywords arguments forwarded to base class. """ super().__init__(name, owner, **kwargs) # TODO we need to move this in an enhanced MathematicalProblem self._raw_constraints: Set[str] = set() # Human-readable constraints self._constraints: List[Dict] = list() # Non-negativity constraints self._initial_state: Dict[str, Any] = dict() self._objective: EvalString = None self.options.declare( name = 'method', default = None, values = self.available_methods(), desc = ( "Type of solver." " If not given, chosen to be one of 'BFGS', 'L-BFGS-B', 'SLSQP'," " depending if the problem has constraints or bounds." ), allow_none = True, ) self.options.declare( 'eps', 2**(-26), dtype=float, lower=2**(-26), upper=1., desc="Step size used for numerical approximation of the Jacobian.", ) self.options.declare( 'ftol', 1.5e-8, dtype=float, lower=1e-15, upper=1., desc="Iterations stop when (f^k - f^{k+1}) / max(|f^k|, |f^{k+1}|, 1) <= ftol.", ) self.options.declare( 'maxiter', 100, dtype=int, lower=1, desc='Maximum number of iterations.', ) self.options.declare( 'monitor', False, dtype=bool, allow_none=False, desc="Defines if intermediate system state should be recorded.", ) self._filter_options(kwargs, aliases={'tol': 'ftol', 'max_iter': 'maxiter'})
[docs] def set_minimum(self, expression: str) -> None: """Set the scalar objective function to be minimized. Parameters ---------- expression : str The objective expression to be minimized. """ self.__check_owner("objective") check_arg(expression, "expression", str, lambda s: "==" not in s) self._objective = objective = EvalString(expression, self.owner) if objective.constant: warnings.warn(f"Objective is constant {objective.eval()}")
[docs] def set_maximum(self, expression: str) -> None: """Set the scalar objective function to be maximized. Parameters ---------- expression : str The objective expression to be maximized. """ self.set_minimum(f"-({expression})")
[docs] def set_objective(self, expression: str) -> None: """Set the scalar quantity to be minimized. Same as `set_minimum`. Note: ----- This method is deprecated, and should be replaced by either `set_minimum` or `set_maximum`, depending on the objective. Parameters ---------- expression : str The objective expression to be minimized. """ warnings.warn( "method `set_objective` is deprecated; use `set_minimum` or `set_maximum` instead." ) self.set_minimum(expression)
[docs] @staticmethod def available_methods() -> List[str]: """Returns all possible values of option `method`. For more information, please refer to the online documentation of `scipy.optimize.minimize`. """ return [ 'Nelder-Mead', 'Powell', 'CG', 'BFGS', 'Newton-CG', 'L-BFGS-B', 'TNC', 'COBYLA', 'SLSQP', 'dogleg', 'trust-constr', 'trust-ncg', 'trust-exact', 'trust-krylov', ]
@property def objective(self): try: return self._objective.eval() except: return None @property def objective_expr(self) -> str: return str(self._objective) if self._objective else None
[docs] def add_unknown(self, name: Union[str, Iterable[Union[dict, str, Unknown]]], max_abs_step: Number = numpy.inf, max_rel_step: Number = numpy.inf, lower_bound: Number = -numpy.inf, upper_bound: Number = numpy.inf ) -> "MathematicalProblem": """Add unknown variables. You can set variable one by one or provide a list of dictionary to set multiple variable at once. The dictionary key are the arguments of this method. Parameters ---------- name : str or Iterable of dictionary or str Name of the variable or list of variable to add max_rel_step : float, optional Maximal relative step by which the variable can be modified by the numerical solver; default numpy.inf max_abs_step : float, optional Maximal absolute step by which the variable can be modified by the numerical solver; default numpy.inf lower_bound : float, optional Lower bound on which the solver solution is saturated; default -numpy.inf upper_bound : float, optional Upper bound on which the solver solution is saturated; default numpy.inf Returns ------- MathematicalProblem The modified MathematicalSystem """ self.__check_owner("variables") self._raw_problem.add_unknown(name, max_abs_step, max_rel_step, lower_bound, upper_bound)
[docs] def add_constraints(self, expression: Union[str, List[str]]) -> None: """Add constraints to the optimization problem. Parameters ---------- - expression [str or List[str]]: Human-readable equality or inequality constraints, such as 'x >= y**2', '0 < alpha < 1', 'a == b', or a list thereof. Note ---- Expressions are parsed into non-negative constraints in the optimization problem. Strict inequalities are not enforced, and treated as non-strict inequalities. For instance, `x < y` translates into: `y - x >= 0`. """ self.__check_owner("constraints") check_arg(expression, 'expression', (str, Collection)) if isinstance(expression, str): expression = [expression] self._raw_constraints.update(expression) constraints = ConstraintParser.parse(expression) for constraint in constraints: # Test that expression can be evaluated try: evalstr = EvalString(constraint.expression, self.owner) except: raise data = dict( type = 'ineq' if constraint.is_inequality else 'eq', expr = evalstr, ) self._constraints.append(data)
@property def constraints(self) -> Set[str]: """Set[str]: representation of optimization constraints.""" return self._raw_constraints.copy() def __check_owner(self, kind: str) -> None: if self.owner is None: raise AttributeError(f"Owner system is required to define optimization {kind}.")
[docs] def setup_run(self): """Method called once before starting any simulation.""" super().setup_run() # Resolve unknown aliasing and connected unknowns analyzer = UnknownAnalyzer(self.owner) self.problem = analyzer.filter_problem(self._raw_problem)
def _fun_wrapper(self, expression: EvalString) -> Callable[[numpy.ndarray], float]: """Wrapper around objective and constraint expression to propagate the x values in the `System` owner. Parameters ---------- expression : EvalString The expression to be evaluated. Returns ------- Callable[[numpy.ndarray], float] Callable function usable by scipy.optimize.minimize """ def wrapper(x: numpy.ndarray, *args) -> float: """Wrapper around objective and constraint function to propagate the x values in the `System` owner. Parameters ---------- x : 1D-scalar numpy.ndarray Points at which the function needs to be evaluated args : Tuple Any additional fixed parameters needed to completely specify the function Returns ------- Float Value of the function for x """ # Propagate x value in the System owner, if it has changed counter = 0 need_update = False for unknown in self.problem.unknowns.values(): if unknown.mask is None: unknown.set_default_value(x[counter]) counter += 1 else: n = numpy.count_nonzero(unknown.mask) unknown.set_default_value(x[counter:counter + n]) counter += n # Set the variable to the new x if not numpy.array_equal(unknown.value, unknown.default_value): unknown.set_to_default() need_update = True if need_update: for child in self.exec_order: self.children[child].run_once() return expression.eval() return wrapper def _fresidues(self, x: numpy.ndarray) -> float: """ Method used by the solver to take free variables values as input and values of the objective function (after running the System). Parameters ---------- x : numpy.ndarray The list of values to set to the free variables of the `System` Returns ------- float Objective function value """ x = numpy.asarray(x) logger.debug(f"Call fresidues with x = {x!r}") self.set_iteratives(x) if len(self.children) > 0: for subdriver in self.children.values(): subdriver.run_once() else: self.owner.run_children_drivers() objective = self._objective.eval() logger.debug(f"Objective: {objective!r}") return objective
[docs] def set_iteratives(self, x: Sequence[float]) -> None: x = numpy.asarray(x) counter = 0 for unknown in self.problem.unknowns.values(): if unknown.mask is None: unknown.set_default_value(x[counter]) counter += 1 else: n = numpy.count_nonzero(unknown.mask) unknown.set_default_value(x[counter : counter + n]) counter += n # Set variable to new x if not numpy.array_equal(unknown.value, unknown.default_value): unknown.set_to_default()
[docs] def resolution_method(self, fresidues: Callable[[Sequence[float]], float], x0: numpy.ndarray, args: Tuple[Union[float, str], bool] = (), options: Optional[OptionsDictionary] = None, bounds = None, constraints = None, ) -> SolverResults: """Function call to cancel the residues. Parameters ---------- fresidues : Callable[[Sequence[float], Union[float, str]], float] Residues function taking two parameters (evaluation vector, time/ref) and returning the residues x0 : numpy.ndarray The initial values vector to converge to the solution args : Tuple[Union[float, str], bool], optional A tuple of additional argument for fresidues starting with the time/ref parameter options : OptionsDictionary, optional Options for the numerical resolution method Returns ------- SolverResults Solution container """ sub_options = { # Specify both `ftol` and `gtol`, as name varies among scipy solvers 'ftol': options['ftol'], 'gtol': options['ftol'], # `ftol` is not understood by unconstrained solvers 'eps': options['eps'], 'maxiter': options['maxiter'], 'disp': bool(options['verbose']), } def callback(*args, **kwargs) -> bool: self._record_data() return False with warnings.catch_warnings(): # Ignore warnings about `gtol` or `ftol` potentially emitted by scipy solver warnings.filterwarnings("ignore", message="Unknown solver options: [fg]tol") output = scipy.optimize.minimize( fresidues, x0, args=args, tol=options['ftol'], method=options['method'], bounds=bounds, constraints=constraints, options=sub_options, callback=callback, ) if output.fun < self.objective: # Solver solution does not correspond to # last system execution; rerun to synch. self._fresidues(output.x) self._record_data() return output
def _precompute(self): """List unknowns and gather initial values.""" super()._precompute() OptionalDriver.set_inhibited(True) init = numpy.empty(0) for name, unknown in self.problem.unknowns.items(): if not self.force_init and name in self.solution: # We ran successfully at least once and are environmental friendly data = self.solution[name] else: # User wants the init or first simulation or crash try: boundary = self._initial_state[name] except KeyError: data = copy.deepcopy(unknown.value) else: umask = unknown.mask if unknown.mask is not None else numpy.empty(0) bmask = boundary.mask if boundary.mask is not None else numpy.empty(0) if not numpy.array_equal(umask, bmask): raise ValueError( f"Unknown and initial conditions on {unknown.name!r} are not masked equally" ) data = copy.deepcopy(boundary.default_value) # self._initial_state[name] = boundary init = numpy.append(init, data) self.initial_values = init
[docs] def compute(self) -> None: """Execute the optimization.""" self.status = '' self.error_code = '0' # Check that objective is set if self._objective is None: self.status = 'ERROR' self.error_code = '9' raise ArithmeticError( "Optimization objective was not specified." ) # Gather and simplify the bounds limits = self._get_solver_limits() unique_lower = numpy.unique(limits['lower_bound']) unique_upper = numpy.unique(limits['upper_bound']) def get_bounds(lower, upper) -> tuple: return ( None if lower == -numpy.inf else lower, None if upper == numpy.inf else upper ) if unique_lower.size == 1 and unique_upper.size == 1: bounds = get_bounds(unique_lower[0], unique_upper[0]) if bounds == (None, None): bounds = None else: bounds = tuple() if bounds is not None: bounds = [ get_bounds(lower, upper) for lower, upper in zip(limits['lower_bound'], limits['upper_bound']) ] # Create nonlinear constraints # TODO The following is really ugly. Constraints should be merged # from children through MathematicalProblem extension. def format_constraint(constraint): return { 'type': constraint['type'], 'fun': self._fun_wrapper(constraint['expr']) } constraints = tuple(map(format_constraint, self._constraints)) if len(self.initial_values) > 0: monitored = self.options['monitor'] BaseRecorder.paused = not monitored if monitored: self._fresidues(self.initial_values) self._record_data() results = self.resolution_method( self._fresidues, self.initial_values, args = (), bounds = bounds, constraints = constraints, options = self.options, ) if not results.success: self.status = 'ERROR' self.error_code = '9' self.solution = {} logger.error(f"The solver failed: {results.message}") else: self.solution = dict( (key, unknown.default_value) for key, unknown in self.problem.unknowns.items() ) self._print_solution() if not monitored: BaseRecorder.paused = False self._record_data() else: logger.warning('No design variable has been specified for the optimization.')
def _record_data(self) -> None: """Record data into recorder, if any.""" if self._recorder is not None: self._recorder.record_state(self.name) def _postcompute(self) -> None: """Undo pull inputs and reset iteratives sets.""" OptionalDriver.set_inhibited(False) super()._postcompute() def _print_solution(self) -> None: # TODO better returning a string """Print the solution in the log.""" if self.options['verbose']: logger.info(f"Objective function: {self._objective.eval():.5g}") logger.info(f"Parameters [{len(self.solution)}]: ") for name, value in self.solution.items(): logger.info(f" # {name}: {value}") constraints = self._constraints if constraints: logger.info(f"Constraints [{len(constraints)}]: ") for constraint in constraints: expr = constraint['expr'] logger.info(f" # {expr}: {expr.eval()}") def _repr_markdown_(self) -> str: """Mardown representation of optimization problem.""" itemize = lambda item: f"* {item}" def section(header: str, collection: Union[str, List[str]]) -> List[str]: content = [] if isinstance(collection, str): collection = [collection] if collection: content.append(f"#### {header.title()}:\n") content.extend(map(itemize, collection)) content.append("") return content lines = [] def add_section(header, collection) -> None: nonlocal lines lines.extend(section(header, collection)) add_section("Objective", f"Minimize {self._objective}") add_section("Unknowns", self._raw_problem.unknowns) add_section("Constraints", self._raw_constraints) return "\n".join(lines)