Source code for cyclonedds.internal

"""
 * Copyright(c) 2021 to 2022 ZettaScale Technology and others
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
 * v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""

import os
import uuid
import inspect
import platform
import ctypes as ct
from ctypes.util import find_library
from functools import wraps
from dataclasses import dataclass


if 'CYCLONEDDS_PYTHON_NO_IMPORT_LIBS' not in os.environ:
    from .__library__ import library_path, in_wheel


class CycloneDDSLoaderException(Exception):
    pass


def _load(path):
    """Most basic loader, return the loaded DLL on path"""
    try:
        library = ct.CDLL(path)
    except OSError:
        raise CycloneDDSLoaderException(f"Failed to load CycloneDDS library from {path}")
    if not library:
        raise CycloneDDSLoaderException(f"Failed to load CycloneDDS library from {path}")
    return library


def _loader_wheel_gen(rel_path, ext):
    def _loader_wheel():
        if in_wheel:
            return _load(str(library_path))
        return None
    return _loader_wheel


def _loader_cyclonedds_home_gen(name):
    """
        If CYCLONEDDS_HOME is set it is required to be valid and must be used to load.
    """
    def _loader_cyclonedds_home():
        if "CYCLONEDDS_HOME" not in os.environ:
            return None

        return _load(os.path.join(os.environ["CYCLONEDDS_HOME"], name))
    return _loader_cyclonedds_home


def _loader_on_path_gen(name):
    """
        Attempt to load the library without specifying any path at all
        the system might find it with LD_LIBRARY_PATH or the likes.
    """
    def _loader_on_path():
        try:
            lib = find_library("ddsc")
            if lib:
                return _load(lib)
        except CycloneDDSLoaderException:
            pass

        try:
            return _load(name)
        except CycloneDDSLoaderException:
            pass

        return None
    return _loader_on_path



def _loader_install_path():
        try:
            return _load(str(library_path))
        except CycloneDDSLoaderException:
            pass
        return None

_loaders_per_system = {
    "Linux": [
        _loader_wheel_gen(["..", "cyclonedds.libs"], ".so"),
        _loader_cyclonedds_home_gen(f"lib{os.sep}libddsc.so"),
        _loader_on_path_gen("libddsc.so"),
        _loader_install_path
    ],
    "Windows": [
        _loader_wheel_gen(["..", "cyclonedds.libs"], ".dll"),
        _loader_cyclonedds_home_gen(f"bin{os.sep}ddsc.dll"),
        _loader_on_path_gen("ddsc.dll"),
        _loader_install_path
    ],
    "Darwin": [
        _loader_wheel_gen([".dylibs"], ".dylib"),
        _loader_cyclonedds_home_gen(f"lib{os.sep}libddsc.dylib"),
        _loader_on_path_gen("libddsc.dylib"),
        _loader_install_path
    ]
}


[docs]def load_cyclonedds() -> ct.CDLL: """ Internal method to load the Cyclone DDS Dynamic Library. Handles platform specific naming/configuration. """ if 'CYCLONEDDS_PYTHON_NO_IMPORT_LIBS' in os.environ: return system = platform.system() if system not in _loaders_per_system: raise CycloneDDSLoaderException( f"You are running on an unknown system configuration {system}, unable to determine the CycloneDDS load path." ) for loader in _loaders_per_system[system]: if not loader: continue lib = loader() if lib: return lib raise CycloneDDSLoaderException( "The CycloneDDS library could not be located. " "Try setting the CYCLONEDDS_HOME variable to what you used as CMAKE_INSTALL_PREFIX." )
[docs]def c_call(cname): """ Decorator. Convert a function into call into the class associated dll. """ class DllCall: def __init__(self, function): self.function = function # This gets called when the class is finalized def __set_name__(self, cls, name): if 'CYCLONEDDS_PYTHON_NO_IMPORT_LIBS' in os.environ: return s = inspect.signature(self.function) # Set c function types based on python type annotations cfunc = getattr(cls._dll_handle, cname, None) # Sometimes the c function does not exist, unset attr if cfunc is None: delattr(cls, name) return # Note: in python 3.10 we get NoneType for voids instead of None # This confuses ctypes a lot, so we explicitly test for it # We also add the ignore for the error that flake8 generates cfunc.restype = s.return_annotation if s.return_annotation != type(None) else None # noqa: E721 # Note: ignoring the 'self' argument cfunc.argtypes = [p.annotation for i, p in enumerate(s.parameters.values()) if i > 0] # Need to rebuild this function to ignore the 'self' attribute @wraps(self.function) def final_func(self_, *args): return cfunc(*args) # replace class named method with c call setattr(cls, name, final_func) return DllCall
def static_c_call(cname): """ Decorator. Convert a function into call into the class associated dll. """ class DllCall: def __init__(self, function): self.function = function # This gets called when the class is finalized def __set_name__(self, cls, name): if 'CYCLONEDDS_PYTHON_NO_IMPORT_LIBS' in os.environ: return s = inspect.signature(self.function) # Set c function types based on python type annotations cfunc = getattr(cls._dll_handle, cname, None) # Sometimes the c function does not exist, unset attr if cfunc is None: delattr(cls, name) return # Note: in python 3.10 we get NoneType for voids instead of None # This confuses ctypes a lot, so we explicitly test for it # We also add the ignore for the error that flake8 generates cfunc.restype = s.return_annotation if s.return_annotation != type(None) else None # noqa: E721 # Note: ignoring the 'self' argument cfunc.argtypes = [p.annotation for i, p in enumerate(s.parameters.values()) if i > 0] @wraps(self.function) def final_func(*args): return cfunc(*args) # replace class named method with c call setattr(cls, name, final_func) return DllCall
[docs]def c_callable(return_type, argument_types) -> ct.CFUNCTYPE: """ Decorator. Make a C function type based on python type annotations. """ return ct.CFUNCTYPE(return_type, *argument_types)
[docs]class DDS: """ Common class for all DDS related classes. This class enables the c_call magic. """ _dll_handle = load_cyclonedds() def __init__(self, reference: int) -> None: self._ref = reference
[docs]@dataclass class SampleInfo: """ Contains information about the associated data value Attributes ---------- sample_state: Possible values: :class:`SampleState<cyclonedds.core.SampleState>` view_state: Possible values: :class:`ViewState<cyclonedds.core.ViewState>` instance_state: Possible values: :class:`InstanceState<cyclonedds.core.InstanceState>` source_timestamp: The time (in unix nanoseconds) that the associated sample was written. instance_handle: Handle to the data instance (if this is a keyed topic) """ sample_state: int view_state: int instance_state: int valid_data: bool source_timestamp: int instance_handle: int publication_handle: int disposed_generation_count: int no_writers_generation_count: int sample_rank: int generation_rank: int absolute_generation_rank: int
@dataclass class InvalidSample: key_sample: object sample_info: SampleInfo
[docs]class dds_c_t: # noqa N801 entity = ct.c_int32 time = ct.c_int64 duration = ct.c_int64 instance_handle = ct.c_int64 domainid = ct.c_uint32 sample_state = ct.c_int view_state = ct.c_int instance_state = ct.c_int reliability = ct.c_int durability = ct.c_int history = ct.c_int presentation_access_scope = ct.c_int type_consistency = ct.c_int ingnorelocal = ct.c_int ownership = ct.c_int liveliness = ct.c_int destination_order = ct.c_int data_representation_id = ct.c_int16 qos_p = ct.c_void_p attach = ct.c_void_p listener_p = ct.c_void_p topic_descriptor_p = ct.c_void_p returnv = ct.c_int32
[docs] class inconsistent_topic_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32)]
[docs] class liveliness_lost_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32)]
[docs] class liveliness_changed_status(ct.Structure): # noqa N801 _fields_ = [('alive_count', ct.c_uint32), ('not_alive_count', ct.c_uint32), ('alive_count_change', ct.c_int32), ('not_alive_count_change', ct.c_int32), ('last_publication_handle', ct.c_int64)]
[docs] class offered_deadline_missed_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('last_instance_handle', ct.c_int64)]
[docs] class offered_incompatible_qos_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('last_policy_id', ct.c_uint32)]
[docs] class sample_lost_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32)]
[docs] class sample_rejected_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('last_reason', ct.c_int), ('last_instance_handle', ct.c_int64)]
[docs] class requested_deadline_missed_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('last_instance_handle', ct.c_int64)]
[docs] class requested_incompatible_qos_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('last_policy_id', ct.c_uint32)]
[docs] class publication_matched_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('current_count', ct.c_uint32), ('current_count_change', ct.c_int32), ('last_subscription_handle', ct.c_int64)]
[docs] class subscription_matched_status(ct.Structure): # noqa N801 _fields_ = [('total_count', ct.c_uint32), ('total_count_change', ct.c_int32), ('current_count', ct.c_uint32), ('current_count_change', ct.c_int32), ('last_publication_handle', ct.c_int64)]
[docs] class guid(ct.Structure): # noqa N801 _fields_ = [('v', ct.c_uint8 * 16)]
[docs] def as_python_guid(self) -> uuid.UUID: return uuid.UUID(bytes=bytes(self.v))
[docs] class sample_info(ct.Structure): # noqa N801 _fields_ = [ ('sample_state', ct.c_uint), ('view_state', ct.c_uint), ('instance_state', ct.c_uint), ('valid_data', ct.c_bool), ('source_timestamp', ct.c_int64), ('instance_handle', ct.c_uint64), ('publication_handle', ct.c_uint64), ('disposed_generation_count', ct.c_uint32), ('no_writers_generation_count', ct.c_uint32), ('sample_rank', ct.c_uint32), ('generation_rank', ct.c_uint32), ('absolute_generation_rank', ct.c_uint32) ]
[docs] class sample_buffer(ct.Structure): # noqa N801 _fields_ = [ ('buf', ct.c_void_p), ('len', ct.c_size_t) ]
try: import cyclonedds._clayer as _clayer # noqa E402 except ImportError as e: raise ImportError(f"Error importing Cyclone DDS C library: is Cyclone built using the ENABLE_TYPELIB build option? -- {e}") from e dds_infinity: int = _clayer.DDS_INFINITY uint32_max: int = _clayer.UINT32_MAX feature_typelib = _clayer.HAS_TYPELIB feature_type_discovery = _clayer.HAS_TYPE_DISCOVERY feature_topic_discovery = _clayer.HAS_TOPIC_DISCOVERY dds_domain_default: int = _clayer.DDS_DOMAIN_DEFAULT