Source code for cyclonedds.builtin

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

import uuid
import ctypes as ct
from dataclasses import dataclass
from typing import Optional, Union, TYPE_CHECKING

from .core import Entity, DDSException, Qos, ReadCondition, ViewState, InstanceState, SampleState
from .topic import Topic
from .sub import DataReader
from .internal import dds_c_t
from .qos import _CQos

from cyclonedds._clayer import ddspy_read_participant, ddspy_take_participant, ddspy_read_endpoint, ddspy_take_endpoint, ddspy_read_topic, ddspy_take_topic
from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier


if TYPE_CHECKING:
    import cyclonedds


[docs]class BuiltinTopic(Topic): """Represent a built-in CycloneDDS Topic by magic reference number.""" def __init__(self, _ref, data_type): self._ref = _ref self.data_type = data_type def __del__(self): pass
[docs]@dataclass class DcpsParticipant: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsParticipant topic. Attributes ---------- key: uuid.UUID Unique participant identifier qos: Qos Qos policies associated with the participant. """ key: uuid.UUID qos: Qos
@dataclass class DcpsTopic: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic topic. Attributes ---------- key: Unique identifier for the topic, publication or subscription endpoint. topic_name: Name of the associated topic. type_name: Name of the type. qos: Qos policies associated with the endpoint. typeid: Complete XTypes TypeIdentifier of the type, can be None. """ key: uuid.UUID topic_name: str type_name: str qos: Qos type_id: Optional[TypeIdentifier]
[docs]@dataclass class DcpsEndpoint: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription topic. Attributes ---------- key: uuid.UUID Unique identifier for the topic, publication or subscription endpoint. participant_key: uuid.UUID Unique identifier of the participant the endpoint belongs to. participant_instance_handle: int Instance handle topic_name: str Name of the associated topic. type_name: str Name of the type. qos: Qos Qos policies associated with the endpoint. typeid: TypeIdentifier, optional Complete XTypes TypeIdentifier of the type, can be None. """ key: uuid.UUID participant_key: uuid.UUID participant_instance_handle: int topic_name: str type_name: str qos: Qos type_id: Optional[TypeIdentifier]
[docs]class BuiltinDataReader(DataReader): """ Builtin topics have sligtly different behaviour than normal topics, so you should use this BuiltinDataReader instead of the normal DataReader. They are identical in the rest of their functionality. """
[docs] def __init__(self, subscriber_or_participant: Union['cyclonedds.sub.Subscriber', 'cyclonedds.domain.DomainParticipant'], builtin_topic: 'cyclonedds.builtin.BuiltinTopic', qos: Optional['cyclonedds.core.Qos'] = None, listener: Optional['cyclonedds.core.Listener'] = None) -> None: """Initialize the BuiltinDataReader Parameters ---------- subscriber_or_participant: cyclonedds.sub.Subscriber, cyclonedds.domain.DomainParticipant The subscriber to which this reader will be added. If you supply a DomainParticipant a subscriber will be created for you. builtin_topic: cyclonedds.builtin.BuiltinTopic Which Builtin Topic to subscribe to. This can be one of BuiltinTopicDcpsParticipant, BuiltinTopicDcpsTopic, BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription. Please note that BuiltinTopicDcpsTopic will fail if you built CycloneDDS without Topic Discovery. qos: cyclonedds.core.Qos, optional = None Optionally supply a Qos. listener: cyclonedds.core.Listener = None Optionally supply a Listener. """ self._topic = builtin_topic cqos = _CQos.qos_to_cqos(qos) if qos else None Entity.__init__( self, self._create_reader( subscriber_or_participant._ref, builtin_topic._ref, cqos, listener._ref if listener else None ), listener=listener ) self._next_condition = ReadCondition(self, ViewState.Any | SampleState.NotRead | InstanceState.Any) if cqos: _CQos.cqos_destroy(cqos) self._make_constructors() self._keepalive_entities = [self.subscriber]
def _make_constructors(self): def participant_constructor(keybytes, qosobject, sampleinfo): s = DcpsParticipant(uuid.UUID(bytes=keybytes), qos=qosobject) s.sample_info = sampleinfo return s def endpoint_constructor(keybytes, participant_keybytes, p_instance_handle, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): ident = None if typeid_bytes is not None: try: ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) except Exception: pass s = DcpsEndpoint( uuid.UUID(bytes=keybytes), uuid.UUID(bytes=participant_keybytes), p_instance_handle, topic_name, type_name, qosobject, ident ) s.sample_info = sampleinfo return s def topic_constructor(keybytes, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): ident = None if typeid_bytes is not None: try: ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) except Exception: pass s = DcpsTopic( uuid.UUID(bytes=keybytes), topic_name, type_name, qosobject, ident ) s.sample_info = sampleinfo return s def cqos_to_qos(pointer): p = ct.cast(pointer, dds_c_t.qos_p) return _CQos.cqos_to_qos(p) if self._topic == BuiltinTopicDcpsParticipant: self._readfn = ddspy_read_participant self._takefn = ddspy_take_participant self._constructor = participant_constructor elif self._topic == BuiltinTopicDcpsTopic: self._readfn = ddspy_read_topic self._takefn = ddspy_take_topic self._constructor = topic_constructor else: self._readfn = ddspy_read_endpoint self._takefn = ddspy_take_endpoint self._constructor = endpoint_constructor self._cqos_conv = cqos_to_qos
[docs] def read(self, N: int = 1, condition: Union['cyclonedds.core.ReadCondition', 'cyclonedds.core.QueryCondition'] = None): """Read a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples you are interested in. Parameters ---------- N: int The maximum number of samples to read. condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional Only read samples that satisfy the supplied condition. Raises ------ DDSException If any error code is returned by the DDS API it is converted into an exception. """ ref = condition._ref if condition else self._ref ret = self._readfn(ref, N, self._constructor, self._cqos_conv) if type(ret) == int: raise DDSException(ret, f"Occurred when calling read() in {repr(self)}") return ret
[docs] def take(self, N: int = 1, condition=None): """Take a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples you are interested in. Parameters ---------- N: int The maximum number of samples to read. condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional Only take samples that satisfy the supplied condition. Raises ------ DDSException If any error code is returned by the DDS API it is converted into an exception. """ ref = condition._ref if condition else self._ref ret = self._takefn(ref, N, self._constructor, self._cqos_conv) if type(ret) == int: raise DDSException(ret, f"Occurred when calling read() in {repr(self)}") return ret
_pseudo_handle = 0x7fff0000 BuiltinTopicDcpsParticipant = BuiltinTopic(_pseudo_handle + 1, DcpsParticipant) """Built-in topic, is published to when a new participants appear on the network.""" BuiltinTopicDcpsTopic = BuiltinTopic(_pseudo_handle + 2, DcpsEndpoint) """ Built-in topic, is published to when a new topic appear on the network. Make sure cyclonedds has been built with `-DENABLE_TOPIC_DISCOVERY=ON` and `//CycloneDDS/Domain/Discovery/EnableTopicDiscoveryEndpoints` is set in the config. """ BuiltinTopicDcpsPublication = BuiltinTopic(_pseudo_handle + 3, DcpsEndpoint) """Built-in topic, is published to when a publication happens.""" BuiltinTopicDcpsSubscription = BuiltinTopic(_pseudo_handle + 4, DcpsEndpoint) """Built-in topic, is published to when a subscription happens.""" __all__ = [ "DcpsParticipant", "DcpsEndpoint", "BuiltinDataReader", "BuiltinTopicDcpsParticipant", "BuiltinTopicDcpsTopic", "BuiltinTopicDcpsPublication", "BuiltinTopicDcpsSubscription" ]