Source code for cosapp.tools.problem_viewer.problem_viewer

import os
import json

from typing import Any, Dict, Union

from collections import OrderedDict
import logging
import base64

from cosapp.systems import System
from cosapp.ports.port import BasePort, ExtensiblePort

from cosapp.utils.helpers import check_arg

logger = logging.getLogger(__name__)


def _get_tree_dict(
    obj: Union[System, BasePort],
    include_orphan_vars=True
) -> Dict[str, Any]:
    """Get a dictionary representation of the system hierarchy.
    """
    def init_tree(obj, subtype):
        return OrderedDict(
            name = obj.name,
            type = 'subsystem',
            subsystem_type = subtype,
        )

    def port_tree(port):
        tree_dict = init_tree(port, 'component')
        tree_dict['children'] = children = list()

        if include_orphan_vars or not isinstance(port, ExtensiblePort):
            dtype = 'param' if port.is_input else 'unknown'
            children.extend(
                map(lambda n: OrderedDict(name=n, type=dtype), iter(port))
            )
        return tree_dict

    def system_tree(system):
        tree_dict = init_tree(system, 'group')
        tree_dict['children'] = children = list()

        def add_ports(ports):
            nonlocal children
            children.extend(port_tree(port)
                for port in filter(lambda p: len(p) > 0, ports)
            )

        add_ports(system.inputs.values())

        children.extend(system_tree(child)
            for child in system.children.values()
        )
        # outputs come after child list for a matter of visualization
        add_ports(system.outputs.values())
        
        return tree_dict

    if isinstance(obj, System):
        return system_tree(obj)
    elif isinstance(obj, BasePort):
        return port_tree(obj)
    else:
        raise TypeError(f"Object must be a port or a system; got {type(obj)}.")


def _get_connections(system: System, include_orphan_vars: bool) -> Dict[str, str]:
    """Get a dictionary representation of the system connections. Structure is {'_in': '_out'}."""
    connections = dict()

    def contextual_name(port: BasePort):
        return port.name if port.owner is system else port.contextual_name

    def add_prefix(name: str, varname: str):
        prefix = f"{name}."
        return varname if varname.startswith(prefix) else f"{prefix}{varname}"

    # Gather connections for current system
    for connector in system.all_connectors():
        if not include_orphan_vars and isinstance(connector.sink, ExtensiblePort):
            continue
        sink_name = contextual_name(connector.sink)
        source_name = contextual_name(connector.source)
        for target, origin in connector.mapping.items():
            connections[f"{sink_name}.{target}"] = f"{source_name}.{origin}"

    # Recursively gather children's connections
    for name, child in system.children.items():
        child_connections = _get_connections(child, include_orphan_vars)
        for k, v in child_connections.items():
            connections[add_prefix(name, k)] = add_prefix(name, v)

    return connections


def _get_viewer_data(system: System, include_orphan_vars: bool) -> Dict[str, str]:
    """Get the data needed by the N2 viewer as a dictionary."""
    if isinstance(system, System):
        head = system
    else:
        raise TypeError('get_model_viewer_data only accepts System')

    if not include_orphan_vars:
        logger.warning('The system may contain inwards or outwards.')

    tree_data =_get_tree_dict(head, include_orphan_vars)
    connections = _get_connections(head, include_orphan_vars)
    data_dict = {
        'tree': tree_data,
        'connections_list' : [
            dict(src=origin, tgt=target)
            for target, origin in connections.items()
            if origin is not None
        ],
    }
    return data_dict


[docs]def view_model( problem_or_filename, outfile='n2.html', show_browser=True, embeddable=False, draw_potential_connections=False, include_orphan_vars=True, ) -> None: """ Generates an HTML file containing a tree viewer. Optionally pops up a web browser to view the file. Parameters ---------- problem_or_filename : A System System : The System for the desired tree. outfile : str, optional The name of the final output file show_browser : bool, optional If True, pop up the system default web browser to view the generated html file. Defaults to True. embeddable : bool, optional If True, gives a single HTML file that doesn't have the <html>, <DOCTYPE>, <body> and <head> tags. If False, gives a single, standalone HTML file for viewing. draw_potential_connections : bool, optional If true, allows connections to be drawn on the N2 that do not currently exist in the model. Defaults to True. include_orphan_vars : bool, optional If True, display inwards and outwards on the N2 diagram. Defaults to True. """ def add_html_tags(content: str) -> str: """Format `content` into standalone HTML content.""" html_begin_tags = """<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> </head> <body>\n""" html_end_tags = """ </body> </html>""" return html_begin_tags + content + html_end_tags cur_dir = os.path.dirname(os.path.abspath(__file__)) viz_dir = os.path.join(cur_dir, "visualization") lib_dir = os.path.join(viz_dir, "libs") src_dir = os.path.join(viz_dir, "src") sty_dir = os.path.join(viz_dir, "style") def get_content(path: str, filename: str, binary=False) -> str: fullpath = os.path.join(path, filename) if binary: with open(fullpath, "rb") as fp: return str(base64.b64encode(fp.read()).decode("ascii")) else: with open(fullpath, "r") as fp: return fp.read() def get_lib(filename: str, binary=False) -> str: return get_content(lib_dir, filename, binary) def get_src(filename: str, binary=False) -> str: return get_content(src_dir, filename, binary) def get_style(filename: str, binary=False) -> str: return get_content(sty_dir, filename, binary) # Get index.html index = get_content(viz_dir, "index.html") # Get model viewer data data = _get_viewer_data(problem_or_filename, include_orphan_vars) model_viewer_data = f"var modelData = {json.dumps(data)}" # Put all style and JS into index index = index.replace("{{awesomplete_style}}", get_style("awesomplete.css")) index = index.replace("{{partition_tree_style}}", get_style("partition_tree.css")) index = index.replace("{{fontello}}", get_style("fontello.woff", binary=True)) index = index.replace("{{d3_lib}}", get_lib("d3.v4.min.js")) index = index.replace("{{awesomplete_lib}}", get_lib("awesomplete.js")) index = index.replace("{{vk_beautify_lib}}", get_lib("vkBeautify.js")) index = index.replace("{{model_data}}", model_viewer_data) index = index.replace("{{constants_lib}}", get_src("constants.js")) index = index.replace("{{modal_lib}}", get_src("modal.js")) index = index.replace("{{svg_lib}}", get_src("svg.js")) index = index.replace("{{search_lib}}", get_src("search.js")) index = index.replace("{{legend_lib}}", get_src("legend.js")) index = index.replace("{{draw_lib}}", get_src("draw.js")) index = index.replace("{{ptn2_lib}}", get_src("ptN2.js")) index = index.replace( "{{draw_potential_connections}}", 'true' if draw_potential_connections else 'false', ) if not embeddable: index = add_html_tags(index) with open(outfile, 'w') as fp: fp.write(index) # Open in browser, if required if show_browser: from cosapp.tools.problem_viewer.webview import webview webview(outfile)