dynsub

The dynsub example is a PoC for a C-based JSON printer of arbitrary data. It assumes that topic discovery is enabled, but doesn’t require it.

Running the example

Pass the name of a topic to dynsub and it waits for a writer of that topic to show up. When it finds one in the DCPSPublication topic, it tries to subscribe and print the received samples as JSON.

For example: Start the HelloworldPublisher in one shell:

# bin/HelloworldPublisher
=== [Publisher]  Waiting for a reader to be discovered ...

In another shell start dynsub:

# bin/dynsub HelloWorldData_Msg
{"userID":1,"message":"Hello World"}
{"userID":1}

The second line is the “invalid sample” generated because of the termination of the publisher. In Cyclone DDS, only the key fields are valid, and therefore printed.

Instead of the HelloWorldData_Msg, the small publisher program “variouspub” can publish a number of different types. Pass it the name of the type to publish. For example:

# bin/variouspub B

This publishes samples at 1Hz until killed.

Source code

dynsub.c
  1// Copyright(c) 2022 to 2023 ZettaScale Technology and others
  2//
  3// This program and the accompanying materials are made available under the
  4// terms of the Eclipse Public License v. 2.0 which is available at
  5// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
  6// v. 1.0 which is available at
  7// http://www.eclipse.org/org/documents/edl-v10.php.
  8//
  9// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
 10
 11#include <stdio.h>
 12#include <string.h>
 13#include <stdlib.h>
 14#include <assert.h>
 15#include <locale.h>
 16#include <signal.h>
 17#if !defined _WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 18#include <unistd.h>
 19#endif
 20
 21#include "dds/dds.h"
 22#include "dynsub.h"
 23
 24// Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data.
 25// The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it
 26// comes in two forms: MinimalTypeObject and CompleteTypeObject.  Only the latter includes field names, so
 27// that's what need.
 28//
 29// Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by
 30// IDLC.  So instead of including yet another copy, we simply refer to those.  These files are not (yet?)
 31// part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names
 32// of the relevant header files.
 33//
 34// The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the
 35// corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation.  Rather than casting
 36// pointers like we do here, they should be defined in a slightly different way so that they are not really
 37// opaque.  For now, this'll have to do.
 38#include "dds/ddsc/dds_public_alloc.h"
 39#include "dds/ddsi/ddsi_sertype.h"
 40#include "dds/ddsi/ddsi_xt_typeinfo.h"
 41
 42#include "dds/ddsrt/threads.h"
 43#include "dds/ddsi/ddsi_serdata.h"
 44
 45// For convenience, the DDS participant is global
 46static dds_entity_t participant;
 47
 48static dds_entity_t termcond;
 49
 50// Helper function to wait for a DCPSPublication/DCPSSubscription to show up with the desired topic name,
 51// then calls dds_find_topic to create a topic for that data writer's/reader's type up the retrieves the
 52// type object.
 53static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj)
 54{
 55  const dds_entity_t waitset = dds_create_waitset (participant);
 56  const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL);
 57  const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE);
 58  const dds_entity_t dcpssubscription_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL);
 59  const dds_entity_t dcpssubscription_readcond = dds_create_readcondition (dcpssubscription_reader, DDS_ANY_STATE);
 60  (void) dds_waitset_attach (waitset, dcpspublication_readcond, dcpspublication_reader);
 61  (void) dds_waitset_attach (waitset, dcpssubscription_readcond, dcpssubscription_reader);
 62  const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout;
 63  dds_return_t ret = DDS_RETCODE_OK;
 64  *xtypeobj = NULL;
 65  struct ppc ppc;
 66  ppc_init (&ppc);
 67  dds_attach_t triggered_reader_x;
 68  while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, &triggered_reader_x, 1, abstimeout) > 0)
 69  {
 70    void *epraw = NULL;
 71    dds_sample_info_t si;
 72    dds_entity_t triggered_reader = (dds_entity_t) triggered_reader_x;
 73    if (dds_take (triggered_reader, &epraw, &si, 1, 1) <= 0)
 74      continue;
 75    dds_builtintopic_endpoint_t *ep = epraw;
 76    const dds_typeinfo_t *typeinfo = NULL;
 77    // We are only interested in DCPSPublications where the topic name matches and that carry type information
 78    // (a non-XTypes capable DDS would not provide type information) because without that information there is
 79    // no way we can do anything interesting with it.
 80    if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo)
 81    {
 82      // Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does
 83      // require that topic discovery is enabled in the configuration.  The advantage of using dds_find_topic
 84      // is that it creates a topic with the same name, type *and QoS*.  That distinction only matters if
 85      // topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent:
 86      // - using a different topic QoS might result in an incompatible QoS notification if topic discovery is
 87      //   enabled (everything would still work).
 88      // - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters
 89      //
 90      // So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS
 91      // as an approximation of the topic QoS.
 92      if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0)
 93      {
 94        fprintf (stderr, "dds_find_topic: %s ... continuing on the assumption that topic discovery is disabled\n", dds_strretcode (*topic));
 95        dds_topic_descriptor_t *descriptor;
 96        if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0)
 97        {
 98          fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret));
 99          dds_return_loan (triggered_reader, &epraw, 1);
100          goto error;
101        }
102        dds_qset_data_representation (ep->qos, 0, NULL);
103        if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0)
104        {
105          fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic));
106          dds_delete_topic_descriptor (descriptor);
107          dds_return_loan (triggered_reader, &epraw, 1);
108          goto error;
109        }
110        dds_delete_topic_descriptor (descriptor);
111      }
112      // The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data
113      if ((*xtypeobj = load_type_with_deps (participant, typeinfo, &ppc)) == NULL)
114      {
115        fprintf (stderr, "loading type with all dependencies failed\n");
116        dds_return_loan (triggered_reader, &epraw, 1);
117        goto error;
118      }
119      if (load_type_with_deps_min (participant, typeinfo, &ppc) == NULL)
120      {
121        fprintf (stderr, "loading minimal type with all dependencies failed\n");
122        dds_return_loan (triggered_reader, &epraw, 1);
123        goto error;
124      }
125    }
126    dds_return_loan (triggered_reader, &epraw, 1);
127  }
128  if (*xtypeobj)
129  {
130    // If we got the type object, populate the type cache
131    size_t align, size;
132    build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size);
133    fflush (stdout);
134    struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info;
135    if ((info = type_cache_lookup (&templ)) != NULL)
136    {
137      assert (info->release == NULL);
138      info->release = *xtypeobj;
139    }
140    else
141    {
142      // not sure whether this is at all possible
143      info = malloc (sizeof (*info));
144      assert (info);
145      *info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size };
146      type_cache_add (info);
147    }
148  }
149error:
150  dds_delete (dcpspublication_reader);
151  dds_delete (dcpssubscription_reader);
152  dds_delete (waitset);
153  return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
154}
155
156static bool print_sample_normal (dds_entity_t reader, const DDS_XTypes_TypeObject *xtypeobj)
157{
158  void *raw = NULL;
159  dds_sample_info_t si;
160  dds_return_t ret;
161  if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0)
162    return false;
163  else if (ret != 0)
164  {
165    // ... that we then print
166    print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
167    if (dds_return_loan (reader, &raw, 1) < 0)
168      return false;
169  }
170  return true;
171}
172
173static void hexdump (const unsigned char *msg, const size_t len)
174{
175  for (size_t off16 = 0; off16 < len; off16 += 16)
176  {
177    printf ("%04" PRIxSIZE " ", off16);
178    size_t off1;
179    for (off1 = 0; off1 < 16 && off16 + off1 < len; off1++)
180      printf ("%s %02x", (off1 == 8) ? " " : "", msg[off16 + off1]);
181    for (; off1 < 16; off1++)
182      printf ("%s   ", (off1 == 8) ? " " : "");
183    printf ("  |");
184    for (off1 = 0; off1 < 16 && off16 + off1 < len; off1++)
185    {
186      unsigned char c = msg[off16 + off1];
187      printf ("%c", (c >= 32 && c < 127) ? c : '.');
188    }
189    printf ("|\n");
190  }
191  fflush (stdout);
192}
193
194static const char *encodingstr (const struct ddsi_serdata *sd)
195{
196  uint16_t encoding;
197  ddsi_serdata_to_ser (sd, 0, 2, &encoding);
198  switch (encoding)
199  {
200    case DDSI_RTPS_CDR_BE:
201    case DDSI_RTPS_CDR_LE:
202      return "CDR";
203    case DDSI_RTPS_PL_CDR_BE:
204    case DDSI_RTPS_PL_CDR_LE:
205      return "PL_CDR";
206    case DDSI_RTPS_CDR2_BE:
207    case DDSI_RTPS_CDR2_LE:
208      return "CDR2";
209    case DDSI_RTPS_D_CDR2_BE:
210    case DDSI_RTPS_D_CDR2_LE:
211      return "D_CDR2";
212    case DDSI_RTPS_PL_CDR2_BE:
213    case DDSI_RTPS_PL_CDR2_LE:
214      return "PL_CDR2";
215    default:
216      return "unknown";
217  }
218}
219
220static bool print_sample_cdr (dds_entity_t reader, const DDS_XTypes_TypeObject *xtypeobj)
221{
222  // Note: doesn't print the exact CDR received, but the "normalised" one where byteswapping
223  // has been performed, booleans have been mapped to 0 or 1, and perhaps some other similar
224  // changes have been made.
225  struct ddsi_serdata *sd = NULL;
226  dds_sample_info_t si;
227  dds_return_t ret;
228  if ((ret = dds_takecdr (reader, &sd, 1, &si, 0)) < 0)
229    return false;
230  else if (ret != 0)
231  {
232    printf ("encoding: %s", encodingstr (sd));
233    if (!si.valid_data)
234      printf (" (expect XCDR2 because it is an invalid sample)");
235    printf ("\n");
236    if (ddsi_serdata_size (sd) == 4)
237      printf ("(no payload)\n");
238    else
239    {
240      ddsrt_iovec_t iov;
241      struct ddsi_serdata *refsd;
242      refsd = ddsi_serdata_to_ser_ref (sd, 4, ddsi_serdata_size (sd) - 4, &iov);
243      hexdump (iov.iov_base, iov.iov_len);
244      ddsi_serdata_to_ser_unref (refsd, &iov);
245    }
246
247    void *raw = calloc (1, sd->type->sizeof_type);
248    if (raw == NULL)
249      abort ();
250
251    bool ok;
252    if (si.valid_data)
253      ok = ddsi_serdata_to_sample (sd, raw, NULL, NULL);
254    else
255    {
256      const struct ddsi_sertype *st;
257      dds_get_entity_sertype (reader, &st);
258      ok = ddsi_serdata_untyped_to_sample (st, sd, raw, NULL, NULL);
259    }
260    if (ok)
261      print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
262    else
263      printf ("(conversion to sample failed)\n");
264    ddsi_sertype_free_sample (sd->type, raw, DDS_FREE_CONTENTS);
265    free (raw);
266
267    ddsi_serdata_unref (sd);
268  }
269  return true;
270}
271
272#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
273static void signal_handler (int sig)
274{
275  (void) sig;
276  dds_set_guardcondition (termcond, true);
277}
278#endif
279
280#if !_WIN32 && !DDSRT_WITH_FREERTOS && !__ZEPHYR__
281static uint32_t sigthread (void *varg)
282{
283  sigset_t *set = varg;
284  int sig;
285  if (sigwait (set, &sig) == 0)
286    signal_handler (sig);
287  return 0;
288}
289#endif
290
291int main (int argc, char **argv)
292{
293  dds_return_t ret = 0;
294  dds_entity_t topic = 0;
295  bool raw_mode;
296  const char *topic_name;
297
298  // for printf("%ls")
299  setlocale (LC_CTYPE, "");
300
301  if (argc == 2)
302  {
303    raw_mode = false;
304    topic_name = argv[1];
305  }
306  else if (argc == 3 && strcmp (argv[1], "-r") == 0)
307  {
308    raw_mode = true;
309    topic_name = argv[2];
310  }
311  else
312  {
313    fprintf (stderr, "usage: %s [-r] topicname\n", argv[0]);
314    return 2;
315  }
316
317  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
318  if (participant < 0)
319  {
320    fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
321    return 1;
322  }
323
324  // The one magic step: get a topic and type object ...
325  DDS_XTypes_TypeObject *xtypeobj;
326  type_cache_init ();
327  if ((ret = get_topic_and_typeobj (topic_name, DDS_SECS (10), &topic, &xtypeobj)) < 0)
328  {
329    fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret));
330    goto error;
331  }
332  // ... given those, we can create a reader just like we do normally ...
333  const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL);
334  // ... and create a waitset that allows us to wait for any incoming data ...
335  const dds_entity_t waitset = dds_create_waitset (participant);
336  const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
337  (void) dds_waitset_attach (waitset, readcond, 0);
338
339  termcond = dds_create_guardcondition (participant);
340  (void) dds_waitset_attach (waitset, termcond, 0);
341
342#ifdef _WIN32
343  signal (SIGINT, signal_handler);
344#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
345  ddsrt_thread_t sigtid;
346  sigset_t sigset, osigset;
347  sigemptyset (&sigset);
348#ifdef __APPLE__
349  DDSRT_WARNING_GNUC_OFF(sign-conversion)
350#endif
351  sigaddset (&sigset, SIGHUP);
352  sigaddset (&sigset, SIGINT);
353  sigaddset (&sigset, SIGTERM);
354#ifdef __APPLE__
355  DDSRT_WARNING_GNUC_ON(sign-conversion)
356#endif
357  sigprocmask (SIG_BLOCK, &sigset, &osigset);
358  {
359    ddsrt_threadattr_t tattr;
360    ddsrt_threadattr_init (&tattr);
361    ddsrt_thread_create (&sigtid, "sigthread", &tattr, sigthread, &sigset);
362  }
363#endif
364
365  bool termflag = false;
366  while (!termflag)
367  {
368    (void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY);
369    dds_read_guardcondition (termcond, &termflag);
370
371    bool ok = raw_mode ? print_sample_cdr (reader, xtypeobj) : print_sample_normal (reader, xtypeobj);
372    if (!ok)
373      break;
374  }
375
376#if _WIN32
377  signal_handler (SIGINT);
378#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
379  {
380    /* get the attention of the signal handler thread */
381    void (*osigint) (int);
382    void (*osigterm) (int);
383    kill (getpid (), SIGTERM);
384    ddsrt_thread_join (sigtid, NULL);
385    osigint = signal (SIGINT, SIG_IGN);
386    osigterm = signal (SIGTERM, SIG_IGN);
387    sigprocmask (SIG_SETMASK, &osigset, NULL);
388    signal (SIGINT, osigint);
389    signal (SIGINT, osigterm);
390  }
391#endif
392
393error:
394  type_cache_free ();
395  dds_delete (participant);
396  return ret < 0;
397}
variouspub_types.idl
 1struct A {
 2  @key
 3  string name;
 4  string message;
 5  unsigned long count;
 6};
 7
 8struct T {
 9  short s;
10  long l;
11};
12
13struct B {
14  A a;
15  sequence<T> ts;
16};
17
18struct C {
19  B b;
20  @key
21  short k;
22};
23
24module M1 {
25  @appendable
26  struct O {
27    @optional long x;
28  };
29};
30
31struct D {
32  wstring ws;
33  wchar wc;
34  unsigned long count;
35};
36
37struct U {
38  uint32 w;
39  @key string x;
40  string y;
41  @key uint32 z;
42};
43
44struct E {
45  uint32 a;
46  @key sequence<U> b[2];
47  @key uint32 c;
48};
variouspub.c
  1#include <stdio.h>
  2#include <string.h>
  3#include <stdlib.h>
  4#include <signal.h>
  5#include <assert.h>
  6
  7#include "dds/dds.h"
  8#include "variouspub_types.h"
  9
 10static void *samples_a[] = {
 11  &(A){ "Mariken",   "Wie sidi, vrient?", 0 },
 12  &(A){ "Die duvel", "Een meester vol consten,", 0 },
 13  &(A){ "Die duvel", "Nieuwers af falende, wes ic besta.", 0 },
 14  &(A){ "Mariken",   "'t Comt mi alleleens met wien dat ick ga,", 0 },
 15  &(A){ "Mariken",   "Also lief gae ic metten quaetsten als metten besten.", 0 },
 16  NULL
 17};
 18
 19static void *samples_b[] = {
 20  &(B){ {"Die duvel", "Wildi u liefde te mi werts vesten,", 0},
 21        { ._length = 2, ._maximum = 2, ._release = false,
 22          ._buffer = (T[]){ {2,3},{5,7} } } },
 23  &(B){ {"Die duvel", "Ick sal u consten leeren sonder ghelijcke,", 0},
 24        { ._length = 3, ._maximum = 3, ._release = false,
 25          ._buffer = (T[]){ {11,13},{17,19},{23,29} } } },
 26  &(B){ {"Die duvel", "Die seven vrie consten: rethorijcke, musijcke,", 0},
 27        { ._length = 5, ._maximum = 5, ._release = false,
 28          ._buffer = (T[]){ {31,37},{41,43},{47,52},{59,61},{67,71} } } },
 29  &(B){ {"Die duvel", "Logica, gramatica ende geometrie,", 0},
 30        { ._length = 7, ._maximum = 7, ._release = false,
 31          ._buffer = (T[]){ {73,79},{83,89},{97,101},{103,107},{109,113},
 32                            {127,131},{137,139} } } },
 33  &(B){ {"Die duvel", "Aristmatica ende alkenie,", 0},
 34        { ._length = 11, ._maximum = 11, ._release = false,
 35          ._buffer = (T[]){ {149,151},{157,163},{167,173},{179,181},
 36                            {191,193},{197, 199},{211,223},{227,229},
 37                            {233,239},{241,251},{257,263} } } },
 38  NULL
 39};
 40
 41static void *samples_c[] = {
 42  &(C){ { {"Die duvel", "Dwelc al consten sijn seer curable.", 0},
 43          { ._length = 13, ._maximum = 13, ._release = false,
 44            ._buffer = (T[]){ {269,271},{277,281},{283,293},{307,311},
 45                              {313,317},{331,337},{347,349},{353,359},
 46                              {367,373},{379,383},{389,397},{401,409},
 47                              {419,421} } } },
 48        8936 },
 49  &(C){ { {"Die duvel", "Noyt vrouwe en leefde op eerde so able", 0},
 50          { ._length = 17, ._maximum = 17, ._release = false,
 51            ._buffer = (T[]){ {431,433},{439,443},{449,457},{461,463},
 52                              {467,479},{487,491},{499,503},{509,521},
 53                              {523,541},{547,557},{563,569},{571,577},
 54                              {587,593},{599,601},{607,613},{617,619},
 55                              {631,641} } } },
 56        18088 },
 57  &(C){ { {"Die duvel", "Als ic u maken sal.", 0},
 58          { ._length = 19, ._maximum = 19, ._release = false,
 59            ._buffer = (T[]){ {643,647},{653,659},{661,673},{677,683},
 60                              {691,701},{709,719},{727,733},{739,743},
 61                              {751,757},{761,769},{773,787},{797,809},
 62                              {811,821},{823,827},{829,839},{853,857},
 63                              {859,863},{877,881},{883,887} } } },
 64        29172 },
 65  &(C){ { {"Mariken",  "So moetti wel zijn een constich man.", 0},
 66          { ._length = 23, ._maximum = 23, ._release = false,
 67            ._buffer = (T[]){ {907,911},{919,929},{937,941},{947,953},
 68                              {967,971},{977,983},{991,997},{1009,1013},
 69                              {1019,1021},{1031,1033},{1039,1049},{1051,1061},
 70                              {1063,1069},{1087,1091},{1093,1097},{1103,1109},
 71                              {1117,1123},{1129,1151},{1153,1163},{1171,1181},
 72                              {1187,1193},{1201,1213},{1217,1223} } } },
 73        16022 },
 74  &(C){ { {"Mariken",  "Wie sidi dan?", 0},
 75          { ._length = 29, ._maximum = 29, ._release = false,
 76            ._buffer = (T[]){ {1229,1231},{1237,1249},{1259,1277},{1279,1283},
 77                              {1289,1291},{1297,1301},{1303,1307},{1319,1321},
 78                              {1327,1361},{1367,1373},{1381,1399},{1409,1423},
 79                              {1427,1429},{1433,1439},{1447,1451},{1453,1459},
 80                              {1471,1481},{1483,1487},{1489,1493},{1499,1511},
 81                              {1523,1531},{1543,1549},{1553,1559},{1567,1571},
 82                              {1579,1583},{1597,1601},{1607,1609},{1613,1619},
 83                              {1621,1627} } } },
 84        17880 },
 85  NULL
 86};
 87
 88static int32_t long_4 = 4;
 89static void *samples_M1_O[] = {
 90  &(M1_O){ .x = NULL },
 91  &(M1_O){ .x = &long_4 },
 92  NULL
 93};
 94
 95static void *samples_d[] = {
 96  &(D){ L"😀 Een Kruyck gaat soo langh te water tot datse barst.", 0x2206, 0 },
 97  &(D){ L"🙃 Men treckt een Boogh soo lang tot datse stucken knarst.", 0x2207, 0 },
 98  &(D){ L"😊 De Steel-kunst doet zyn Meester de dood vaak verwerven.", 0x22a5, 0 },
 99  NULL
100};
101
102static void *samples_e[] = {
103  &(E){ 0, {{0},{0}}, 456 },
104  &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){34,"aap","noot",56}, ._release = false}}, 456},
105  &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){78,"aap","zus",56}, ._release = false}}, 456},
106  &(E){ 0, {{0},{._length=1, ._maximum=1, ._buffer=&(U){78,"wim","zus",90}, ._release = false}}, 456},
107  NULL
108};
109
110static struct tpentry {
111  const char *name;
112  const dds_topic_descriptor_t *descr;
113  void **samples;
114  size_t count_offset;
115} tptab[] = {
116  { "A", &A_desc, samples_a, offsetof (A, count) },
117  { "B", &B_desc, samples_b, offsetof (B, a.count) },
118  { "C", &C_desc, samples_c, offsetof (C, b.a.count) },
119  { "M1::O", &M1_O_desc, samples_M1_O, SIZE_MAX },
120  { "D", &D_desc, samples_d, offsetof (D, count) },
121  { "E", &E_desc, samples_e, offsetof (E, a) },
122  { NULL, NULL, NULL, 0 }
123};
124
125static void usage (const char *argv0)
126{
127  fprintf (stderr, "usage: %s {", argv0);
128  const char *sep = "";
129  for (struct tpentry *tpentry = &tptab[0]; tpentry->name; tpentry++)
130  {
131    fprintf (stderr, "%s%s", sep, tpentry->name);
132    sep = "|";
133  }
134  fprintf (stderr, "}\n");
135  exit (2);
136}
137
138static volatile sig_atomic_t interrupted;
139
140static void sigint (int sig)
141{
142  (void) sig;
143  interrupted = 1;
144}
145
146int main (int argc, char **argv)
147{
148  if (argc != 2)
149    usage (argv[0]);
150  struct tpentry *tpentry;
151  for (tpentry = &tptab[0]; tpentry->name; tpentry++)
152    if (strcmp (tpentry->name, argv[1]) == 0)
153      break;
154  if (tpentry->name == NULL)
155    usage (argv[0]);
156
157  const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
158  if (participant < 0)
159  {
160    fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
161    return 1;
162  }
163
164  const dds_entity_t topic = dds_create_topic (participant, tpentry->descr, tpentry->name, NULL, NULL);
165  const dds_entity_t writer = dds_create_writer (participant, topic, NULL, NULL);
166  uint32_t sample_idx = 0;
167  uint32_t count = 0;
168  signal (SIGINT, sigint);
169  while (!interrupted)
170  {
171    dds_return_t ret = 0;
172    void *sample = tpentry->samples[sample_idx];
173    uint32_t * const countp =
174      (tpentry->count_offset != SIZE_MAX)
175      ? (uint32_t *) ((unsigned char *) sample + tpentry->count_offset)
176      : 0;
177    if (countp)
178      *countp = count++;
179    if ((ret = dds_write (writer, sample)) < 0)
180    {
181      fprintf (stderr, "dds_write: %s\n", dds_strretcode (ret));
182      dds_delete (participant);
183      return 1;
184    }
185    if (tpentry->samples[++sample_idx] == NULL)
186    {
187      sample_idx = 0;
188    }
189    dds_sleepfor (DDS_SECS (1));
190  }
191
192  dds_delete (participant);
193  return 0;
194}