"""
* Copyright(c) 2021 to 2022 ZettaScale Technology and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""
from argparse import ArgumentError
import uuid
import asyncio
import concurrent
import ctypes as ct
from weakref import WeakValueDictionary
from typing import Any, Callable, Dict, Optional, List, TYPE_CHECKING
from .internal import c_call, c_callable, dds_infinity, dds_c_t, DDS
from .qos import Qos, Policy, _CQos
if TYPE_CHECKING:
import cyclonedds
[docs]class DDSException(Exception):
"""This exception is thrown when a return code from the underlying C api indicates non-valid use of the API.
Print the exception directly or convert it to string for a detailed description.
Attributes
----------
code: int
One of the ``DDS_RETCODE_`` constants that indicates the type of error.
msg: str
A human readable description of where the error occurred
"""
DDS_RETCODE_OK = 0 # Success
DDS_RETCODE_ERROR = -1 # Non specific error
DDS_RETCODE_UNSUPPORTED = -2 # Feature unsupported
DDS_RETCODE_BAD_PARAMETER = -3 # Bad parameter value
DDS_RETCODE_PRECONDITION_NOT_MET = -4 # Precondition for operation not met
DDS_RETCODE_OUT_OF_RESOURCES = (
-5
) # When an operation fails because of a lack of resources
DDS_RETCODE_NOT_ENABLED = -6 # When a configurable feature is not enabled
DDS_RETCODE_IMMUTABLE_POLICY = (
-7
) # When an attempt is made to modify an immutable policy
DDS_RETCODE_INCONSISTENT_POLICY = (
-8
) # When a policy is used with inconsistent values
DDS_RETCODE_ALREADY_DELETED = (
-9
) # When an attempt is made to delete something more than once
DDS_RETCODE_TIMEOUT = -10 # When a timeout has occurred
DDS_RETCODE_NO_DATA = -11 # When expected data is not provided
DDS_RETCODE_ILLEGAL_OPERATION = (
-12
) # When a function is called when it should not be
DDS_RETCODE_NOT_ALLOWED_BY_SECURITY = (
-13
) # When credentials are not enough to use the function
error_message_mapping = {
DDS_RETCODE_OK: ("DDS_RETCODE_OK", "Success"),
DDS_RETCODE_ERROR: ("DDS_RETCODE_ERROR", "Non specific error"),
DDS_RETCODE_UNSUPPORTED: ("DDS_RETCODE_UNSUPPORTED", "Feature unsupported"),
DDS_RETCODE_BAD_PARAMETER: ("DDS_RETCODE_BAD_PARAMETER", "Bad parameter value"),
DDS_RETCODE_PRECONDITION_NOT_MET: (
"DDS_RETCODE_PRECONDITION_NOT_MET",
"Precondition for operation not met",
),
DDS_RETCODE_OUT_OF_RESOURCES: (
"DDS_RETCODE_OUT_OF_RESOURCES",
"Operation failed because of a lack of resources",
),
DDS_RETCODE_NOT_ENABLED: (
"DDS_RETCODE_NOT_ENABLED",
"A configurable feature is not enabled",
),
DDS_RETCODE_IMMUTABLE_POLICY: (
"DDS_RETCODE_IMMUTABLE_POLICY",
"An attempt was made to modify an immutable policy",
),
DDS_RETCODE_INCONSISTENT_POLICY: (
"DDS_RETCODE_INCONSISTENT_POLICY",
"A policy with inconsistent values was used",
),
DDS_RETCODE_ALREADY_DELETED: (
"DDS_RETCODE_ALREADY_DELETED",
"An attempt was made to delete something more than once",
),
DDS_RETCODE_TIMEOUT: ("DDS_RETCODE_TIMEOUT", "A timeout has occurred"),
DDS_RETCODE_NO_DATA: ("DDS_RETCODE_NO_DATA", "Expected data is not provided"),
DDS_RETCODE_ILLEGAL_OPERATION: (
"DDS_RETCODE_ILLEGAL_OPERATION",
"A function was called when it should not be",
),
DDS_RETCODE_NOT_ALLOWED_BY_SECURITY: (
"DDS_RETCODE_NOT_ALLOWED_BY_SECURITY",
"Insufficient credentials supplied to use the function",
),
}
def __init__(self, code: int, msg: str = None, **kwargs) -> None:
"""Initialize a DDSException. Code should be one of the DDS_RETCODE_* constants."""
self.code = code
self.msg = msg or ""
super().__init__(**kwargs)
def __str__(self) -> str:
if self.code in self.error_message_mapping:
msg = self.error_message_mapping[self.code]
return f"[{msg[0]}] {msg[1]}. {self.msg}"
return f"[DDSException] Got an unexpected error code '{self.code}'. {self.msg}"
def __repr__(self) -> str:
return str(self)
[docs]class Entity(DDS):
"""
Base class for all entities in the DDS API. The lifetime of the underlying
DDS API object is linked to the lifetime of the Python entity object.
Attributes
----------
subscriber
If this entity is associated with a DataReader retrieve it.
It is read-only. This is a proxy for get_subscriber().
publisher
If this entity is associated with a Publisher retrieve it.
It is read-only. This is a proxy for get_publisher().
datareader
If this entity is associated with a DataReader retrieve it.
It is read-only. This is a proxy for get_datareader().
guid: uuid.UUID
Return the globally unique identifier for this entity.
It is read-only. This is a proxy for get_guid().
status_mask
The status mask for this entity. It is a set of bits formed
from ``DDSStatus``. This is a proxy for get/set_status_mask().
parent
The entity that is this entities parent. For example: the subscriber for a
datareader, the participant for a topic.
It is read-only. This is a proxy for get_parent().
participant
Get the participant for any entity, will only fail for a ``Domain``.
It is read-only. This is a proxy for get_participant().
children
Get a list of children belonging to this entity. It is the opposite as ``parent``.
It is read-only. This is a proxy for get_children().
domain_id
Get the id of the domain this entity belongs to.
"""
_entities: Dict[dds_c_t.entity, "Entity"] = WeakValueDictionary()
[docs] def __init__(self, ref: int, listener: "Listener" = None) -> None:
"""Initialize an Entity. You should never need to initialize an Entity manually.
Parameters
----------
ref: int
The reference id as returned by the DDS API.
listener: Listener
Listener for this entity. We retain the python object to avoid it being garbage collected if the listener
goes out of scope but the entity doesn't. If we don't the python function will be freed, causing C to call
into freed memory -> segfault.
Raises
------
DDSException
If an invalid reference id is passed to this function this means instantiation of some other object failed.
"""
if ref < 0:
raise DDSException(
ref,
f"Occurred upon initialisation of a {self.__class__.__module__}.{self.__class__.__name__}",
)
super().__init__(ref)
self._entities[self._ref] = self
self._listener = listener
def __del__(self) -> None:
if not hasattr(self, "_ref") or self._ref not in self._entities:
return
del self._entities[self._ref]
self._delete(self._ref)
[docs] def get_subscriber(self) -> Optional["cyclonedds.sub.Subscriber"]:
"""Retrieve the subscriber associated with this entity.
Returns
-------
Optional[Subscriber]
Not all entities are associated with a subscriber, so this method may return None.
Raises
------
DDSException
"""
ref = self._get_subscriber(self._ref)
if ref >= 0:
return self.get_entity(ref)
raise DDSException(
ref, f"Occurred when getting the subscriber for {repr(self)}"
)
subscriber: Optional["cyclonedds.sub.Subscriber"] = property(get_subscriber)
[docs] def get_publisher(self) -> Optional["cyclonedds.pub.Publisher"]:
"""Retrieve the publisher associated with this entity.
Returns
-------
Optional[Publisher]
Not all entities are associated with a publisher, so this method may return None.
Raises
------
DDSException
"""
ref = self._get_publisher(self._ref)
if ref >= 0:
return self.get_entity(ref)
raise DDSException(ref, f"Occurred when getting the publisher for {repr(self)}")
publisher: Optional["cyclonedds.pub.Publisher"] = property(get_publisher)
[docs] def get_datareader(self) -> Optional["cyclonedds.sub.DataReader"]:
"""Retrieve the datareader associated with this entity.
Returns
-------
Optional[DataReader]
Not all entities are associated with a datareader, so this method may return None.
Raises
------
DDSException
"""
ref = self._get_datareader(self._ref)
if ref >= 0:
return self.get_entity(ref)
raise DDSException(
ref, f"Occurred when getting the datareader for {repr(self)}"
)
datareader: Optional["cyclonedds.sub.DataReader"] = property(get_datareader)
[docs] def get_instance_handle(self) -> int:
"""Retrieve the instance associated with this entity.
Returns
-------
int
The integer handle is just a number you can use in writer/reader calls.
Raises
------
DDSException
"""
handle = dds_c_t.instance_handle()
ret = self._get_instance_handle(self._ref, ct.byref(handle))
if ret == 0:
return int(handle)
raise DDSException(
ret, f"Occurred when getting the instance handle for {repr(self)}"
)
instance_handle: int = property(get_instance_handle)
[docs] def get_guid(self) -> uuid.UUID:
"""Get a globally unique identifier for this entity.
Returns
-------
uuid.UUID
View the python documentation for this class for detailed usage.
Raises
------
DDSException
"""
guid = dds_c_t.guid()
ret = self._get_guid(self._ref, ct.byref(guid))
if ret == 0:
return guid.as_python_guid()
raise DDSException(ret, f"Occurred when getting the GUID for {repr(self)}")
guid: uuid.UUID = property(get_guid)
[docs] def read_status(self, mask: int = None) -> int:
"""Read the status bits set on this Entity. You can build a mask by using :class:`DDSStatus`.
Parameters
----------
mask
The :class:`DDSStatus` mask. If not supplied the mask is used that was set on this Entity using set_status_mask.
Returns
-------
int
The :class:`DDSStatus` bits that were set.
Raises
------
DDSException
"""
status = ct.c_uint32()
ret = self._read_status(
self._ref,
ct.byref(status),
ct.c_uint32(mask) if mask else self.get_status_mask(),
)
if ret == 0:
return status.value
raise DDSException(ret, f"Occurred when reading the status for {repr(self)}")
[docs] def take_status(self, mask: int = None) -> int:
"""Take the status bits set on this Entity, after which they will be set to 0 again.
You can build a mask by using :class:`DDSStatus`.
Parameters
----------
mask
The :class:`DDSStatus` mask. If not supplied the mask is used that was set on this Entity using set_status_mask.
Returns
-------
int
The :class:`DDSStatus` bits that were set.
Raises
------
DDSException
"""
status = ct.c_uint32()
ret = self._take_status(
self._ref,
ct.byref(status),
ct.c_uint32(mask) if mask else self.get_status_mask(),
)
if ret == 0:
return status.value
raise DDSException(ret, f"Occurred when taking the status for {repr(self)}")
[docs] def get_status_changes(self) -> int:
"""Get all status changes since the last read_status() or take_status().
Returns
-------
int
The :class:`DDSStatus` bits that were set.
Raises
------
DDSException
"""
status = ct.c_uint32()
ret = self._get_status_changes(self._ref, ct.byref(status))
if ret == 0:
return status.value
raise DDSException(
ret, f"Occurred when getting the status changes for {repr(self)}"
)
[docs] def get_status_mask(self) -> int:
"""Get the status mask for this Entity.
Returns
-------
int
The :class:`DDSStatus` bits that are enabled.
Raises
------
DDSException
"""
mask = ct.c_uint32()
ret = self._get_status_mask(self._ref, ct.byref(mask))
if ret == 0:
return mask.value
raise DDSException(
ret, f"Occurred when getting the status mask for {repr(self)}"
)
[docs] def set_status_mask(self, mask: int) -> None:
"""Set the status mask for this Entity. By default the mask is 0. Only the status changes
for the bits in this mask are tracked on this entity.
Parameters
----------
mask : int
The :class:`DDSStatus` bits to track.
Raises
------
DDSException
"""
ret = self._set_status_mask(self._ref, ct.c_uint32(mask))
if ret == 0:
return
raise DDSException(
ret, f"Occurred when setting the status mask for {repr(self)}"
)
status_mask: int = property(get_status_mask, set_status_mask)
[docs] def get_qos(self) -> Qos:
"""Get the :class:`Qos` associated with this entity. Note that the object returned is not
the same python object that you used to set the :class:`Qos` on this object. Modifications to the :class:`Qos` object
that is returned does **not** modify the Qos of the Entity.
Returns
-------
Qos
The :class:`Qos` object associated with this entity.
Raises
------
DDSException
"""
cqos = _CQos.cqos_create()
ret = self._get_qos(self._ref, cqos)
if ret == 0:
qos = _CQos.cqos_to_qos(cqos)
_CQos.cqos_destroy(cqos)
return qos
_CQos.cqos_destroy(cqos)
raise DDSException(
ret, f"Occurred when getting the Qos Policies for {repr(self)}"
)
[docs] def set_qos(self, qos: Qos) -> None:
"""Set :class:`Qos` policies on this entity. Note, only a limited number of :class:`Qos` policies can be set after
the object is created (:class:`Policy.LatencyBudget` and :class:`Policy.OwnershipStrength`). Any policies not set
explicitly in the supplied :class:`Qos` remain unchanged.
Parameters
----------
qos : Qos
The :class:`Qos` to apply to this entity.
Raises
------
DDSException
If you pass an immutable policy or cause the total collection of qos policies to become inconsistent
an exception will be raised.
"""
cqos = _CQos.qos_to_cqos(qos)
ret = self._set_qos(self._ref, cqos)
_CQos.cqos_destroy(cqos)
if ret == 0:
return
raise DDSException(
ret, f"Occurred when setting the Qos Policies for {repr(self)}"
)
[docs] def get_listener(self) -> "Listener":
"""Return a listener with the right methods set. Modifying the returned listener object does not modify
this entity, you will have to call set_listener() with the changed object.
Returns
-------
Listener
A listener with which you can add additional callbacks.
"""
return self._listener.copy() if self._listener else Listener()
[docs] def set_listener(self, listener: Optional["Listener"]) -> None:
"""Update the listener for this object. If a listener already exist for this object only the fields you explicitly
have set on your new listener are overwritten. Passing None will remove this entity's Listener.
Future changes to the passed Listener object will not affect the Listener associated with this Entity.
Parameters
----------
listener :
The listener object to use, or None to remove the current listener from this Entity.
Raises
------
DDSException
"""
if listener is not None:
if self._listener is not None:
if self._listener != listener:
listener.copy_to(self._listener)
else:
self._listener = listener.copy()
ref = self._listener._ref
else:
ref = None
self._listener = None
ret = self._set_listener(self._ref, ref)
if ret == 0:
return
raise DDSException(ret, f"Occurred when setting the Listener for {repr(self)}")
[docs] def get_parent(self) -> Optional["Entity"]:
"""Get the parent entity associated with this entity. A ``Domain`` object is the only object without parent,
but if the domain is not created through the Python API this call won't find it
and return None from the DomainParticipant.
Returns
-------
Optional[Entity]
The parent of this entity. This would be the Subscriber for a DataReader, DomainParticipant for a Topic etc.
Raises
------
DDSException
"""
ret = self._get_parent(self._ref)
if ret > 0:
return self.get_entity(ret)
if ret is None or ret == 0:
return None
raise DDSException(ret, f"Occurred when getting the parent of {repr(self)}")
parent: Optional["Entity"] = property(get_parent)
[docs] def get_participant(self) -> Optional["cyclonedds.domain.DomainParticipant"]:
"""Get the domain participant for this entity. This should work on all valid Entity objects except a Domain.
Returns
-------
Optional[cyclonedds.domain.DomainParticipant]
Only fails for a Domain object.
Raises
------
DDSException
"""
ret = self._get_participant(self._ref)
if ret > 0:
return self.get_entity(ret)
if ret is None or ret == 0:
return None
raise DDSException(
ret, f"Occurred when getting the participant of {repr(self)}"
)
participant: Optional["cyclonedds.domain.DomainParticipant"] = property(get_participant)
[docs] def get_children(self) -> List["Entity"]:
"""Get the list of children of this entity. For example, the list of datareaders belonging to a subscriber.
Opposite of parent.
Returns
-------
List[Entity]
All the entities considered children of this entity.
Raises
------
DDSException
"""
num_children = self._get_children(self._ref, None, 0)
if num_children < 0:
raise DDSException(
num_children,
f"Occurred when getting the number of children of {repr(self)}",
)
if num_children == 0:
return []
children_list = (dds_c_t.entity * int(num_children))()
children_list_pt = ct.cast(children_list, ct.POINTER(dds_c_t.entity))
ret = self._get_children(self._ref, children_list_pt, num_children)
if ret >= 0:
return [self.get_entity(children_list[i]) for i in range(ret)]
raise DDSException(ret, f"Occurred when getting the children of {repr(self)}")
children: List["Entity"] = property(get_children)
[docs] def get_domain_id(self) -> int:
"""Get the id of the domain this entity resides in.
Returns
-------
int
The domain id.
Raises
------
DDSException
"""
domainid = dds_c_t.domainid()
ret = self._get_domainid(self._ref, ct.byref(domainid))
if ret == 0:
return domainid.value
raise DDSException(ret, f"Occurred when getting the domainid of {repr(self)}")
domain_id: int = property(get_domain_id)
[docs] def begin_coherent(self) -> None:
"""Begin coherent publishing or begin accessing a coherent set in a Subscriber.
This can only be invoked on Publishers, Subscribers, DataWriters and DataReaders.
Invoking on a DataWriter or DataReader behaves as if it was invoked on its parent
Publisher or Subscriber respectively.
"""
ret = self._begin_coherent(self._ref)
if ret < 0:
raise DDSException(ret, f"Occurred when beginning coherent on {repr(self)}")
[docs] def end_coherent(self) -> None:
"""End coherent publishing or end accessing a coherent set in a Subscriber.
This can only be invoked on Publishers, Subscribers, DataWriters and DataReaders.
Invoking on a DataWriter or DataReader behaves as if it was invoked on its parent
Publisher or Subscriber respectively.
"""
ret = self._end_coherent(self._ref)
if ret < 0:
raise DDSException(ret, f"Occurred when ending coherent on {repr(self)}")
[docs] @classmethod
def get_entity(cls, entity_id) -> Optional["Entity"]:
"""Turn a CycloneDDS C Entity id into a Python object. You shouldn't need this."""
return cls._entities.get(entity_id)
@classmethod
def _init_from_retcode(cls, code):
entity = cls.__new__(cls)
Entity.__init__(entity, code)
return entity
@c_call("dds_delete")
def _delete(self, entity: dds_c_t.entity) -> None:
pass
@c_call("dds_get_subscriber")
def _get_subscriber(self, entity: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_get_datareader")
def _get_datareader(self, entity: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_get_publisher")
def _get_publisher(self, entity: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_get_instance_handle")
def _get_instance_handle(
self, entity: dds_c_t.entity, handle: ct.POINTER(dds_c_t.instance_handle)
) -> dds_c_t.returnv:
pass
@c_call("dds_get_guid")
def _get_guid(
self, entity: dds_c_t.entity, guid: ct.POINTER(dds_c_t.guid)
) -> dds_c_t.returnv:
pass
@c_call("dds_read_status")
def _read_status(
self, entity: dds_c_t.entity, status: ct.POINTER(ct.c_uint32), mask: ct.c_uint32
) -> dds_c_t.returnv:
pass
@c_call("dds_take_status")
def _take_status(
self, entity: dds_c_t.entity, status: ct.POINTER(ct.c_uint32), mask: ct.c_uint32
) -> dds_c_t.returnv:
pass
@c_call("dds_get_status_changes")
def _get_status_changes(
self, entity: dds_c_t.entity, status: ct.POINTER(ct.c_uint32)
) -> dds_c_t.returnv:
pass
@c_call("dds_get_status_mask")
def _get_status_mask(
self, entity: dds_c_t.entity, mask: ct.POINTER(ct.c_uint32)
) -> dds_c_t.returnv:
pass
@c_call("dds_set_status_mask")
def _set_status_mask(
self, entity: dds_c_t.entity, mask: ct.c_uint32
) -> dds_c_t.returnv:
pass
@c_call("dds_get_qos")
def _get_qos(self, entity: dds_c_t.entity, qos: dds_c_t.qos_p) -> dds_c_t.returnv:
pass
@c_call("dds_set_qos")
def _set_qos(self, entity: dds_c_t.entity, qos: dds_c_t.qos_p) -> dds_c_t.returnv:
pass
@c_call("dds_get_listener")
def _get_listener(
self, entity: dds_c_t.entity, listener: dds_c_t.listener_p
) -> dds_c_t.returnv:
pass
@c_call("dds_set_listener")
def _set_listener(
self, entity: dds_c_t.entity, listener: dds_c_t.listener_p
) -> dds_c_t.returnv:
pass
@c_call("dds_get_parent")
def _get_parent(self, entity: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_get_participant")
def _get_participant(self, entity: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_get_children")
def _get_children(
self,
entity: dds_c_t.entity,
children_list: ct.POINTER(dds_c_t.returnv),
size: ct.c_size_t,
) -> dds_c_t.returnv:
pass
@c_call("dds_get_domainid")
def _get_domainid(
self, entity: dds_c_t.entity, domainid: ct.POINTER(dds_c_t.domainid)
) -> dds_c_t.returnv:
pass
@c_call("dds_begin_coherent")
def _begin_coherent(self, entity: dds_c_t.entity) -> dds_c_t.returnv:
pass
@c_call("dds_end_coherent")
def _end_coherent(self, entity: dds_c_t.entity) -> dds_c_t.returnv:
pass
def __repr__(self) -> str:
ref = None
try:
ref = self._ref
except Exception:
pass
return f"<Entity, type={self.__class__.__module__}.{self.__class__.__name__}, addr={hex(id(self))}, id={ref}>"
_inconsistent_topic_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.inconsistent_topic_status, ct.c_void_p]
)
_data_available_fn = c_callable(None, [dds_c_t.entity, ct.c_void_p])
_liveliness_lost_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.liveliness_lost_status, ct.c_void_p]
)
_liveliness_changed_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.liveliness_changed_status, ct.c_void_p]
)
_offered_deadline_missed_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.offered_deadline_missed_status, ct.c_void_p]
)
_offered_incompatible_qos_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.offered_incompatible_qos_status, ct.c_void_p]
)
_data_on_readers_fn = c_callable(None, [dds_c_t.entity, ct.c_void_p])
_on_sample_lost_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.sample_lost_status, ct.c_void_p]
)
_on_sample_rejected_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.sample_rejected_status, ct.c_void_p]
)
_on_requested_deadline_missed_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.requested_deadline_missed_status, ct.c_void_p]
)
_on_requested_incompatible_qos_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.requested_incompatible_qos_status, ct.c_void_p]
)
_on_publication_matched_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.publication_matched_status, ct.c_void_p]
)
_on_subscription_matched_fn = c_callable(
None, [dds_c_t.entity, dds_c_t.subscription_matched_status, ct.c_void_p]
)
def _is_override(func):
obj = func.__self__
if type(obj) == Listener:
return False
parent_method = getattr(super(type(obj), obj), func.__name__)
return func.__func__ != parent_method.__func__
[docs]class Listener(DDS):
"""Listeners are callback containers for entities."""
[docs] def __init__(self, **kwargs):
"""Create a Listener object. The initializer takes override function lambdas.
Please note that all listener callbacks are dispatched synchronously from DDS receive thread(s). You can get away with doing
tiny amounts of processing in these callback methods, but aquiring the Python GIL from the DDS receive thread will severely
hurt your DDS performance. Furthermore, deleting entities or writing data inside listener callbacks can get you into deadlocks.
Parameters
----------
on_data_available : Callable
Set on_data_available callback.
on_inconsistent_topic : Callable
Set on_inconsistent_topic callback.
on_liveliness_lost : Callable
Set on_liveliness_lost callback.
on_liveliness_changed : Callable
Set on_liveliness_changed callback.
on_offered_deadline_missed : Callable
Set on_offered_deadline_missed callback.
on_offered_incompatible_qos : Callable
Set on_offered_incompatible_qos callback.
on_data_on_readers : Callable
Set on_data_on_readers callback.
on_sample_lost : Callable
Set on_sample_lost callback.
on_sample_rejected : Callable
Set on_sample_rejected callback.
on_requested_deadline_missed : Callable
Set on_requested_deadline_missed callback.
on_requested_incompatible_qos : Callable
Set on_requested_incompatible_qos callback.
on_publication_matched : Callable
Set on_publication_matched callback.
on_subscription_matched : Callable
Set on_subscription_matched callback.
"""
super().__init__(self._create_listener(None))
self._set_functors = {}
if _is_override(self.on_data_available):
self.set_on_data_available(self.on_data_available)
self._set_functors["on_data_available"] = self.on_data_available
if _is_override(self.on_inconsistent_topic):
self.set_on_inconsistent_topic(self.on_inconsistent_topic)
self._set_functors["on_inconsistent_topic"] = self.on_inconsistent_topic
if _is_override(self.on_liveliness_lost):
self.set_on_liveliness_lost(self.on_liveliness_lost)
self._set_functors["on_liveliness_lost"] = self.on_liveliness_lost
if _is_override(self.on_liveliness_changed):
self.set_on_liveliness_changed(self.on_liveliness_changed)
self._set_functors["on_liveliness_changed"] = self.on_liveliness_changed
if _is_override(self.on_offered_deadline_missed):
self.set_on_offered_deadline_missed(self.on_offered_deadline_missed)
self._set_functors[
"on_offered_deadline_missed"
] = self.on_offered_deadline_missed
if _is_override(self.on_offered_incompatible_qos):
self.set_on_offered_incompatible_qos(self.on_offered_incompatible_qos)
self._set_functors[
"on_offered_incompatible_qos"
] = self.on_offered_incompatible_qos
if _is_override(self.on_data_on_readers):
self.set_on_data_on_readers(self.on_data_on_readers)
self._set_functors["on_data_on_readers"] = self.on_data_on_readers
if _is_override(self.on_sample_lost):
self.set_on_sample_lost(self.on_sample_lost)
self._set_functors["on_sample_lost"] = self.on_sample_lost
if _is_override(self.on_sample_rejected):
self.set_on_sample_rejected(self.on_sample_rejected)
self._set_functors["on_sample_rejected"] = self.on_sample_rejected
if _is_override(self.on_requested_deadline_missed):
self.set_on_requested_deadline_missed(self.on_requested_deadline_missed)
self._set_functors[
"on_requested_deadline_missed"
] = self.on_requested_deadline_missed
if _is_override(self.on_requested_incompatible_qos):
self.set_on_requested_incompatible_qos(self.on_requested_incompatible_qos)
self._set_functors[
"on_requested_incompatible_qos"
] = self.on_requested_incompatible_qos
if _is_override(self.on_publication_matched):
self.set_on_publication_matched(self.on_publication_matched)
self._set_functors["on_publication_matched"] = self.on_publication_matched
if _is_override(self.on_subscription_matched):
self.set_on_subscription_matched(self.on_subscription_matched)
self._set_functors["on_subscription_matched"] = self.on_subscription_matched
self.setters = {
"on_data_available": self.set_on_data_available,
"on_inconsistent_topic": self.set_on_inconsistent_topic,
"on_liveliness_lost": self.set_on_liveliness_lost,
"on_liveliness_changed": self.set_on_liveliness_changed,
"on_offered_deadline_missed": self.set_on_offered_deadline_missed,
"on_offered_incompatible_qos": self.set_on_offered_incompatible_qos,
"on_data_on_readers": self.set_on_data_on_readers,
"on_sample_lost": self.set_on_sample_lost,
"on_sample_rejected": self.set_on_sample_rejected,
"on_requested_deadline_missed": self.set_on_requested_deadline_missed,
"on_requested_incompatible_qos": self.set_on_requested_incompatible_qos,
"on_publication_matched": self.set_on_publication_matched,
"on_subscription_matched": self.set_on_subscription_matched,
}
for name, value in kwargs.items():
if name not in self.setters:
raise ArgumentError(f"Invalid listener attribute '{name}'")
self.setters[name](value)
def __del__(self):
self._delete_listener(self._ref)
[docs] def reset(self) -> None:
self._reset_listener(self._ref)
[docs] def copy(self) -> "Listener":
listener = Listener(**self._set_functors)
return listener
[docs] def copy_to(self, listener: "Listener") -> None:
for name, functor in self._set_functors.items():
listener.setters[name](functor)
[docs] def merge(self, listener: "Listener") -> None:
"""
Copies any configured (non-default) callbacks from the given `listener` to self, replacing existing callbacks
already configured on this listener.
"""
listener.copy_to(self)
[docs] def on_inconsistent_topic(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.inconsistent_topic_status,
) -> None:
pass
[docs] def set_on_inconsistent_topic(
self, callable: Callable[["cyclonedds.sub.DataReader"], None]
):
self.on_inconsistent_topic = callable
if callable is None:
self._set_inconsistent_topic(self._ref, None)
del self._set_functors["on_inconsistent_topic"]
else:
self._set_functors["on_inconsistent_topic"] = self.on_inconsistent_topic
def call(topic, status, arg):
self.on_inconsistent_topic(Entity.get_entity(topic), status)
self._on_inconsistent_topic = _inconsistent_topic_fn(call)
self._set_inconsistent_topic(self._ref, self._on_inconsistent_topic)
[docs] def on_data_available(self, reader: "cyclonedds.sub.DataReader") -> None:
pass
[docs] def set_on_data_available(
self, callable: Callable[["cyclonedds.sub.DataReader"], None]
):
self.on_data_available = callable
if callable is None:
self._set_data_available(self._ref, None)
del self._set_functors["on_data_available"]
else:
self._set_functors["on_data_available"] = self.on_data_available
def call(reader, arg):
self.on_data_available(Entity.get_entity(reader))
self._on_data_available = _data_available_fn(call)
self._set_data_available(self._ref, self._on_data_available)
[docs] def on_liveliness_lost(
self,
writer: "cyclonedds.pub.DataWriter",
status: dds_c_t.liveliness_lost_status,
) -> None:
pass
[docs] def set_on_liveliness_lost(
self,
callable: Callable[
["cyclonedds.pub.DataWriter", dds_c_t.liveliness_lost_status], None
],
):
self.on_liveliness_lost = callable
if callable is None:
self._set_liveliness_lost(self._ref, None)
del self._set_functors["on_liveliness_lost"]
else:
self._set_functors["on_liveliness_lost"] = self.on_liveliness_lost
def call(writer, status, arg):
self.on_liveliness_lost(Entity.get_entity(writer), status)
self._on_liveliness_lost = _liveliness_lost_fn(call)
self._set_liveliness_lost(self._ref, self._on_liveliness_lost)
[docs] def on_liveliness_changed(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.liveliness_changed_status,
) -> None:
pass
[docs] def set_on_liveliness_changed(
self,
callable: Callable[
["cyclonedds.sub.DataReader", dds_c_t.liveliness_changed_status], None
],
):
self.on_liveliness_changed = callable
if callable is None:
self._set_liveliness_changed(self._ref, None)
del self._set_functors["on_liveliness_changed"]
else:
self._set_functors["on_liveliness_changed"] = self.on_liveliness_changed
def call(reader, status, arg):
self.on_liveliness_changed(Entity.get_entity(reader), status)
self._on_liveliness_changed = _liveliness_changed_fn(call)
self._set_liveliness_changed(self._ref, self._on_liveliness_changed)
[docs] def on_offered_deadline_missed(
self,
writer: "cyclonedds.pub.DataWriter",
status: dds_c_t.offered_deadline_missed_status,
) -> None:
pass
[docs] def set_on_offered_deadline_missed(
self,
callable: Callable[
["cyclonedds.pub.DataWriter", dds_c_t.offered_deadline_missed_status], None
],
):
self.on_offered_deadline_missed = callable
if callable is None:
self._set_on_offered_deadline_missed(self._ref, None)
del self._set_functors["on_offered_deadline_missed"]
else:
self._set_functors[
"on_offered_deadline_missed"
] = self.on_offered_deadline_missed
def call(writer, status, arg):
self.on_offered_deadline_missed(Entity.get_entity(writer), status)
self._on_offered_deadline_missed = _offered_deadline_missed_fn(call)
self._set_on_offered_deadline_missed(
self._ref, self._on_offered_deadline_missed
)
[docs] def on_offered_incompatible_qos(
self,
writer: "cyclonedds.pub.DataWriter",
status: dds_c_t.offered_incompatible_qos_status,
) -> None:
pass
[docs] def set_on_offered_incompatible_qos(
self,
callable: Callable[
["cyclonedds.pub.DataWriter", dds_c_t.offered_incompatible_qos_status], None
],
):
self.on_offered_incompatible_qos = callable
if callable is None:
self._set_on_offered_incompatible_qos(self._ref, None)
del self._set_functors["on_offered_incompatible_qos"]
else:
self._set_functors[
"on_offered_incompatible_qos"
] = self.on_offered_incompatible_qos
def call(writer, status, arg):
self.on_offered_incompatible_qos(Entity.get_entity(writer), status)
self._on_offered_incompatible_qos = _offered_incompatible_qos_fn(call)
self._set_on_offered_incompatible_qos(
self._ref, self._on_offered_incompatible_qos
)
[docs] def on_data_on_readers(self, subscriber: "cyclonedds.sub.Subscriber") -> None:
pass
[docs] def set_on_data_on_readers(
self, callable: Callable[["cyclonedds.sub.Subscriber"], None]
):
self.on_data_on_readers = callable
if callable is None:
self._set_data_available(self._ref, None)
del self._set_functors["on_data_on_readers"]
else:
self._set_functors["on_data_on_readers"] = self.on_data_on_readers
def call(subscriber, arg):
self.on_data_on_readers(Entity.get_entity(subscriber))
self._on_data_on_readers = _data_on_readers_fn(call)
self._set_on_data_on_readers(self._ref, self._on_data_on_readers)
[docs] def on_sample_lost(
self, writer: "cyclonedds.pub.DataWriter", status: dds_c_t.sample_lost_status
) -> None:
pass
[docs] def set_on_sample_lost(
self,
callable: Callable[
["cyclonedds.pub.DataWriter", dds_c_t.sample_lost_status], None
],
):
self.on_sample_lost = callable
if callable is None:
self._set_on_sample_lost(self._ref, None)
del self._set_functors["on_sample_lost"]
else:
self._set_functors["on_sample_lost"] = self.on_sample_lost
def call(writer, status, arg):
self.on_sample_lost(Entity.get_entity(writer), status)
self._on_sample_lost = _on_sample_lost_fn(call)
self._set_on_sample_lost(self._ref, self._on_sample_lost)
[docs] def on_sample_rejected(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.sample_rejected_status,
) -> None:
pass
[docs] def set_on_sample_rejected(
self,
callable: Callable[
["cyclonedds.sub.DataReader", dds_c_t.sample_rejected_status], None
],
):
self.on_sample_rejected = callable
if callable is None:
self._set_on_sample_rejected(self._ref, None)
del self._set_functors["on_sample_rejected"]
else:
self._set_functors["on_sample_rejected"] = self.on_sample_rejected
def call(writer, status, arg):
self.on_sample_rejected(Entity.get_entity(writer), status)
self._on_sample_rejected = _on_sample_rejected_fn(call)
self._set_on_sample_rejected(self._ref, self._on_sample_rejected)
[docs] def on_requested_deadline_missed(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.requested_deadline_missed_status,
) -> None:
pass
[docs] def set_on_requested_deadline_missed(
self,
callable: Callable[
["cyclonedds.sub.DataReader", dds_c_t.requested_deadline_missed_status],
None,
],
):
self.on_requested_deadline_missed = callable
if callable is None:
self._set_on_requested_deadline_missed(self._ref, None)
del self._set_functors["on_requested_deadline_missed"]
else:
self._set_functors[
"on_requested_deadline_missed"
] = self.on_requested_deadline_missed
def call(reader, status, arg):
self.on_requested_deadline_missed(Entity.get_entity(reader), status)
self._on_requested_deadline_missed = _on_requested_deadline_missed_fn(call)
self._set_on_requested_deadline_missed(
self._ref, self._on_requested_deadline_missed
)
[docs] def on_requested_incompatible_qos(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.requested_incompatible_qos_status,
) -> None:
pass
[docs] def set_on_requested_incompatible_qos(
self,
callable: Callable[
["cyclonedds.sub.DataReader", dds_c_t.requested_incompatible_qos_status],
None,
],
):
self.on_requested_incompatible_qos = callable
if callable is None:
self._set_on_requested_incompatible_qos(self._ref, None)
del self._set_functors["on_requested_incompatible_qos"]
else:
self._set_functors[
"on_requested_incompatible_qos"
] = self.on_requested_incompatible_qos
def call(reader, status, arg):
self.on_requested_incompatible_qos(Entity.get_entity(reader), status)
self._on_requested_incompatible_qos = _on_requested_incompatible_qos_fn(
call
)
self._set_on_requested_incompatible_qos(
self._ref, self._on_requested_incompatible_qos
)
[docs] def on_publication_matched(
self,
writer: "cyclonedds.pub.DataWriter",
status: dds_c_t.publication_matched_status,
) -> None:
pass
[docs] def set_on_publication_matched(
self,
callable: Callable[
["cyclonedds.pub.DataWriter", dds_c_t.publication_matched_status], None
],
):
self.on_publication_matched = callable
if callable is None:
self._set_on_publication_matched(self._ref, None)
del self._set_functors["on_publication_matched"]
else:
self._set_functors["on_publication_matched"] = self.on_publication_matched
def call(writer, status, arg):
self.on_publication_matched(Entity.get_entity(writer), status)
self._on_publication_matched = _on_publication_matched_fn(call)
self._set_on_publication_matched(self._ref, self._on_publication_matched)
[docs] def on_subscription_matched(
self,
reader: "cyclonedds.sub.DataReader",
status: dds_c_t.subscription_matched_status,
) -> None:
pass
[docs] def set_on_subscription_matched(
self,
callable: Callable[
["cyclonedds.sub.DataReader", dds_c_t.subscription_matched_status], None
],
):
self.on_subscription_matched = callable
if callable is None:
self._set_on_subscription_matched(self._ref, None)
del self._set_functors["on_subscription_matched"]
else:
self._set_functors["on_subscription_matched"] = self.on_subscription_matched
def call(reader, status, arg):
self.on_subscription_matched(Entity.get_entity(reader), status)
self._on_subscription_matched = _on_subscription_matched_fn(call)
self._set_on_subscription_matched(self._ref, self._on_subscription_matched)
@c_call("dds_create_listener")
def _create_listener(self, arg: ct.c_void_p) -> dds_c_t.listener_p:
pass
@c_call("dds_reset_listener")
def _reset_listener(self, listener: dds_c_t.listener_p) -> None:
pass
@c_call("dds_copy_listener")
def _copy_listener(self, dst: dds_c_t.listener_p, src: dds_c_t.listener_p) -> None:
pass
@c_call("dds_merge_listener")
def _merge_listener(self, dst: dds_c_t.listener_p, src: dds_c_t.listener_p) -> None:
pass
@c_call("dds_lset_inconsistent_topic")
def _set_inconsistent_topic(
self, listener: dds_c_t.listener_p, callback: _inconsistent_topic_fn
) -> None:
pass
@c_call("dds_lset_data_available")
def _set_data_available(
self, listener: dds_c_t.listener_p, callback: _data_available_fn
) -> None:
pass
@c_call("dds_lset_liveliness_lost")
def _set_liveliness_lost(
self, listener: dds_c_t.listener_p, callback: _liveliness_lost_fn
) -> None:
pass
@c_call("dds_lset_liveliness_changed")
def _set_liveliness_changed(
self, listener: dds_c_t.listener_p, callback: _liveliness_changed_fn
) -> None:
pass
@c_call("dds_lset_offered_deadline_missed")
def _set_on_offered_deadline_missed(
self, listener: dds_c_t.listener_p, callback: _offered_deadline_missed_fn
) -> None:
pass
@c_call("dds_lset_offered_incompatible_qos")
def _set_on_offered_incompatible_qos(
self, listener: dds_c_t.listener_p, callback: _offered_incompatible_qos_fn
) -> None:
pass
@c_call("dds_lset_data_on_readers")
def _set_on_data_on_readers(
self, listener: dds_c_t.listener_p, callback: _data_on_readers_fn
) -> None:
pass
@c_call("dds_lset_sample_lost")
def _set_on_sample_lost(
self, listener: dds_c_t.listener_p, callback: _on_sample_lost_fn
) -> None:
pass
@c_call("dds_lset_sample_rejected")
def _set_on_sample_rejected(
self, listener: dds_c_t.listener_p, callback: _on_sample_rejected_fn
) -> None:
pass
@c_call("dds_lset_requested_deadline_missed")
def _set_on_requested_deadline_missed(
self, listener: dds_c_t.listener_p, callback: _on_requested_deadline_missed_fn
) -> None:
pass
@c_call("dds_lset_requested_incompatible_qos")
def _set_on_requested_incompatible_qos(
self, listener: dds_c_t.listener_p, callback: _on_requested_incompatible_qos_fn
) -> None:
pass
@c_call("dds_lset_publication_matched")
def _set_on_publication_matched(
self, listener: dds_c_t.listener_p, callback: _on_publication_matched_fn
) -> None:
pass
@c_call("dds_lset_subscription_matched")
def _set_on_subscription_matched(
self, listener: dds_c_t.listener_p, callback: _on_subscription_matched_fn
) -> None:
pass
@c_call("dds_delete_listener")
def _delete_listener(self, listener: dds_c_t.listener_p) -> None:
pass
[docs]class SampleState:
"""SampleState constants for building condition masks. This class is static and
there should never be a need to instantiate it. It operates on the state of
a single sample.
Attributes
----------
Read: int
Only consider samples that have already been read.
NotRead: int
Only consider unread samples.
Any: int
Ignore the read/unread state of samples.
"""
Read: int = 1
NotRead: int = 2
Any: int = 3
[docs]class ViewState:
"""ViewState constants for building condition masks. This class is static and
there should never be a need to instantiate it. It operates on the state of
an instance.
Attributes
----------
New: int
Only consider samples belonging to newly created instances.
Old: int
Only consider samples belonging to previously created instances.
Any: int
Ignore the fact whether instances are new or not.
"""
New: int = 4
Old: int = 8
Any: int = 12
[docs]class InstanceState:
"""InstanceState constants for building condition masks. This class is static and
there should never be a need to instantiate it. It operates on the state of
an instance.
Attributes
----------
Alive: int
Only consider samples belonging to an alive instance (it has alive writer(s))
NotAliveDisposed: int
Only consider samples belonging to an instance that is not alive because it was actively disposed.
NotAliveNoWriters: int
Only consider samples belonging to an instance that is not alive because it has no writers.
Any: int
Ignore the liveliness status of the instance.
"""
Alive: int = 16
NotAliveDisposed: int = 32
NotAliveNoWriters: int = 64
Any: int = 112
[docs]class DDSStatus:
"""DDSStatus contains constants to build status masks. It is static and should never
need to be instantiated.
Attributes
----------
InconsistentTopic: int
OfferedDeadlineMissed: int
RequestedDeadlineMissed: int
OfferedIncompatibleQos: int
RequestedIncompatibleQos: int
SampleLost: int
SampleRejected: int
DataOnReaders: int
DataAvailable: int
LivelinessLost: int
LivelinessChanged: int
PublicationMatched: int
SubscriptionMatched: int
All = (1 << 14) - 1
"""
InconsistentTopic = 1 << 0
OfferedDeadlineMissed = 1 << 1
RequestedDeadlineMissed = 1 << 2
OfferedIncompatibleQos = 1 << 3
RequestedIncompatibleQos = 1 << 4
SampleLost = 1 << 5
SampleRejected = 1 << 6
DataOnReaders = 1 << 7
DataAvailable = 1 << 8
LivelinessLost = 1 << 9
LivelinessChanged = 1 << 10
PublicationMatched = 1 << 11
SubscriptionMatched = 1 << 12
All: int = (1 << 13) - 1
class _Condition(Entity):
"""Utility class to implement common methods between Read and Queryconditions"""
def get_mask(self) -> int:
mask: ct.c_uint32 = ct.c_uint32()
ret = self._get_mask(self._ref, ct.byref(mask))
if ret == 0:
return mask.value
raise DDSException(ret, f"Occurred when obtaining the mask of {repr(self)}")
def is_triggered(self) -> bool:
ret = self._triggered(self._ref)
if ret < 0:
raise DDSException(
ret, f"Occurred when checking if {repr(self)} was triggered"
)
return ret == 1
triggered: bool = property(is_triggered)
@c_call("dds_get_mask")
def _get_mask(
self, condition: dds_c_t.entity, mask: ct.POINTER(ct.c_uint32)
) -> dds_c_t.returnv:
pass
@c_call("dds_triggered")
def _triggered(self, condition: dds_c_t.entity) -> dds_c_t.returnv:
pass
[docs]class ReadCondition(_Condition):
"""Condition that triggers when new data is available to read according to the mask.
Construct a mask using InstanceState, ViewState and SampleState.
"""
[docs] def __init__(self, reader: "cyclonedds.sub.DataReader", mask: int) -> None:
"""Construct a ReadCondition."""
self.reader = reader
self.mask = mask
super().__init__(self._create_readcondition(reader._ref, mask))
@c_call("dds_create_readcondition")
def _create_readcondition(
self, reader: dds_c_t.entity, mask: ct.c_uint32
) -> dds_c_t.entity:
pass
_querycondition_filter_fn = c_callable(ct.c_bool, [ct.c_void_p])
[docs]class QueryCondition(_Condition):
"""Condition that triggers when new data is available to read according to the mask.
Construct a mask using InstanceState, ViewState and SampleState. Add a filter function
that receives the sample and returns a boolean whether to accept or reject the sample.
"""
[docs] def __init__(
self,
reader: "cyclonedds.sub.DataReader",
mask: int,
filter: Callable[[Any], bool],
) -> None:
"""Construct a QueryCondition."""
self.reader = reader
self.mask = mask
self.filter = filter
def call(sample_pt):
try:
sample_info = ct.cast(sample_pt, ct.POINTER(dds_c_t.sample_buffer))[0]
array_type = ct.c_ubyte * sample_info.len
array = ct.cast(sample_info.buf, ct.POINTER(array_type))
contents = array.contents[:]
data = self.reader._topic.data_type.deserialize(bytes(contents))
return self.filter(data)
except Exception: # Block any python exception from going into C
return False
self._filter = _querycondition_filter_fn(call)
super().__init__(self._create_querycondition(reader._ref, mask, self._filter))
@c_call("dds_create_querycondition")
def _create_querycondition(
self,
reader: dds_c_t.entity,
mask: ct.c_uint32,
filter: _querycondition_filter_fn,
) -> dds_c_t.entity:
pass
[docs]class GuardCondition(Entity):
"""A GuardCondition is a manually triggered condition that can be added to a :class:`WaitSet<cyclonedds.core.WaitSet>`."""
[docs] def __init__(self, domain_participant: "cyclonedds.domain.DomainParticipant"):
"""Initialize a GuardCondition
Parameters
----------
domain_participant: DomainParticipant
The domain in which the GuardCondition should be active.
"""
super().__init__(self._create_guardcondition(domain_participant._ref))
[docs] def set(self, triggered: bool) -> None:
"""Set the status of the GuardCondition to triggered or untriggered.
Parameters
----------
triggered: bool
Wether to trigger this condition.
Returns
-------
None
Raises
------
DDSException
"""
ret = self._set_guardcondition(self._ref, triggered)
if ret < 0:
raise DDSException(ret, f"Occurred when calling set on {repr(self)}")
[docs] def read(self) -> bool:
"""Read the status of the GuardCondition.
Returns
----------
bool
Wether this condition is triggered.
Raises
------
DDSException
"""
triggered = ct.c_bool()
ret = self._read_guardcondition(self._ref, ct.byref(triggered))
if ret < 0:
raise DDSException(ret, f"Occurred when calling read on {repr(self)}")
return bool(triggered)
[docs] def take(self) -> bool:
"""Take the status of the GuardCondition. If it is True it will be False on the next call.
Returns
----------
bool
Whether this condition is triggered.
Raises
------
DDSException
"""
triggered = ct.c_bool()
ret = self._take_guardcondition(self._ref, ct.byref(triggered))
if ret < 0:
raise DDSException(ret, f"Occurred when calling read on {repr(self)}")
return bool(triggered)
@c_call("dds_create_guardcondition")
def _create_guardcondition(self, participant: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_set_guardcondition")
def _set_guardcondition(
self, guardcond: dds_c_t.entity, triggered: ct.c_bool
) -> dds_c_t.returnv:
pass
@c_call("dds_read_guardcondition")
def _read_guardcondition(
self, guardcond: dds_c_t.entity, triggered: ct.POINTER(ct.c_bool)
) -> dds_c_t.returnv:
pass
@c_call("dds_take_guardcondition")
def _take_guardcondition(
self, guardcond: dds_c_t.entity, triggered: ct.POINTER(ct.c_bool)
) -> dds_c_t.returnv:
pass
[docs]class WaitSet(Entity):
"""A WaitSet is a way to provide synchronous access to events happening in the DDS system. You can attach almost any kind
of entity to a WaitSet and then perform a blocking wait on the waitset. When one or more of the entities in the waitset
trigger the wait is unblocked. What a 'trigger' is depends on the type of entity, you can find out more in
``todo(DDS) triggers``.
"""
[docs] def __init__(
self, domain_participant: "cyclonedds.domain.DomainParticipant"
) -> None:
"""Make a new WaitSet. It starts of empty. An empty waitset will never trigger.
Parameters
----------
domain_participant: DomainParticipant
The domain in which you want to make a WaitSet
"""
super().__init__(self._create_waitset(domain_participant._ref))
self.attached = []
def __del__(self) -> None:
for v in self.attached:
self._waitset_detach(self._ref, v[0]._ref)
super().__del__()
[docs] def attach(self, entity: Entity) -> None:
"""Attach an entity to this WaitSet. This is a no-op if the entity was already attached.
Parameters
----------
entity: Entity
The entity you wish to attach.
Raises
------
DDSException: When you try to attach a non-triggerable entity.
"""
if self.is_attached(entity):
return
value_pt = ct.c_int()
ret = self._waitset_attach(self._ref, entity._ref, ct.byref(value_pt))
if ret < 0:
raise DDSException(
ret, f"Occurred when trying to attach {repr(entity)} to {repr(self)}"
)
self.attached.append((entity, value_pt))
[docs] def detach(self, entity: Entity) -> None:
"""Detach an entity from this WaitSet. If it was not attach this is a no-op.
Note that this operation is not atomic, a trigger for the detached entity could still occurr right
after detaching it.
Parameters
----------
entity: Entity
The entity you wish to attach
"""
for i, v in enumerate(self.attached):
if v[0] == entity:
ret = self._waitset_detach(self._ref, entity._ref)
if ret < 0:
raise DDSException(
ret,
f"Occurred when trying to attach {repr(entity)} to {repr(self)}",
)
del self.attached[i]
break
[docs] def is_attached(self, entity: Entity) -> bool:
"""Check whether an entity is attached.
Parameters
----------
entity: Entity
Check the attachment of this entity.
"""
for v in self.attached:
if v[0] == entity:
return True
return False
[docs] def get_entities(self) -> List[Entity]:
"""Get all entities attached"""
# Note: should spend some time on synchronisation. What if the waitset is used across threads?
# That is probably a bad idea in python, but who is going to stop the user from doing it anyway...
return [v[0] for v in self.attached]
[docs] def wait(self, timeout: int) -> int:
"""Block execution and wait for one of the entities in this waitset to trigger.
Parameters
----------
timeout: int
The maximum number of nanoseconds to block. Use the function :func:`duration<cyclonedds.util.duration>`
to write that in a human readable format.
Returns
-------
int
The number of triggered entities. This will be 0 when a timeout occurred.
"""
ret = self._waitset_wait(self._ref, None, 0, timeout)
if ret >= 0:
return ret
raise DDSException(ret, f"Occurred while waiting in {repr(self)}")
[docs] def wait_until(self, abstime: int) -> int:
"""Block execution and wait for one of the entities in this waitset to trigger.
Parameters
----------
abstime: int
The absolute time in nanoseconds since the start of the program (TODO CONFIRM THIS)
to block. Use the function :func:`duration<cyclonedds.util.duration>` to write that in
a human readable format.
Returns
-------
int
The number of triggered entities. This will be 0 when a timeout occurred.
"""
cs = (ct.c_void_p * len(self.attached))()
pcs = ct.cast(cs, ct.c_void_p)
ret = self._waitset_wait_until(
self._ref, ct.byref(pcs), len(self.attached), abstime
)
if ret >= 0:
return ret
raise DDSException(ret, f"Occurred while waiting in {repr(self)}")
[docs] def set_trigger(self, value: bool) -> None:
"""Manually trigger a WaitSet. It is unlikely you would need this.
Parameters
----------
value: bool
The trigger value.
"""
ret = self._waitset_set_trigger(self._ref, value)
if ret < 0:
raise DDSException(ret, f"Occurred when setting trigger in {repr(self)}")
[docs] async def wait_async(self, timeout: Optional[int] = None) -> int:
"""Asynchronously wait for a WaitSet to trigger. Use in event-loop based applications.
Parameters
----------
timeout: int, Optional = None
Maximum number of nanoseconds to wait before returning. By default this is infinity.
"""
timeout = timeout or dds_infinity
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, self.wait, timeout)
@c_call("dds_create_waitset")
def _create_waitset(self, domain_participant: dds_c_t.entity) -> dds_c_t.entity:
pass
@c_call("dds_waitset_attach")
def _waitset_attach(
self, waitset: dds_c_t.entity, entity: dds_c_t.entity, x: dds_c_t.attach
) -> dds_c_t.returnv:
pass
@c_call("dds_waitset_detach")
def _waitset_detach(
self, waitset: dds_c_t.entity, entity: dds_c_t.entity
) -> dds_c_t.returnv:
pass
@c_call("dds_waitset_wait")
def _waitset_wait(
self,
waitset: dds_c_t.entity,
xs: ct.POINTER(dds_c_t.attach),
nxs: ct.c_size_t,
reltimeout: dds_c_t.duration,
) -> dds_c_t.returnv:
pass
@c_call("dds_waitset_wait_until")
def _waitset_wait_until(
self,
waitset: dds_c_t.entity,
xs: ct.POINTER(dds_c_t.attach),
nxs: ct.c_size_t,
abstimeout: dds_c_t.duration,
) -> dds_c_t.returnv:
pass
@c_call("dds_waitset_set_trigger")
def _waitset_set_trigger(
self, waitset: dds_c_t.entity, value: ct.c_bool
) -> dds_c_t.returnv:
pass
__all__ = [
"DDSException",
"Entity",
"Qos",
"Policy",
"Listener",
"DDSStatus",
"ViewState",
"InstanceState",
"SampleState",
"ReadCondition",
"QueryCondition",
"GuardCondition",
"WaitSet",
]