Source code for cyclonedds.qos

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

from dataclasses import dataclass, make_dataclass, asdict, field
from inspect import isclass
from base64 import b64encode, b64decode
from typing import Sequence, Union, Set, Optional, ClassVar
import ctypes as ct

from .internal import static_c_call, dds_c_t, DDS


class BasePolicy:
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if cls.__name__ in ['Property', 'BinaryProperty']:
            return
        if cls.__scope__ != cls.__name__:
            cls.__name__ = f"{cls.__scope__}.{cls.__name__}"


def _no_init(*args, **kwargs):
    raise NotImplementedError("This Qos object cannot be initialized like this.")


def _policy_singleton(scope, name):
    return make_dataclass(
        name, [],
        bases=(BasePolicy,),
        namespace={'__scope__': scope, '__repr__': lambda s: f"Policy.{scope}.{name}"},
        frozen=True)()


[docs]class Policy: """The Policy class is fully static and should never need to be instantiated. See Also -------- qoshowto: How to work with Qos and Policy, TODO. """ __init__ = _no_init
[docs] class Reliability: """The Reliability Qos Policy Examples -------- >>> Policy.Reliability.BestEffort >>> Policy.Reliability.Reliable(max_blocking_time=duration(seconds=1)) """ __scope__ = "Reliability" __init__ = _no_init BestEffort: 'Policy.Reliability.BestEffort' = _policy_singleton("Reliability", "BestEffort")
[docs] @dataclass(frozen=True) class Reliable(BasePolicy): """Use Reliable reliability Parameters ---------- max_blocking_time : int The number of nanoseconds the writer will bock when its history is full. Use the :func:`duration<cyclonedds.util.duration>` function to avoid time calculation headaches. """ __scope__: ClassVar[str] = "Reliability" max_blocking_time: int
[docs] class Durability: """ The Durability Qos Policy Examples -------- >>> Policy.Durability.Volatile >>> Policy.Durability.TransientLocal >>> Policy.Durability.Transient >>> Policy.Durability.Persistent """ __init__ = _no_init __scope__ = "Durability" Volatile: 'Policy.Durability.Volatile' = _policy_singleton("Durability", "Volatile") TransientLocal: 'Policy.Durability.TransientLocal' = _policy_singleton("Durability", "TransientLocal") Transient: 'Policy.Durability.Transient' = _policy_singleton("Durability", "Transient") Persistent: 'Policy.Durability.Persistent' = _policy_singleton("Durability", "Persistent")
[docs] class History: """ The History Qos Policy Examples -------- >>> Policy.History.KeepAll >>> Policy.History.KeepLast(depth=10) Attributes ---------- KeepAll: Tuple[PolicyType, Any] The type of this entity is not publicly specified. """ __init__ = _no_init __scope__ = "History" KeepAll: 'Policy.History.KeepAll' = _policy_singleton("History", "KeepAll")
[docs] @dataclass(frozen=True) class KeepLast(BasePolicy): """ Parameters ---------- depth : int The depth of samples to keep in the history. """ __scope__: ClassVar[str] = "History" depth: int
[docs] @dataclass(frozen=True) class ResourceLimits(BasePolicy): """The ResourceLimits Qos Policy Examples -------- >>> Policy.ResourceLimits( >>> max_samples=10, >>> max_instances=10, >>> max_samples_per_instance=2 >>> ) Attributes ---------- max_samples : int Max number of samples total. max_instances : int Max number of instances total. max_samples_per_instance : int Max number of samples per instance. """ __scope__: ClassVar[str] = "ResourceLimits" max_samples: int = -1 max_instances: int = -1 max_samples_per_instance: int = -1
[docs] class PresentationAccessScope: """The Presentation Access Scope Qos Policy Examples -------- >>> Policy.PresentationAccessScope.Instance(coherent_access=True, ordered_access=False) >>> Policy.PresentationAccessScope.Topic(coherent_access=True, ordered_access=False) >>> Policy.PresentationAccessScope.Group(coherent_access=True, ordered_access=False) """ __init__ = _no_init __scope__ = "PresentationAccessScope"
[docs] @dataclass(frozen=True) class Instance(BasePolicy): """Use Instance Presentation Access Scope Attributes ---------- coherent_access : bool Enable coherent access ordered_access : bool Enable ordered access """ __scope__: ClassVar[str] = "PresentationAccessScope" coherent_access: bool ordered_access: bool
[docs] @dataclass(frozen=True) class Topic(BasePolicy): """Use Topic Presentation Access Scope Attributes ---------- coherent_access : bool Enable coherent access ordered_access : bool Enable ordered access """ __scope__: ClassVar[str] = "PresentationAccessScope" coherent_access: bool ordered_access: bool
[docs] @dataclass(frozen=True) class Group(BasePolicy): """Use Group Presentation Access Scope Attributes ---------- coherent_access : bool Enable coherent access ordered_access : bool Enable ordered access """ __scope__: ClassVar[str] = "PresentationAccessScope" coherent_access: bool ordered_access: bool
[docs] @dataclass(frozen=True) class Lifespan(BasePolicy): """The Lifespan Qos Policy Examples -------- >>> Policy.Lifespan(duration(seconds=2)) Attributes ---------- lifespan : int Expiration time relative to the source timestamp of a sample in nanoseconds. """ __scope__: ClassVar[str] = "Lifespan" lifespan: int
[docs] @dataclass(frozen=True) class Deadline(BasePolicy): """The Deadline Qos Policy Examples -------- >>> Policy.Deadline(deadline=duration(seconds=2)) Attributes ---------- deadline : int Deadline of a sample in nanoseconds. """ __scope__: ClassVar[str] = "Deadline" deadline: int
[docs] @dataclass(frozen=True) class LatencyBudget(BasePolicy): """The Latency Budget Qos Policy Examples -------- >>> Policy.LatencyBudget(duration(seconds=2)) Parameters ---------- budget : int Latency budget in nanoseconds. """ __scope__: ClassVar[str] = "LatencyBudget" budget: int
[docs] class Ownership: """The Ownership Qos Policy Examples -------- >>> Policy.Ownership.Shared >>> Policy.Ownership.Exclusive Attributes ---------- Shared: Policy.Ownership.Shared Exclusive: Policy.Ownership.Exclusive """ __init__ = _no_init __scope__ = "Ownership" Shared: 'Policy.Ownership.Shared' = _policy_singleton("Ownership", "Shared") Exclusive: 'Policy.Ownership.Exclusive' = _policy_singleton("Ownership", "Exclusive")
[docs] @dataclass(frozen=True) class OwnershipStrength(BasePolicy): """The Ownership Strength Qos Policy Examples -------- >>> Policy.OwnershipStrength(strength=2) Parameters ---------- strength : int Ownership strength as integer. """ __scope__: ClassVar[str] = "OwnershipStrength" strength: int
[docs] class Liveliness: """The Liveliness Qos Policy Examples -------- >>> Policy.Liveliness.Automatic(lease_duration=duration(seconds=10)) >>> Policy.Liveliness.ManualByParticipant(lease_duration=duration(seconds=10)) >>> Policy.Liveliness.ManualByTopic(lease_duration=duration(seconds=10)) """ __init__ = _no_init __scope__ = "Liveliness"
[docs] @dataclass(frozen=True) class Automatic(BasePolicy): """Use Automatic Liveliness Attributes ---------- lease_duration: int The lease duration in nanoseconds. Use the helper function :func:`duration<cyclonedds.util.duration>` to write the duration in a human readable format. """ __scope__: ClassVar[str] = "Liveliness" lease_duration: int
[docs] @dataclass(frozen=True) class ManualByParticipant(BasePolicy): """Use ManualByParticipant Liveliness Attributes ---------- lease_duration: int The lease duration in nanoseconds. Use the helper function :func:`duration<cyclonedds.util.duration>` to write the duration in a human readable format. """ __scope__: ClassVar[str] = "Liveliness" lease_duration: int
[docs] @dataclass(frozen=True) class ManualByTopic(BasePolicy): """Use ManualByTopic Liveliness Attributes ---------- lease_duration: int The lease duration in nanoseconds. Use the helper function :func:`duration<cyclonedds.util.duration>` to write the duration in a human readable format. """ __scope__: ClassVar[str] = "Liveliness" lease_duration: int
[docs] @dataclass(frozen=True) class TimeBasedFilter(BasePolicy): """The TimeBasedFilter Qos Policy Examples -------- >>> Policy.TimeBasedFilter(filter_fn=duration(seconds=2)) Attributes ---------- filter_time: int Minimum time between samples in nanoseconds. Use the helper function :func:`duration<cyclonedds.util.duration>` to write the duration in a human readable format. """ __scope__: ClassVar[str] = "TimeBasedFilter" filter_time: int
[docs] @dataclass(frozen=True) class Partition(BasePolicy): """The Partition Qos Policy Examples -------- >>> Policy.Partition(partitions=["partition_a", "partition_b", "partition_c"]) >>> Policy.Partition(partitions=[f"partition_{i}" for i in range(100)]) Attributes ---------- partitions : Sequence[str] """ __scope__: ClassVar[str] = "Partition" partitions: Sequence[str] def __post_init__(self): # Tuple-fy partitions to ensure immutability # The super trick here is because the class is already frozen so _officially_ # we are not supposed to be able to edit this variable. partitions = [self.partitions] if type(self.partitions) == str else self.partitions super().__setattr__('partitions', tuple(partitions))
[docs] @dataclass(frozen=True) class TransportPriority(BasePolicy): """The TransportPriority Qos Policy Examples -------- >>> Policy.TransportPriority(priority=10) Attributes ---------- priority: int """ __scope__: ClassVar[str] = "TransportPriority" priority: int
[docs] class DestinationOrder: """The DestinationOrder Qos Policy Examples -------- >>> Policy.DestinationOrder.ByReceptionTimestamp >>> Policy.DestinationOrder.BySourceTimestamp """ __scope__: ClassVar[str] = "DestinationOrder" ByReceptionTimestamp: 'Policy.DestinationOrder.ByReceptionTimestamp' = \ _policy_singleton("DestinationOrder", "ByReceptionTimestamp") BySourceTimestamp: 'Policy.DestinationOrder.BySourceTimestamp' = \ _policy_singleton("DestinationOrder", "BySourceTimestamp")
[docs] @dataclass(frozen=True) class WriterDataLifecycle(BasePolicy): """The WriterDataLifecycle Qos Policy Examples -------- >>> Policy.WriterDataLifecycle(autodispose=False) Attributes ---------- autodispose: bool """ __scope__: ClassVar[str] = "WriterDataLifecycle" autodispose: bool
[docs] @dataclass(frozen=True) class ReaderDataLifecycle(BasePolicy): """The ReaderDataLifecycle Qos Policy Examples -------- >>> Policy.ReaderDataLifecycle( >>> autopurge_nowriter_samples_delay=duration(minutes=2), >>> autopurge_disposed_samples_delay=duration(minutes=5) >>> ) Attributes ---------- autopurge_nowriter_samples_delay: bool autopurge_disposed_samples_delay: bool """ __scope__: ClassVar[str] = "ReaderDataLifecycle" autopurge_nowriter_samples_delay: int autopurge_disposed_samples_delay: int
[docs] @dataclass(frozen=True) class DurabilityService(BasePolicy): """The DurabilityService Qos Policy Examples -------- >>> Policy.DurabilityService( >>> cleanup_delay=duration(minutes=2.5), >>> history=Policy.History.KeepLast(20), >>> max_samples=2000, >>> max_instances=200, >>> max_samples_per_instance=25 >>> ) Attributes ---------- cleanup_delay: int history: Policy.History.KeepAll, Policy.History.KeepLast max_samples: int max_instances: int max_samples_per_instance: int """ __scope__: ClassVar[str] = "DurabilityService" cleanup_delay: int history: Union['Policy.History.KeepAll', 'Policy.History.KeepLast'] max_samples: int max_instances: int max_samples_per_instance: int
[docs] class IgnoreLocal: """The IgnoreLocal Qos Policy Examples -------- >>> Policy.IgnoreLocal.Nothing >>> Policy.IgnoreLocal.Participant >>> Policy.IgnoreLocal.Process """ __init__ = _no_init __scope__ = "IgnoreLocal" Nothing: 'Policy.IgnoreLocal.Nothing' = _policy_singleton("IgnoreLocal", "Nothing") Participant: 'Policy.IgnoreLocal.Participant' = _policy_singleton("IgnoreLocal", "Participant") Process: 'Policy.IgnoreLocal.Process' = _policy_singleton("IgnoreLocal", "Process")
[docs] @dataclass(frozen=True) class Userdata(BasePolicy): """The Userdata Qos Policy Examples -------- >>> Policy.Userdata(data=b"Hello, World!") """ __scope__: ClassVar[str] = "Userdata" data: bytes def __post_init__(self): if type(self.data) != bytes: raise ValueError("Userdata needs to be bytes.")
[docs] @dataclass(frozen=True) class Topicdata(BasePolicy): """The Topicdata Qos Policy Examples -------- >>> Policy.Topicdata(data=b"Hello, World!") """ __scope__: ClassVar[str] = "Topicdata" data: bytes def __post_init__(self): if type(self.data) != bytes: raise ValueError("Topicdata needs to be bytes.")
[docs] @dataclass(frozen=True) class Groupdata(BasePolicy): """The Groupdata Qos Policy Examples -------- >>> Policy.Groupdata(data=b"Hello, World!") """ __scope__: ClassVar[str] = "Groupdata" data: bytes def __post_init__(self): if type(self.data) != bytes: raise ValueError("Groupdata needs to be bytes.")
[docs] @dataclass(frozen=True) class Property(BasePolicy): """The Property Qos Policy Examples -------- >>> Policy.Property(key="host", value="central") """ key: str value: str __scope__: str = field(init=False, repr=False, compare=False) def __post_init__(self): if type(self.value) != str: raise ValueError("Property value should be string.") # The super trick here is because the class is already frozen so _officially_ # we are not supposed to be able to edit this variable. super().__setattr__('__scope__', f"Property<{self.key}>") def __repr__(self): return f"Property(key=\"{self.key}\", value=\"{self.value}\")"
[docs] @dataclass(frozen=True) class BinaryProperty(BasePolicy): """The BinaryProperty Qos Policy Examples -------- >>> Policy.BinaryProperty(key="host", value=b"central") """ key: str value: bytes __scope__: str = field(init=False, repr=False, compare=False) def __post_init__(self): if type(self.value) != bytes: raise ValueError("BinaryProperty value should be bytes.") # The super trick here is because the class is already frozen so _officially_ # we are not supposed to be able to edit this variable. super().__setattr__('__scope__', f"BinaryProperty<{self.key}>") def __repr__(self): return f"BinaryProperty(key=\"{self.key}\", value=b\"{self.value}\")"
[docs] class TypeConsistency: """The TypeConsistency Qos Policy Examples -------- >>> Policy.TypeConsistency.DisallowTypeCoercion >>> Policy.TypeConsistency.AllowTypeCoercion """ __init__ = _no_init __scope__ = "TypeConsistency"
[docs] @dataclass(frozen=True) class DisallowTypeCoercion(BasePolicy): __scope__: ClassVar[str] = "TypeConsistency" force_type_validation: bool = False
[docs] @dataclass(frozen=True) class AllowTypeCoercion(BasePolicy): __scope__: ClassVar[str] = "TypeConsistency" ignore_sequence_bounds: bool = True ignore_string_bounds: bool = True ignore_member_names: bool = True prevent_type_widening: bool = False force_type_validation: bool = False
[docs] @dataclass(frozen=True) class DataRepresentation(BasePolicy): """The DataRepresentation Qos Policy""" __scope__: ClassVar[str] = "DataRepresentation" use_cdrv0_representation: bool = False use_xcdrv2_representation: bool = False
[docs] @dataclass(frozen=True) class EntityName(BasePolicy): """The EntityName Qos Policy""" __scope__: ClassVar[str] = "EntityName" name: str
[docs]class Qos: """This class represents a collections of policies. It allows for easy inspection of this set. When you retrieve a Qos object from an entity modifying that object would actually does not change the Qos of the entity. To reflect this Qos objects are immutable. .. container:: operations .. describe:: x == y Checks if two Qos objects contain the same policies. This is a full comparison, not a match. .. describe:: x != y Checks if two Qos objects do not contain the same policies. .. describe:: p in qos Check if a Policy p is contained in Qos object qos. You can use all levels of generalization, for example: ``Policy.History in qos``, ``Policy.History.KeepLast in qos`` and ``Policy.History.KeepLast(1) in qos``. .. describe:: qos[p] Obtain the Policy matched with p from the Qos object, for example: ``qos[Policy.History] -> Policy.History.KeepAll`` .. describe:: iter(x) The Qos object supports iteration over it's contents. .. describe:: len(x) Return the number of Policies in the Qos object. .. describe:: str(x) Human-readable description of the contained Qos policies. Attributes ---------- policies: Tuple[BasePolicy] A sorted tuple of the Policies contained in this Qos object """ _policy_mapper = { "Policy.Reliability.BestEffort": Policy.Reliability.BestEffort, "Policy.Reliability.Reliable": Policy.Reliability.Reliable, "Policy.Durability.Volatile": Policy.Durability.Volatile, "Policy.Durability.TransientLocal": Policy.Durability.TransientLocal, "Policy.Durability.Transient": Policy.Durability.Transient, "Policy.Durability.Persistent": Policy.Durability.Persistent, "Policy.History.KeepAll": Policy.History.KeepAll, "Policy.History.KeepLast": Policy.History.KeepLast, "Policy.ResourceLimits": Policy.ResourceLimits, "Policy.PresentationAccessScope.Instance": Policy.PresentationAccessScope.Instance, "Policy.PresentationAccessScope.Topic": Policy.PresentationAccessScope.Topic, "Policy.PresentationAccessScope.Group": Policy.PresentationAccessScope.Group, "Policy.Lifespan": Policy.Lifespan, "Policy.Deadline": Policy.Deadline, "Policy.LatencyBudget": Policy.LatencyBudget, "Policy.Ownership.Shared": Policy.Ownership.Shared, "Policy.Ownership.Exclusive": Policy.Ownership.Exclusive, "Policy.OwnershipStrength": Policy.OwnershipStrength, "Policy.Liveliness.Automatic": Policy.Liveliness.Automatic, "Policy.Liveliness.ManualByParticipant": Policy.Liveliness.ManualByParticipant, "Policy.Liveliness.ManualByTopic": Policy.Liveliness.ManualByTopic, "Policy.TimeBasedFilter": Policy.TimeBasedFilter, "Policy.Partition": Policy.Partition, "Policy.TransportPriority": Policy.TransportPriority, "Policy.DestinationOrder.ByReceptionTimestamp": Policy.DestinationOrder.ByReceptionTimestamp, "Policy.DestinationOrder.BySourceTimestamp": Policy.DestinationOrder.BySourceTimestamp, "Policy.WriterDataLifecycle": Policy.WriterDataLifecycle, "Policy.ReaderDataLifecycle": Policy.ReaderDataLifecycle, "Policy.DurabilityService": Policy.DurabilityService, "Policy.IgnoreLocal.Nothing": Policy.IgnoreLocal.Nothing, "Policy.IgnoreLocal.Participant": Policy.IgnoreLocal.Participant, "Policy.IgnoreLocal.Process": Policy.IgnoreLocal.Process, "Policy.Userdata": Policy.Userdata, "Policy.Groupdata": Policy.Groupdata, "Policy.Topicdata": Policy.Topicdata, "Policy.Property": Policy.Property, "Policy.BinaryProperty": Policy.BinaryProperty, "Policy.TypeConsistency.DisallowTypeCoercion": Policy.TypeConsistency.DisallowTypeCoercion, "Policy.TypeConsistency.AllowTypeCoercion": Policy.TypeConsistency.AllowTypeCoercion, "Policy.DataRepresentation": Policy.DataRepresentation, "Policy.EntityName": Policy.EntityName }
[docs] def __init__(self, *policies, base: Optional['Qos'] = None): """Initialize a Qos object Parameters ---------- *policies: BasePolicy Pass in any number of constructed Policies. base : Qos, optional Optionally inherit policies from another Qos object. Inherited policies are overwritten by those newly set. Raises ------ TypeError If you pass something that is not a Policy or use a base that is not a Qos object this will be treated as a TypeError. ValueError If you pass two overlapping Policies, for example ``Policy.History.KeepLast(10)`` and ``Policy.History.KeepAll`` this will be treated as a ValueError. """ policies = list(policies) for p in policies: if not isinstance(p, BasePolicy): raise TypeError(f"{repr(p)} is not a Policy.") if base is not None: if not isinstance(base, Qos): raise TypeError("base takes a Qos as argument.") for policy in base.policies: for p in policies: if p.__scope__ == policy.__scope__: break else: policies.append(policy) self.__policies = tuple(sorted(policies, key=lambda x: x.__scope__)) self._assert_consistency()
def _assert_consistency(self): for i in range(len(self.__policies)): if not isinstance(self.__policies[i], BasePolicy): raise TypeError(str(self.__policies[i]), " is not a Policy.") for i in range(1, len(self.__policies)): if self.__policies[i - 1].__scope__ == self.__policies[i].__scope__: raise ValueError("Multiple Qos policies of type {}.".format(self.__policies[i].__scope__)) @property def policies(self): return self.__policies def __iter__(self): return iter(self.policies) def __getitem__(self, key): if not hasattr(key, "__scope__"): raise ValueError(f"{key} is not a valid policy to look up in the qos") scope = key.__scope__ for p in self.__policies: if p.__scope__ > scope: break if p.__scope__ == scope: return p return None def __contains__(self, key): if not hasattr(key, "__scope__"): raise ValueError(f"{key} is not a valid policy to look up in the qos") if isclass(key): scope = key.__scope__ for p in self.__policies: if p.__scope__ > scope: break if p.__scope__ == scope: return True else: scope = key.__scope__ for p in self.__policies: if p.__scope__ > scope: break if p == key: return True return False def __len__(self): return len(self.__policies) def __eq__(self, other): if not isinstance(other, Qos): return False if len(self.policies) != len(other.policies): return False for p, q in zip(self.policies, other.policies): if p != q: return False return True def __repr__(self): return f"{self.__class__.__name__}({', '.join(repr(p) for p in self.policies)})" __str__ = __repr__
[docs] def asdict(self): """Convert a Qos object to a python dictionary. Returns ------- dict Fully describe the Qos object using a python dictionary with only built-in types (dict, list, string, int, boolean). This format is not guaranteed to stay consistent between cyclonedds versions but can be useful for debugging or use within an application. """ ret = {} for p in self.policies: path = p.__class__.__name__.split(".") data = asdict(p) if "__scope__" in data: # Property & BinaryProperty path[0] = data["__scope__"] del data["__scope__"] if 'kind' in data: del data['kind'] for k, v in data.items(): if type(v) == bytes: data[k] = b64encode(v).decode() if len(path) == 1: ret[path[0]] = data else: # if len(path) == 2: ret[path[0]] = {"kind": path[1]} if data: ret[path[0]].update(data) return ret
[docs] @classmethod def fromdict(cls, data: dict): """Convert a python dictionary as generated by ``asdict()`` to a Qos object. Returns ------- Qos Note that the format of the python dictionary is not guaranteed between cyclonedds versions thus storing these dictionaries to disk and loading them again is not recommended. """ policies = [] for k, v in data.items(): # Special case for subqos if k == "DurabilityService": if not v["history"]: v["history"] = Policy.History.KeepAll else: v["history"] = Policy.History.KeepLast(v["history"]["depth"]) # Special case for UserData/TopicData/GroupData elif k in ['Userdata', 'Topicdata', 'Groupdata']: v["data"] = b64decode(v["data"].encode()) elif k.startswith("Property"): k = "Property" elif k.startswith("BinaryProperty"): k = "BinaryProperty" v["value"] = b64decode(v["value"].encode()) name = f"Policy.{k}" if name in Qos._policy_mapper: if v: policies.append(Qos._policy_mapper[name](**v)) else: policies.append(Qos._policy_mapper[name]) continue if "kind" in v: name += f".{v['kind']}" del v["kind"] if name in Qos._policy_mapper: if v: policies.append(Qos._policy_mapper[name](**v)) else: policies.append(cls._policy_mapper[name]) continue raise ValueError("Not a valid Qos dictionary.") return cls(*policies)
def __add__(self, other) -> 'Qos': return Qos(*other.policies, base=self) def __sub__(self, other) -> 'Qos': for pol in other: if pol not in self: raise ValueError(f"Cannot remove {pol} because that is not contained within this Qos object") return Qos(*[pol for pol in self.policies if pol not in other]) def domain_participant(self) -> 'DomainParticipantQos': return DomainParticipantQos( *[policy for policy in self if policy.__scope__.split("<")[0] in DomainParticipantQos.supported_scopes] ) def topic(self) -> 'TopicQos': return TopicQos( *[policy for policy in self if policy.__scope__.split("<")[0] in TopicQos.supported_scopes] ) def publisher(self) -> 'PublisherQos': return PublisherQos( *[policy for policy in self if policy.__scope__.split("<")[0] in PublisherQos.supported_scopes] ) def subscriber(self) -> 'SubscriberQos': return SubscriberQos( *[policy for policy in self if policy.__scope__.split("<")[0] in SubscriberQos.supported_scopes] ) def datareader(self) -> 'DataReaderQos': return DataReaderQos( *[policy for policy in self if policy.__scope__.split("<")[0] in DataReaderQos.supported_scopes] ) def datawriter(self) -> 'DataWriterQos': return DataWriterQos( *[policy for policy in self if policy.__scope__.split("<")[0] in DataWriterQos.supported_scopes] )
class LimitedScopeQos(Qos): for_entity: str supported_scopes: Set[str] def _assert_consistency(self): super()._assert_consistency() for policy in self.policies: if policy.__scope__.split("<")[0] not in self.supported_scopes: raise ValueError(f"{self.for_entity} Qos does not support {policy}.") class DomainParticipantQos(LimitedScopeQos): for_entity: str = "DomainParticipant" supported_scopes: Set[str] = {"EntityName", "BinaryProperty", "Property", "Userdata", "IgnoreLocal"} class TopicQos(LimitedScopeQos): for_entity: str = "Topic" supported_scopes: Set[str] = { "EntityName", "BinaryProperty", "Deadline", "DestinationOrder", "Durability", "DurabilityService", "History", "IgnoreLocal", "LatencyBudget", "Lifespan", "Liveliness", "Ownership", "Property", "Reliability", "ResourceLimits", "Topicdata", "TransportPriority", "TypeConsistency", "DataRepresentation" } class PublisherQos(LimitedScopeQos): for_entity: str = "Publisher" supported_scopes: Set[str] = { "EntityName", "BinaryProperty", "Groupdata", "IgnoreLocal", "Partition", "PresentationAccessScope", "Property" } class SubscriberQos(LimitedScopeQos): for_entity: str = "Subscriber" supported_scopes: Set[str] = { "EntityName", "BinaryProperty", "Groupdata", "IgnoreLocal", "Partition", "PresentationAccessScope", "Property" } class DataWriterQos(LimitedScopeQos): for_entity: str = "DataWriter" supported_scopes: Set[str] = { "EntityName", "BinaryProperty", "Deadline", "DestinationOrder", "Durability", "DurabilityService", "History", "IgnoreLocal", "LatencyBudget", "Lifespan", "Liveliness", "Ownership", "OwnershipStrength", "Property", "Reliability", "ResourceLimits", "TransportPriority", "Userdata", "WriterDataLifecycle", "TypeConsistency", "DataRepresentation" } class DataReaderQos(LimitedScopeQos): for_entity: str = "DataReader" supported_scopes: Set[str] = { "EntityName", "BinaryProperty", "Deadline", "DestinationOrder", "Durability", "History", "IgnoreLocal", "LatencyBudget", "Liveliness", "Ownership", "Property", "ReaderDataLifecycle", "Reliability", "ResourceLimits", "TimeBasedFilter", "Userdata", "TypeConsistency", "DataRepresentation" } class _CQos(DDS): """The _CQos object represents a qos pointer into DDS. Because they are somewhat annoying to deal with these are intended to be short-lived objects, used just to convert between the handy Qos object and the CycloneDDS C layer. """ _all_scopes = ( "Reliability", "Durability", "History", "ResourceLimits", "PresentationAccessScope", "Lifespan", "Deadline", "LatencyBudget", "Ownership", "OwnershipStrength", "Liveliness", "TimeBasedFilter", "Partition", "TransportPriority", "DestinationOrder", "WriterDataLifecycle", "ReaderDataLifecycle", "DurabilityService", "IgnoreLocal", "Userdata", "Groupdata", "Topicdata", "Property", "BinaryProperty", "TypeConsistency", "DataRepresentation", "EntityName" ) @classmethod def cqos_create(cls): return cls._create_qos() @classmethod def qos_to_cqos(cls, qos: Qos): cqos = cls._create_qos() for policy in qos: getattr(cls, "_set_p_" + policy.__scope__.split("<")[0].lower())(cqos, policy) return cqos @classmethod def cqos_to_qos(cls, cqos): policies = [] for scope in cls._all_scopes: p = getattr(cls, "_get_p_" + scope.lower())(cqos) if p is not None: if type(p) == list: policies.extend(p) else: policies.append(p) return Qos(*policies) @classmethod def cqos_destroy(cls, cqos): cls.delete_cqos(cqos) @static_c_call("dds_create_qos") def _create_qos(self) -> dds_c_t.qos_p: pass @static_c_call("dds_delete_qos") def delete_cqos(self, qos: dds_c_t.qos_p) -> None: pass @static_c_call("dds_free") def free(self, ptr: ct.c_void_p) -> None: pass # Reliability @classmethod def _set_p_reliability(cls, qos, policy): if policy == Policy.Reliability.BestEffort: return cls._set_reliability(qos, 0, 0) return cls._set_reliability(qos, 1, policy.max_blocking_time) @static_c_call("dds_qset_reliability") def _set_reliability(self, qos: dds_c_t.qos_p, reliability_kind: dds_c_t.reliability, blocking_time: dds_c_t.duration) -> None: pass # Durability @classmethod def _set_p_durability(cls, qos, policy): if policy == Policy.Durability.Volatile: return cls._set_durability(qos, 0) elif policy == Policy.Durability.TransientLocal: return cls._set_durability(qos, 1) elif policy == Policy.Durability.Transient: return cls._set_durability(qos, 2) return cls._set_durability(qos, 3) @static_c_call("dds_qset_durability") def _set_durability(self, qos: dds_c_t.qos_p, durability_kind: dds_c_t.durability) -> None: pass # History @classmethod def _set_p_history(cls, qos, policy): if policy == Policy.History.KeepAll: return cls._set_history(qos, 1, 0) return cls._set_history(qos, 0, policy.depth) @static_c_call("dds_qset_history") def _set_history(self, qos: dds_c_t.qos_p, history_kind: dds_c_t.history, depth: ct.c_int32) -> None: pass # Resource Limits @classmethod def _set_p_resourcelimits(cls, qos, policy): return cls._set_resource_limits(qos, policy.max_samples, policy.max_instances, policy.max_samples_per_instance) @static_c_call("dds_qset_resource_limits") def _set_resource_limits(self, qos: dds_c_t.qos_p, max_samples: ct.c_int32, max_instances: ct.c_int32, max_samples_per_instance: ct.c_int32) -> None: pass # Presentation access scpoe @classmethod def _set_p_presentationaccessscope(cls, qos, policy): if type(policy) is Policy.PresentationAccessScope.Instance: return cls._set_presentation_access_scope(qos, 0, policy.coherent_access, policy.ordered_access) elif type(policy) is Policy.PresentationAccessScope.Topic: return cls._set_presentation_access_scope(qos, 1, policy.coherent_access, policy.ordered_access) return cls._set_presentation_access_scope(qos, 2, policy.coherent_access, policy.ordered_access) @static_c_call("dds_qset_presentation") def _set_presentation_access_scope(self, qos: dds_c_t.qos_p, access_scope: dds_c_t.presentation_access_scope, coherent_access: ct.c_bool, ordered_access: ct.c_bool) -> None: pass # Lifespan @classmethod def _set_p_lifespan(cls, qos, policy): return cls._set_lifespan(qos, policy.lifespan) @static_c_call("dds_qset_lifespan") def _set_lifespan(self, qos: dds_c_t.qos_p, lifespan: dds_c_t.duration) -> None: pass # Deadline @classmethod def _set_p_deadline(cls, qos, policy): return cls._set_deadline(qos, policy.deadline) @static_c_call("dds_qset_deadline") def _set_deadline(self, qos: dds_c_t.qos_p, deadline: dds_c_t.duration) -> None: pass # Latency budget @classmethod def _set_p_latencybudget(cls, qos, policy): return cls._set_latency_budget(qos, policy.budget) @static_c_call("dds_qset_latency_budget") def _set_latency_budget(self, qos: dds_c_t.qos_p, latency_budget: dds_c_t.duration) -> None: pass # Ownership @classmethod def _set_p_ownership(cls, qos, policy): if policy == Policy.Ownership.Shared: return cls._set_ownership(qos, 0) return cls._set_ownership(qos, 1) @static_c_call("dds_qset_ownership") def _set_ownership(self, qos: dds_c_t.qos_p, ownership_kind: dds_c_t.ownership) -> None: pass # Ownership Strength @classmethod def _set_p_ownershipstrength(cls, qos, policy): return cls._set_ownership_strength(qos, policy.strength) @static_c_call("dds_qset_ownership_strength") def _set_ownership_strength(self, qos: dds_c_t.qos_p, ownership_strength: ct.c_int32) -> None: pass # Liveliness @classmethod def _set_p_liveliness(cls, qos, policy): if type(policy) is Policy.Liveliness.Automatic: return cls._set_liveliness(qos, 0, policy.lease_duration) elif type(policy) is Policy.Liveliness.ManualByParticipant: return cls._set_liveliness(qos, 1, policy.lease_duration) return cls._set_liveliness(qos, 2, policy.lease_duration) @static_c_call("dds_qset_liveliness") def _set_liveliness(self, qos: dds_c_t.qos_p, liveliness_kind: dds_c_t.liveliness, lease_duration: dds_c_t.duration) -> None: pass # Time based filter @classmethod def _set_p_timebasedfilter(cls, qos, policy): return cls._set_time_based_filter(qos, policy.filter_time) @static_c_call("dds_qset_time_based_filter") def _set_time_based_filter(self, qos: dds_c_t.qos_p, minimum_separation: dds_c_t.duration) -> None: pass # Partition @classmethod def _set_p_partition(cls, qos, policy): ps = [p.encode() for p in policy.partitions] p_pt = (ct.c_char_p * len(ps))() for i, p in enumerate(ps): p_pt[i] = p cls._set_partition(qos, len(ps), p_pt) @static_c_call("dds_qset_partition") def _set_partition(self, qos: dds_c_t.qos_p, n: ct.c_uint32, ps: ct.POINTER(ct.c_char_p)) -> None: pass # Transport priority @classmethod def _set_p_transportpriority(cls, qos, policy): return cls._set_transport_priority(qos, policy.priority) @static_c_call("dds_qset_transport_priority") def _set_transport_priority(self, qos: dds_c_t.qos_p, value: ct.c_int32) -> None: pass # Destination order @classmethod def _set_p_destinationorder(cls, qos, policy): if policy == Policy.DestinationOrder.ByReceptionTimestamp: return cls._set_destination_order(qos, 0) return cls._set_destination_order(qos, 1) @static_c_call("dds_qset_destination_order") def _set_destination_order(self, qos: dds_c_t.qos_p, destination_order_kind: dds_c_t.destination_order) -> None: pass # Writer Data Lifecycle @classmethod def _set_p_writerdatalifecycle(cls, qos, policy): return cls._set_writer_data_lifecycle(qos, policy.autodispose) @static_c_call("dds_qset_writer_data_lifecycle") def _set_writer_data_lifecycle(self, qos: dds_c_t.qos_p, autodispose: ct.c_bool) -> None: pass # Reader Data Lifecycle @classmethod def _set_p_readerdatalifecycle(cls, qos, policy): return cls._set_reader_data_lifecycle( qos, policy.autopurge_nowriter_samples_delay, policy.autopurge_disposed_samples_delay ) @static_c_call("dds_qset_reader_data_lifecycle") def _set_reader_data_lifecycle(self, qos: dds_c_t.qos_p, autopurge_nowriter_samples_delay: dds_c_t.duration, autopurge_disposed_samples_delay: dds_c_t.duration) -> None: pass # Durability Service @classmethod def _set_p_durabilityservice(cls, qos, policy): if policy.history == Policy.History.KeepAll: history_kind = 1 history_depth = 0 else: history_kind = 0 history_depth = policy.history.depth return cls._set_durability_service( qos, policy.cleanup_delay, history_kind, history_depth, policy.max_samples, policy.max_instances, policy.max_samples_per_instance ) @static_c_call("dds_qset_durability_service") def _set_durability_service(self, qos: dds_c_t.qos_p, service_cleanup_delay: dds_c_t.duration, history_kind: dds_c_t.history, history_depth: ct.c_int32, max_samples: ct.c_int32, max_instances: ct.c_int32, max_samples_per_instance: ct.c_int32) -> None: pass # Ignore local @classmethod def _set_p_ignorelocal(cls, qos, policy): if policy == Policy.IgnoreLocal.Nothing: return cls._set_ignore_local(qos, 0) elif policy == Policy.IgnoreLocal.Participant: return cls._set_ignore_local(qos, 1) return cls._set_ignore_local(qos, 2) @static_c_call("dds_qset_ignorelocal") def _set_ignore_local(self, qos: dds_c_t.qos_p, ingorelocal_kind: dds_c_t.ingnorelocal) -> None: pass # Userdata @classmethod def _set_p_userdata(cls, qos, policy): cls._set_userdata(qos, policy.data, len(policy.data)) @static_c_call("dds_qset_userdata") def _set_userdata(self, qos: dds_c_t.qos_p, value: ct.c_void_p, size: ct.c_size_t) -> None: pass # Topic @classmethod def _set_p_topicdata(cls, qos, policy): cls._set_topicdata(qos, policy.data, len(policy.data)) @static_c_call("dds_qset_topicdata") def _set_topicdata(self, qos: dds_c_t.qos_p, value: ct.c_void_p, size: ct.c_size_t) -> None: pass # Group @classmethod def _set_p_groupdata(cls, qos, policy): cls._set_groupdata(qos, policy.data, len(policy.data)) @static_c_call("dds_qset_groupdata") def _set_groupdata(self, qos: dds_c_t.qos_p, value: ct.c_void_p, size: ct.c_size_t) -> None: pass # Property @classmethod def _set_p_property(cls, qos, policy): cls._set_property(qos, policy.key.encode('utf8'), policy.value.encode('utf8')) @static_c_call("dds_qset_prop") def _set_property(self, qos: dds_c_t.qos_p, key: ct.c_char_p, value: ct.c_char_p) -> None: pass # Binary property @classmethod def _set_p_binaryproperty(cls, qos, policy): cls._set_binaryproperty(qos, policy.key.encode('utf8'), policy.value, len(policy.value)) @static_c_call("dds_qset_bprop") def _set_binaryproperty(self, qos: dds_c_t.qos_p, key: ct.c_char_p, value: ct.c_void_p, size: ct.c_size_t) -> None: pass # Type Consistency @classmethod def _set_p_typeconsistency(cls, qos, policy): if isinstance(policy, Policy.TypeConsistency.DisallowTypeCoercion): return cls._set_type_consistency(qos, 0, False, False, False, False, policy.force_type_validation) return cls._set_type_consistency( qos, 1, policy.ignore_sequence_bounds, policy.ignore_string_bounds, policy.ignore_member_names, policy.prevent_type_widening, policy.force_type_validation ) @static_c_call("dds_qset_type_consistency") def _set_type_consistency(self, qos: dds_c_t.qos_p, type_consistency_kind: dds_c_t.type_consistency, ignore_sequence_bounds: ct.c_bool, ignore_string_bounds: ct.c_bool, ignore_member_names: ct.c_bool, prevent_type_widening: ct.c_bool, force_type_validation: ct.c_bool) -> None: pass # Data Representation @classmethod def _set_p_datarepresentation(cls, qos, policy: Policy.DataRepresentation): if policy.use_cdrv0_representation and policy.use_xcdrv2_representation: representations = (dds_c_t.data_representation_id * 2)() representations[0] = 0 representations[1] = 2 return cls._set_data_representation(qos, 2, representations) if policy.use_cdrv0_representation: representations = dds_c_t.data_representation_id(0) return cls._set_data_representation(qos, 1, ct.byref(representations)) if policy.use_xcdrv2_representation: representations = dds_c_t.data_representation_id(2) return cls._set_data_representation(qos, 1, ct.byref(representations)) @static_c_call("dds_qset_data_representation") def _set_data_representation(self, qos: dds_c_t.qos_p, n: ct.c_uint32, values: ct.POINTER(dds_c_t.data_representation_id)) -> None: pass # Entity Name @classmethod def _set_p_entityname(cls, qos, policy: Policy.EntityName): return cls._set_entity_name(qos, policy.name.encode('utf8')) @static_c_call("dds_qset_entity_name") def _set_entity_name(self, qos: dds_c_t.qos_p, name: ct.c_char_p) -> None: pass # END OF SETTERS, START OF GETTERS # _gc_data_size = ct.c_size_t() _gc_data_value = ct.c_void_p() _gc_durability = dds_c_t.durability() _gc_history = dds_c_t.history() _gc_history_depth = ct.c_int32() _gc_max_samples = ct.c_int32() _gc_max_instances = ct.c_int32() _gc_max_samples_per_instance = ct.c_int32() _gc_access_scope = dds_c_t.presentation_access_scope() _gc_coherent_access = ct.c_bool() _gc_ordered_access = ct.c_bool() _gc_lifespan = dds_c_t.duration() _gc_deadline = dds_c_t.duration() _gc_latency_budget = dds_c_t.duration() _gc_ownership = dds_c_t.ownership() _gc_ownership_strength = ct.c_int32() _gc_liveliness = dds_c_t.liveliness() _gc_lease_duration = dds_c_t.duration() _gc_time_based_filter = dds_c_t.duration() _gc_partition_num = ct.c_uint32() _gc_partition_names = (ct.POINTER(ct.c_char_p))() _gc_reliability = dds_c_t.reliability() _gc_max_blocking_time = dds_c_t.duration() _gc_transport_priority = ct.c_int32() _gc_destination_order = dds_c_t.destination_order() _gc_writer_autodispose = ct.c_bool() _gc_autopurge_nowriter_samples_delay = dds_c_t.duration() _gc_autopurge_disposed_samples_delay = dds_c_t.duration() _gc_durservice_service_cleanup_delay = dds_c_t.duration() _gc_durservice_history_kind = dds_c_t.history() _gc_durservice_history_depth = ct.c_int32() _gc_durservice_max_samples = ct.c_int32() _gc_durservice_max_instances = ct.c_int32() _gc_durservice_max_samples_per_instance = ct.c_int32() _gc_ignorelocal = dds_c_t.ingnorelocal() _gc_propnames_num = ct.c_uint32() _gc_propnames_names = (ct.POINTER(ct.c_char_p))() _gc_prop_get_value = ct.c_char_p() _gc_bpropnames_num = ct.c_uint32() _gc_bpropnames_names = (ct.POINTER(ct.c_char_p))() _gc_bprop_get_value = ct.c_char_p() _gc_typecons_kind = dds_c_t.type_consistency() _gc_typecons_iseqbounds = ct.c_bool() _gc_typecons_istrbounds = ct.c_bool() _gc_typecons_imemnames = ct.c_bool() _gc_typecons_itypewide = ct.c_bool() _gc_typecons_forceval = ct.c_bool() # Reliability @classmethod def _get_p_reliability(cls, qos): if not cls._get_reliability(qos, ct.byref(cls._gc_reliability), ct.byref(cls._gc_max_blocking_time)): return None if cls._gc_reliability.value == 0: return Policy.Reliability.BestEffort return Policy.Reliability.Reliable(max_blocking_time=cls._gc_max_blocking_time.value) @static_c_call("dds_qget_reliability") def _get_reliability(self, qos: dds_c_t.qos_p, reliability_kind: ct.POINTER(dds_c_t.reliability), blocking_time: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Durability @classmethod def _get_p_durability(cls, qos): if not cls._get_durability(qos, ct.byref(cls._gc_durability)): return None if cls._gc_durability.value == 0: return Policy.Durability.Volatile elif cls._gc_durability.value == 1: return Policy.Durability.TransientLocal elif cls._gc_durability.value == 2: return Policy.Durability.Transient return Policy.Durability.Persistent @static_c_call("dds_qget_durability") def _get_durability(self, qos: dds_c_t.qos_p, durability_kind: ct.POINTER(dds_c_t.durability)) -> ct.c_bool: pass # History @classmethod def _get_p_history(cls, qos): if not cls._get_history(qos, ct.byref(cls._gc_history), ct.byref(cls._gc_history_depth)): return None if cls._gc_history.value == 1: return Policy.History.KeepAll return Policy.History.KeepLast(depth=cls._gc_history_depth.value) @static_c_call("dds_qget_history") def _get_history(self, qos: dds_c_t.qos_p, history_kind: ct.POINTER(dds_c_t.history), depth: ct.POINTER(ct.c_int32)) -> ct.c_bool: pass # Resource limits @classmethod def _get_p_resourcelimits(cls, qos): if not cls._get_resource_limits( qos, ct.byref(cls._gc_max_samples), ct.byref(cls._gc_max_instances), ct.byref(cls._gc_max_samples_per_instance)): return None return Policy.ResourceLimits( max_samples=cls._gc_max_samples.value, max_instances=cls._gc_max_instances.value, max_samples_per_instance=cls._gc_max_samples_per_instance.value ) @static_c_call("dds_qget_resource_limits") def _get_resource_limits(self, qos: dds_c_t.qos_p, max_samples: ct.POINTER(ct.c_int32), max_instances: ct.POINTER(ct.c_int32), max_samples_per_instance: ct.POINTER(ct.c_int32)) -> ct.c_bool: pass # Presentation access scope @classmethod def _get_p_presentationaccessscope(cls, qos): if not cls._get_presentation( qos, ct.byref(cls._gc_access_scope), ct.byref(cls._gc_coherent_access), ct.byref(cls._gc_ordered_access)): return None if cls._gc_access_scope.value == 0: return Policy.PresentationAccessScope.Instance( coherent_access=cls._gc_coherent_access.value, ordered_access=cls._gc_ordered_access.value ) elif cls._gc_access_scope.value == 1: return Policy.PresentationAccessScope.Topic( coherent_access=cls._gc_coherent_access.value, ordered_access=cls._gc_ordered_access.value ) return Policy.PresentationAccessScope.Group( coherent_access=cls._gc_coherent_access.value, ordered_access=cls._gc_ordered_access.value ) @static_c_call("dds_qget_presentation") def _get_presentation(self, qos: dds_c_t.qos_p, access_scope: ct.POINTER(dds_c_t.presentation_access_scope), coherent_access: ct.POINTER(ct.c_bool), ordered_access: ct.POINTER(ct.c_bool)) -> ct.c_bool: pass # Lifespan @classmethod def _get_p_lifespan(cls, qos): if not cls._get_lifespan(qos, ct.byref(cls._gc_lifespan)): return None return Policy.Lifespan(lifespan=cls._gc_lifespan.value) @static_c_call("dds_qget_lifespan") def _get_lifespan(self, qos: dds_c_t.qos_p, lifespan: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Deadline @classmethod def _get_p_deadline(cls, qos): if not cls._get_deadline(qos, ct.byref(cls._gc_deadline)): return None return Policy.Deadline(deadline=cls._gc_deadline.value) @static_c_call("dds_qget_deadline") def _get_deadline(self, qos: dds_c_t.qos_p, deadline: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Latency Budget @classmethod def _get_p_latencybudget(cls, qos): if not cls._get_latency_budget(qos, ct.byref(cls._gc_latency_budget)): return None return Policy.LatencyBudget(budget=cls._gc_latency_budget.value) @static_c_call("dds_qget_latency_budget") def _get_latency_budget(self, qos: dds_c_t.qos_p, latency_budget: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Ownership @classmethod def _get_p_ownership(cls, qos): if not cls._get_ownership(qos, ct.byref(cls._gc_ownership)): return None if cls._gc_ownership.value == 0: return Policy.Ownership.Shared return Policy.Ownership.Exclusive @static_c_call("dds_qget_ownership") def _get_ownership(self, qos: dds_c_t.qos_p, ownership_kind: ct.POINTER(dds_c_t.ownership)) -> ct.c_bool: pass # Ownership strength @classmethod def _get_p_ownershipstrength(cls, qos): if not cls._get_ownership_strength(qos, ct.byref(cls._gc_ownership_strength)): return None return Policy.OwnershipStrength(strength=cls._gc_ownership_strength.value) @static_c_call("dds_qget_ownership_strength") def _get_ownership_strength(self, qos: dds_c_t.qos_p, strength: ct.POINTER(ct.c_int32)) -> ct.c_bool: pass # Liveliness @classmethod def _get_p_liveliness(cls, qos): if not cls._get_liveliness(qos, ct.byref(cls._gc_liveliness), ct.byref(cls._gc_lease_duration)): return None if cls._gc_liveliness.value == 0: return Policy.Liveliness.Automatic(lease_duration=cls._gc_lease_duration.value) if cls._gc_liveliness.value == 1: return Policy.Liveliness.ManualByParticipant(lease_duration=cls._gc_lease_duration.value) return Policy.Liveliness.ManualByTopic(lease_duration=cls._gc_lease_duration.value) @static_c_call("dds_qget_liveliness") def _get_liveliness(self, qos: dds_c_t.qos_p, liveliness_kind: ct.POINTER(dds_c_t.liveliness), lease_duration: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Time based filter @classmethod def _get_p_timebasedfilter(cls, qos): if not cls._get_time_based_filter(qos, ct.byref(cls._gc_time_based_filter)): return None return Policy.TimeBasedFilter(filter_time=cls._gc_time_based_filter.value) @static_c_call("dds_qget_time_based_filter") def _get_time_based_filter(self, qos: dds_c_t.qos_p, minimum_separation: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Partition @classmethod def _get_p_partition(cls, qos): if not cls._get_partition(qos, ct.byref(cls._gc_partition_num), ct.byref(cls._gc_partition_names)): return None if cls._gc_partition_num.value == 0: return None names = [None] * cls._gc_partition_num.value for i in range(cls._gc_partition_num.value): names[i] = bytes(cls._gc_partition_names[i]).decode() return Policy.Partition(partitions=names) @static_c_call("dds_qget_partition") def _get_partition(self, qos: dds_c_t.qos_p, n: ct.POINTER(ct.c_uint32), ps: ct.POINTER(ct.POINTER(ct.c_char_p))) -> ct.c_bool: pass # Transport priority @classmethod def _get_p_transportpriority(cls, qos): if not cls._get_transport_priority(qos, ct.byref(cls._gc_transport_priority)): return None return Policy.TransportPriority(priority=cls._gc_transport_priority.value) @static_c_call("dds_qget_transport_priority") def _get_transport_priority(self, qos: dds_c_t.qos_p, value: ct.POINTER(ct.c_int32)) -> ct.c_bool: pass # Destination order @classmethod def _get_p_destinationorder(cls, qos): if not cls._get_destination_order(qos, ct.byref(cls._gc_destination_order)): return None if cls._gc_destination_order.value == 0: return Policy.DestinationOrder.ByReceptionTimestamp return Policy.DestinationOrder.BySourceTimestamp @static_c_call("dds_qget_destination_order") def _get_destination_order(self, qos: dds_c_t.qos_p, destination_order_kind: ct.POINTER(dds_c_t.destination_order)) -> ct.c_bool: pass # Writer data lifecycle @classmethod def _get_p_writerdatalifecycle(cls, qos): if not cls._get_writer_data_lifecycle(qos, ct.byref(cls._gc_writer_autodispose)): return None return Policy.WriterDataLifecycle(autodispose=cls._gc_writer_autodispose.value) @static_c_call("dds_qget_writer_data_lifecycle") def _get_writer_data_lifecycle(self, qos: dds_c_t.qos_p, autodispose: ct.POINTER(ct.c_bool)) -> ct.c_bool: pass # Reader data lifecycle @classmethod def _get_p_readerdatalifecycle(cls, qos): if not cls._get_reader_data_lifecycle(qos, ct.byref(cls._gc_autopurge_nowriter_samples_delay), ct.byref(cls._gc_autopurge_disposed_samples_delay)): return None return Policy.ReaderDataLifecycle( autopurge_nowriter_samples_delay=cls._gc_autopurge_nowriter_samples_delay.value, autopurge_disposed_samples_delay=cls._gc_autopurge_disposed_samples_delay.value ) @static_c_call("dds_qget_reader_data_lifecycle") def _get_reader_data_lifecycle(self, qos: dds_c_t.qos_p, autopurge_nowriter_samples_delay: ct.POINTER(dds_c_t.duration), autopurge_disposed_samples_delay: ct.POINTER(dds_c_t.duration)) -> ct.c_bool: pass # Durability service @classmethod def _get_p_durabilityservice(cls, qos): if not cls._get_durability_service( qos, ct.byref(cls._gc_durservice_service_cleanup_delay), ct.byref(cls._gc_durservice_history_kind), ct.byref(cls._gc_durservice_history_depth), ct.byref(cls._gc_durservice_max_samples), ct.byref(cls._gc_durservice_max_instances), ct.byref(cls._gc_durservice_max_samples_per_instance)): return None if cls._gc_durservice_history_kind.value == 0: history = Policy.History.KeepLast(depth=cls._gc_durservice_history_depth.value) else: history = Policy.History.KeepAll return Policy.DurabilityService( cleanup_delay=cls._gc_durservice_service_cleanup_delay.value, history=history, max_samples=cls._gc_durservice_max_samples.value, max_instances=cls._gc_durservice_max_instances.value, max_samples_per_instance=cls._gc_durservice_max_samples_per_instance.value ) @static_c_call("dds_qget_durability_service") def _get_durability_service(self, qos: dds_c_t.qos_p, service_cleanup_delay: ct.POINTER(dds_c_t.duration), history_kind: ct.POINTER(dds_c_t.history), history_depth: ct.POINTER(ct.c_int32), max_samples: ct.POINTER(ct.c_int32), max_instances: ct.POINTER(ct.c_int32), max_samples_per_instance: ct.POINTER(ct.c_int32)) -> ct.c_bool: pass # Ignore local @classmethod def _get_p_ignorelocal(cls, qos): if not cls._get_ignorelocal(qos, ct.byref(cls._gc_ignorelocal)): return None if cls._gc_ignorelocal.value == 0: return Policy.IgnoreLocal.Nothing if cls._gc_ignorelocal.value == 1: return Policy.IgnoreLocal.Participant return Policy.IgnoreLocal.Process @static_c_call("dds_qget_ignorelocal") def _get_ignorelocal(self, qos: dds_c_t.qos_p, ingorelocal_kind: ct.POINTER(dds_c_t.ingnorelocal)) -> ct.c_bool: pass # Userdata @classmethod def _get_p_userdata(cls, qos): if not cls._get_userdata(qos, ct.byref(cls._gc_data_value), ct.byref(cls._gc_data_size)): return None if cls._gc_data_size.value == 0 or not bool(cls._gc_data_value): return None return Policy.Userdata(data=ct.string_at(cls._gc_data_value, cls._gc_data_size.value)) @static_c_call("dds_qget_userdata") def _get_userdata(self, qos: dds_c_t.qos_p, value: ct.POINTER(ct.c_void_p), size: ct.POINTER(ct.c_size_t)) -> ct.c_bool: pass # Topicdata @classmethod def _get_p_topicdata(cls, qos): if not cls._get_topicdata(qos, ct.byref(cls._gc_data_value), ct.byref(cls._gc_data_size)): return None if cls._gc_data_size.value == 0 or not bool(cls._gc_data_value): return None byte_type = ct.c_byte * cls._gc_data_size.value mybytes = bytes(ct.cast(cls._gc_data_value, ct.POINTER(byte_type))[0]) return Policy.Topicdata(data=mybytes) @static_c_call("dds_qget_topicdata") def _get_topicdata(self, qos: dds_c_t.qos_p, value: ct.POINTER(ct.c_void_p), size: ct.POINTER(ct.c_size_t)) -> ct.c_bool: pass # Groupdata @classmethod def _get_p_groupdata(cls, qos): if not cls._get_groupdata(qos, ct.byref(cls._gc_data_value), ct.byref(cls._gc_data_size)): return None if cls._gc_data_size == 0 or not bool(cls._gc_data_value): return None byte_type = ct.c_byte * cls._gc_data_size.value mybytes = bytes(ct.cast(cls._gc_data_value, ct.POINTER(byte_type))[0]) return Policy.Groupdata(data=mybytes) @static_c_call("dds_qget_groupdata") def _get_groupdata(self, qos: dds_c_t.qos_p, value: ct.POINTER(ct.c_void_p), size: ct.POINTER(ct.c_size_t)) -> ct.c_bool: pass # Properties @classmethod def _get_p_property(cls, qos): num = ct.c_size_t() names = ct.POINTER(ct.POINTER(ct.c_char))() if not cls._get_property_names(qos, ct.byref(num), ct.byref(names)): return None ret = [] try: for i in range(num.value): name = ct.cast(names[i], ct.c_char_p) value = ct.c_char_p() if not cls._get_property_value(qos, name, ct.byref(value)): raise Exception("Internal QOS property structure is corrupt!") ret.append(Policy.Property(name.value.decode("utf8"), value.value.decode("utf8"))) cls.free(value) finally: for i in range(num.value): cls.free(names[i]) cls.free(names) return ret @static_c_call("dds_qget_propnames") def _get_property_names(self, qos: dds_c_t.qos_p, num: ct.POINTER(ct.c_size_t), names: ct.POINTER(ct.POINTER(ct.POINTER(ct.c_char)))) -> ct.c_bool: pass @static_c_call("dds_qget_prop") def _get_property_value(self, qos: dds_c_t.qos_p, name: ct.c_char_p, value: ct.POINTER(ct.c_char_p)) -> ct.c_bool: pass # Binary properties @classmethod def _get_p_binaryproperty(cls, qos): num = ct.c_size_t() names = ct.POINTER(ct.POINTER(ct.c_char))() if not cls._get_binaryproperty_names(qos, ct.byref(num), ct.byref(names)): return None ret = [] try: for i in range(num.value): name = ct.cast(names[i], ct.c_char_p) value = ct.c_void_p() size = ct.c_size_t() if not cls._get_binaryproperty_value(qos, name, ct.byref(value), ct.byref(size)): raise Exception("Internal QOS property structure is corrupt!") ret.append(Policy.BinaryProperty(name.value.decode("utf8"), ct.string_at(value, size.value))) cls.free(value) finally: for i in range(num.value): cls.free(names[i]) cls.free(names) return ret @static_c_call("dds_qget_bpropnames") def _get_binaryproperty_names(self, qos: dds_c_t.qos_p, num: ct.POINTER(ct.c_size_t), names: ct.POINTER(ct.POINTER(ct.POINTER(ct.c_char)))) -> ct.c_bool: pass @static_c_call("dds_qget_bprop") def _get_binaryproperty_value(self, qos: dds_c_t.qos_p, name: ct.c_char_p, value: ct.POINTER(ct.c_void_p), size: ct.POINTER(ct.c_size_t)) -> ct.c_bool: pass # Type Consistency @classmethod def _get_p_typeconsistency(cls, qos): if not cls._get_type_consistency(qos, ct.byref(cls._gc_typecons_kind), ct.byref(cls._gc_typecons_iseqbounds), ct.byref(cls._gc_typecons_istrbounds), ct.byref(cls._gc_typecons_imemnames), ct.byref(cls._gc_typecons_itypewide), ct.byref(cls._gc_typecons_forceval)): return None if cls._gc_typecons_kind.value == 0: return Policy.TypeConsistency.DisallowTypeCoercion( force_type_validation=cls._gc_typecons_forceval.value ) return Policy.TypeConsistency.AllowTypeCoercion( ignore_sequence_bounds=cls._gc_typecons_iseqbounds.value, ignore_string_bounds=cls._gc_typecons_istrbounds.value, ignore_member_names=cls._gc_typecons_imemnames.value, prevent_type_widening=cls._gc_typecons_itypewide.value, force_type_validation=cls._gc_typecons_forceval.value ) @static_c_call("dds_qget_type_consistency") def _get_type_consistency(self, qos: dds_c_t.qos_p, type_consistency_kind: ct.POINTER(dds_c_t.type_consistency), ignore_sequence_bounds: ct.POINTER(ct.c_bool), ignore_string_bounds: ct.POINTER(ct.c_bool), ignore_member_names: ct.POINTER(ct.c_bool), prevent_type_widening: ct.POINTER(ct.c_bool), force_type_validation: ct.POINTER(ct.c_bool)) -> ct.c_bool: pass # Data Representation @classmethod def _get_p_datarepresentation(cls, qos): n = ct.c_uint32(0) values = ct.POINTER(dds_c_t.data_representation_id)() if not cls._get_data_representation(qos, ct.byref(n), ct.byref(values)): return None use_cdrv0 = False use_xcdrv2 = False for i in range(n.value): if values[i] == 0: use_cdrv0 = True elif values[i] == 2: use_xcdrv2 = True # Function docs of dds_qget_data_representation say the caller is responsible # for free'ing the buffer. cls.free(values) return Policy.DataRepresentation(use_cdrv0_representation=use_cdrv0, use_xcdrv2_representation=use_xcdrv2) @static_c_call("dds_qget_data_representation") def _get_data_representation(self, qos: dds_c_t.qos_p, n: ct.POINTER(ct.c_uint32), values: ct.POINTER(ct.POINTER(dds_c_t.data_representation_id))) -> ct.c_bool: pass # Entity Name @classmethod def _get_p_entityname(cls, qos): if not cls._get_entity_name(qos, ct.byref(cls._gc_prop_get_value)): return None if cls._gc_prop_get_value is None or cls._gc_prop_get_value.value is None: return None if type(cls._gc_prop_get_value.value) != bytes: return None name = cls._gc_prop_get_value.value.decode('utf8') cls.free(cls._gc_prop_get_value) return Policy.EntityName(name=name) @static_c_call("dds_qget_entity_name") def _get_entity_name(self, qos: dds_c_t.qos_p, name: ct.POINTER(ct.c_char_p)) -> ct.c_bool: pass