Source code for cyclonedds.builtin

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

import uuid
import ctypes as ct
from dataclasses import dataclass
from typing import Optional, Union, TYPE_CHECKING

from .core import Entity, DDSException, Qos, ReadCondition, ViewState, InstanceState, SampleState
from .topic import Topic
from .sub import DataReader
from .internal import dds_c_t
from .qos import _CQos
from .builtin_types import DcpsParticipant, DcpsTopic, DcpsEndpoint, endpoint_constructor, participant_constructor, topic_constructor, cqos_to_qos

from cyclonedds._clayer import ddspy_read_participant, ddspy_take_participant, ddspy_read_endpoint, ddspy_take_endpoint, ddspy_read_topic, ddspy_take_topic
from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier


if TYPE_CHECKING:
    import cyclonedds


[docs]class BuiltinTopic(Topic): """Represent a built-in CycloneDDS Topic by magic reference number.""" def __init__(self, _ref, data_type): self._ref = _ref self.data_type = data_type def __del__(self): pass
[docs]class BuiltinDataReader(DataReader): """ Builtin topics have sligtly different behaviour than normal topics, so you should use this BuiltinDataReader instead of the normal DataReader. They are identical in the rest of their functionality. """
[docs] def __init__(self, subscriber_or_participant: Union['cyclonedds.sub.Subscriber', 'cyclonedds.domain.DomainParticipant'], builtin_topic: 'cyclonedds.builtin.BuiltinTopic', qos: Optional['cyclonedds.core.Qos'] = None, listener: Optional['cyclonedds.core.Listener'] = None) -> None: """Initialize the BuiltinDataReader Parameters ---------- subscriber_or_participant: cyclonedds.sub.Subscriber, cyclonedds.domain.DomainParticipant The subscriber to which this reader will be added. If you supply a DomainParticipant a subscriber will be created for you. builtin_topic: cyclonedds.builtin.BuiltinTopic Which Builtin Topic to subscribe to. This can be one of BuiltinTopicDcpsParticipant, BuiltinTopicDcpsTopic, BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription. Please note that BuiltinTopicDcpsTopic will fail if you built CycloneDDS without Topic Discovery. qos: cyclonedds.core.Qos, optional = None Optionally supply a Qos. listener: cyclonedds.core.Listener = None Optionally supply a Listener. """ self._topic = builtin_topic cqos = _CQos.qos_to_cqos(qos) if qos else None Entity.__init__( self, self._create_reader( subscriber_or_participant._ref, builtin_topic._ref, cqos, listener._ref if listener else None ), listener=listener ) self._next_condition = ReadCondition(self, ViewState.Any | SampleState.NotRead | InstanceState.Any) if cqos: _CQos.cqos_destroy(cqos) self._make_constructors() self._keepalive_entities = [self.subscriber]
def _make_constructors(self): if self._topic == BuiltinTopicDcpsParticipant: self._readfn = ddspy_read_participant self._takefn = ddspy_take_participant self._constructor = participant_constructor elif self._topic == BuiltinTopicDcpsTopic: self._readfn = ddspy_read_topic self._takefn = ddspy_take_topic self._constructor = topic_constructor else: self._readfn = ddspy_read_endpoint self._takefn = ddspy_take_endpoint self._constructor = endpoint_constructor self._cqos_conv = cqos_to_qos
[docs] def read(self, N: int = 1, condition: Union['cyclonedds.core.ReadCondition', 'cyclonedds.core.QueryCondition'] = None): """Read a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples you are interested in. Parameters ---------- N: int The maximum number of samples to read. condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional Only read samples that satisfy the supplied condition. Raises ------ DDSException If any error code is returned by the DDS API it is converted into an exception. """ ref = condition._ref if condition else self._ref ret = self._readfn(ref, N, self._constructor, self._cqos_conv) if type(ret) == int: raise DDSException(ret, f"Occurred when calling read() in {repr(self)}") return ret
[docs] def take(self, N: int = 1, condition=None): """Take a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples you are interested in. Parameters ---------- N: int The maximum number of samples to read. condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional Only take samples that satisfy the supplied condition. Raises ------ DDSException If any error code is returned by the DDS API it is converted into an exception. """ ref = condition._ref if condition else self._ref ret = self._takefn(ref, N, self._constructor, self._cqos_conv) if type(ret) == int: raise DDSException(ret, f"Occurred when calling read() in {repr(self)}") return ret
_pseudo_handle = 0x7fff0000 BuiltinTopicDcpsParticipant = BuiltinTopic(_pseudo_handle + 1, DcpsParticipant) """Built-in topic, is published to when a new participants appear on the network.""" BuiltinTopicDcpsTopic = BuiltinTopic(_pseudo_handle + 2, DcpsEndpoint) """ Built-in topic, is published to when a new topic appear on the network. Make sure cyclonedds has been built with `-DENABLE_TOPIC_DISCOVERY=ON` and `//CycloneDDS/Domain/Discovery/EnableTopicDiscoveryEndpoints` is set in the config. """ BuiltinTopicDcpsPublication = BuiltinTopic(_pseudo_handle + 3, DcpsEndpoint) """Built-in topic, is published to when a publication happens.""" BuiltinTopicDcpsSubscription = BuiltinTopic(_pseudo_handle + 4, DcpsEndpoint) """Built-in topic, is published to when a subscription happens.""" __all__ = [ "DcpsParticipant", "DcpsEndpoint", "BuiltinDataReader", "BuiltinTopicDcpsParticipant", "BuiltinTopicDcpsTopic", "BuiltinTopicDcpsPublication", "BuiltinTopicDcpsSubscription" ]