Source code for cyclonedds.pub

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

from typing import Optional, Union, Generic, TypeVar, TYPE_CHECKING

from .internal import c_call, dds_c_t
from .core import Entity, DDSException, Listener
from .domain import DomainParticipant
from .topic import Topic
from .qos import _CQos, Qos, LimitedScopeQos, PublisherQos, DataWriterQos

from cyclonedds._clayer import ddspy_write, ddspy_write_ts, ddspy_dispose, ddspy_writedispose, ddspy_writedispose_ts, \
    ddspy_dispose_handle, ddspy_dispose_handle_ts, ddspy_register_instance, ddspy_unregister_instance,   \
    ddspy_unregister_instance_handle, ddspy_unregister_instance_ts, ddspy_unregister_instance_handle_ts, \
    ddspy_lookup_instance, ddspy_dispose_ts


if TYPE_CHECKING:
    import cyclonedds


[docs]class Publisher(Entity): def __init__( self, domain_participant: DomainParticipant, qos: Optional[Qos] = None, listener: Optional[Listener] = None): if not isinstance(domain_participant, DomainParticipant): raise TypeError(f"{domain_participant} is not a cyclonedds.domain.DomainParticipant.") if qos is not None: if isinstance(qos, LimitedScopeQos) and not isinstance(qos, PublisherQos): raise TypeError(f"{qos} is not appropriate for a Publisher") elif not isinstance(qos, Qos): raise TypeError(f"{qos} is not a valid qos object") if listener is not None: if not isinstance(listener, Listener): raise TypeError(f"{listener} is not a valid listener object.") cqos = _CQos.qos_to_cqos(qos) if qos else None try: super().__init__( self._create_publisher( domain_participant._ref, cqos, listener._ref if listener else None ), listener=listener ) finally: if cqos: _CQos.cqos_destroy(cqos) self._keepalive_entities = [self.participant]
[docs] def suspend(self): ret = self._suspend(self._ref) if ret == 0: return raise DDSException(ret, f"Occurred while suspending {repr(self)}")
[docs] def resume(self): ret = self._resume(self._ref) if ret == 0: return raise DDSException(ret, f"Occurred while resuming {repr(self)}")
[docs] def wait_for_acks(self, timeout: int): """ This operation blocks the calling thread until either all data written by the publisher or writer is acknowledged by all matched reliable reader entities, or else the duration specified by the timeout parameter elapses, whichever happens first. Parameters ---------- timeout The maximum number of nanoseconds to wait. Use the function :func:`duration<cyclonedds.util.duration>` to write that in a human readable format. """ ret = self._wait_for_acks(self._ref, timeout) if ret == 0: return True elif ret == DDSException.DDS_RETCODE_TIMEOUT: return False raise DDSException(ret, f"Occurred while waiting for acks from {repr(self)}")
@c_call("dds_create_publisher") def _create_publisher(self, domain_participant: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: pass @c_call("dds_suspend") def _suspend(self, publisher: dds_c_t.entity) -> dds_c_t.returnv: pass @c_call("dds_resume") def _resume(self, publisher: dds_c_t.entity) -> dds_c_t.returnv: pass @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass
_T = TypeVar('_T')
[docs]class DataWriter(Entity, Generic[_T]): def __init__(self, publisher_or_participant: Union[DomainParticipant, Publisher], topic: Topic[_T], qos: Optional[Qos] = None, listener: Optional[Listener] = None): if not isinstance(publisher_or_participant, (DomainParticipant, Publisher)): raise TypeError(f"{publisher_or_participant} is not a cyclonedds.domain.DomainParticipant" " or cyclonedds.pub.Publisher.") if not isinstance(topic, Topic): raise TypeError(f"{topic} is not a cyclonedds.topic.Topic.") if qos is not None: if isinstance(qos, LimitedScopeQos) and not isinstance(qos, DataWriterQos): raise TypeError(f"{qos} is not appropriate for a DataWriter") elif not isinstance(qos, Qos): raise TypeError(f"{qos} is not a valid qos object") cqos = _CQos.qos_to_cqos(qos) if qos else None try: super().__init__( self._create_writer( publisher_or_participant._ref, topic._ref, cqos, listener._ref if listener else None ), listener=listener ) finally: if cqos: _CQos.cqos_destroy(cqos) self._topic = topic self.data_type = topic.data_type self._keepalive_entities = [self.publisher, self.topic] cqos = _CQos.cqos_create() ret = self._get_qos(self._ref, cqos) if ret == 0: data_repr_policy = _CQos._get_p_datarepresentation(cqos) if data_repr_policy is None or (data_repr_policy.use_cdrv0_representation and data_repr_policy.use_xcdrv2_representation): self._use_version_2 = None # Use whatever is native to the datatype elif data_repr_policy.use_xcdrv2_representation: self._use_version_2 = True else: self._use_version_2 = False _CQos.cqos_destroy(cqos) @property def topic(self) -> Topic[_T]: return self._topic
[docs] def write(self, sample: _T, timestamp: Optional[int] = None): """ Parameters ---------- sample The sample to write timestamp The sample's source_timestamp (in nanoseconds since the UNIX Epoch) """ if not isinstance(sample, self.data_type): raise TypeError(f"{sample} is not of type {self.data_type}") ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_write_ts(self._ref, ser, timestamp) else: ret = ddspy_write(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while writing sample in {repr(self)}")
[docs] def write_dispose(self, sample: _T, timestamp: Optional[int] = None): """ Similar to :func:`write` but also marks the sample for disposal by setting its :class:`InstanceState<cyclonedds.core.InstanceState>` to `NotAliveDisposed`. Parameters ---------- sample The sample to dispose timestamp The sample's source_timestamp (in nanoseconds since the UNIX Epoch) """ ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_writedispose_ts(self._ref, ser, timestamp) else: ret = ddspy_writedispose(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while writedisposing sample in {repr(self)}")
[docs] def dispose(self, sample: _T, timestamp: Optional[int] = None): """ Marks the sample for disposal by setting its :class:`InstanceState<cyclonedds.core.InstanceState>` to `NotAliveDisposed`. Parameters ---------- sample The sample to dispose timestamp The sample's source_timestamp (in nanoseconds since the UNIX Epoch) """ ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_dispose_ts(self._ref, ser, timestamp) else: ret = ddspy_dispose(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while disposing in {repr(self)}")
[docs] def dispose_instance_handle(self, handle: int, timestamp: Optional[int] = None): """ Marks the instance and all samples associated wiht the given handle for disposal by setting their :class:`InstanceState<cyclonedds.core.InstanceState>` to `NotAliveDisposed`. Parameters ---------- handle An instance handle received from :func:`register_instance` or :func:`lookup_instance`. timestamp The instance's source_timestamp (in nanoseconds since the UNIX Epoch) """ if timestamp is not None: ret = ddspy_dispose_handle_ts(self._ref, handle, timestamp) else: ret = ddspy_dispose_handle(self._ref, handle) if ret < 0: raise DDSException(ret, f"Occurred while disposing in {repr(self)}")
[docs] def register_instance(self, sample: _T) -> int: ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') ret = ddspy_register_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while registering instance in {repr(self)}") return ret
[docs] def unregister_instance(self, sample: _T, timestamp: Optional[int] = None): """ Parameters ---------- sample The sample to unregister timestamp The timestamp used at registration (in nanoseconds since the UNIX Epoch) """ ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_unregister_instance_ts(self._ref, ser, timestamp) else: ret = ddspy_unregister_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while unregistering instance in {repr(self)}")
[docs] def unregister_instance_handle(self, handle: int, timestamp: Optional[int] = None): """ Parameters ---------- handle An instance handle received from :func:`register_instance` or :func:`lookup_instance`. timestamp The timestamp used at registration (in nanoseconds since the UNIX Epoch) """ if timestamp is not None: ret = ddspy_unregister_instance_handle_ts(self._ref, handle, timestamp) else: ret = ddspy_unregister_instance_handle(self._ref, handle) if ret < 0: raise DDSException(ret, f"Occurred while unregistering instance handle n {repr(self)}")
[docs] def wait_for_acks(self, timeout: int) -> bool: """ This operation blocks the calling thread until either all data written by the publisher or writer is acknowledged by all matched reliable reader entities, or else the duration specified by the timeout parameter elapses, whichever happens first. Parameters ---------- timeout The maximum number of nanoseconds to wait. Use the function :func:`duration<cyclonedds.util.duration>` to write that in a human readable format. """ ret = self._wait_for_acks(self._ref, timeout) if ret == 0: return True elif ret == Exception.DDS_RETCODE_TIMEOUT: return False raise DDSException(ret, f"Occurred while waiting for acks from {repr(self)}")
[docs] def lookup_instance(self, sample: _T) -> Optional[int]: """ This operation takes a sample and returns an instance handle to be used for subsequent operations. """ ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') ret = ddspy_lookup_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while lookup up instance from {repr(self)}") if ret == 0: return None return ret
@c_call("dds_create_writer") def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: pass @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass