"""
* Copyright(c) 2021 to 2022 ZettaScale Technology and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
"""
import asyncio
import concurrent.futures
from typing import AsyncGenerator, List, Optional, TypeVar, Union, Generator, Generic, TYPE_CHECKING
from .core import Entity, Listener, DDSException, WaitSet, ReadCondition, SampleState, InstanceState, ViewState
from .domain import DomainParticipant
from .topic import Topic
from .internal import c_call, dds_c_t, InvalidSample
from .qos import _CQos, Qos, LimitedScopeQos, SubscriberQos, DataReaderQos
from .util import duration
from cyclonedds._clayer import ddspy_read, ddspy_take, ddspy_read_handle, ddspy_take_handle, ddspy_lookup_instance
if TYPE_CHECKING:
import cyclonedds
[docs]class Subscriber(Entity):
def __init__(
self,
domain_participant: 'cyclonedds.domain.DomainParticipant',
qos: Optional[Qos] = None,
listener: Optional[Listener] = None):
if not isinstance(domain_participant, DomainParticipant):
raise TypeError(f"{domain_participant} is not a cyclonedds.domain.DomainParticipant.")
if qos is not None:
if isinstance(qos, LimitedScopeQos) and not isinstance(qos, SubscriberQos):
raise TypeError(f"{qos} is not appropriate for a Subscriber")
elif not isinstance(qos, Qos):
raise TypeError(f"{qos} is not a valid qos object")
if listener is not None:
if not isinstance(listener, Listener):
raise TypeError(f"{listener} is not a valid listener object.")
cqos = _CQos.qos_to_cqos(qos) if qos else None
try:
super().__init__(
self._create_subscriber(
domain_participant._ref,
cqos,
listener._ref if listener else None
),
listener=listener
)
finally:
if cqos:
_CQos.cqos_destroy(cqos)
self._keepalive_entities = [self.participant]
[docs] def notify_readers(self):
ret = self._notify_readers(self._ref)
if ret < 0:
raise DDSException(ret, f"Occurred while reading data in {repr(self)}")
@c_call("dds_create_subscriber")
def _create_subscriber(self, domain_participant: dds_c_t.entity, qos: dds_c_t.qos_p,
listener: dds_c_t.listener_p) -> dds_c_t.entity:
pass
@c_call("dds_notify_readers")
def _notify_readers(self, subsriber: dds_c_t.entity) -> dds_c_t.returnv:
pass
_T = TypeVar('_T')
[docs]class DataReader(Entity, Generic[_T]):
"""Subscribe to a topic and read/take the data published to it.
All returned samples are annotated with the :class:`sample.sample_info<cyclonedds.internal.SampleInfo>` attribute.
"""
def __init__(
self,
subscriber_or_participant: Union['cyclonedds.sub.Subscriber', 'cyclonedds.domain.DomainParticipant'],
topic: Topic[_T],
qos: Optional[Qos] = None,
listener: Optional[Listener] = None):
"""
Parameters
----------
subscriber_or_participant: cyclonedds.sub.Subscriber, cyclonedds.domain.DomainParticipant
The subscriber to which this reader will be added. If you supply a DomainParticipant a subscriber
will be created for you.
builtin_topic: cyclonedds.builtin.BuiltinTopic
Which Builtin Topic to subscribe to. This can be one of BuiltinTopicDcpsParticipant, BuiltinTopicDcpsTopic,
BuiltinTopicDcpsPublication or BuiltinTopicDcpsSubscription. Please note that BuiltinTopicDcpsTopic will fail if
you built CycloneDDS without Topic Discovery.
qos: cyclonedds.core.Qos, optional = None
Optionally supply a Qos.
listener: cyclonedds.core.Listener = None
Optionally supply a Listener.
"""
if not isinstance(subscriber_or_participant, (Subscriber, DomainParticipant)):
raise TypeError(f"{subscriber_or_participant} is not a cyclonedds.domain.DomainParticipant"
" or cyclonedds.sub.Subscriber.")
if not isinstance(topic, Topic):
raise TypeError(f"{topic} is not a cyclonedds.topic.Topic.")
if qos is not None:
if isinstance(qos, LimitedScopeQos) and not isinstance(qos, DataReaderQos):
raise TypeError(f"{qos} is not appropriate for a DataReader")
elif not isinstance(qos, Qos):
raise TypeError(f"{qos} is not a valid qos object")
if listener is not None:
if not isinstance(listener, Listener):
raise TypeError(f"{listener} is not a valid listener object.")
cqos = _CQos.qos_to_cqos(qos) if qos else None
try:
super().__init__(
self._create_reader(
subscriber_or_participant._ref,
topic._ref,
cqos,
listener._ref if listener else None
),
listener=listener
)
finally:
if cqos:
_CQos.cqos_destroy(cqos)
self._topic = topic
self._topic_ref = topic._ref
self._next_condition = None
self._keepalive_entities = [self.subscriber, topic]
@property
def topic(self) -> Topic[_T]:
return self._topic
[docs] def read(self, N: int = 1, condition: Entity = None, instance_handle: int = None) -> List[_T]:
"""Read a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples
you are interested in.
Reading samples does not remove them from the :class:`DataReader's<DataReader>` receive queue. So read
methods may return the same sample in multiple calls.
Parameters
----------
N: int
The maximum number of samples to read.
condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional
Only read samples that satisfy the supplied condition.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
if instance_handle is not None:
ret = ddspy_read_handle(condition._ref if condition else self._ref, N, instance_handle)
else:
ret = ddspy_read(condition._ref if condition else self._ref, N)
if type(ret) == int:
raise DDSException(ret, f"Occurred while reading data in {repr(self)}")
samples = []
for (data, info) in ret:
if info.valid_data:
samples.append(self._topic.data_type.deserialize(data))
samples[-1].sample_info = info
else:
samples.append(InvalidSample(self._topic.data_type.deserialize_key(data), info))
return samples
[docs] def take(self, N: int = 1, condition: Entity = None, instance_handle: int = None) -> List[_T]:
"""Take a maximum of N samples, non-blocking. Optionally use a read/query-condition to select which samples
you are interested in.
Taking samples removes them from the :class:`DataReader's<DataReader>` receive queue. So take methods will
not return the same sample more than once.
Parameters
----------
N: int
The maximum number of samples to read.
condition: cyclonedds.core.ReadCondition, cyclonedds.core.QueryCondition, optional
Only take samples that satisfy the supplied condition.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
if instance_handle is not None:
ret = ddspy_take_handle(condition._ref if condition else self._ref, N, instance_handle)
else:
ret = ddspy_take(condition._ref if condition else self._ref, N)
if type(ret) == int:
raise DDSException(ret, f"Occurred while taking data in {repr(self)}")
samples = []
for (data, info) in ret:
if info.valid_data:
samples.append(self._topic.data_type.deserialize(data))
samples[-1].sample_info = info
else:
samples.append(InvalidSample(self._topic.data_type.deserialize_key(data), info))
return samples
[docs] def read_next(self) -> Optional[_T]:
"""Shortcut method to read exactly one sample or return None.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
self._next_condition = self._next_condition or \
ReadCondition(self, ViewState.Any | SampleState.NotRead | InstanceState.Alive)
samples = self.read(condition=self._next_condition)
if samples:
return samples[0]
return None
[docs] def take_next(self) -> Optional[_T]:
"""Shortcut method to take exactly one sample or return None.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
self._next_condition = self._next_condition or \
ReadCondition(self, ViewState.Any | SampleState.NotRead | InstanceState.Alive)
samples = self.take(condition=self._next_condition)
if samples:
return samples[0]
return None
[docs] def read_iter(self, condition=None, timeout: int = None) -> Generator[_T, None, None]:
"""Shortcut method to iterate reading samples. Iteration will stop once the timeout you supply expires.
Every time a sample is received the timeout is reset.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
waitset = WaitSet(self.participant)
condition = condition or ReadCondition(self, ViewState.Any | InstanceState.Alive | SampleState.NotRead)
waitset.attach(condition)
timeout = timeout or duration(weeks=99999)
while True:
while True:
a = self.read(condition=condition)
if not a:
break
yield a[0]
if waitset.wait(timeout) == 0:
break
[docs] def read_one(self, condition=None, timeout: int = None) -> _T:
"""Shortcut method to block and take exactly one sample or raise a timeout"""
sample = next(self.read_iter(condition=condition, timeout=timeout))
if sample is None:
raise TimeoutError()
return sample
[docs] def take_iter(self, condition=None, timeout: int = None) -> Generator[_T, None, None]:
"""Shortcut method to iterate taking samples. Iteration will stop once the timeout you supply expires.
Every time a sample is received the timeout is reset.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
waitset = WaitSet(self.participant)
condition = condition or ReadCondition(self, ViewState.Any | InstanceState.Alive | SampleState.NotRead)
waitset.attach(condition)
timeout = timeout or duration(weeks=99999)
while True:
while True:
a = self.take(condition=condition)
if not a:
break
yield a[0]
if waitset.wait(timeout) == 0:
break
[docs] def take_one(self, condition=None, timeout: int = None) -> _T:
"""Shortcut method to block and take exactly one sample or raise a timeout"""
sample = next(self.take_iter(condition=condition, timeout=timeout))
if sample is None:
raise TimeoutError()
return sample
[docs] async def read_aiter(self, condition=None, timeout: int = None) -> AsyncGenerator[_T, None]:
"""Shortcut method to async iterate reading samples. Iteration will stop once the timeout you supply expires.
Every time a sample is received the timeout is reset.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
waitset = WaitSet(self.participant)
condition = condition or ReadCondition(self, ViewState.Any | InstanceState.Alive | SampleState.NotRead)
waitset.attach(condition)
timeout = timeout or duration(weeks=99999)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while True:
while True:
a = self.read(condition=condition)
if not a:
break
yield a[0]
result = await loop.run_in_executor(pool, waitset.wait, timeout)
if result == 0:
break
[docs] async def take_aiter(self, condition=None, timeout: int = None) -> AsyncGenerator[_T, None]:
"""Shortcut method to async iterate taking samples. Iteration will stop once the timeout you supply expires.
Every time a sample is received the timeout is reset.
Raises
------
DDSException
If any error code is returned by the DDS API it is converted into an exception.
"""
waitset = WaitSet(self.participant)
condition = condition or ReadCondition(self, ViewState.Any | InstanceState.Alive | SampleState.NotRead)
waitset.attach(condition)
timeout = timeout or duration(weeks=99999)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while True:
while True:
a = self.take(condition=condition)
if not a:
break
yield a[0]
result = await loop.run_in_executor(pool, waitset.wait, timeout)
if result == 0:
break
[docs] def wait_for_historical_data(self, timeout: int) -> bool:
ret = self._wait_for_historical_data(self._ref, timeout)
if ret == 0:
return True
elif ret == DDSException.DDS_RETCODE_TIMEOUT:
return False
raise DDSException(ret, f"Occured while waiting for historical data in {repr(self)}")
[docs] def lookup_instance(self, sample: _T) -> Optional[int]:
ret = ddspy_lookup_instance(self._ref, sample.serialize())
if ret < 0:
raise DDSException(ret, f"Occurred while lookup up instance from {repr(self)}")
if ret == 0:
return None
return ret
@c_call("dds_create_reader")
def _create_reader(self, subscriber: dds_c_t.entity, topic: dds_c_t.entity, qos: dds_c_t.qos_p,
listener: dds_c_t.listener_p) -> dds_c_t.entity:
pass
@c_call("dds_reader_wait_for_historical_data")
def _wait_for_historical_data(self, reader: dds_c_t.entity, max_wait: dds_c_t.duration) -> dds_c_t.returnv:
pass
__all__ = ["Subscriber", "DataReader"]