Source code for cyclonedds.builtin_types

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

import uuid
import ctypes as ct
from dataclasses import dataclass
from typing import Optional, Union, TYPE_CHECKING

from .core import Qos
from .internal import dds_c_t
from .qos import _CQos

from cyclonedds.idl._typesupport.DDS.XTypes import TypeIdentifier

[docs]@dataclass class DcpsParticipant: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsParticipant topic. Attributes ---------- key: uuid.UUID Unique participant identifier qos: Qos Qos policies associated with the participant. """ key: uuid.UUID qos: Qos
@dataclass class DcpsTopic: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsTopic topic. Attributes ---------- key: Unique identifier for the topic, publication or subscription endpoint. topic_name: Name of the associated topic. type_name: Name of the type. qos: Qos policies associated with the endpoint. typeid: Complete XTypes TypeIdentifier of the type, can be None. """ key: uuid.UUID topic_name: str type_name: str qos: Qos type_id: Optional[TypeIdentifier]
[docs]@dataclass class DcpsEndpoint: """ Data sample as returned when you subscribe to the BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription topic. Attributes ---------- key: uuid.UUID Unique identifier for the topic, publication or subscription endpoint. participant_key: uuid.UUID Unique identifier of the participant the endpoint belongs to. participant_instance_handle: int Instance handle topic_name: str Name of the associated topic. type_name: str Name of the type. qos: Qos Qos policies associated with the endpoint. typeid: TypeIdentifier, optional Complete XTypes TypeIdentifier of the type, can be None. """ key: uuid.UUID participant_key: uuid.UUID participant_instance_handle: int topic_name: str type_name: str qos: Qos type_id: Optional[TypeIdentifier]
def cqos_to_qos(pointer): p = ct.cast(pointer, dds_c_t.qos_p) return _CQos.cqos_to_qos(p) def participant_constructor(keybytes, qosobject, sampleinfo): s = DcpsParticipant(uuid.UUID(bytes=keybytes), qos=qosobject) s.sample_info = sampleinfo return s def endpoint_constructor(keybytes, participant_keybytes, p_instance_handle, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): ident = None if typeid_bytes is not None: try: ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) except Exception: pass s = DcpsEndpoint( uuid.UUID(bytes=keybytes), uuid.UUID(bytes=participant_keybytes), p_instance_handle, topic_name, type_name, qosobject, ident ) s.sample_info = sampleinfo return s def topic_constructor(keybytes, topic_name, type_name, qosobject, sampleinfo, typeid_bytes): ident = None if typeid_bytes is not None: try: ident = TypeIdentifier.deserialize(typeid_bytes, has_header=False, use_version_2=True) except Exception: pass s = DcpsTopic( uuid.UUID(bytes=keybytes), topic_name, type_name, qosobject, ident ) s.sample_info = sampleinfo return s