cosapp.systems.externalsystem

Classes

Client(name, port)

Communication(port)

ExternalSystem(name, **kwargs)

Server(port[, failed_connection_attempt_max])

TCPSystem(name[, init_variables, port])

class cosapp.systems.externalsystem.Client(name, port)[source]

Bases: Communication

close_connection() None[source]
connect_server(timeout=5.0)[source]
stop() None[source]
class cosapp.systems.externalsystem.Communication(port)[source]

Bases: object

close_connection()[source]
send_message(message)[source]
wait_for_message(nb_characters=1024)[source]
class cosapp.systems.externalsystem.ExternalSystem(name: str, **kwargs)[source]

Bases: System

abstract read_outputs() Any[source]
abstract send_inputs() None[source]
serialize_data() Dict[str, Any][source]

Serialize all input data into a dictionary

class cosapp.systems.externalsystem.Server(port, failed_connection_attempt_max=1)[source]

Bases: Communication

accept()[source]
class cosapp.systems.externalsystem.TCPSystem(name: str, init_variables: dict | None = None, port: int | None = 13000, **kwargs)[source]

Bases: ExternalSystem

call_clean_run()[source]

Execute clean_run recursively on all modules.

Parameters:

skip_drivers (bool) – Skip calling cosapp.drivers.driver.Driver.clean_run()

call_setup_run()[source]

Execute setup_run recursively on all modules.

Parameters:

skip_drivers (bool) – Skip calling cosapp.drivers.driver.Driver.setup_run()

close_service()[source]
compute()[source]

Contains the customized Module calculation, to execute after children.

read_outputs() Any[source]
send_inputs() None[source]