"""Recorder to Delimited Separated Value format file."""
import os
import numpy
import pandas
from typing import Any, List, Optional
from collections.abc import Collection
from numbers import Integral
from cosapp.recorders.recorder import BaseRecorder, SearchPattern
from cosapp.core.execution import ExecutionType
from cosapp.utils.helpers import is_numerical, check_arg
[docs]
class DSVRecorder(BaseRecorder):
"""Record data into Delimiter Separated Value file.
Matching pattern are case sensitive and support the following special patterns:
======== ================================
Pattern Meaning
======== ================================
`*` matches everything
`?` matches any single character
`[seq]` matches any character in seq
`[!seq]` matches any character not in seq
======== ================================
Excluding pattern are shadowing includes one; e.g. if `includes='*port_in.*' and `excludes='*.Pt'`, for a port
having variables named `Tt` and `Pt`, only `Tt` will be recorded.
Parameters
----------
filepath : str
Filepath to save data into.
delimiter : `','`, `';'` or `'\t'`, optional
Delimiter of data in the file; default `','`.
use_buffer : bool, optional
Should the data written after the simulation (`False`) or every time they are available (`True`);
default `False`.
includes : str or list of str, optional
Variables matching these patterns will be included; default `'*'` (i.e. all variables).
excludes : str or list of str or None, optional
Variables matching these patterns will be excluded; default `None` (i.e. nothing is excluded).
numerical_only : bool, optional
Keep only numerical variables (i.e. number or numerical vector); default False.
section : str, optional
Current section name; default `''`.
precision : int, optional
Precision digits when writing floating point number; default 9 (i.e. 10 figures will be written).
hold : bool, optional
Append the new data or not; default `False`.
raw_output : bool, optional
Raw output; default `False`.
..note::
Do not mention `inwards` or `outwards` in `includes` or `excludes` list. Otherwise you may not record the wanted
variables.
"""
def __init__(
self,
filepath: str,
includes: SearchPattern = "*",
excludes: Optional[SearchPattern] = None,
numerical_only = False,
section = "",
precision = 9,
hold = False,
delimiter = ",",
raw_output = False,
use_buffer = False,
):
check_arg(filepath, "filepath", str)
check_arg(delimiter, "delimiter", str)
check_arg(use_buffer, "use_buffer", bool)
supported_delimiters = (",", ";", "\t")
if delimiter not in supported_delimiters:
raise ValueError(
f"Supported delimiters are {supported_delimiters}; got {delimiter!r}"
)
super().__init__(
includes, excludes, numerical_only, section, precision, hold, raw_output
)
self.__filepath = filepath # type: str
self.__delimiter = delimiter # type: str
self.__buffer = [] if use_buffer else None # type: List[List[Any]]
@property
def filepath(self) -> str:
"""str: path of the DSV file"""
return self.__filepath
@property
def delimiter(self) -> str:
"""str: column delimiter used in DSV file"""
return self.__delimiter
[docs]
def export_data(self) -> pandas.DataFrame:
"""Export recorded results into a pandas.DataFrame object."""
if not os.path.exists(self.__filepath):
return pandas.DataFrame()
elif self.__buffer is not None:
headers = self.get_headers()
return pandas.DataFrame(self.__buffer, columns=headers)
else:
return pandas.read_csv(self.__filepath, delimiter=self.__delimiter, header=0)
@property
def _raw_data(self) -> List[List[Any]]:
"""Return a raw/unformatted version of records.
Returns
-------
List[List[Any]]
Records of `watched_object` for variables given by method `field_names()`
"""
if not os.path.exists(self.__filepath):
return list()
if self.__buffer is not None:
return self.__buffer
with open(self.__filepath, "r") as fd:
# The header line is skipped
content = map(lambda line: line.split(self.__delimiter), fd.readlines()[1:])
return content
def _enable_parallel_execution(self, exec_type: ExecutionType, chunk_id: int) -> None:
"""Enables the use of this `Recorder` in parallel execution.
This method must perform the necessary changes to allow parallel
execution in a multithreading or multiprocessing context.
No-op for multiprocessing, not implemented yet for multithreading.
Parameters
----------
exec_type : ExecutionType
Type of parallel execution
chunk_id : int
Identifier of the chunk to be handled by this recorder
"""
if exec_type == ExecutionType.MULTI_THREADING:
raise NotImplementedError("Multithreading is not implemented yet")
elif exec_type == ExecutionType.MULTI_PROCESSING and self.__buffer is None:
self.__filepath += f"_{chunk_id}"
def _disable_parallel_execution(self, exec_type: ExecutionType, chunk_id: int) -> None:
"""Disables the use of this `Recorder` in parallel execution.
This method rollbacks the changes made to the `Recorder` to handle parallel
execution.
Parameters
----------
exec_type : ExecutionType
Type of parallel execution
chunk_id : int
Identifier of the chunk to be handled by this recorder
"""
if exec_type == ExecutionType.MULTI_PROCESSING and self.__buffer is None:
self.__filepath = self.__filepath.rsplit("_", maxsplit=1)[0]
[docs]
def start(self):
"""Initialize recording support."""
super().start()
if not self.hold:
# Run system to ensure data are up-to-date
# TODO could we use clean/dirty here?
self.watched_object.run_once()
if self.__buffer is not None:
self.__buffer.clear()
# Write header
headers = self.get_headers()
with open(self.__filepath, "w") as fd:
fd.write(self.__delimiter.join(headers) + "\n")
def _record(self, line: List[Any]) -> None:
if self.__buffer is None:
with open(self.__filepath, "a") as fd:
fd.write(self.__delimiter.join(line) + "\n")
else:
self.__buffer.append(line)
def _batch_record(self, lines: List[List[Any]]) -> None:
"""Records multiple lines at a time.
Internal API allowing efficient concatenation of recorders.
"""
if self.__buffer is None:
with open(self.__filepath, "a") as fd:
for line in lines:
fd.write(self.__delimiter.join(line) + "\n")
else:
self.__buffer += lines
[docs]
def clear(self):
"""Clear all previously stored data."""
if self.__buffer is not None:
self.__buffer.clear()
super().clear()
[docs]
def exit(self):
"""Close recording session."""
if self.__buffer is not None:
delimiter = self.__delimiter
with open(self.__filepath, "a") as fd:
for line in self.__buffer:
fd.write(delimiter.join(line) + "\n")