"""Export a System as FMU."""
import itertools
import logging
import os
import shutil
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Union
from collections.abc import Collection
import numpy
from cosapp.core import __version__ as cosapp_version
from cosapp.drivers import RunSingleCase
from cosapp.drivers.abstractsolver import AbstractSolver
from cosapp.ports.port import BasePort
from cosapp.systems import System
from cosapp.utils.helpers import check_arg
from cosapp.utils.naming import natural_varname
from cosapp.utils.json import EncodingMetadata
try:
from pythonfmu import Fmi2Causality, Fmi2Variability, FmuBuilder as PyFmuBuilder
except ImportError:
class Fmi2Causality(Enum):
parameter = 0
calculatedParameter = 1
input = 2
output = 3
local = 4
class Fmi2Variability(Enum):
constant = 0
fixed = 1
tunable = 2
discrete = 3
continuous = 4
PyFmuBuilder = None
logger = logging.getLogger(__name__)
DECK_FILE_NAME = "system.json"
THIS_YEAR = datetime.now().year
[docs]
class TimeIntegrator(Enum):
"""Available time integrator for FMU export."""
EulerExplicit = "Euler explicit"
RK2 = "Runge-Kutta 2"
RK3 = "Runge-Kutta 3"
RK4 = "Runge-Kutta 4"
@property
def driver(self) -> str:
if self.name.startswith("RK"):
return "RungeKutta"
else:
return self.name
@property
def options(self) -> Dict[str, str]:
if self.name.startswith("RK"):
return dict(order=int(self.name[2:]))
else:
return dict()
[docs]
class VariableType(Enum):
Boolean = "bool"
Integer = "int"
Real = "float"
String = "bytes"
[docs]
class Variable(NamedTuple):
"""Variable attributes"""
name: str
pytype: str # Python type
fmutype: str # FMU type
causality: Fmi2Causality
variability: Optional[Fmi2Variability] = None
[docs]
class FmuBuilder:
"""CoSApp FMU builder"""
@staticmethod
def _add_variables(
vars: Dict[str, Any],
causality: Fmi2Causality,
variability: Optional[Fmi2Variability] = None,
) -> List[Variable]:
"""Transforms the to-included variable dictionary in
a list of Variable object for injection in the Jinja2 template.
Parameters
----------
vars : Dict[str, Any]
Dictionary of the to-be-included variables
causality : Fmi2Causality
Causality of the variables
variablility : Fmi2Variablity or None
Variability of the variables; default not specified
Returns
-------
List[Variable]
List of Variable object to be injected in the template.
"""
l_vars = list()
for name, value in vars.items():
try:
dtype = FmuBuilder._get_variable_type(value)
except TypeError as e:
raise TypeError(f"{e!s} for variable {name}")
l_vars.append(
Variable(
name,
dtype.value,
dtype.name,
causality.name,
getattr(variability, "name", None),
)
)
return l_vars
@staticmethod
def _get_default_value(names: Iterable[str], system: System) -> Dict[str, Any]:
"""Read the default value of the variable in the system.
Parameters
----------
names: Iterable[str]
Variable to look for
system: System
CoSApp system in from which the values need to be taken
Returns
-------
Dict[str, Any] : Dictionary of the variable name (key) with their value (value)
"""
values = dict()
for name in names:
try:
values[name] = eval(f"master.{name}", {"master": system})
except (AttributeError, IndexError, KeyError):
raise ValueError(
f"Variable {name} not found in the system '{system.name}'."
)
return values
@staticmethod
def _get_default_variables(
ports: Dict[str, BasePort], to_skip: Iterable[str]
) -> Dict[str, Any]:
"""Get the default variable list from a port list.
Variable names will be `port`.`variable_name` except for
inwards and outwards (it will be `variable_name` directly).
Only variables of type supported by FMI are listed.
Parameters
----------
ports : Dict[str, BasePort]
List of port to extract variables from
to_skip : Iterable[str]
List of variable names to ignore
Returns
-------
Dict[str, Any]
Return a dictionary of the variables to be included with their
default value.
"""
vars = dict()
for port_name, port in ports.items():
for name in port:
full_name = natural_varname(f"{port_name}.{name}")
if full_name in to_skip:
continue
value = port[name]
try:
FmuBuilder._get_variable_type(value)
except TypeError:
logger.debug(
f"Variable {full_name!r} has unsupported type for FMI."
)
else:
vars[full_name] = value
return vars
@staticmethod
def _get_documentation_folder(dest: Path) -> Path:
"""Path: The subfolder containing the documentation files."""
doc_folder = dest / "documentation"
doc_folder.mkdir(parents=True, exist_ok=True)
return doc_folder
@staticmethod
def _get_project_folder(dest: Path) -> Path:
"""Path: The subfolder containing the project files."""
project_folder = dest / "project_files"
project_folder.mkdir(parents=True, exist_ok=True)
return project_folder
@staticmethod
def _get_script_file(dest: Path, system: System, suffix: str) -> Path:
"""Path: Facade module file path."""
class_name = type(system).__name__
path = FmuBuilder._get_project_folder(dest)
filename = path / f"{class_name.lower()}{suffix}.py"
filename.parent.mkdir(parents=True, exist_ok=True)
return filename
@staticmethod
def _get_variable_type(value: Any) -> VariableType:
"""Get the FMI variable type of a value.
Parameters
----------
value : Any
Value to test
Returns
-------
VariableType
Type of the variable
Raises
------
TypeError
For unsupported variable types.
"""
if isinstance(value, (bool, numpy.bool_)):
dtype = VariableType.Boolean
elif isinstance(value, (int, numpy.int_)):
dtype = VariableType.Integer
elif isinstance(value, (float, numpy.float64)):
dtype = VariableType.Real
elif isinstance(value, (str, numpy.str_)):
dtype = VariableType.String
elif isinstance(value, numpy.ndarray):
if value.ndim != 0:
raise TypeError("Unsupported numpy array")
if numpy.issubdtype(value.dtype, numpy.dtype(bool)):
dtype = VariableType.Boolean
elif numpy.issubdtype(value.dtype, numpy.dtype(int)):
dtype = VariableType.Integer
elif numpy.issubdtype(value.dtype, numpy.dtype(float)):
dtype = VariableType.Real
elif numpy.issubdtype(value.dtype, numpy.dtype(str)):
dtype = VariableType.String
else:
raise TypeError(f"Unsupported numpy dtype {value.dtype}")
else:
dtype = type(value)
raise TypeError(f"Unsupported type {dtype.__qualname__}")
return dtype
[docs]
@staticmethod
def generate_fmu_facade(
system: System,
inputs: Iterable[str] = None,
parameters: Iterable[str] = None,
outputs: Iterable[str] = None,
locals: Iterable[str] = None,
time_integrator: Union[TimeIntegrator, str] = TimeIntegrator.RK4,
nonlinear_solver: Optional[AbstractSolver] = None,
dest: Union[Path, str] = os.curdir,
python_env: Union[Path, str, None] = None,
version: Optional[str] = None,
author: Optional[str] = None,
description: Optional[str] = None,
copyright: Optional[str] = "",
license: Optional[str] = "",
fmu_name_suffix: str = "",
) -> Path:
"""Export a system as CoSimulation FMU respecting FMI 2.0.
Inputs, parameters, outputs and locals are by default taken from inputs,
inwards, outputs and outwards variables of the CoSApp system, respectively. If you wish
to set an empty list, set the corresponding argument with a empty dictionary.
You can add a mathematical problem to the system by passing the proper
nonlinear_solver. If it contains a RunSingleCase, its equations will be
passed to the FMU.
Parameters
----------
- system : System
System to be exported
- inputs : Iterable[str], optional
List of input variables with initial values; default None
- parameters : Iterable[str], optional
List of parameter variables with initial values; default None
- outputs : Iterable[str], optional
List of output variables with initial values; default None
- locals : Iterable[str], optional
List of local variables with initial values; default None
- time_integrator : TimeIntegrator, optional
Time integrator algorithm; default Runge-Kutta 4th order
- nonlinear_solver : AbstractSolver, optional
Non linear Driver to use for solving the system at a given instant
- dest : str or Path, optional
Destination folder; default current directory
- python_env : str or Path, optional
File listing the python dependency; default None
- version : str, optional
FMU version; default None
- author : str, optional
FMU author; default None
- description : str, optional
System description; default None
- license : str, optional
FMU license; default ""
- copyright : str, optional
FMU copyright; default ""
- fmu_name_suffix : str, optional
FMU name; default ""
Returns
-------
pathlib.Path
Folder path containing FMU facade files
"""
# Optional dependencies
try:
from jinja2 import Environment, PackageLoader
except ImportError:
raise ImportError("jinja2 needs to be installed to export a System as FMU.")
# Check the arguments
nonetype = type(None)
check_arg(system, "system", System)
check_arg(inputs, "inputs", (Collection, nonetype))
check_arg(parameters, "parameters", (Collection, nonetype))
check_arg(outputs, "outputs", (Collection, nonetype))
check_arg(locals, "locals", (Collection, nonetype))
check_arg(time_integrator, "time_integrator", (TimeIntegrator, str))
check_arg(nonlinear_solver, "nonlinear_solver", (AbstractSolver, nonetype))
check_arg(dest, "dest", (Path, str))
check_arg(python_env, "python_env", (Path, str, nonetype))
local_vars = dict()
options = {
"version": version,
"author": author,
"description": description,
"copyright": copyright,
"license": license,
}
for name, value in options.items():
if value is not None:
check_arg(value, name, str)
local_vars[name] = value
# Convert argument
time_integrator = TimeIntegrator(time_integrator)
temp_dest = Path(dest)
# Generate the parameters
system_type = type(system)
params = {
"module_name": system_type.__module__,
"class_name": system_type.__qualname__,
"driver": time_integrator.driver,
"time_options": time_integrator.options,
# TODO 'nl_options': dict(),
"variables": list(),
"class_attrs": dict(),
"system_file": DECK_FILE_NAME,
}
for name, value in local_vars.items():
params["class_attrs"][name] = value
# TODO FMI supports to set for CoSimulation the DefaultExperiment.stepSize
# that defines the preferred communicationStepSize
user_list = set(
itertools.chain(inputs or [], parameters or [], outputs or [], locals or [])
)
if inputs is None:
inward_port = system.inputs.pop(System.INWARDS)
inputs = FmuBuilder._get_default_variables(system.inputs, user_list)
system.inputs[System.INWARDS] = inward_port # Restore popped port
else:
inputs = FmuBuilder._get_default_value(inputs, system)
if parameters is None:
parameters = FmuBuilder._get_default_variables(
{System.INWARDS: system.inputs[System.INWARDS]}, user_list
)
else:
parameters = FmuBuilder._get_default_value(parameters, system)
if outputs is None:
outward_port = system.outputs.pop(System.OUTWARDS)
outputs = FmuBuilder._get_default_variables(system.outputs, user_list)
system.outputs[System.OUTWARDS] = outward_port
else:
outputs = FmuBuilder._get_default_value(outputs, system)
if locals is None:
locals = FmuBuilder._get_default_variables(
{System.OUTWARDS: system.outputs[System.OUTWARDS]}, user_list
)
else:
locals = FmuBuilder._get_default_value(locals, system)
params["variables"].extend(
FmuBuilder._add_variables(inputs, Fmi2Causality.input)
)
params["variables"].extend(
FmuBuilder._add_variables(
parameters, Fmi2Causality.parameter, Fmi2Variability.tunable
)
)
params["variables"].extend(
FmuBuilder._add_variables(outputs, Fmi2Causality.output)
)
params["variables"].extend(
FmuBuilder._add_variables(locals, Fmi2Causality.local)
)
if nonlinear_solver is not None:
subdrivers = list(nonlinear_solver.children.values())
error_msg = "The nonlinear solver may have at most one sub-driver, of type `RunSingleCase`"
if len(subdrivers) > 1:
names = list(map(lambda driver: driver.name, subdrivers))
raise ValueError(f"{error_msg}; found sub-drivers {names}")
# Assemble solver problem
try:
problem = nonlinear_solver.raw_problem.copy()
except:
problem = system.new_problem('design')
try:
driver = subdrivers[0]
except IndexError:
pass
else:
if not isinstance(driver, RunSingleCase):
raise TypeError(f"{error_msg}; got {driver!r}")
# Note:
# Since `RunSingleCase` child is unique, a simple merging
# of the various mathematical problems by extension works.
# It would not be the case in a multi-point design problem,
# for example, where off-design, design and local problems
# must be assembled with care.
problem.extend(driver.offdesign)
problem.extend(driver.design)
params["problem"] = problem.to_dict()
env = Environment(
loader=PackageLoader("cosapp.tools", "templates"),
trim_blocks=True,
lstrip_blocks=True,
)
template = env.get_template("pythonfmu.j2")
rendered_script = template.render(params)
filename = FmuBuilder._get_script_file(temp_dest, system, fmu_name_suffix)
with open(filename, mode="w", encoding="utf-8") as f:
f.write(rendered_script)
project_folder = FmuBuilder._get_project_folder(temp_dest)
deck_file = project_folder / DECK_FILE_NAME
system.save(deck_file, encoding_metadata=EncodingMetadata(with_drivers=False, value_only=True)) # Save the customized system
# Create default environment if it is not provided
if python_env is None:
python_env = project_folder / "requirements.txt"
python_env = Path(python_env)
if not python_env.exists():
logger.info("Create default package list.")
package_list = {"cosapp": cosapp_version}
python_env.write_text(
"\n".join(
f"{name}~={version}" for name, version in package_list.items()
)
)
doc_folder = FmuBuilder._get_documentation_folder(temp_dest)
license_file = None
if license is not None and license.endswith("Proprietary"):
company = license[: -len("Proprietary")].strip()
license_file = doc_folder / "licenses" / "license.txt"
license_file.parent.mkdir(parents=True, exist_ok=True)
license_file.write_text(
f"Copyright © {THIS_YEAR} {company} - All Rights Reserved\n"
f"Unauthorized copying and/or distribution of this source-code, via any medium "
f"is strictly prohibited\nwithout the express permission of {company}.\n"
"Proprietary and confidential"
)
return temp_dest
[docs]
@staticmethod
def to_fmu(
system: System,
inputs: Iterable[str] = None,
parameters: Iterable[str] = None,
outputs: Iterable[str] = None,
locals: Iterable[str] = None,
time_integrator: Union[TimeIntegrator, str] = TimeIntegrator.RK4,
nonlinear_solver: Optional[AbstractSolver] = None,
dest: Union[Path, str] = os.curdir,
python_env: Union[Path, str, None] = None,
project_files: Iterable[Union[Path, str]] = set(),
version: Optional[str] = None,
author: Optional[str] = None,
description: Optional[str] = None,
copyright: Optional[str] = "",
license: Optional[str] = "",
fmu_name_suffix: str = "",
) -> Path:
"""Export a system as CoSimulation FMU respecting FMI 2.0.
Inputs, parameters, outputs and locals are by default taken from inputs,
inwards, outputs and outwards variables of the CoSApp system, respectively. If you wish
to set an empty list, set the corresponding argument with a empty dictionary.
You can add a mathematical problem to the system by passing the proper
nonlinear_solver. If it contains a RunSingleCase, its equations will be
passed to the FMU.
Parameters
----------
system : System
System to be exported
inputs : Iterable[str], optional
List of input variables with initial values; default None
parameters : Iterable[str], optional
List of parameter variables with initial values; default None
outputs : Iterable[str], optional
List of output variables with initial values; default None
locals : Iterable[str], optional
List of local variables with initial values; default None
time_integrator : TimeIntegrator, optional
Time integrator algorithm; default Runge-Kutta 4th order
nonlinear_solver : AbstractSolver, optional
Non linear Driver to use for solving the system at a given instant
dest : str or Path, optional
Destination folder; default current directory
python_env : str or Path, optional
File listing the python dependency; default None
project_files : Iterable of str or Path, optional
List of additional files to be included in the FMU; default no additional files
version : str, optional
FMU version; default None
author : str, optional
FMU author; default None
description : str, optional
System description; default None
license : str, optional
FMU license; default ""
copyright : str, optional
FMU copyright; default ""
fmu_name_suffix : str, optional
FMU name suffix; default ""
Returns
-------
pathlib.Path
FMU file path object
"""
dest = Path(dest)
temp_dest = dest / "tmp"
temp_dest = FmuBuilder.generate_fmu_facade(
system,
inputs,
parameters,
outputs,
locals,
time_integrator,
nonlinear_solver,
temp_dest,
python_env,
version,
author,
description,
copyright,
license,
fmu_name_suffix,
)
if PyFmuBuilder is None:
logger.warning(
"pythonfmu needs to be installed to package the simulation as FMU."
)
return dest
project_folder = FmuBuilder._get_project_folder(temp_dest)
project_files = set(map(Path, project_files))
project_files.update(project_folder.glob("*"))
documentation_folder = FmuBuilder._get_documentation_folder(temp_dest)
fmu_path = PyFmuBuilder.build_FMU(
FmuBuilder._get_script_file(temp_dest, system, fmu_name_suffix),
dest=dest,
project_files=project_files,
documentation_folder=documentation_folder,
needsExecutionTool="false",
canGetAndSetFMUstate="false",
)
shutil.rmtree(temp_dest)
filename = str(fmu_path)
if filename.endswith("FMU.fmu"):
# Suppress rightmost occurrence of "FMU"
new_filename = "".join(filename.rsplit("FMU", 1))
fmu_path = Path(
shutil.move(filename, new_filename)
)
return fmu_path
to_fmu = FmuBuilder.to_fmu