Source code for cosapp.tools.problem_viewer.problem_viewer

import os
import json

from typing import Any, Dict, Union

from collections import OrderedDict
import logging
import base64

from cosapp.systems import System
from cosapp.ports.port import BasePort, ExtensiblePort

from cosapp.utils.helpers import check_arg

logger = logging.getLogger(__name__)


def _get_tree_dict(
    obj: Union[System, BasePort],
    include_orphan_vars=True
) -> Dict[str, Any]:
    """Get a dictionary representation of the system hierarchy.
    """
    def init_tree(obj, subtype):
        return OrderedDict(
            name = obj.name,
            type = 'subsystem',
            subsystem_type = subtype,
        )

    def port_tree(port):
        tree_dict = init_tree(port, 'component')
        tree_dict['children'] = children = list()

        if include_orphan_vars or not isinstance(port, ExtensiblePort):
            dtype = 'param' if port.is_input else 'unknown'
            children.extend(
                map(lambda n: OrderedDict(name=n, type=dtype), iter(port))
            )
        return tree_dict

    def system_tree(system):
        tree_dict = init_tree(system, 'group')
        tree_dict['children'] = children = list()

        def add_ports(ports):
            nonlocal children
            children.extend(port_tree(port)
                for port in filter(lambda p: len(p) > 0, ports)
            )

        add_ports(system.inputs.values())

        children.extend(system_tree(child)
            for child in system.children.values()
        )
        # outputs come after child list for a matter of visualization
        add_ports(system.outputs.values())
        
        return tree_dict

    if isinstance(obj, System):
        return system_tree(obj)
    elif isinstance(obj, BasePort):
        return port_tree(obj)
    else:
        raise TypeError(f"Object must be a port or a system; got {type(obj)}.")


def _get_connections(system: System, include_orphan_vars: bool) -> Dict[str, str]:
    """Get a dictionary representation of the system connections. Structure is {'_in': '_out'}."""
    connections = OrderedDict()
    def add_prefix(name, s):
        return s if s.startswith(name) else f"{name}.{s}"

    # Gather connections for current system
    for c in system.all_connectors():
        if not include_orphan_vars and isinstance(c.sink, ExtensiblePort):
            continue

        for target, origin in c.mapping.items():
            connections[f"{c.sink.contextual_name}.{target}"] = f"{c.source.contextual_name}.{origin}"

    # Recursively gather children's connections
    for name, child in system.children.items():
        child_connections = _get_connections(child, include_orphan_vars)
        for k, v in child_connections.items():
            connections[add_prefix(name, k)] = add_prefix(name, v)

    return connections


def _get_viewer_data(system: System, include_orphan_vars: bool) -> Dict:
    """Get the data needed by the N2 viewer as a dictionary."""
    if isinstance(system, System):
        root_group = system
    else:
        raise TypeError('get_model_viewer_data only accepts System')

    if not include_orphan_vars:
        logger.warning('The system may contain inwards or outwards.')

    data_dict = {}
    data_dict['tree'] = _get_tree_dict(root_group, include_orphan_vars)
    connections_list = []
    connections = _get_connections(root_group, include_orphan_vars)
    for in_abs, out_abs in connections.items():
        if out_abs is None:
            continue
        connections_list.append(OrderedDict(src=out_abs, tgt=in_abs))
    data_dict['connections_list'] = connections_list

    return data_dict


[docs]def view_model( problem_or_filename, outfile='n2.html', show_browser=True, embeddable=False, draw_potential_connections=False, include_orphan_vars=True, ): """ Generates an HTML file containing a tree viewer. Optionally pops up a web browser to view the file. Parameters ---------- problem_or_filename : A System System : The System for the desired tree. outfile : str, optional The name of the final output file show_browser : bool, optional If True, pop up the system default web browser to view the generated html file. Defaults to True. embeddable : bool, optional If True, gives a single HTML file that doesn't have the <html>, <DOCTYPE>, <body> and <head> tags. If False, gives a single, standalone HTML file for viewing. draw_potential_connections : bool, optional If true, allows connections to be drawn on the N2 that do not currently exist in the model. Defaults to True. include_orphan_vars : bool, optional If True, display inwards and outwards on the N2 diagram. Defaults to True. """ html_begin_tags = """<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> </head> <body>\n""" html_end_tags = """ </body> </html>""" code_dir = os.path.dirname(os.path.abspath(__file__)) vis_dir = os.path.join(code_dir, "visualization") libs_dir = os.path.join(vis_dir, "libs") src_dir = os.path.join(vis_dir, "src") style_dir = os.path.join(vis_dir, "style") #grab the libraries with open(os.path.join(libs_dir, "awesomplete.js"), "r") as f: awesomplete = f.read() with open(os.path.join(libs_dir, "d3.v4.min.js"), "r") as f: d3 = f.read() with open(os.path.join(libs_dir, "vkBeautify.js"), "r") as f: vk_beautify = f.read() #grab the src with open(os.path.join(src_dir, "constants.js"), "r") as f: constants = f.read() with open(os.path.join(src_dir, "draw.js"), "r") as f: draw = f.read() with open(os.path.join(src_dir, "legend.js"), "r") as f: legend = f.read() with open(os.path.join(src_dir, "modal.js"), "r") as f: modal = f.read() with open(os.path.join(src_dir, "ptN2.js"), "r") as f: pt_n2 = f.read() with open(os.path.join(src_dir, "search.js"), "r") as f: search = f.read() with open(os.path.join(src_dir, "svg.js"), "r") as f: svg = f.read() #grab the style with open(os.path.join(style_dir, "awesomplete.css"), "r") as f: awesomplete_style = f.read() with open(os.path.join(style_dir, "partition_tree.css"), "r") as f: partition_tree_style = f.read() with open(os.path.join(style_dir, "fontello.woff"), "rb") as f: encoded_font = str(base64.b64encode(f.read()).decode("ascii")) #grab the index.html with open(os.path.join(vis_dir, "index.html"), "r") as f: index = f.read() #grab the model viewer data data = _get_viewer_data(problem_or_filename, include_orphan_vars) model_viewer_data = f"var modelData = {json.dumps(data)}" #add the necessary HTML tags if we aren't embedding if not embeddable: index = html_begin_tags + index + html_end_tags #put all style and JS into index index = index.replace('{{awesomplete_style}}', awesomplete_style) index = index.replace('{{partition_tree_style}}', partition_tree_style) index = index.replace('{{fontello}}', encoded_font) index = index.replace('{{d3_lib}}', d3) index = index.replace('{{awesomplete_lib}}', awesomplete) index = index.replace('{{vk_beautify_lib}}', vk_beautify) index = index.replace('{{model_data}}', model_viewer_data) index = index.replace('{{constants_lib}}', constants) index = index.replace('{{modal_lib}}', modal) index = index.replace('{{svg_lib}}', svg) index = index.replace('{{search_lib}}', search) index = index.replace('{{legend_lib}}', legend) index = index.replace('{{draw_lib}}', draw) index = index.replace('{{ptn2_lib}}', pt_n2) if draw_potential_connections: index = index.replace('{{draw_potential_connections}}', 'true') else: index = index.replace('{{draw_potential_connections}}', 'false') with open(outfile, 'w') as f: f.write(index) #open it up in the browser if show_browser: from cosapp.tools.problem_viewer.webview import webview webview(outfile)