Python Tutorial¶
Let’s enter the world of DDS by making our presence known. We will not worry about configuration or what DDS does under the hood but write a single message.
To publish anything to DDS we need to define the type of message first. Suppose you are worried about talking to other applications that are not necessarily running Python. In that case, you will use the IDL compiler, but for now, we will manually define our message type directly in Python using the cyclonedds.idl
tools:
1from dataclasses import dataclass
2from cyclonedds.idl import IdlStruct
3
4@dataclass
5class Message(IdlStruct):
6 text: str
7
8
9name = input("What is your name? ")
10message = Message(text=f"{name} has started his first DDS Python application!")
With cyclonedds.idl
write typed classes with the standard library module dataclasses <python:dataclasses>. For this simple application, the data being transmitted only contains a piece of text, but this system has the same expressive power as the OMG IDL specification, allowing you to use almost any complex datastructure.
To send your message over a DDS domain, carry out the following steps:
Join the DDS network using a DomainParticipant
Define which data type and under what name you will publish your message as a Topic
Create the
DataWriter
that publishes that TopicAnd finally, publish the message.
1from cyclonedds.topic import Topic
2from cyclonedds.pub import DataWriter
3
4participant = DomainParticipant()
5topic = Topic(participant, "Announcements", Message)
6writer = DataWriter(participant, topic)
7
8writer.write(message)
You have now published your first message successfully! However, it is hard to tell if that did anything since we don’t have anything set up to listen for incoming messages. Let’s make a second script that takes messages from DDS and prints them to the terminal:
1from dataclasses import dataclass
2from cyclonedds.domain import DomainParticipant
3from cyclonedds.topic import Topic
4from cyclonedds.sub import DataReader
5from cyclonedds.util import duration
6from cyclonedds.idl import IdlStruct
7
8@dataclass
9class Message(IdlStruct):
10 text: str
11
12participant = DomainParticipant()
13topic = Topic(participant, "Announcements", Message)
14reader = DataReader(participant, topic)
15
16# If we don't receive a single announcement for five minutes, we want the script to exit.
17for msg in reader.take_iter(timeout=duration(minutes=5)):
18 print(msg.text)
Now with this script running in a second terminal, you should see the message pop up when you rerun the first script.