Source code for cosapp.systems.processsystem
from multiprocessing import Process, Queue
from typing import Dict, Any
from cosapp.systems.externalsystem import ExternalSystem
[docs]class ProcessSystem(ExternalSystem):
__slots__ = ('_queue_in', '_queue_out', 'to_execute')
def __init__(self, name: str, **kwargs):
self._queue_in = Queue()
self._queue_out = Queue()
self.to_execute = Queue()
super().__init__(name, **kwargs)
def _initialize(self, **kwargs):
self.add_inward(
"working_folder",
None,
dtype=str,
desc="Folder of execution for the process.",
)
return kwargs
[docs] def call_setup_run(self):
super().call_setup_run()
self._process = Process(
target=self.to_execute,
args=(self._queue_in, self._queue_out, self.working_folder),
)
self._process.start()
[docs] def call_clean_run(self):
self._queue_in.put({})
# self._queue_out.get()
self._process.join()
super().call_clean_run()
[docs] def read_outputs(self, timeout: float = 1) -> Dict[str, Any]:
return self._queue_out.get(True, timeout)