8000 Add auto-modifying cyclic tasks by bessman · Pull Request #703 · hardbyte/python-can · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add auto-modifying cyclic tasks #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def stop(self) -> None:
"""


class CyclicSendTaskABC(CyclicTask):
class CyclicSendTaskABC(CyclicTask, abc.ABC):
"""
Message send task with defined period
"""
Expand Down Expand Up @@ -114,7 +114,7 @@ def _check_and_convert_messages(
return messages


class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC):
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
def __init__(
self,
messages: Union[Sequence[Message], Message],
Expand All @@ -136,17 +136,15 @@ def __init__(
self.duration = duration


class RestartableCyclicTaskABC(CyclicSendTaskABC):
class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
"""Adds support for restarting a stopped cyclic task"""

@abc.abstractmethod
def start(self) -> None:
"""Restart a stopped periodic task."""


class ModifiableCyclicTaskABC(CyclicSendTaskABC):
"""Adds support for modifying a periodic message"""

class ModifiableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None:
"""Helper function to perform error checking when modifying the data in
the cyclic task.
Expand Down Expand Up @@ -190,7 +188,7 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
self.messages = messages


class MultiRateCyclicSendTaskABC(CyclicSendTaskABC):
class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
"""A Cyclic send task that supports switches send frequency after a set time."""

def __init__(
Expand Down Expand Up @@ -218,7 +216,7 @@ def __init__(


class ThreadBasedCyclicSendTask(
ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
):
"""Fallback cyclic send task using daemon thread."""

Expand All @@ -230,6 +228,7 @@ def __init__(
period: float,
duration: Optional[float] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> None:
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.

Expand All @@ -255,6 +254,7 @@ def __init__(
time.perf_counter() + duration if duration else None
)
self.on_error = on_error
self.modifier_callback = modifier_callback

if USE_WINDOWS_EVENTS:
self.period_ms = int(round(period * 1000, 0))
Expand Down Expand Up @@ -301,14 +301,22 @@ def _run(self) -> None:
# Prevent calling bus.send from multiple threads
with self.send_lock:
try:
if self.modifier_callback is not None:
self.modifier_callback(self.messages[msg_index])
self.bus.send(self.messages[msg_index])
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)
if self.on_error:
if not self.on_error(exc):
break
else:

# stop if `on_error` callback was not given
if self.on_error is None:
self.stop()
raise exc

# stop if `on_error` returns False
if not self.on_error(exc):
self.stop()
break

msg_due_time_ns += self.period_ns
if self.end_time is not None and time.perf_counter() >= self.end_time:
break
Expand Down
20 changes: 15 additions & 5 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum, auto
from time import time
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast

import can
import can.typechecking
Expand Down Expand Up @@ -195,6 +195,7 @@ def send_periodic(
period: float,
duration: Optional[float] = None,
store_task: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

Expand All @@ -216,6 +217,10 @@ def send_periodic(
:param store_task:
If True (the default) the task will be attached to this Bus instance.
Disable to instead manage tasks manually.
:param modifier_callback:
Function which should be used to modify each message's data before
sending. The callback modifies the :attr:`~can.Message.data` of the
message and returns ``None``.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the task's
Expand All @@ -230,7 +235,7 @@ def send_periodic(

.. note::

For extremely long running Bus instances with many short lived
For extremely long-running Bus instances with many short-lived
tasks the default a D7AE pi with ``store_task==True`` may not be
appropriate as the stopped tasks are still taking up memory as they
are associated with the Bus instance.
Expand All @@ -247,9 +252,8 @@ def send_periodic(
# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
task = cast(
_SelfRemovingCyclicTask,
self._send_periodic_internal(msgs, period, duration),
self._send_periodic_internal(msgs, period, duration, modifier_callback),
)

# we wrap the task's stop method to also remove it from the Bus's list of tasks
periodic_tasks = self._periodic_tasks
original_stop_method = task.stop
Expand All @@ -275,6 +279,7 @@ def _send_periodic_internal(
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Expand All @@ -298,7 +303,12 @@ def _send_periodic_internal(
threading.Lock()
)
task = ThreadBasedCyclicSendTask(
self, self._lock_send_periodic, msgs, period, duration
bus=self,
lock=self._lock_send_periodic,
messages=msgs,
period=period,
duration=duration,
modifier_callback=modifier_callback,
)
return task

Expand Down
22 changes: 17 additions & 5 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import Optional
from typing import Callable, Optional, Sequence, Union

import can.interfaces.ixxat.canlib_vcinpl as vcinpl
import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2
from can import BusABC, Message
from can.bus import BusState
from can import (
BusABC,
BusState,
CyclicSendTaskABC,
Message,
)


class IXXATBus(BusABC):
Expand Down Expand Up 341A @@ -145,8 +149,16 @@ def _recv_internal(self, timeout):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
return self.bus.send(msg, timeout)

def _send_periodic_internal(self, msgs, period, duration=None):
return self.bus._send_periodic_internal(msgs, period, duration)
def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> CyclicSendTaskABC:
return self.bus._send_periodic_internal(
msgs, period, duration, modifier_callback
)

def shutdown(self) -> None:
super().shutdown()
Expand Down
56 changes: 41 additions & 15 deletions can/interfaces/ixxat/canlib_vcinpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import functools
import logging
import sys
from typing import Callable, Optional, Tuple

from can import BusABC, CanProtocol, Message
from can.broadcastmanager import (
import warnings
from typing import Callable, Optional, Sequence, Tuple, Union

from can import (
BusABC,
BusState,
CanProtocol,
CyclicSendTaskABC,
LimitedDurationCyclicSendTaskABC,
Message,
RestartableCyclicTaskABC,
)
from can.bus import BusState
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
from can.ctypesutil import HRESULT as ctypes_HRESULT
from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError
Expand Down Expand Up @@ -785,17 +789,39 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
# Want to log outgoing messages?
# log.log(self.RECV_LOGGING_LEVEL, "Sent: %s", message)

def _send_periodic_internal(self, msgs, period, duration=None):
def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> CyclicSendTaskABC:
"""Send a message using built-in cyclic transmit list functionality."""
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler)
caps = structures.CANCAPABILITIES()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
if modifier_callback is None:
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(
self._device_handle, self.channel, self._scheduler
)
caps = structures.CANCAPABILITIES()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
)

# fallback to thread based cyclic task
warnings.warn(
f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
"when the `modifier_callback` argument is given."
)
return BusABC._send_periodic_internal(
self,
msgs=msgs,
period=period,
duration=duration,
modifier_callback=modifier_callback,
)

def shutdown(self):
Expand Down
56 changes: 41 additions & 15 deletions can/interfaces/ixxat/canlib_vcinpl2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import functools
import logging
import sys
from typing import Callable, Optional, Tuple
import warnings
from typing import Callable, Optional, Sequence, Tuple, Union

from can import BusABC, CanProtocol, Message
from can.broadcastmanager import (
from can import (
BusABC,
CanProtocol,
CyclicSendTaskABC,
LimitedDurationCyclicSendTaskABC,
Message,
RestartableCyclicTaskABC,
)
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
Expand Down Expand Up @@ -931,19 +935,41 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
else:
_canlib.canChannelPostMessage(self._channel_handle, message)

def _send_periodic_internal(self, msgs, period, duration=None):
def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> CyclicSendTaskABC:
"""Send a message using built-in cyclic transmit list functionality."""
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler)
caps = structures.CANCAPABILITIES2()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = (
caps.dwCmsClkFreq / caps.dwCmsDivisor
) # TODO: confirm
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
if modifier_callback is None:
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(
self._device_handle, self.channel, self._scheduler
)
caps = structures.CANCAPABILITIES2()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = (
caps.dwCmsClkFreq / caps.dwCmsDivisor
) # TODO: confirm
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
)

# fallback to thread based cyclic task
warnings.warn(
f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
"when the `modifier_callback` argument is given."
)
return BusABC._send_periodic_internal(
self,
msgs=msgs,
period=period,
duration=duration,
modifier_callback=modifier_callback,
)

def shutdown(self):
Expand Down
Loading
0