Source code for cyclonedds.domain

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

import ctypes as ct
from typing import List, Optional

from .internal import c_call, dds_c_t
from .core import Entity, DDSException, Listener
from .topic import Topic
from .qos import _CQos, LimitedScopeQos, DomainParticipantQos, Qos


[docs]class Domain(Entity): """A Domain represents a DDS domain with a set configuration. On the network a Domain is nothing more than an integer id. DDS domains are guaranteed to never mix, allowing logical separation of parts of your application. """ def __init__(self, domain_id: int, config: Optional[str] = None): """Initialize a domain with domain id and configuration. The configuration is either a xml string or an url to a xml file. """ self._id = domain_id if config is not None: super().__init__(self._create_domain(dds_c_t.domainid(domain_id), config.encode("ascii"))) else: super().__init__(self._create_domain(dds_c_t.domainid(domain_id), None))
[docs] def get_participants(self) -> List[Entity]: """Get all local participants of a domain.""" num_participants = self._lookup_participant(self._id, None, 0) if num_participants < 0: raise DDSException(num_participants, f"Occurred when getting the number of participants of domain {self._id}") elif num_participants == 0: return [] participants_list = (dds_c_t.entity * num_participants)() ret = self._lookup_participant( self._id, ct.cast(ct.byref(participants_list), ct.POINTER(dds_c_t.entity)), num_participants ) if ret >= 0: return [Entity.get_entity(participants_list[i]) for i in range(ret)] raise DDSException(ret, f"Occurred when getting the participants of domain {self._id}")
@c_call("dds_create_domain") def _create_domain(self, id: dds_c_t.domainid, config: ct.c_char_p) -> dds_c_t.entity: pass @c_call("dds_lookup_participant") def _lookup_participant(self, id: dds_c_t.domainid, participants: ct.POINTER(dds_c_t.entity), size: ct.c_size_t) \ -> dds_c_t.entity: pass
[docs]class DomainParticipant(Entity): """The DomainParticipant is the central entrypoint for any DDS Application. It serves as root entity for all other entities. """ def __init__(self, domain_id: int = 0, qos: Optional[Qos] = None, listener: Optional[Listener] = None): """Initialize a DomainParticipant. Parameters ---------- domain_id: int, optional, default 0 The DDS Domain to use qos: cyclonedds.qos.Qos, optional, default None Apply DomainParticipant Qos. listener: cyclonedds.core.Listener, optional, default None Attach a Listener to the participant """ if qos is not None: if isinstance(qos, LimitedScopeQos) and not isinstance(qos, DomainParticipantQos): raise TypeError(f"{qos} is not appropriate for a DomainParticipant") elif not isinstance(qos, Qos): raise TypeError(f"{qos} is not a valid qos object") if listener is not None: if not isinstance(listener, Listener): raise TypeError(f"{listener} is not a valid listener object.") cqos = _CQos.qos_to_cqos(qos) if qos else None try: super().__init__(self._create_participant(domain_id, cqos, listener._ref if listener else None), listener=listener) finally: if cqos: _CQos.cqos_destroy(cqos) @c_call("dds_create_participant") def _create_participant(self, domain_id: dds_c_t.domainid, qos: dds_c_t.qos_p, listener: dds_c_t.listener_p) \ -> dds_c_t.entity: pass