Python Tutorial

Let’s enter the world of DDS by making our presence known. We will not worry about configuration or what DDS does under the hood but write a single message. To publish anything to DDS we need to define the type of message first. Suppose you are worried about talking to other applications that are not necessarily running Python. In that case, you will use the IDL compiler, but for now, we will manually define our message type directly in Python using the cyclonedds.idl tools:

 1from dataclasses import dataclass
 2from cyclonedds.idl import IdlStruct
 3
 4@dataclass
 5class Message(IdlStruct):
 6    text: str
 7
 8
 9name = input("What is your name? ")
10message = Message(text=f"{name} has started his first DDS Python application!")

With cyclonedds.idl write typed classes with the standard library module dataclasses <python:dataclasses>. For this simple application, the data being transmitted only contains a piece of text, but this system has the same expressive power as the OMG IDL specification, allowing you to use almost any complex datastructure.

To send your message over a DDS domain, carry out the following steps:

  1. Join the DDS network using a DomainParticipant

  2. Define which data type and under what name you will publish your message as a Topic

  3. Create the DataWriter that publishes that Topic

  4. And finally, publish the message.

1from cyclonedds.topic import Topic
2from cyclonedds.pub import DataWriter
3
4participant = DomainParticipant()
5topic = Topic(participant, "Announcements", Message)
6writer = DataWriter(participant, topic)
7
8writer.write(message)

You have now published your first message successfully! However, it is hard to tell if that did anything since we don’t have anything set up to listen for incoming messages. Let’s make a second script that takes messages from DDS and prints them to the terminal:

 1from dataclasses import dataclass
 2from cyclonedds.domain import DomainParticipant
 3from cyclonedds.topic import Topic
 4from cyclonedds.sub import DataReader
 5from cyclonedds.util import duration
 6from cyclonedds.idl import IdlStruct
 7
 8@dataclass
 9class Message(IdlStruct):
10    text: str
11
12participant = DomainParticipant()
13topic = Topic(participant, "Announcements", Message)
14reader = DataReader(participant, topic)
15
16# If we don't receive a single announcement for five minutes, we want the script to exit.
17for msg in reader.take_iter(timeout=duration(minutes=5)):
18    print(msg.text)

Now with this script running in a second terminal, you should see the message pop up when you rerun the first script.