Source code for cosapp.tools.trigger
"""
Utility classes for executing CoSApp simulation following file events.
Those classes use third-party `watchdog` package to observe file events.
"""
import logging
from threading import Timer
from typing import AnyStr, List, Union
from cosapp.systems import System
logger = logging.getLogger(__name__)
try: # watchdog will be a conditional dependency
from watchdog.events import (
PatternMatchingEventHandler,
FileCreatedEvent,
FileModifiedEvent,
)
from watchdog.observers import Observer
[docs]
class WatchdogHandler(PatternMatchingEventHandler):
"""Define a watchdog class that allows to trigger actions on file events (creation,
modification, etc.).
Parameters
----------
owner: System
Owner of the watchdog. Actions will be triggered on him on events
folder: str
Folder to supervise with the watchdog
timeout: float, optional
Define the max inactivity duration of the watchdog
patterns: Union[List, AnyStr], optional
Define the file patterns to monitor in the chosen path
"""
def __init__(
self,
owner: System,
folder: str,
timeout: float = 30.0,
patterns: Union[List, AnyStr] = "*.*",
):
"""`WatchdogHandler` class constructor
Parameters
----------
owner: System
Owner of the watchdog. Actions will be triggered on him on events
folder: str
Folder to supervise with the watchdog
timeout: float, optional
Define the max inactivity duration of the watchdog
patterns: Union[List, AnyStr], optional
Define the file patterns to monitor in the chosen path
"""
super(WatchdogHandler, self).__init__(patterns=patterns)
self._owner = owner
self.timeout = timeout
self.exit = False
self.folder = folder
self.__observer: Observer = None
self.__timer: Timer = None
self.__is_alive = False
[docs]
def compute(self, file_full_path: str) -> None:
"""Actions to do while an event is detected
Parameters
----------
file_full_path: str
Gives access to the file that triggered an event
"""
self._owner.run_drivers()
[docs]
def is_alive(self) -> bool:
"""Is the watcher alive?"""
return self.__is_alive
[docs]
def reset(self) -> None:
"""Reset the watchdog by stopping the timer countdown and starting a new one"""
if self.__observer is None:
raise RuntimeError("Watcher has never been started.")
self.__timer.cancel()
self.__timer = Timer(self.timeout, self.stop)
self.__timer.start()
[docs]
def stop(self) -> None:
"""Exits the watchdog monitoring"""
if not self.__is_alive:
logger.debug("Watcher not alive.")
return
self.__is_alive = False
self.__timer.cancel()
self.__observer.stop()
self.__observer.join()
logger.info(f"..trigger on {self._owner.name!r} is timeout")
[docs]
def start(self, time_step: float = 1.0) -> None:
"""Starts the watchdog monitoring
Parameters
----------
time_step: float, optional
Gives the time step at which the watchdog will check events in the supervised folder
"""
self.__observer = Observer()
self.__observer.schedule(self, path=self.folder)
if time_step > self.timeout:
time_step = self.timeout
logger.debug(
f"> trigger created on {self._owner.name!r} with {self.timeout} seconds timeout.."
)
self.__observer.start()
self.__timer = Timer(self.timeout, self.stop)
self.__timer.start()
self.__is_alive = True
[docs]
class FileCreationHandler(WatchdogHandler):
"""Specific watchdog class to monitor file creation events
Parameters
----------
owner: System
Owner of the watchdog. Actions will be triggered on him on events
folder: str
Folder to supervise with the watchdog
timeout: float, optional
Define the max inactivity duration of the watchdog
patterns: Union[List, AnyStr], optional
Define the file patterns to monitor in the chosen path
"""
[docs]
def on_created(self, event: FileCreatedEvent) -> None: # when file is created
"""Actions to complete when a creation event is triggered
Parameters
----------
event
Event that triggered the watchdog
"""
logger.info(f" #creation of file {event.src_path!r} detected")
self.compute(event.src_path)
self.reset()
logger.debug(
f"> trigger restart on {self._owner.name!r} for another {self.timeout} seconds.."
)
[docs]
class FileModificationHandler(WatchdogHandler):
"""Specific watchdog class to monitor file modification events.
Parameters
----------
owner: System
Owner of the watchdog. Actions will be triggered on him on events
folder: str
Folder to supervise with the watchdog
timeout: float, optional
Define the max inactivity duration of the watchdog
patterns: Union[List, AnyStr], optional
Define the file patterns to monitor in the chosen path
"""
[docs]
def on_modified(
self, event: FileModifiedEvent
) -> None: # when file is modified
"""Actions to complete when a modification event is triggered
Parameters
----------
event
Event that triggered the watchdog
"""
logger.info(f" #modification of file {event.src_path!r} detected")
self.compute(event.src_path)
self.reset()
logger.debug(
f"> trigger restart on {self._owner.name!r} for another {self.timeout} seconds.."
)
except ImportError:
logger.warning(
"'watchdog' package is not installed. Files modification and creation trackers are not available."
)