Source code for cyclonedds.pub

"""
 * Copyright(c) 2021 ADLINK Technology Limited and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

from typing import Optional, Union, Generic, TypeVar, TYPE_CHECKING

from .internal import c_call, dds_c_t
from .core import Entity, DDSException, Listener
from .domain import DomainParticipant
from .topic import Topic
from .qos import _CQos, Qos, LimitedScopeQos, PublisherQos, DataWriterQos

from cyclonedds._clayer import ddspy_write, ddspy_write_ts, ddspy_dispose, ddspy_writedispose, ddspy_writedispose_ts, \
    ddspy_dispose_handle, ddspy_dispose_handle_ts, ddspy_register_instance, ddspy_unregister_instance,   \
    ddspy_unregister_instance_handle, ddspy_unregister_instance_ts, ddspy_unregister_instance_handle_ts, \
    ddspy_lookup_instance, ddspy_dispose_ts


if TYPE_CHECKING:
    import cyclonedds


[docs]class Publisher(Entity): def __init__( self, domain_participant: DomainParticipant, qos: Optional[Qos] = None, listener: Optional[Listener] = None): if not isinstance(domain_participant, DomainParticipant): raise TypeError(f"{domain_participant} is not a cyclonedds.domain.DomainParticipant.") if qos is not None: if isinstance(qos, LimitedScopeQos) and not isinstance(qos, PublisherQos): raise TypeError(f"{qos} is not appropriate for a Publisher") elif not isinstance(qos, Qos): raise TypeError(f"{qos} is not a valid qos object") if listener is not None: if not isinstance(listener, Listener): raise TypeError(f"{listener} is not a valid listener object.") cqos = _CQos.qos_to_cqos(qos) if qos else None try: super().__init__( self._create_publisher( domain_participant._ref, cqos, listener._ref if listener else None ), listener=listener ) finally: if cqos: _CQos.cqos_destroy(cqos) self._keepalive_entities = [self.participant]
[docs] def suspend(self): ret = self._suspend(self._ref) if ret == 0: return raise DDSException(ret, f"Occurred while suspending {repr(self)}")
[docs] def resume(self): ret = self._resume(self._ref) if ret == 0: return raise DDSException(ret, f"Occurred while resuming {repr(self)}")
[docs] def wait_for_acks(self, timeout: int): ret = self._wait_for_acks(self._ref, timeout) if ret == 0: return True elif ret == DDSException.DDS_RETCODE_TIMEOUT: return False raise DDSException(ret, f"Occurred while waiting for acks from {repr(self)}")
@c_call("dds_create_publisher") def _create_publisher(self, domain_participant: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: pass @c_call("dds_suspend") def _suspend(self, publisher: dds_c_t.entity) -> dds_c_t.returnv: pass @c_call("dds_resume") def _resume(self, publisher: dds_c_t.entity) -> dds_c_t.returnv: pass @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass
_T = TypeVar('_T')
[docs]class DataWriter(Entity, Generic[_T]): def __init__(self, publisher_or_participant: Union[DomainParticipant, Publisher], topic: Topic[_T], qos: Optional[Qos] = None, listener: Optional[Listener] = None): if not isinstance(publisher_or_participant, (DomainParticipant, Publisher)): raise TypeError(f"{publisher_or_participant} is not a cyclonedds.domain.DomainParticipant" " or cyclonedds.pub.Publisher.") if not isinstance(topic, Topic): raise TypeError(f"{topic} is not a cyclonedds.topic.Topic.") if qos is not None: if isinstance(qos, LimitedScopeQos) and not isinstance(qos, DataWriterQos): raise TypeError(f"{qos} is not appropriate for a DataWriter") elif not isinstance(qos, Qos): raise TypeError(f"{qos} is not a valid qos object") cqos = _CQos.qos_to_cqos(qos) if qos else None try: super().__init__( self._create_writer( publisher_or_participant._ref, topic._ref, cqos, listener._ref if listener else None ), listener=listener ) finally: if cqos: _CQos.cqos_destroy(cqos) self._topic = topic self.data_type = topic.data_type self._keepalive_entities = [self.publisher, self.topic] cqos = _CQos.cqos_create() ret = self._get_qos(self._ref, cqos) if ret == 0: data_repr_policy = _CQos._get_p_datarepresentation(cqos) if data_repr_policy is None or (data_repr_policy.use_cdrv0_representation and data_repr_policy.use_xcdrv2_representation): self._use_version_2 = None # Use whatever is native to the datatype elif data_repr_policy.use_xcdrv2_representation: self._use_version_2 = True else: self._use_version_2 = False _CQos.cqos_destroy(cqos) @property def topic(self) -> Topic[_T]: return self._topic
[docs] def write(self, sample: _T, timestamp: Optional[int] = None): if not isinstance(sample, self.data_type): raise TypeError(f"{sample} is not of type {self.data_type}") ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_write_ts(self._ref, ser, timestamp) else: ret = ddspy_write(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while writing sample in {repr(self)}")
[docs] def write_dispose(self, sample: _T, timestamp: Optional[int] = None): ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_writedispose_ts(self._ref, ser, timestamp) else: ret = ddspy_writedispose(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while writedisposing sample in {repr(self)}")
[docs] def dispose(self, sample: _T, timestamp: Optional[int] = None): ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_dispose_ts(self._ref, ser, timestamp) else: ret = ddspy_dispose(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while disposing in {repr(self)}")
[docs] def dispose_instance_handle(self, handle: int, timestamp: Optional[int] = None): if timestamp is not None: ret = ddspy_dispose_handle_ts(self._ref, handle, timestamp) else: ret = ddspy_dispose_handle(self._ref, handle) if ret < 0: raise DDSException(ret, f"Occurred while disposing in {repr(self)}")
[docs] def register_instance(self, sample: _T) -> int: ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') ret = ddspy_register_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while registering instance in {repr(self)}") return ret
[docs] def unregister_instance(self, sample: _T, timestamp: Optional[int] = None): ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') if timestamp is not None: ret = ddspy_unregister_instance_ts(self._ref, ser, timestamp) else: ret = ddspy_unregister_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while unregistering instance in {repr(self)}")
[docs] def unregister_instance_handle(self, handle: int, timestamp: Optional[int] = None): if timestamp is not None: ret = ddspy_unregister_instance_handle_ts(self._ref, handle, timestamp) else: ret = ddspy_unregister_instance_handle(self._ref, handle) if ret < 0: raise DDSException(ret, f"Occurred while unregistering instance handle n {repr(self)}")
[docs] def wait_for_acks(self, timeout: int) -> bool: ret = self._wait_for_acks(self._ref, timeout) if ret == 0: return True elif ret == Exception.DDS_RETCODE_TIMEOUT: return False raise DDSException(ret, f"Occurred while waiting for acks from {repr(self)}")
[docs] def lookup_instance(self, sample: _T) -> Optional[int]: ser = sample.serialize(use_version_2=self._use_version_2) ser = ser.ljust((len(ser) + 4 - 1) & ~(4 - 1), b'\0') ret = ddspy_lookup_instance(self._ref, ser) if ret < 0: raise DDSException(ret, f"Occurred while lookup up instance from {repr(self)}") if ret == 0: return None return ret
@c_call("dds_create_writer") def _create_writer(self, publisher: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) -> dds_c_t.entity: pass @c_call("dds_wait_for_acks") def _wait_for_acks(self, publisher: dds_c_t.entity, timeout: dds_c_t.duration) -> dds_c_t.returnv: pass