core

class cyclonedds.core.Listener(**kwargs)[source]

Bases: DDS

Listeners are callback containers for entities.

__init__(**kwargs)[source]

Create a Listener object. The initializer takes override function lambdas.

Please note that all listener callbacks are dispatched synchronously from DDS receive thread(s). You can get away with doing tiny amounts of processing in these callback methods, but aquiring the Python GIL from the DDS receive thread will severely hurt your DDS performance. Furthermore, deleting entities or writing data inside listener callbacks can get you into deadlocks.

Parameters:
  • on_data_available (Callable) – Set on_data_available callback.

  • on_inconsistent_topic (Callable) – Set on_inconsistent_topic callback.

  • on_liveliness_lost (Callable) – Set on_liveliness_lost callback.

  • on_liveliness_changed (Callable) – Set on_liveliness_changed callback.

  • on_offered_deadline_missed (Callable) – Set on_offered_deadline_missed callback.

  • on_offered_incompatible_qos (Callable) – Set on_offered_incompatible_qos callback.

  • on_data_on_readers (Callable) – Set on_data_on_readers callback.

  • on_sample_lost (Callable) – Set on_sample_lost callback.

  • on_sample_rejected (Callable) – Set on_sample_rejected callback.

  • on_requested_deadline_missed (Callable) – Set on_requested_deadline_missed callback.

  • on_requested_incompatible_qos (Callable) – Set on_requested_incompatible_qos callback.

  • on_publication_matched (Callable) – Set on_publication_matched callback.

  • on_subscription_matched (Callable) – Set on_subscription_matched callback.

reset()[source]
Return type:

None

copy()[source]
Return type:

Listener

copy_to(listener)[source]
Parameters:

listener (Listener) –

Return type:

None

merge(listener)[source]

Copies any configured (non-default) callbacks from the given listener to self, replacing existing callbacks already configured on this listener.

Parameters:

listener (Listener) –

Return type:

None

on_inconsistent_topic(reader, status)[source]
Parameters:
Return type:

None

set_on_inconsistent_topic(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader], None]) –

on_data_available(reader)[source]
Parameters:

reader (cyclonedds.sub.DataReader) –

Return type:

None

set_on_data_available(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader], None]) –

on_liveliness_lost(writer, status)[source]
Parameters:
Return type:

None

set_on_liveliness_lost(callable)[source]
Parameters:

callable (Callable[[cyclonedds.pub.DataWriter, liveliness_lost_status], None]) –

on_liveliness_changed(reader, status)[source]
Parameters:
Return type:

None

set_on_liveliness_changed(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader, liveliness_changed_status], None]) –

on_offered_deadline_missed(writer, status)[source]
Parameters:
Return type:

None

set_on_offered_deadline_missed(callable)[source]
Parameters:

callable (Callable[[cyclonedds.pub.DataWriter, offered_deadline_missed_status], None]) –

on_offered_incompatible_qos(writer, status)[source]
Parameters:
Return type:

None

set_on_offered_incompatible_qos(callable)[source]
Parameters:

callable (Callable[[cyclonedds.pub.DataWriter, offered_incompatible_qos_status], None]) –

on_data_on_readers(subscriber)[source]
Parameters:

subscriber (cyclonedds.sub.Subscriber) –

Return type:

None

set_on_data_on_readers(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.Subscriber], None]) –

on_sample_lost(writer, status)[source]
Parameters:
Return type:

None

set_on_sample_lost(callable)[source]
Parameters:

callable (Callable[[cyclonedds.pub.DataWriter, sample_lost_status], None]) –

on_sample_rejected(reader, status)[source]
Parameters:
Return type:

None

set_on_sample_rejected(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader, sample_rejected_status], None]) –

on_requested_deadline_missed(reader, status)[source]
Parameters:
Return type:

None

set_on_requested_deadline_missed(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader, requested_deadline_missed_status], None]) –

on_requested_incompatible_qos(reader, status)[source]
Parameters:
Return type:

None

set_on_requested_incompatible_qos(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader, requested_incompatible_qos_status], None]) –

on_publication_matched(writer, status)[source]
Parameters:
Return type:

None

set_on_publication_matched(callable)[source]
Parameters:

callable (Callable[[cyclonedds.pub.DataWriter, publication_matched_status], None]) –

on_subscription_matched(reader, status)[source]
Parameters:
Return type:

None

set_on_subscription_matched(callable)[source]
Parameters:

callable (Callable[[cyclonedds.sub.DataReader, subscription_matched_status], None]) –

class cyclonedds.core.GuardCondition(domain_participant)[source]

Bases: Entity

A GuardCondition is a manually triggered condition that can be added to a WaitSet.

Parameters:

domain_participant (cyclonedds.domain.DomainParticipant) –

__init__(domain_participant)[source]

Initialize a GuardCondition

Parameters:

domain_participant (DomainParticipant) – The domain in which the GuardCondition should be active.

set(triggered)[source]

Set the status of the GuardCondition to triggered or untriggered.

Parameters:

triggered (bool) – Wether to trigger this condition.

Return type:

None

Raises:

DDSException

read()[source]

Read the status of the GuardCondition.

Returns:

Wether this condition is triggered.

Return type:

bool

Raises:

DDSException

take()[source]

Take the status of the GuardCondition. If it is True it will be False on the next call.

Returns:

Whether this condition is triggered.

Return type:

bool

Raises:

DDSException

class cyclonedds.core.ReadCondition(reader, mask)[source]

Bases: _Condition

Condition that triggers when new data is available to read according to the mask. Construct a mask using InstanceState, ViewState and SampleState.

Parameters:
__init__(reader, mask)[source]

Construct a ReadCondition.

Parameters:
Return type:

None

class cyclonedds.core.QueryCondition(reader, mask, filter)[source]

Bases: _Condition

Condition that triggers when new data is available to read according to the mask. Construct a mask using InstanceState, ViewState and SampleState. Add a filter function that receives the sample and returns a boolean whether to accept or reject the sample.

Parameters:
__init__(reader, mask, filter)[source]

Construct a QueryCondition.

Parameters:
Return type:

None

class cyclonedds.core.WaitSet(domain_participant)[source]

Bases: Entity

A WaitSet is a way to provide synchronous access to events happening in the DDS system. You can attach almost any kind of entity to a WaitSet and then perform a blocking wait on the waitset. When one or more of the entities in the waitset trigger the wait is unblocked. What a ‘trigger’ is depends on the type of entity, you can find out more in todo(DDS) triggers.

Parameters:

domain_participant (cyclonedds.domain.DomainParticipant) –

__init__(domain_participant)[source]

Make a new WaitSet. It starts of empty. An empty waitset will never trigger.

Parameters:

domain_participant (DomainParticipant) – The domain in which you want to make a WaitSet

Return type:

None

attach(entity)[source]

Attach an entity to this WaitSet. This is a no-op if the entity was already attached.

Parameters:

entity (Entity) – The entity you wish to attach.

Raises:

DDSException – When you try to attach a non-triggerable entity.:

Return type:

None

detach(entity)[source]

Detach an entity from this WaitSet. If it was not attach this is a no-op. Note that this operation is not atomic, a trigger for the detached entity could still occurr right after detaching it.

Parameters:

entity (Entity) – The entity you wish to attach

Return type:

None

is_attached(entity)[source]

Check whether an entity is attached.

Parameters:

entity (Entity) – Check the attachment of this entity.

Return type:

bool

get_entities()[source]

Get all entities attached

Return type:

List[Entity]

wait(timeout)[source]

Block execution and wait for one of the entities in this waitset to trigger.

Parameters:

timeout (int) – The maximum number of nanoseconds to block. Use the function duration to write that in a human readable format.

Returns:

The number of triggered entities. This will be 0 when a timeout occurred.

Return type:

int

wait_until(abstime)[source]

Block execution and wait for one of the entities in this waitset to trigger.

Parameters:

abstime (int) – The absolute time in nanoseconds since the start of the program (TODO CONFIRM THIS) to block. Use the function duration to write that in a human readable format.

Returns:

The number of triggered entities. This will be 0 when a timeout occurred.

Return type:

int

set_trigger(value)[source]

Manually trigger a WaitSet. It is unlikely you would need this.

Parameters:

value (bool) – The trigger value.

Return type:

None

async wait_async(timeout=None)[source]

Asynchronously wait for a WaitSet to trigger. Use in event-loop based applications.

Parameters:

timeout (int, Optional = None) – Maximum number of nanoseconds to wait before returning. By default this is infinity.

Return type:

int

class cyclonedds.core.InstanceState[source]

InstanceState constants for building condition masks. This class is static and there should never be a need to instantiate it. It operates on the state of an instance.

Alive

Only consider samples belonging to an alive instance (it has alive writer(s))

Type:

int

NotAliveDisposed

Only consider samples belonging to an instance that is not alive because it was actively disposed.

Type:

int

NotAliveNoWriters

Only consider samples belonging to an instance that is not alive because it has no writers.

Type:

int

Any

Ignore the liveliness status of the instance.

Type:

int

class cyclonedds.core.ViewState[source]

ViewState constants for building condition masks. This class is static and there should never be a need to instantiate it. It operates on the state of an instance.

New

Only consider samples belonging to newly created instances.

Type:

int

Old

Only consider samples belonging to previously created instances.

Type:

int

Any

Ignore the fact whether instances are new or not.

Type:

int

class cyclonedds.core.SampleState[source]

SampleState constants for building condition masks. This class is static and there should never be a need to instantiate it. It operates on the state of a single sample.

Read

Only consider samples that have already been read.

Type:

int

NotRead

Only consider unread samples.

Type:

int

Any

Ignore the read/unread state of samples.

Type:

int

class cyclonedds.core.DDSStatus[source]

DDSStatus contains constants to build status masks. It is static and should never need to be instantiated.

InconsistentTopic
Type:

int

OfferedDeadlineMissed
Type:

int

RequestedDeadlineMissed
Type:

int

OfferedIncompatibleQos
Type:

int

RequestedIncompatibleQos
Type:

int

SampleLost
Type:

int

SampleRejected
Type:

int

DataOnReaders
Type:

int

DataAvailable
Type:

int

LivelinessLost
Type:

int

LivelinessChanged
Type:

int

PublicationMatched
Type:

int

SubscriptionMatched
Type:

int

All = (1 << 14) - 1
class cyclonedds.core.Entity(ref, listener=None)[source]

Bases: DDS

Base class for all entities in the DDS API. The lifetime of the underlying DDS API object is linked to the lifetime of the Python entity object.

Parameters:
subscriber

If this entity is associated with a DataReader retrieve it. It is read-only. This is a proxy for get_subscriber().

Type:

Optional[cyclonedds.sub.Subscriber]

publisher

If this entity is associated with a Publisher retrieve it. It is read-only. This is a proxy for get_publisher().

Type:

Optional[cyclonedds.pub.Publisher]

datareader

If this entity is associated with a DataReader retrieve it. It is read-only. This is a proxy for get_datareader().

Type:

Optional[cyclonedds.sub.DataReader]

guid

Return the globally unique identifier for this entity. It is read-only. This is a proxy for get_guid().

Type:

uuid.UUID

status_mask

The status mask for this entity. It is a set of bits formed from DDSStatus. This is a proxy for get/set_status_mask().

Type:

int

parent

The entity that is this entities parent. For example: the subscriber for a datareader, the participant for a topic. It is read-only. This is a proxy for get_parent().

Type:

Optional[Entity]

participant

Get the participant for any entity, will only fail for a Domain. It is read-only. This is a proxy for get_participant().

Type:

Optional[cyclonedds.domain.DomainParticipant]

children

Get a list of children belonging to this entity. It is the opposite as parent. It is read-only. This is a proxy for get_children().

Type:

List[Entity]

domain_id

Get the id of the domain this entity belongs to.

Type:

int

__init__(ref, listener=None)[source]

Initialize an Entity. You should never need to initialize an Entity manually.

Parameters:
  • ref (int) – The reference id as returned by the DDS API.

  • listener (Listener) – Listener for this entity. We retain the python object to avoid it being garbage collected if the listener goes out of scope but the entity doesn’t. If we don’t the python function will be freed, causing C to call into freed memory -> segfault.

Raises:

DDSException – If an invalid reference id is passed to this function this means instantiation of some other object failed.

Return type:

None

get_subscriber()[source]

Retrieve the subscriber associated with this entity.

Returns:

Not all entities are associated with a subscriber, so this method may return None.

Return type:

Optional[Subscriber]

Raises:

DDSException

get_publisher()[source]

Retrieve the publisher associated with this entity.

Returns:

Not all entities are associated with a publisher, so this method may return None.

Return type:

Optional[Publisher]

Raises:

DDSException

get_datareader()[source]

Retrieve the datareader associated with this entity.

Returns:

Not all entities are associated with a datareader, so this method may return None.

Return type:

Optional[DataReader]

Raises:

DDSException

get_instance_handle()[source]

Retrieve the instance associated with this entity.

Returns:

The integer handle is just a number you can use in writer/reader calls.

Return type:

int

Raises:

DDSException

property instance_handle: int

Retrieve the instance associated with this entity.

Returns:

The integer handle is just a number you can use in writer/reader calls.

Return type:

int

Raises:

DDSException

get_guid()[source]

Get a globally unique identifier for this entity.

Returns:

View the python documentation for this class for detailed usage.

Return type:

uuid.UUID

Raises:

DDSException

read_status(mask=None)[source]

Read the status bits set on this Entity. You can build a mask by using DDSStatus.

Parameters:

mask (int) – The DDSStatus mask. If not supplied the mask is used that was set on this Entity using set_status_mask.

Returns:

The DDSStatus bits that were set.

Return type:

int

Raises:

DDSException

take_status(mask=None)[source]

Take the status bits set on this Entity, after which they will be set to 0 again. You can build a mask by using DDSStatus.

Parameters:

mask (int) – The DDSStatus mask. If not supplied the mask is used that was set on this Entity using set_status_mask.

Returns:

The DDSStatus bits that were set.

Return type:

int

Raises:

DDSException

get_status_changes()[source]

Get all status changes since the last read_status() or take_status().

Returns:

The DDSStatus bits that were set.

Return type:

int

Raises:

DDSException

get_status_mask()[source]

Get the status mask for this Entity.

Returns:

The DDSStatus bits that are enabled.

Return type:

int

Raises:

DDSException

set_status_mask(mask)[source]
Set the status mask for this Entity. By default the mask is 0. Only the status changes

for the bits in this mask are tracked on this entity.

Parameters:

mask (int) – The DDSStatus bits to track.

Raises:

DDSException

Return type:

None

get_qos()[source]

Get the Qos associated with this entity. Note that the object returned is not the same python object that you used to set the Qos on this object. Modifications to the Qos object that is returned does not modify the Qos of the Entity.

Returns:

The Qos object associated with this entity.

Return type:

Qos

Raises:

DDSException

set_qos(qos)[source]

Set Qos policies on this entity. Note, only a limited number of Qos policies can be set after the object is created (Policy.LatencyBudget and Policy.OwnershipStrength). Any policies not set explicitly in the supplied Qos remain unchanged.

Parameters:

qos (Qos) – The Qos to apply to this entity.

Raises:

DDSException – If you pass an immutable policy or cause the total collection of qos policies to become inconsistent an exception will be raised.

Return type:

None

get_listener()[source]

Return a listener with the right methods set. Modifying the returned listener object does not modify this entity, you will have to call set_listener() with the changed object.

Returns:

A listener with which you can add additional callbacks.

Return type:

Listener

set_listener(listener)[source]

Update the listener for this object. If a listener already exist for this object only the fields you explicitly have set on your new listener are overwritten. Passing None will remove this entity’s Listener.

Future changes to the passed Listener object will not affect the Listener associated with this Entity.

Parameters:

listener (Optional[Listener]) – The listener object to use, or None to remove the current listener from this Entity.

Raises:

DDSException

Return type:

None

get_parent()[source]

Get the parent entity associated with this entity. A Domain object is the only object without parent, but if the domain is not created through the Python API this call won’t find it and return None from the DomainParticipant.

Returns:

The parent of this entity. This would be the Subscriber for a DataReader, DomainParticipant for a Topic etc.

Return type:

Optional[Entity]

Raises:

DDSException

get_participant()[source]

Get the domain participant for this entity. This should work on all valid Entity objects except a Domain.

Returns:

Only fails for a Domain object.

Return type:

Optional[cyclonedds.domain.DomainParticipant]

Raises:

DDSException

get_children()[source]

Get the list of children of this entity. For example, the list of datareaders belonging to a subscriber. Opposite of parent.

Returns:

All the entities considered children of this entity.

Return type:

List[Entity]

Raises:

DDSException

get_domain_id()[source]

Get the id of the domain this entity resides in.

Returns:

The domain id.

Return type:

int

Raises:

DDSException

begin_coherent()[source]

Begin coherent publishing or begin accessing a coherent set in a Subscriber.

This can only be invoked on Publishers, Subscribers, DataWriters and DataReaders. Invoking on a DataWriter or DataReader behaves as if it was invoked on its parent Publisher or Subscriber respectively.

Return type:

None

end_coherent()[source]

End coherent publishing or end accessing a coherent set in a Subscriber.

This can only be invoked on Publishers, Subscribers, DataWriters and DataReaders. Invoking on a DataWriter or DataReader behaves as if it was invoked on its parent Publisher or Subscriber respectively.

Return type:

None

classmethod get_entity(entity_id)[source]

Turn a CycloneDDS C Entity id into a Python object. You shouldn’t need this.

Return type:

Optional[Entity]

class cyclonedds.core.DDSException(code, msg=None, **kwargs)[source]

Bases: Exception

This exception is thrown when a return code from the underlying C api indicates non-valid use of the API. Print the exception directly or convert it to string for a detailed description.

Parameters:
  • code (int) –

  • msg (str) –

code

One of the DDS_RETCODE_ constants that indicates the type of error.

Type:

int

msg

A human readable description of where the error occurred

Type:

str

DDS_RETCODE_OK = 0
DDS_RETCODE_ERROR = -1
DDS_RETCODE_UNSUPPORTED = -2
DDS_RETCODE_BAD_PARAMETER = -3
DDS_RETCODE_PRECONDITION_NOT_MET = -4
DDS_RETCODE_OUT_OF_RESOURCES = -5
DDS_RETCODE_NOT_ENABLED = -6
DDS_RETCODE_IMMUTABLE_POLICY = -7
DDS_RETCODE_INCONSISTENT_POLICY = -8
DDS_RETCODE_ALREADY_DELETED = -9
DDS_RETCODE_TIMEOUT = -10
DDS_RETCODE_NO_DATA = -11
DDS_RETCODE_ILLEGAL_OPERATION = -12
DDS_RETCODE_NOT_ALLOWED_BY_SECURITY = -13
error_message_mapping = {-13: ('DDS_RETCODE_NOT_ALLOWED_BY_SECURITY', 'Insufficient credentials supplied to use the function'), -12: ('DDS_RETCODE_ILLEGAL_OPERATION', 'A function was called when it should not be'), -11: ('DDS_RETCODE_NO_DATA', 'Expected data is not provided'), -10: ('DDS_RETCODE_TIMEOUT', 'A timeout has occurred'), -9: ('DDS_RETCODE_ALREADY_DELETED', 'An attempt was made to delete something more than once'), -8: ('DDS_RETCODE_INCONSISTENT_POLICY', 'A policy with inconsistent values was used'), -7: ('DDS_RETCODE_IMMUTABLE_POLICY', 'An attempt was made to modify an immutable policy'), -6: ('DDS_RETCODE_NOT_ENABLED', 'A configurable feature is not enabled'), -5: ('DDS_RETCODE_OUT_OF_RESOURCES', 'Operation failed because of a lack of resources'), -4: ('DDS_RETCODE_PRECONDITION_NOT_MET', 'Precondition for operation not met'), -3: ('DDS_RETCODE_BAD_PARAMETER', 'Bad parameter value'), -2: ('DDS_RETCODE_UNSUPPORTED', 'Feature unsupported'), -1: ('DDS_RETCODE_ERROR', 'Non specific error'), 0: ('DDS_RETCODE_OK', 'Success')}
class cyclonedds.core.Statistics(entity)[source]

Statistics object for entity.

Parameters:

entity (Entity) –

entity

The handle of entity to which this set of statistics applies.

Type:

Entity

opaque

The internal data.

Type:

int

time

Time stamp of lastest call to Statistics(entity).refresh() in nanoseconds since epoch.

Type:

datatime

count

Number of key-value pairs.

Type:

int

data

Data.

Type:

dict

Examples

>>> Statistics(datawriter)
>>> Statistics(datawriter).refresh()
refresh()[source]

Update a previously created statistics structure with current values.

Only the time stamp and the values (and “opaque”) may change. The set of keys and the types of the values do not change.