8000 Convert `can.Logger` and `can.LogReader` into functions by zariiii9003 · Pull Request #1703 · hardbyte/python-can · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Convert can.Logger and can.LogReader into funct 8000 ions #1703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions can/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"CSVWriter",
"Logger",
"LogReader",
"MESSAGE_READERS",
"MESSAGE_WRITERS",
"MessageSync",
"MF4Reader",
"MF4Writer",
Expand All @@ -39,8 +41,8 @@
]

# Generic
from .logger import BaseRotatingLogger, Logger, SizedRotatingLogger
from .player import LogReader, MessageSync
from .logger import MESSAGE_WRITERS, BaseRotatingLogger, Logger, SizedRotatingLogger
from .player import MESSAGE_READERS, LogReader, MessageSync

# isort: split

Expand Down
197 changes: 98 additions & 99 deletions can/io/logger.py
< 8000 /tr>
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Callable,
ClassVar,
Dict,
Final,
Literal,
Optional,
Set,
Expand All @@ -24,15 +25,13 @@
from typing_extensions import Self

from .._entry_points import read_entry_points
from ..listener import Listener
from ..message import Message
from ..typechecking import AcceptedIOType, FileLike, StringPathLike
from .asc import ASCWriter
from .blf import BLFWriter
from .canutils import CanutilsLogWriter
from .csv import CSVWriter
from .generic import (
BaseIOHandler,
BinaryIOMessageWriter,
FileIOMessageWriter,
MessageWriter,
Expand All @@ -42,20 +41,85 @@
from .sqlite import SqliteWriter
from .trc import TRCWriter

#: A map of file suffixes to their corresponding
#: :class:`can.io.generic.MessageWriter` class
MESSAGE_WRITERS: Final[Dict[str, Type[MessageWriter]]] = {
".asc": ASCWriter,
".blf": BLFWriter,
".csv": CSVWriter,
".db": SqliteWriter,
".log": CanutilsLogWriter,
".mf4": MF4Writer,
".trc": TRCWriter,
".txt": Printer,
}


def _update_writer_plugins() -> None:
"""Update available message writer plugins from entry points."""
for entry_point in read_entry_points("can.io.message_writer"):
if entry_point.key in MESSAGE_WRITERS:
continue

writer_class = entry_point.load()
if issubclass(writer_class, MessageWriter):
MESSAGE_WRITERS[entry_point.key] = writer_class


def _get_logger_for_suffix(suffix: str) -> Type[MessageWriter]:
try:
return MESSAGE_WRITERS[suffix]
except KeyError:
raise ValueError(
f'No write support for unknown log format "{suffix}"'
) from None


class Logger(MessageWriter):
def _compress(
filename: StringPathLike, **kwargs: Any
) -> Tuple[Type[MessageWriter], FileLike]:
"""
Logs CAN messages to a file.
Return the suffix and io object of the decompressed file.
File will automatically recompress upon close.
"""
suffixes = pathlib.Path(filename).suffixes
if len(suffixes) != 2:
raise ValueError(
f"No write support for unknown log format \"{''.join(suffixes)}\""
) from None

real_suffix = suffixes[-2].lower()
if real_suffix in (".blf", ".db"):
raise ValueError(
f"The file type {real_suffix} is currently incompatible with gzip."
)
logger_type = _get_logger_for_suffix(real_suffix)
append = kwargs.get("append", False)

if issubclass(logger_type, BinaryIOMessageWriter):
mode = "ab" if append else "wb"
else:
mode = "at" if append else "wt"

return logger_type, gzip.open(filename, mode)


def Logger( # noqa: N802
filename: Optional[StringPathLike], **kwargs: Any
) -> MessageWriter:
"""Find and return the appropriate :class:`~can.io.generic.MessageWriter` instance
for a given file suffix.

The format is determined from the file suffix which can be one of:
* .asc: :class:`can.ASCWriter`
* .asc :class:`can.ASCWriter`
* .blf :class:`can.BLFWriter`
* .csv: :class:`can.CSVWriter`
* .db: :class:`can.SqliteWriter`
* .db :class:`can.SqliteWriter`
* .log :class:`can.CanutilsLogWriter`
* .mf4 :class:`can.MF4Writer`
(optional, depends on `asammdf <https://github.com/danielhrisca/asammdf>`_)
* .trc :class:`can.TRCWriter`
* .txt :class:`can.Printer`
* .mf4 :class:`can.MF4Writer` (optional, depends on asammdf)

Any of these formats can be used with gzip compression by appending
the suffix .gz (e.g. filename.asc.gz). However, third-party tools might not
Expand All @@ -65,97 +129,33 @@ class Logger(MessageWriter):

The log files may be incomplete until `stop()` is called due to buffering.

:param filename:
the filename/path of the file to write to,
may be a path-like object or None to
instantiate a :class:`~can.Printer`
:raises ValueError:
if the filename's suffix is of an unknown file type

.. note::
This class itself is just a dispatcher, and any positional and keyword
This function itself is just a dispatcher, and any positional and keyword
arguments are passed on to the returned instance.
"""

fetched_plugins = False
message_writers: ClassVar[Dict[str, Type[MessageWriter]]] = {
".asc": ASCWriter,
".blf": BLFWriter,
".csv": CSVWriter,
".db": SqliteWriter,
".log": CanutilsLogWriter,
".mf4": MF4Writer,
".trc": TRCWriter,
".txt": Printer,
}

@staticmethod
def __new__( # type: ignore[misc]
cls: Any, filename: Optional[StringPathLike], **kwargs: Any
) -> MessageWriter:
"""
:param filename:
the filename/path of the file to write to,
may be a path-like object or None to
instantiate a :class:`~can.Printer`
:raises ValueError:
if the filename's suffix is of an unknown file type
"""
if filename is None:
return Printer(**kwargs)

if not Logger.fetched_plugins:
Logger.message_writers.update(
{
writer.key: cast(Type[MessageWriter], writer.load())
for writer in read_entry_points("can.io.message_writer")
}
)
Logger.fetched_plugins = True

suffix = pathlib.PurePath(filename).suffix.lower()

file_or_filename: AcceptedIOType = filename
if suffix == ".gz":
logger_type, file_or_filename = Logger.compress(filename, **kwargs)
else:
logger_type = cls._get_logger_for_suffix(suffix)

return logger_type(file=file_or_filename, **kwargs)

@classmethod
def _get_logger_for_suffix(cls, suffix: str) -> Type[MessageWriter]:
try:
logger_type = Logger.message_writers[suffix]
if logger_type is None:
raise ValueError(f'failed to import logger for extension "{suffix}"')
return logger_type
except KeyError:
raise ValueError(
f'No write support for this unknown log format "{suffix}"'
) from None

@classmethod
def compress(
cls, filename: StringPathLike, **kwargs: Any
) -> Tuple[Type[MessageWriter], FileLike]:
"""
Return the suffix and io object of the decompressed file.
File will automatically recompress upon close.
"""
real_suffix = pathlib.Path(filename).suffixes[-2].lower()
if real_suffix in (".blf", ".db"):
raise ValueError(
f"The file type {real_suffix} is currently incompatible with gzip."
)
logger_type = cls._get_logger_for_suffix(real_suffix)
append = kwargs.get("append", False)

if issubclass(logger_type, BinaryIOMessageWriter):
mode = "ab" if append else "wb"
else:
mode = "at" if append else "wt"
if filename is None:
return Printer(**kwargs)

return logger_type, gzip.open(filename, mode)
_update_writer_plugins()

def on_message_received(self, msg: Message) -> None:
pass
suffix = pathlib.PurePath(filename).suffix.lower()
file_or_filename: AcceptedIOType = filename
if suffix == ".gz":
logger_type, file_or_filename = _compress(filename, **kwargs)
else:
logger_type = _get_logger_for_suffix(suffix)
return logger_type(file=file_or_filename, **kwargs)


class BaseRotatingLogger(Listener, BaseIOHandler, ABC):
class BaseRotatingLogger(MessageWriter, ABC):
"""
Base class for rotating CAN loggers. This class is not meant to be
instantiated directly. Subclasses must implement the :meth:`should_rollover`
Expand Down Expand Up @@ -187,20 +187,15 @@ class BaseRotatingLogger(Listener, BaseIOHandler, ABC):
rollover_count: int = 0

def __init__(self, **kwargs: Any) -> None:
Listener.__init__(self)
BaseIOHandler.__init__(self, file=None)
super().__init__(**{**kwargs, "file": None})

self.writer_kwargs = kwargs

# Expected to be set by the subclass
self._writer: Optional[FileIOMessageWriter] = None

@property
@abstractmethod
def writer(self) -> FileIOMessageWriter:
"""This attribute holds an instance of a writer class which manages the actual file IO."""
if self._writer is not None:
return self._writer
raise ValueError(f"{self.__class__.__name__}.writer is None.")
raise NotImplementedError

def rotation_filename(self, default_name: StringPathLike) -> StringPathLike:
"""Modify the filename of a log file when rotating.
Expand Down Expand Up @@ -270,7 +265,7 @@ def _get_new_writer(self, filename: StringPathLike) -> FileIOMessageWriter:
logger = Logger(filename=filename, **self.writer_kwargs)
if isinstance(logger, FileIOMessageWriter):
return logger
if isinstance(logger, Printer) and logger.file is not None:
elif isinstance(logger, Printer) and logger.file is not None:
return cast(FileIOMessageWriter, logger)

raise ValueError(
Expand Down Expand Up @@ -373,6 +368,10 @@ def __init__(

self._writer = self._get_new_writer(self.base_filename)

@property
def writer(self) -> FileIOMessageWriter:
return self._writer

def should_rollover(self, msg: Message) -> bool:
if self.max_bytes <= 0:
return False
Expand Down
Loading
0