cosapp.systems.externalsystem

Classes

Client(name, port)

Communication(port)

ExternalSystem(name, **kwargs)

Server(port[, failed_connection_attempt_max])

TCPSystem(name, init_variables, port, **kwargs)

class cosapp.systems.externalsystem.Client(name, port)[source]

Bases: cosapp.systems.externalsystem.Communication

close_connection()None[source]
connect_server(timeout=5.0)[source]
stop()None[source]
class cosapp.systems.externalsystem.Communication(port)[source]

Bases: object

close_connection()[source]
send_message(message)[source]
wait_for_message(nb_characters=1024)[source]
class cosapp.systems.externalsystem.ExternalSystem(name: str, **kwargs)[source]

Bases: cosapp.systems.system.System

abstract read_outputs()Any[source]
abstract send_inputs()None[source]
serialize_data()Dict[str, Any][source]

Serialize all input data into a dictionary

class cosapp.systems.externalsystem.Server(port, failed_connection_attempt_max=1)[source]

Bases: cosapp.systems.externalsystem.Communication

accept()[source]
class cosapp.systems.externalsystem.TCPSystem(name: str, init_variables: Optional[dict] = None, port: Optional[int] = 13000, **kwargs)[source]

Bases: cosapp.systems.externalsystem.ExternalSystem

call_clean_run()[source]

Execute clean_run recursively on all modules.

Parameters

skip_drivers (bool) – Skip calling cosapp.drivers.driver.Driver.clean_run()

call_setup_run()[source]

Execute setup_run recursively on all modules.

Parameters

skip_drivers (bool) – Skip calling cosapp.drivers.driver.Driver.setup_run()

close_service()[source]
compute()[source]

Contains the customized Module calculation, to execute after children.

read_outputs()Any[source]
send_inputs()None[source]