dynsub¶
The dynsub example is a PoC for a C-based JSON printer of arbitrary data. It assumes that topic discovery is enabled, but doesn’t require it.
Running the example¶
Pass the name of a topic to dynsub and it waits for a writer of that topic to show up. When it finds one in the DCPSPublication topic, it tries to subscribe and print the received samples as JSON.
For example: Start the HelloworldPublisher
in one shell:
# bin/HelloworldPublisher
=== [Publisher] Waiting for a reader to be discovered ...
In another shell start dynsub
:
# bin/dynsub HelloWorldData_Msg
{"userID":1,"message":"Hello World"}
{"userID":1}
The second line is the “invalid sample” generated because of the termination of the publisher. In Cyclone DDS, only the key fields are valid, and therefore printed.
Instead of the HelloWorldData_Msg, the small publisher program “variouspub” can publish a number of different types. Pass it the name of the type to publish. For example:
# bin/variouspub B
This publishes samples at 1Hz until killed.
Source code¶
1// Copyright(c) 2022 to 2023 ZettaScale Technology and others
2//
3// This program and the accompanying materials are made available under the
4// terms of the Eclipse Public License v. 2.0 which is available at
5// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
6// v. 1.0 which is available at
7// http://www.eclipse.org/org/documents/edl-v10.php.
8//
9// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
10
11#include <stdio.h>
12#include <string.h>
13#include <stdlib.h>
14#include <assert.h>
15#include <locale.h>
16
17#include "dds/dds.h"
18#include "dynsub.h"
19
20// Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data.
21// The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it
22// comes in two forms: MinimalTypeObject and CompleteTypeObject. Only the latter includes field names, so
23// that's what need.
24//
25// Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by
26// IDLC. So instead of including yet another copy, we simply refer to those. These files are not (yet?)
27// part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names
28// of the relevant header files.
29//
30// The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the
31// corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation. Rather than casting
32// pointers like we do here, they should be defined in a slightly different way so that they are not really
33// opaque. For now, this'll have to do.
34#include "dds/ddsi/ddsi_xt_typeinfo.h"
35
36// For convenience, the DDS participant is global
37static dds_entity_t participant;
38
39
40// Helper function to wait for a DCPSPublication/DCPSSubscription to show up with the desired topic name,
41// then calls dds_find_topic to create a topic for that data writer's/reader's type up the retrieves the
42// type object.
43static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj)
44{
45 const dds_entity_t waitset = dds_create_waitset (participant);
46 const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL);
47 const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE);
48 const dds_entity_t dcpssubscription_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL);
49 const dds_entity_t dcpssubscription_readcond = dds_create_readcondition (dcpssubscription_reader, DDS_ANY_STATE);
50 (void) dds_waitset_attach (waitset, dcpspublication_readcond, dcpspublication_reader);
51 (void) dds_waitset_attach (waitset, dcpssubscription_readcond, dcpssubscription_reader);
52 const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout;
53 dds_return_t ret = DDS_RETCODE_OK;
54 *xtypeobj = NULL;
55 struct ppc ppc;
56 ppc_init (&ppc);
57 dds_attach_t triggered_reader_x;
58 while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, &triggered_reader_x, 1, abstimeout) > 0)
59 {
60 void *epraw = NULL;
61 dds_sample_info_t si;
62 dds_entity_t triggered_reader = (dds_entity_t) triggered_reader_x;
63 if (dds_take (triggered_reader, &epraw, &si, 1, 1) <= 0)
64 continue;
65 dds_builtintopic_endpoint_t *ep = epraw;
66 const dds_typeinfo_t *typeinfo = NULL;
67 // We are only interested in DCPSPublications where the topic name matches and that carry type information
68 // (a non-XTypes capable DDS would not provide type information) because without that information there is
69 // no way we can do anything interesting with it.
70 if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo)
71 {
72 // Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does
73 // require that topic discovery is enabled in the configuration. The advantage of using dds_find_topic
74 // is that it creates a topic with the same name, type *and QoS*. That distinction only matters if
75 // topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent:
76 // - using a different topic QoS might result in an incompatible QoS notification if topic discovery is
77 // enabled (everything would still work).
78 // - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters
79 //
80 // So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS
81 // as an approximation of the topic QoS.
82 if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0)
83 {
84 fprintf (stderr, "dds_find_topic: %s ... continuing on the assumption that topic discovery is disabled\n", dds_strretcode (*topic));
85 dds_topic_descriptor_t *descriptor;
86 if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0)
87 {
88 fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret));
89 dds_return_loan (triggered_reader, &epraw, 1);
90 goto error;
91 }
92 if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0)
93 {
94 fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic));
95 dds_delete_topic_descriptor (descriptor);
96 dds_return_loan (triggered_reader, &epraw, 1);
97 goto error;
98 }
99 dds_delete_topic_descriptor (descriptor);
100 }
101 // The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data
102 if ((*xtypeobj = load_type_with_deps (participant, typeinfo, &ppc)) == NULL)
103 {
104 fprintf (stderr, "loading type with all dependencies failed\n");
105 dds_return_loan (triggered_reader, &epraw, 1);
106 goto error;
107 }
108 if (load_type_with_deps_min (participant, typeinfo, &ppc) == NULL)
109 {
110 fprintf (stderr, "loading minimal type with all dependencies failed\n");
111 dds_return_loan (triggered_reader, &epraw, 1);
112 goto error;
113 }
114 }
115 dds_return_loan (triggered_reader, &epraw, 1);
116 }
117 if (*xtypeobj)
118 {
119 // If we got the type object, populate the type cache
120 size_t align, size;
121 build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size);
122 fflush (stdout);
123 struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info;
124 if ((info = type_cache_lookup (&templ)) != NULL)
125 {
126 assert (info->release == NULL);
127 info->release = *xtypeobj;
128 }
129 else
130 {
131 // not sure whether this is at all possible
132 info = malloc (sizeof (*info));
133 assert (info);
134 *info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size };
135 type_cache_add (info);
136 }
137 }
138error:
139 dds_delete (dcpspublication_reader);
140 dds_delete (dcpssubscription_reader);
141 dds_delete (waitset);
142 return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
143}
144
145int main (int argc, char **argv)
146{
147 dds_return_t ret = 0;
148 dds_entity_t topic = 0;
149
150 // for printf("%ls")
151 setlocale (LC_CTYPE, "");
152
153 if (argc != 2)
154 {
155 fprintf (stderr, "usage: %s topicname\n", argv[0]);
156 return 2;
157 }
158
159 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
160 if (participant < 0)
161 {
162 fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
163 return 1;
164 }
165
166 // The one magic step: get a topic and type object ...
167 DDS_XTypes_TypeObject *xtypeobj;
168 type_cache_init ();
169 if ((ret = get_topic_and_typeobj (argv[1], DDS_SECS (10), &topic, &xtypeobj)) < 0)
170 {
171 fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret));
172 goto error;
173 }
174 // ... given those, we can create a reader just like we do normally ...
175 const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL);
176 // ... and create a waitset that allows us to wait for any incoming data ...
177 const dds_entity_t waitset = dds_create_waitset (participant);
178 const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
179 (void) dds_waitset_attach (waitset, readcond, 0);
180 while (1)
181 {
182 (void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY);
183 void *raw = NULL;
184 dds_sample_info_t si;
185 if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0)
186 goto error;
187 else if (ret != 0)
188 {
189 // ... that we then print
190 print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
191 if ((ret = dds_return_loan (reader, &raw, 1)) < 0)
192 goto error;
193 }
194 }
195
196 error:
197 type_cache_free ();
198 dds_delete (participant);
199 return ret < 0;
200}
1struct A {
2 @key
3 string name;
4 string message;
5 unsigned long count;
6};
7
8struct T {
9 short s;
10 long l;
11};
12
13struct B {
14 A a;
15 sequence<T> ts;
16};
17
18struct C {
19 B b;
20 @key
21 short k;
22};
23
24module M1 {
25 @appendable
26 struct O {
27 @optional long x;
28 };
29};
30
31struct D {
32 wstring ws;
33 wchar wc;
34 unsigned long count;
35};
1#include <stdio.h>
2#include <string.h>
3#include <stdlib.h>
4#include <signal.h>
5#include <assert.h>
6
7#include "dds/dds.h"
8#include "variouspub_types.h"
9
10static void *samples_a[] = {
11 &(A){ "Mariken", "Wie sidi, vrient?", 0 },
12 &(A){ "Die duvel", "Een meester vol consten,", 0 },
13 &(A){ "Die duvel", "Nieuwers af falende, wes ic besta.", 0 },
14 &(A){ "Mariken", "'t Comt mi alleleens met wien dat ick ga,", 0 },
15 &(A){ "Mariken", "Also lief gae ic metten quaetsten als metten besten.", 0 },
16 NULL
17};
18
19static void *samples_b[] = {
20 &(B){ {"Die duvel", "Wildi u liefde te mi werts vesten,", 0},
21 { ._length = 2, ._maximum = 2, ._release = false,
22 ._buffer = (T[]){ {2,3},{5,7} } } },
23 &(B){ {"Die duvel", "Ick sal u consten leeren sonder ghelijcke,", 0},
24 { ._length = 3, ._maximum = 3, ._release = false,
25 ._buffer = (T[]){ {11,13},{17,19},{23,29} } } },
26 &(B){ {"Die duvel", "Die seven vrie consten: rethorijcke, musijcke,", 0},
27 { ._length = 5, ._maximum = 5, ._release = false,
28 ._buffer = (T[]){ {31,37},{41,43},{47,52},{59,61},{67,71} } } },
29 &(B){ {"Die duvel", "Logica, gramatica ende geometrie,", 0},
30 { ._length = 7, ._maximum = 7, ._release = false,
31 ._buffer = (T[]){ {73,79},{83,89},{97,101},{103,107},{109,113},
32 {127,131},{137,139} } } },
33 &(B){ {"Die duvel", "Aristmatica ende alkenie,", 0},
34 { ._length = 11, ._maximum = 11, ._release = false,
35 ._buffer = (T[]){ {149,151},{157,163},{167,173},{179,181},
36 {191,193},{197, 199},{211,223},{227,229},
37 {233,239},{241,251},{257,263} } } },
38 NULL
39};
40
41static void *samples_c[] = {
42 &(C){ { {"Die duvel", "Dwelc al consten sijn seer curable.", 0},
43 { ._length = 13, ._maximum = 13, ._release = false,
44 ._buffer = (T[]){ {269,271},{277,281},{283,293},{307,311},
45 {313,317},{331,337},{347,349},{353,359},
46 {367,373},{379,383},{389,397},{401,409},
47 {419,421} } } },
48 8936 },
49 &(C){ { {"Die duvel", "Noyt vrouwe en leefde op eerde so able", 0},
50 { ._length = 17, ._maximum = 17, ._release = false,
51 ._buffer = (T[]){ {431,433},{439,443},{449,457},{461,463},
52 {467,479},{487,491},{499,503},{509,521},
53 {523,541},{547,557},{563,569},{571,577},
54 {587,593},{599,601},{607,613},{617,619},
55 {631,641} } } },
56 18088 },
57 &(C){ { {"Die duvel", "Als ic u maken sal.", 0},
58 { ._length = 19, ._maximum = 19, ._release = false,
59 ._buffer = (T[]){ {643,647},{653,659},{661,673},{677,683},
60 {691,701},{709,719},{727,733},{739,743},
61 {751,757},{761,769},{773,787},{797,809},
62 {811,821},{823,827},{829,839},{853,857},
63 {859,863},{877,881},{883,887} } } },
64 29172 },
65 &(C){ { {"Mariken", "So moetti wel zijn een constich man.", 0},
66 { ._length = 23, ._maximum = 23, ._release = false,
67 ._buffer = (T[]){ {907,911},{919,929},{937,941},{947,953},
68 {967,971},{977,983},{991,997},{1009,1013},
69 {1019,1021},{1031,1033},{1039,1049},{1051,1061},
70 {1063,1069},{1087,1091},{1093,1097},{1103,1109},
71 {1117,1123},{1129,1151},{1153,1163},{1171,1181},
72 {1187,1193},{1201,1213},{1217,1223} } } },
73 16022 },
74 &(C){ { {"Mariken", "Wie sidi dan?", 0},
75 { ._length = 29, ._maximum = 29, ._release = false,
76 ._buffer = (T[]){ {1229,1231},{1237,1249},{1259,1277},{1279,1283},
77 {1289,1291},{1297,1301},{1303,1307},{1319,1321},
78 {1327,1361},{1367,1373},{1381,1399},{1409,1423},
79 {1427,1429},{1433,1439},{1447,1451},{1453,1459},
80 {1471,1481},{1483,1487},{1489,1493},{1499,1511},
81 {1523,1531},{1543,1549},{1553,1559},{1567,1571},
82 {1579,1583},{1597,1601},{1607,1609},{1613,1619},
83 {1621,1627} } } },
84 17880 },
85 NULL
86};
87
88static int32_t long_4 = 4;
89static void *samples_M1_O[] = {
90 &(M1_O){ .x = NULL },
91 &(M1_O){ .x = &long_4 },
92 NULL
93};
94
95static void *samples_d[] = {
96 &(D){ L"😀 Een Kruyck gaat soo langh te water tot datse barst.", 0x2206, 0 },
97 &(D){ L"🙃 Men treckt een Boogh soo lang tot datse stucken knarst.", 0x2207, 0 },
98 &(D){ L"😊 De Steel-kunst doet zyn Meester de dood vaak verwerven.", 0x22a5, 0 },
99 NULL
100};
101
102static struct tpentry {
103 const char *name;
104 const dds_topic_descriptor_t *descr;
105 void **samples;
106 size_t count_offset;
107} tptab[] = {
108 { "A", &A_desc, samples_a, offsetof (A, count) },
109 { "B", &B_desc, samples_b, offsetof (B, a.count) },
110 { "C", &C_desc, samples_c, offsetof (C, b.a.count) },
111 { "M1::O", &M1_O_desc, samples_M1_O, SIZE_MAX },
112 { "D", &D_desc, samples_d, offsetof (D, count) },
113 { NULL, NULL, NULL, 0 }
114};
115
116static void usage (const char *argv0)
117{
118 fprintf (stderr, "usage: %s {", argv0);
119 const char *sep = "";
120 for (struct tpentry *tpentry = &tptab[0]; tpentry->name; tpentry++)
121 {
122 fprintf (stderr, "%s%s", sep, tpentry->name);
123 sep = "|";
124 }
125 fprintf (stderr, "}\n");
126 exit (2);
127}
128
129static volatile sig_atomic_t interrupted;
130
131static void sigint (int sig)
132{
133 (void) sig;
134 interrupted = 1;
135}
136
137int main (int argc, char **argv)
138{
139 if (argc != 2)
140 usage (argv[0]);
141 struct tpentry *tpentry;
142 for (tpentry = &tptab[0]; tpentry->name; tpentry++)
143 if (strcmp (tpentry->name, argv[1]) == 0)
144 break;
145 if (tpentry->name == NULL)
146 usage (argv[0]);
147
148 const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
149 if (participant < 0)
150 {
151 fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
152 return 1;
153 }
154
155 const dds_entity_t topic = dds_create_topic (participant, tpentry->descr, tpentry->name, NULL, NULL);
156 const dds_entity_t writer = dds_create_writer (participant, topic, NULL, NULL);
157 uint32_t sample_idx = 0;
158 uint32_t count = 0;
159 signal (SIGINT, sigint);
160 while (!interrupted)
161 {
162 dds_return_t ret = 0;
163 void *sample = tpentry->samples[sample_idx];
164 uint32_t * const countp =
165 (tpentry->count_offset != SIZE_MAX)
166 ? (uint32_t *) ((unsigned char *) sample + tpentry->count_offset)
167 : 0;
168 if (countp)
169 *countp = count++;
170 if ((ret = dds_write (writer, sample)) < 0)
171 {
172 fprintf (stderr, "dds_write: %s\n", dds_strretcode (ret));
173 dds_delete (participant);
174 return 1;
175 }
176 if (tpentry->samples[++sample_idx] == NULL)
177 {
178 sample_idx = 0;
179 }
180 dds_sleepfor (DDS_SECS (1));
181 }
182
183 dds_delete (participant);
184 return 0;
185}