dynsub¶
The dynsub example is a PoC for a C-based JSON printer of arbitrary data. It assumes that topic discovery is enabled, but doesn’t require it.
Running the example¶
Pass the name of a topic to dynsub and it waits for a writer of that topic to show up. When it finds one in the DCPSPublication topic, it tries to subscribe and print the received samples as JSON.
For example: Start the HelloworldPublisher in one shell:
# bin/HelloworldPublisher
=== [Publisher] Waiting for a reader to be discovered ...
In another shell start dynsub:
# bin/dynsub HelloWorldData_Msg
{"userID":1,"message":"Hello World"}
{"userID":1}
The second line is the “invalid sample” generated because of the termination of the publisher. In Cyclone DDS, only the key fields are valid, and therefore printed.
Instead of the HelloWorldData_Msg, the small publisher program “variouspub” can publish a number of different types. Pass it the name of the type to publish. For example:
# bin/variouspub B
This publishes samples at 1Hz until killed.
Source code¶
1// Copyright(c) 2022 to 2023 ZettaScale Technology and others
2//
3// This program and the accompanying materials are made available under the
4// terms of the Eclipse Public License v. 2.0 which is available at
5// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
6// v. 1.0 which is available at
7// http://www.eclipse.org/org/documents/edl-v10.php.
8//
9// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
10
11#include <stdio.h>
12#include <string.h>
13#include <stdlib.h>
14#include <assert.h>
15#include <locale.h>
16#include <signal.h>
17#if !defined _WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__
18#include <unistd.h>
19#endif
20
21#include "dds/dds.h"
22#include "dynsub.h"
23
24// Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data.
25// The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it
26// comes in two forms: MinimalTypeObject and CompleteTypeObject. Only the latter includes field names, so
27// that's what need.
28//
29// Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by
30// IDLC. So instead of including yet another copy, we simply refer to those. These files are not (yet?)
31// part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names
32// of the relevant header files.
33//
34// The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the
35// corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation. Rather than casting
36// pointers like we do here, they should be defined in a slightly different way so that they are not really
37// opaque. For now, this'll have to do.
38#include "dds/ddsc/dds_public_alloc.h"
39#include "dds/ddsi/ddsi_sertype.h"
40#include "dds/ddsi/ddsi_xt_typeinfo.h"
41
42#include "dds/ddsrt/threads.h"
43#include "dds/ddsi/ddsi_serdata.h"
44
45// For convenience, the DDS participant is global
46static dds_entity_t participant;
47
48static dds_entity_t termcond;
49
50// Helper function to wait for a DCPSPublication/DCPSSubscription to show up with the desired topic name,
51// then calls dds_find_topic to create a topic for that data writer's/reader's type up the retrieves the
52// type object.
53static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj)
54{
55 const dds_entity_t waitset = dds_create_waitset (participant);
56 const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL);
57 const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE);
58 const dds_entity_t dcpssubscription_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL);
59 const dds_entity_t dcpssubscription_readcond = dds_create_readcondition (dcpssubscription_reader, DDS_ANY_STATE);
60 (void) dds_waitset_attach (waitset, dcpspublication_readcond, dcpspublication_reader);
61 (void) dds_waitset_attach (waitset, dcpssubscription_readcond, dcpssubscription_reader);
62 const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout;
63 dds_return_t ret = DDS_RETCODE_OK;
64 *xtypeobj = NULL;
65 struct ppc ppc;
66 ppc_init (&ppc);
67 dds_attach_t triggered_reader_x;
68 while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, &triggered_reader_x, 1, abstimeout) > 0)
69 {
70 void *epraw = NULL;
71 dds_sample_info_t si;
72 dds_entity_t triggered_reader = (dds_entity_t) triggered_reader_x;
73 if (dds_take (triggered_reader, &epraw, &si, 1, 1) <= 0)
74 continue;
75 dds_builtintopic_endpoint_t *ep = epraw;
76 const dds_typeinfo_t *typeinfo = NULL;
77 // We are only interested in DCPSPublications where the topic name matches and that carry type information
78 // (a non-XTypes capable DDS would not provide type information) because without that information there is
79 // no way we can do anything interesting with it.
80 if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo)
81 {
82 // Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does
83 // require that topic discovery is enabled in the configuration. The advantage of using dds_find_topic
84 // is that it creates a topic with the same name, type *and QoS*. That distinction only matters if
85 // topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent:
86 // - using a different topic QoS might result in an incompatible QoS notification if topic discovery is
87 // enabled (everything would still work).
88 // - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters
89 //
90 // So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS
91 // as an approximation of the topic QoS.
92 if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0)
93 {
94 fprintf (stderr, "dds_find_topic: %s ... continuing on the assumption that topic discovery is disabled\n", dds_strretcode (*topic));
95 dds_topic_descriptor_t *descriptor;
96 if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0)
97 {
98 fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret));
99 dds_return_loan (triggered_reader, &epraw, 1);
100 goto error;
101 }
102 dds_qset_data_representation (ep->qos, 0, NULL);
103 if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0)
104 {
105 fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic));
106 dds_delete_topic_descriptor (descriptor);
107 dds_return_loan (triggered_reader, &epraw, 1);
108 goto error;
109 }
110 dds_delete_topic_descriptor (descriptor);
111 }
112 // The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data
113 if ((*xtypeobj = load_type_with_deps (participant, typeinfo, &ppc)) == NULL)
114 {
115 fprintf (stderr, "loading type with all dependencies failed\n");
116 dds_return_loan (triggered_reader, &epraw, 1);
117 goto error;
118 }
119 if (load_type_with_deps_min (participant, typeinfo, &ppc) == NULL)
120 {
121 fprintf (stderr, "loading minimal type with all dependencies failed\n");
122 dds_return_loan (triggered_reader, &epraw, 1);
123 goto error;
124 }
125 }
126 dds_return_loan (triggered_reader, &epraw, 1);
127 }
128 if (*xtypeobj)
129 {
130 // If we got the type object, populate the type cache
131 size_t align, size;
132 build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size);
133 fflush (stdout);
134 struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info;
135 if ((info = type_cache_lookup (&templ)) != NULL)
136 {
137 assert (info->release == NULL);
138 info->release = *xtypeobj;
139 }
140 else
141 {
142 // not sure whether this is at all possible
143 info = malloc (sizeof (*info));
144 assert (info);
145 *info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size };
146 type_cache_add (info);
147 }
148 }
149error:
150 dds_delete (dcpspublication_reader);
151 dds_delete (dcpssubscription_reader);
152 dds_delete (waitset);
153 return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
154}
155
156static bool print_sample_normal (dds_entity_t reader, const DDS_XTypes_TypeObject *xtypeobj)
157{
158 void *raw = NULL;
159 dds_sample_info_t si;
160 dds_return_t ret;
161 if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0)
162 return false;
163 else if (ret != 0)
164 {
165 // ... that we then print
166 print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
167 if (dds_return_loan (reader, &raw, 1) < 0)
168 return false;
169 }
170 return true;
171}
172
173static void hexdump (const unsigned char *msg, const size_t len)
174{
175 for (size_t off16 = 0; off16 < len; off16 += 16)
176 {
177 printf ("%04" PRIxSIZE " ", off16);
178 size_t off1;
179 for (off1 = 0; off1 < 16 && off16 + off1 < len; off1++)
180 printf ("%s %02x", (off1 == 8) ? " " : "", msg[off16 + off1]);
181 for (; off1 < 16; off1++)
182 printf ("%s ", (off1 == 8) ? " " : "");
183 printf (" |");
184 for (off1 = 0; off1 < 16 && off16 + off1 < len; off1++)
185 {
186 unsigned char c = msg[off16 + off1];
187 printf ("%c", (c >= 32 && c < 127) ? c : '.');
188 }
189 printf ("|\n");
190 }
191 fflush (stdout);
192}
193
194static const char *encodingstr (const struct ddsi_serdata *sd)
195{
196 uint16_t encoding;
197 ddsi_serdata_to_ser (sd, 0, 2, &encoding);
198 switch (encoding)
199 {
200 case DDSI_RTPS_CDR_BE:
201 case DDSI_RTPS_CDR_LE:
202 return "CDR";
203 case DDSI_RTPS_PL_CDR_BE:
204 case DDSI_RTPS_PL_CDR_LE:
205 return "PL_CDR";
206 case DDSI_RTPS_CDR2_BE:
207 case DDSI_RTPS_CDR2_LE:
208 return "CDR2";
209 case DDSI_RTPS_D_CDR2_BE:
210 case DDSI_RTPS_D_CDR2_LE:
211 return "D_CDR2";
212 case DDSI_RTPS_PL_CDR2_BE:
213 case DDSI_RTPS_PL_CDR2_LE:
214 return "PL_CDR2";
215 default:
216 return "unknown";
217 }
218}
219
220static bool print_sample_cdr (dds_entity_t reader, const DDS_XTypes_TypeObject *xtypeobj)
221{
222 // Note: doesn't print the exact CDR received, but the "normalised" one where byteswapping
223 // has been performed, booleans have been mapped to 0 or 1, and perhaps some other similar
224 // changes have been made.
225 struct ddsi_serdata *sd = NULL;
226 dds_sample_info_t si;
227 dds_return_t ret;
228 if ((ret = dds_takecdr (reader, &sd, 1, &si, 0)) < 0)
229 return false;
230 else if (ret != 0)
231 {
232 printf ("encoding: %s", encodingstr (sd));
233 if (!si.valid_data)
234 printf (" (expect XCDR2 because it is an invalid sample)");
235 printf ("\n");
236 if (ddsi_serdata_size (sd) == 4)
237 printf ("(no payload)\n");
238 else
239 {
240 ddsrt_iovec_t iov;
241 struct ddsi_serdata *refsd;
242 refsd = ddsi_serdata_to_ser_ref (sd, 4, ddsi_serdata_size (sd) - 4, &iov);
243 hexdump (iov.iov_base, iov.iov_len);
244 ddsi_serdata_to_ser_unref (refsd, &iov);
245 }
246
247 void *raw = calloc (1, sd->type->sizeof_type);
248 if (raw == NULL)
249 abort ();
250
251 bool ok;
252 if (si.valid_data)
253 ok = ddsi_serdata_to_sample (sd, raw, NULL, NULL);
254 else
255 {
256 const struct ddsi_sertype *st;
257 dds_get_entity_sertype (reader, &st);
258 ok = ddsi_serdata_untyped_to_sample (st, sd, raw, NULL, NULL);
259 }
260 if (ok)
261 print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
262 else
263 printf ("(conversion to sample failed)\n");
264 ddsi_sertype_free_sample (sd->type, raw, DDS_FREE_CONTENTS);
265 free (raw);
266
267 ddsi_serdata_unref (sd);
268 }
269 return true;
270}
271
272#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
273static void signal_handler (int sig)
274{
275 (void) sig;
276 dds_set_guardcondition (termcond, true);
277}
278#endif
279
280#if !_WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__
281static uint32_t sigthread (void *varg)
282{
283 sigset_t *set = varg;
284 int sig;
285 if (sigwait (set, &sig) == 0)
286 signal_handler (sig);
287 return 0;
288}
289#endif
290
291int main (int argc, char **argv)
292{
293 dds_return_t ret = 0;
294 dds_entity_t topic = 0;
295 bool raw_mode;
296 const char *topic_name;
297
298 // for printf("%ls")
299 setlocale (LC_CTYPE, "");
300
301 if (argc == 2)
302 {
303 raw_mode = false;
304 topic_name = argv[1];
305 }
306 else if (argc == 3 && strcmp (argv[1], "-r") == 0)
307 {
308 raw_mode = true;
309 topic_name = argv[2];
310 }
311 else
312 {
313 fprintf (stderr, "usage: %s [-r] topicname\n", argv[0]);
314 return 2;
315 }
316
317 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
318 if (participant < 0)
319 {
320 fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
321 return 1;
322 }
323
324 // The one magic step: get a topic and type object ...
325 DDS_XTypes_TypeObject *xtypeobj;
326 type_cache_init ();
327 if ((ret = get_topic_and_typeobj (topic_name, DDS_SECS (10), &topic, &xtypeobj)) < 0)
328 {
329 fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret));
330 goto error;
331 }
332 // ... given those, we can create a reader just like we do normally ...
333 const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL);
334 // ... and create a waitset that allows us to wait for any incoming data ...
335 const dds_entity_t waitset = dds_create_waitset (participant);
336 const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
337 (void) dds_waitset_attach (waitset, readcond, 0);
338
339 termcond = dds_create_guardcondition (participant);
340 (void) dds_waitset_attach (waitset, termcond, 0);
341
342#ifdef _WIN32
343 signal (SIGINT, signal_handler);
344#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
345 ddsrt_thread_t sigtid;
346 sigset_t sigset, osigset;
347 sigemptyset (&sigset);
348#ifdef __APPLE__
349 DDSRT_WARNING_GNUC_OFF(sign-conversion)
350#endif
351 sigaddset (&sigset, SIGHUP);
352 sigaddset (&sigset, SIGINT);
353 sigaddset (&sigset, SIGTERM);
354#ifdef __APPLE__
355 DDSRT_WARNING_GNUC_ON(sign-conversion)
356#endif
357 sigprocmask (SIG_BLOCK, &sigset, &osigset);
358 {
359 ddsrt_threadattr_t tattr;
360 ddsrt_threadattr_init (&tattr);
361 ddsrt_thread_create (&sigtid, "sigthread", &tattr, sigthread, &sigset);
362 }
363#endif
364
365 bool termflag = false;
366 while (!termflag)
367 {
368 (void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY);
369 dds_read_guardcondition (termcond, &termflag);
370
371 bool ok = raw_mode ? print_sample_cdr (reader, xtypeobj) : print_sample_normal (reader, xtypeobj);
372 if (!ok)
373 break;
374 }
375
376#if _WIN32
377 signal_handler (SIGINT);
378#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
379 {
380 /* get the attention of the signal handler thread */
381 void (*osigint) (int);
382 void (*osigterm) (int);
383 kill (getpid (), SIGTERM);
384 ddsrt_thread_join (sigtid, NULL);
385 osigint = signal (SIGINT, SIG_IGN);
386 osigterm = signal (SIGTERM, SIG_IGN);
387 sigprocmask (SIG_SETMASK, &osigset, NULL);
388 signal (SIGINT, osigint);
389 signal (SIGINT, osigterm);
390 }
391#endif
392
393error:
394 type_cache_free ();
395 dds_delete (participant);
396 return ret < 0;
397}
1struct A {
2 @key
3 string name;
4 string message;
5 unsigned long count;
6};
7
8struct T {
9 short s;
10 long l;
11};
12
13struct B {
14 A a;
15 sequence<T> ts;
16};
17
18struct C {
19 B b;
20 @key
21 short k;
22};
23
24module M1 {
25 @appendable
26 struct O {
27 @optional long x;
28 };
29};
30
31struct D {
32 wstring ws;
33 wchar wc;
34 unsigned long count;
35};
36
37struct U {
38 uint32 w;
39 @key string x;
40 string y;
41 @key uint32 z;
42};
43
44struct E {
45 uint32 a;
46 @key sequence<U> b[2];
47 @key uint32 c;
48};
1#include <stdio.h>
2#include <string.h>
3#include <stdlib.h>
4#include <signal.h>
5#include <assert.h>
6
7#include "dds/dds.h"
8#include "variouspub_types.h"
9
10static void *samples_a[] = {
11 &(A){ "Mariken", "Wie sidi, vrient?", 0 },
12 &(A){ "Die duvel", "Een meester vol consten,", 0 },
13 &(A){ "Die duvel", "Nieuwers af falende, wes ic besta.", 0 },
14 &(A){ "Mariken", "'t Comt mi alleleens met wien dat ick ga,", 0 },
15 &(A){ "Mariken", "Also lief gae ic metten quaetsten als metten besten.", 0 },
16 NULL
17};
18
19static void *samples_b[] = {
20 &(B){ {"Die duvel", "Wildi u liefde te mi werts vesten,", 0},
21 { ._length = 2, ._maximum = 2, ._release = false,
22 ._buffer = (T[]){ {2,3},{5,7} } } },
23 &(B){ {"Die duvel", "Ick sal u consten leeren sonder ghelijcke,", 0},
24 { ._length = 3, ._maximum = 3, ._release = false,
25 ._buffer = (T[]){ {11,13},{17,19},{23,29} } } },
26 &(B){ {"Die duvel", "Die seven vrie consten: rethorijcke, musijcke,", 0},
27 { ._length = 5, ._maximum = 5, ._release = false,
28 ._buffer = (T[]){ {31,37},{41,43},{47,52},{59,61},{67,71} } } },
29 &(B){ {"Die duvel", "Logica, gramatica ende geometrie,", 0},
30 { ._length = 7, ._maximum = 7, ._release = false,
31 ._buffer = (T[]){ {73,79},{83,89},{97,101},{103,107},{109,113},
32 {127,131},{137,139} } } },
33 &(B){ {"Die duvel", "Aristmatica ende alkenie,", 0},
34 { ._length = 11, ._maximum = 11, ._release = false,
35 ._buffer = (T[]){ {149,151},{157,163},{167,173},{179,181},
36 {191,193},{197, 199},{211,223},{227,229},
37 {233,239},{241,251},{257,263} } } },
38 NULL
39};
40
41static void *samples_c[] = {
42 &(C){ { {"Die duvel", "Dwelc al consten sijn seer curable.", 0},
43 { ._length = 13, ._maximum = 13, ._release = false,
44 ._buffer = (T[]){ {269,271},{277,281},{283,293},{307,311},
45 {313,317},{331,337},{347,349},{353,359},
46 {367,373},{379,383},{389,397},{401,409},
47 {419,421} } } },
48 8936 },
49 &(C){ { {"Die duvel", "Noyt vrouwe en leefde op eerde so able", 0},
50 { ._length = 17, ._maximum = 17, ._release = false,
51 ._buffer = (T[]){ {431,433},{439,443},{449,457},{461,463},
52 {467,479},{487,491},{499,503},{509,521},
53 {523,541},{547,557},{563,569},{571,577},
54 {587,593},{599,601},{607,613},{617,619},
55 {631,641} } } },
56 18088 },
57 &(C){ { {"Die duvel", "Als ic u maken sal.", 0},
58 { ._length = 19, ._maximum = 19, ._release = false,
59 ._buffer = (T[]){ {643,647},{653,659},{661,673},{677,683},
60 {691,701},{709,719},{727,733},{739,743},
61 {751,757},{761,769},{773,787},{797,809},
62 {811,821},{823,827},{829,839},{853,857},
63 {859,863},{877,881},{883,887} } } },
64 29172 },
65 &(C){ { {"Mariken", "So moetti wel zijn een constich man.", 0},
66 { ._length = 23, ._maximum = 23, ._release = false,
67 ._buffer = (T[]){ {907,911},{919,929},{937,941},{947,953},
68 {967,971},{977,983},{991,997},{1009,1013},
69 {1019,1021},{1031,1033},{1039,1049},{1051,1061},
70 {1063,1069},{1087,1091},{1093,1097},{1103,1109},
71 {1117,1123},{1129,1151},{1153,1163},{1171,1181},
72 {1187,1193},{1201,1213},{1217,1223} } } },
73 16022 },
74 &(C){ { {"Mariken", "Wie sidi dan?", 0},
75 { ._length = 29, ._maximum = 29, ._release = false,
76 ._buffer = (T[]){ {1229,1231},{1237,1249},{1259,1277},{1279,1283},
77 {1289,1291},{1297,1301},{1303,1307},{1319,1321},
78 {1327,1361},{1367,1373},{1381,1399},{1409,1423},
79 {1427,1429},{1433,1439},{1447,1451},{1453,1459},
80 {1471,1481},{1483,1487},{1489,1493},{1499,1511},
81 {1523,1531},{1543,1549},{1553,1559},{1567,1571},
82 {1579,1583},{1597,1601},{1607,1609},{1613,1619},
83 {1621,1627} } } },
84 17880 },
85 NULL
86};
87
88static int32_t long_4 = 4;
89static void *samples_M1_O[] = {
90 &(M1_O){ .x = NULL },
91 &(M1_O){ .x = &long_4 },
92 NULL
93};
94
95static void *samples_d[] = {
96 &(D){ L"😀 Een Kruyck gaat soo langh te water tot datse barst.", 0x2206, 0 },
97 &(D){ L"🙃 Men treckt een Boogh soo lang tot datse stucken knarst.", 0x2207, 0 },
98 &(D){ L"😊 De Steel-kunst doet zyn Meester de dood vaak verwerven.", 0x22a5, 0 },
99 NULL
100};
101
102static void *samples_e[] = {
103 &(E){ 0, {{0},{0}}, 456 },
104 &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){34,"aap","noot",56}, ._release = false}}, 456},
105 &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){78,"aap","zus",56}, ._release = false}}, 456},
106 &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){78,"wim","zus",90}, ._release = false}}, 456},
107 NULL
108};
109
110static struct tpentry {
111 const char *name;
112 const dds_topic_descriptor_t *descr;
113 void **samples;
114 size_t count_offset;
115} tptab[] = {
116 { "A", &A_desc, samples_a, offsetof (A, count) },
117 { "B", &B_desc, samples_b, offsetof (B, a.count) },
118 { "C", &C_desc, samples_c, offsetof (C, b.a.count) },
119 { "M1::O", &M1_O_desc, samples_M1_O, SIZE_MAX },
120 { "D", &D_desc, samples_d, offsetof (D, count) },
121 { "E", &E_desc, samples_e, offsetof (E, a) },
122 { NULL, NULL, NULL, 0 }
123};
124
125static void usage (const char *argv0)
126{
127 fprintf (stderr, "usage: %s {", argv0);
128 const char *sep = "";
129 for (struct tpentry *tpentry = &tptab[0]; tpentry->name; tpentry++)
130 {
131 fprintf (stderr, "%s%s", sep, tpentry->name);
132 sep = "|";
133 }
134 fprintf (stderr, "}\n");
135 exit (2);
136}
137
138static volatile sig_atomic_t interrupted;
139
140static void sigint (int sig)
141{
142 (void) sig;
143 interrupted = 1;
144}
145
146int main (int argc, char **argv)
147{
148 if (argc != 2)
149 usage (argv[0]);
150 struct tpentry *tpentry;
151 for (tpentry = &tptab[0]; tpentry->name; tpentry++)
152 if (strcmp (tpentry->name, argv[1]) == 0)
153 break;
154 if (tpentry->name == NULL)
155 usage (argv[0]);
156
157 const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
158 if (participant < 0)
159 {
160 fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
161 return 1;
162 }
163
164 const dds_entity_t topic = dds_create_topic (participant, tpentry->descr, tpentry->name, NULL, NULL);
165 const dds_entity_t writer = dds_create_writer (participant, topic, NULL, NULL);
166 uint32_t sample_idx = 0;
167 uint32_t count = 0;
168 signal (SIGINT, sigint);
169 while (!interrupted)
170 {
171 dds_return_t ret = 0;
172 void *sample = tpentry->samples[sample_idx];
173 uint32_t * const countp =
174 (tpentry->count_offset != SIZE_MAX)
175 ? (uint32_t *) ((unsigned char *) sample + tpentry->count_offset)
176 : 0;
177 if (countp)
178 *countp = count++;
179 if ((ret = dds_write (writer, sample)) < 0)
180 {
181 fprintf (stderr, "dds_write: %s\n", dds_strretcode (ret));
182 dds_delete (participant);
183 return 1;
184 }
185 if (tpentry->samples[++sample_idx] == NULL)
186 {
187 sample_idx = 0;
188 }
189 dds_sleepfor (DDS_SECS (1));
190 }
191
192 dds_delete (participant);
193 return 0;
194}