dynsub

The dynsub example is a PoC for a C-based JSON printer of arbitrary data. It assumes that topic discovery is enabled, but doesn’t require it.

Running the example

Pass the name of a topic to dynsub and it waits for a writer of that topic to show up. When it finds one in the DCPSPublication topic, it tries to subscribe and print the received samples as JSON.

For example: Start the HelloworldPublisher in one shell:

# bin/HelloworldPublisher
=== [Publisher]  Waiting for a reader to be discovered ...

In another shell start dynsub:

# bin/dynsub HelloWorldData_Msg
{"userID":1,"message":"Hello World"}
{"userID":1}

The second line is the “invalid sample” generated because of the termination of the publisher. In Cyclone DDS, only the key fields are valid, and therefore printed.

Instead of the HelloWorldData_Msg, the small publisher program “variouspub” can publish a number of different types. Pass it the name of the type to publish. For example:

# bin/variouspub B

This publishes samples at 1Hz until killed.

Source code

dynsub.c
  1// Copyright(c) 2022 to 2023 ZettaScale Technology and others
  2//
  3// This program and the accompanying materials are made available under the
  4// terms of the Eclipse Public License v. 2.0 which is available at
  5// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
  6// v. 1.0 which is available at
  7// http://www.eclipse.org/org/documents/edl-v10.php.
  8//
  9// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
 10
 11#include <stdio.h>
 12#include <string.h>
 13#include <stdlib.h>
 14#include <assert.h>
 15
 16#include "dds/dds.h"
 17#include "dynsub.h"
 18
 19// Interpreting the data of an arbitrary topic requires interpreting the type object that describes the data.
 20// The type object type is defined by the XTypes specification (https://www.omg.org/spec/DDS-XTypes/) and it
 21// comes in two forms: MinimalTypeObject and CompleteTypeObject.  Only the latter includes field names, so
 22// that's what need.
 23//
 24// Cyclone DDS includes a copy of the IDL as well as the corresponding type definitions in C as generated by
 25// IDLC.  So instead of including yet another copy, we simply refer to those.  These files are not (yet?)
 26// part of the stable API of Cyclone DDS and so the updates to Cyclone may change the locations or the names
 27// of the relevant header files.
 28//
 29// The API uses `dds_typeobj_t` and `dds_typeinfo_t` that are opaque types but really amount to the
 30// corresponding XTypes objects: DDS_XTypes_TypeObject and DDS_XTypes_TypeInformation.  Rather than casting
 31// pointers like we do here, they should be defined in a slightly different way so that they are not really
 32// opaque.  For now, this'll have to do.
 33#include "dds/ddsi/ddsi_xt_typeinfo.h"
 34
 35// For convenience, the DDS participant is global
 36static dds_entity_t participant;
 37
 38
 39// Helper function to wait for a DCPSPublication/DCPSSubscription to show up with the desired topic name,
 40// then calls dds_find_topic to create a topic for that data writer's/reader's type up the retrieves the
 41// type object.
 42static dds_return_t get_topic_and_typeobj (const char *topic_name, dds_duration_t timeout, dds_entity_t *topic, DDS_XTypes_TypeObject **xtypeobj)
 43{
 44  const dds_entity_t waitset = dds_create_waitset (participant);
 45  const dds_entity_t dcpspublication_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL);
 46  const dds_entity_t dcpspublication_readcond = dds_create_readcondition (dcpspublication_reader, DDS_ANY_STATE);
 47  const dds_entity_t dcpssubscription_reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL);
 48  const dds_entity_t dcpssubscription_readcond = dds_create_readcondition (dcpssubscription_reader, DDS_ANY_STATE);
 49  (void) dds_waitset_attach (waitset, dcpspublication_readcond, dcpspublication_reader);
 50  (void) dds_waitset_attach (waitset, dcpssubscription_readcond, dcpssubscription_reader);
 51  const dds_time_t abstimeout = (timeout == DDS_INFINITY) ? DDS_NEVER : dds_time () + timeout;
 52  dds_return_t ret = DDS_RETCODE_OK;
 53  *xtypeobj = NULL;
 54  struct ppc ppc;
 55  ppc_init (&ppc);
 56  dds_attach_t triggered_reader_x;
 57  while (*xtypeobj == NULL && dds_waitset_wait_until (waitset, &triggered_reader_x, 1, abstimeout) > 0)
 58  {
 59    void *epraw = NULL;
 60    dds_sample_info_t si;
 61    dds_entity_t triggered_reader = (dds_entity_t) triggered_reader_x;
 62    if (dds_take (triggered_reader, &epraw, &si, 1, 1) <= 0)
 63      continue;
 64    dds_builtintopic_endpoint_t *ep = epraw;
 65    const dds_typeinfo_t *typeinfo = NULL;
 66    // We are only interested in DCPSPublications where the topic name matches and that carry type information
 67    // (a non-XTypes capable DDS would not provide type information) because without that information there is
 68    // no way we can do anything interesting with it.
 69    if (strcmp (ep->topic_name, topic_name) == 0 && dds_builtintopic_get_endpoint_type_info (ep, &typeinfo) == 0 && typeinfo)
 70    {
 71      // Using dds_find_topic allows us to "clone" the topic definition including the topic QoS, but it does
 72      // require that topic discovery is enabled in the configuration.  The advantage of using dds_find_topic
 73      // is that it creates a topic with the same name, type *and QoS*.  That distinction only matters if
 74      // topic is discovery is enabled and/or if the topic has a durability kind of of transient or persistent:
 75      // - using a different topic QoS might result in an incompatible QoS notification if topic discovery is
 76      //   enabled (everything would still work).
 77      // - transient/persistent data behaviour is defined in terms of the topic QoS actually really matters
 78      //
 79      // So we try to use dds_find_topic, and if that fails, try to go the other route using the writer's QoS
 80      // as an approximation of the topic QoS.
 81      if ((*topic = dds_find_topic (DDS_FIND_SCOPE_GLOBAL, participant, ep->topic_name, typeinfo, DDS_SECS (2))) < 0)
 82      {
 83        fprintf (stderr, "dds_find_topic: %s ... continuing on the assumption that topic discovery is disabled\n", dds_strretcode (*topic));
 84        dds_topic_descriptor_t *descriptor;
 85        if ((ret = dds_create_topic_descriptor(DDS_FIND_SCOPE_GLOBAL, participant, typeinfo, DDS_SECS (10), &descriptor)) < 0)
 86        {
 87          fprintf (stderr, "dds_create_topic_descriptor: %s\n", dds_strretcode (ret));
 88          dds_return_loan (triggered_reader, &epraw, 1);
 89          goto error;
 90        }
 91        if ((*topic = dds_create_topic (participant, descriptor, ep->topic_name, ep->qos, NULL)) < 0)
 92        {
 93          fprintf (stderr, "dds_create_topic_descriptor: %s (be sure to enable topic discovery in the configuration)\n", dds_strretcode (*topic));
 94          dds_delete_topic_descriptor (descriptor);
 95          dds_return_loan (triggered_reader, &epraw, 1);
 96          goto error;
 97        }
 98        dds_delete_topic_descriptor (descriptor);
 99      }
100      // The topic suffices for creating a reader, but we also need the TypeObject to make sense of the data
101      if ((*xtypeobj = load_type_with_deps (participant, typeinfo, &ppc)) == NULL)
102      {
103        fprintf (stderr, "loading type with all dependencies failed\n");
104        dds_return_loan (triggered_reader, &epraw, 1);
105        goto error;
106      }
107      if (load_type_with_deps_min (participant, typeinfo, &ppc) == NULL)
108      {
109        fprintf (stderr, "loading minimal type with all dependencies failed\n");
110        dds_return_loan (triggered_reader, &epraw, 1);
111        goto error;
112      }
113    }
114    dds_return_loan (triggered_reader, &epraw, 1);
115  }
116  if (*xtypeobj)
117  {
118    // If we got the type object, populate the type cache
119    size_t align, size;
120    build_typecache_to (&(*xtypeobj)->_u.complete, &align, &size);
121    fflush (stdout);
122    struct typeinfo templ = { .key = { .key = (uintptr_t) *xtypeobj } } , *info;
123    if ((info = type_cache_lookup (&templ)) != NULL)
124    {
125      assert (info->release == NULL);
126      info->release = *xtypeobj;
127    }
128    else
129    {
130      // not sure whether this is at all possible
131      info = malloc (sizeof (*info));
132      assert (info);
133      *info = (struct typeinfo){ .key = { .key = (uintptr_t) *xtypeobj }, .typeobj = &(*xtypeobj)->_u.complete, .release = *xtypeobj, .align = align, .size = size };
134      type_cache_add (info);
135    }
136  }
137error:
138  dds_delete (dcpspublication_reader);
139  dds_delete (dcpssubscription_reader);
140  dds_delete (waitset);
141  return (*xtypeobj != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
142}
143
144int main (int argc, char **argv)
145{
146  dds_return_t ret = 0;
147  dds_entity_t topic = 0;
148
149  if (argc != 2)
150  {
151    fprintf (stderr, "usage: %s topicname\n", argv[0]);
152    return 2;
153  }
154
155  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
156  if (participant < 0)
157  {
158    fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
159    return 1;
160  }
161
162  // The one magic step: get a topic and type object ...
163  DDS_XTypes_TypeObject *xtypeobj;
164  type_cache_init ();
165  if ((ret = get_topic_and_typeobj (argv[1], DDS_SECS (10), &topic, &xtypeobj)) < 0)
166  {
167    fprintf (stderr, "get_topic_and_typeobj: %s\n", dds_strretcode (ret));
168    goto error;
169  }
170  // ... given those, we can create a reader just like we do normally ...
171  const dds_entity_t reader = dds_create_reader (participant, topic, NULL, NULL);
172  // ... and create a waitset that allows us to wait for any incoming data ...
173  const dds_entity_t waitset = dds_create_waitset (participant);
174  const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
175  (void) dds_waitset_attach (waitset, readcond, 0);
176  while (1)
177  {
178    (void) dds_waitset_wait (waitset, NULL, 0, DDS_INFINITY);
179    void *raw = NULL;
180    dds_sample_info_t si;
181    if ((ret = dds_take (reader, &raw, &si, 1, 1)) < 0)
182      goto error;
183    else if (ret != 0)
184    {
185      // ... that we then print
186      print_sample (si.valid_data, raw, &xtypeobj->_u.complete);
187      if ((ret = dds_return_loan (reader, &raw, 1)) < 0)
188        goto error;
189    }
190  }
191
192 error:
193  type_cache_free ();
194  dds_delete (participant);
195  return ret < 0;
196}
variouspub_types.idl
 1struct A {
 2  @key
 3  string name;
 4  string message;
 5  unsigned long count;
 6};
 7
 8struct T {
 9  short s;
10  long l;
11};
12
13struct B {
14  A a;
15  sequence<T> ts;
16};
17
18struct C {
19  B b;
20  @key
21  short k;
22};
23
24module M1 {
25  @appendable
26  struct O {
27    @optional long x;
28  };
29};
variouspub.c
  1#include <stdio.h>
  2#include <string.h>
  3#include <stdlib.h>
  4#include <signal.h>
  5#include <assert.h>
  6
  7#include "dds/dds.h"
  8#include "variouspub_types.h"
  9
 10static void *samples_a[] = {
 11  &(A){ "Mariken",   "Wie sidi, vrient?", 0 },
 12  &(A){ "Die duvel", "Een meester vol consten,", 0 },
 13  &(A){ "Die duvel", "Nieuwers af falende, wes ic besta.", 0 },
 14  &(A){ "Mariken",   "'t Comt mi alleleens met wien dat ick ga,", 0 },
 15  &(A){ "Mariken",   "Also lief gae ic metten quaetsten als metten besten.", 0 },
 16  NULL
 17};
 18
 19static void *samples_b[] = {
 20  &(B){ {"Die duvel", "Wildi u liefde te mi werts vesten,", 0},
 21        { ._length = 2, ._maximum = 2, ._release = false,
 22          ._buffer = (T[]){ {2,3},{5,7} } } },
 23  &(B){ {"Die duvel", "Ick sal u consten leeren sonder ghelijcke,", 0},
 24        { ._length = 3, ._maximum = 3, ._release = false,
 25          ._buffer = (T[]){ {11,13},{17,19},{23,29} } } },
 26  &(B){ {"Die duvel", "Die seven vrie consten: rethorijcke, musijcke,", 0},
 27        { ._length = 5, ._maximum = 5, ._release = false,
 28          ._buffer = (T[]){ {31,37},{41,43},{47,52},{59,61},{67,71} } } },
 29  &(B){ {"Die duvel", "Logica, gramatica ende geometrie,", 0},
 30        { ._length = 7, ._maximum = 7, ._release = false,
 31          ._buffer = (T[]){ {73,79},{83,89},{97,101},{103,107},{109,113},
 32                            {127,131},{137,139} } } },
 33  &(B){ {"Die duvel", "Aristmatica ende alkenie,", 0},
 34        { ._length = 11, ._maximum = 11, ._release = false,
 35          ._buffer = (T[]){ {149,151},{157,163},{167,173},{179,181},
 36                            {191,193},{197, 199},{211,223},{227,229},
 37                            {233,239},{241,251},{257,263} } } },
 38  NULL
 39};
 40
 41static void *samples_c[] = {
 42  &(C){ { {"Die duvel", "Dwelc al consten sijn seer curable.", 0},
 43          { ._length = 13, ._maximum = 13, ._release = false,
 44            ._buffer = (T[]){ {269,271},{277,281},{283,293},{307,311},
 45                              {313,317},{331,337},{347,349},{353,359},
 46                              {367,373},{379,383},{389,397},{401,409},
 47                              {419,421} } } },
 48        8936 },
 49  &(C){ { {"Die duvel", "Noyt vrouwe en leefde op eerde so able", 0},
 50          { ._length = 17, ._maximum = 17, ._release = false,
 51            ._buffer = (T[]){ {431,433},{439,443},{449,457},{461,463},
 52                              {467,479},{487,491},{499,503},{509,521},
 53                              {523,541},{547,557},{563,569},{571,577},
 54                              {587,593},{599,601},{607,613},{617,619},
 55                              {631,641} } } },
 56        18088 },
 57  &(C){ { {"Die duvel", "Als ic u maken sal.", 0},
 58          { ._length = 19, ._maximum = 19, ._release = false,
 59            ._buffer = (T[]){ {643,647},{653,659},{661,673},{677,683},
 60                              {691,701},{709,719},{727,733},{739,743},
 61                              {751,757},{761,769},{773,787},{797,809},
 62                              {811,821},{823,827},{829,839},{853,857},
 63                              {859,863},{877,881},{883,887} } } },
 64        29172 },
 65  &(C){ { {"Mariken",  "So moetti wel zijn een constich man.", 0},
 66          { ._length = 23, ._maximum = 23, ._release = false,
 67            ._buffer = (T[]){ {907,911},{919,929},{937,941},{947,953},
 68                              {967,971},{977,983},{991,997},{1009,1013},
 69                              {1019,1021},{1031,1033},{1039,1049},{1051,1061},
 70                              {1063,1069},{1087,1091},{1093,1097},{1103,1109},
 71                              {1117,1123},{1129,1151},{1153,1163},{1171,1181},
 72                              {1187,1193},{1201,1213},{1217,1223} } } },
 73        16022 },
 74  &(C){ { {"Mariken",  "Wie sidi dan?", 0},
 75          { ._length = 29, ._maximum = 29, ._release = false,
 76            ._buffer = (T[]){ {1229,1231},{1237,1249},{1259,1277},{1279,1283},
 77                              {1289,1291},{1297,1301},{1303,1307},{1319,1321},
 78                              {1327,1361},{1367,1373},{1381,1399},{1409,1423},
 79                              {1427,1429},{1433,1439},{1447,1451},{1453,1459},
 80                              {1471,1481},{1483,1487},{1489,1493},{1499,1511},
 81                              {1523,1531},{1543,1549},{1553,1559},{1567,1571},
 82                              {1579,1583},{1597,1601},{1607,1609},{1613,1619},
 83                              {1621,1627} } } },
 84        17880 },
 85  NULL
 86};
 87
 88static int32_t long_4 = 4;
 89static void *samples_M1_O[] = {
 90  &(M1_O){ .x = NULL },
 91  &(M1_O){ .x = &long_4 },
 92  NULL
 93};
 94
 95static struct tpentry {
 96  const char *name;
 97  const dds_topic_descriptor_t *descr;
 98  void **samples;
 99  size_t count_offset;
100} tptab[] = {
101  { "A", &A_desc, samples_a, offsetof (A, count) },
102  { "B", &B_desc, samples_b, offsetof (B, a.count) },
103  { "C", &C_desc, samples_c, offsetof (C, b.a.count) },
104  { "M1::O", &M1_O_desc, samples_M1_O, SIZE_MAX },
105  { NULL, NULL, NULL, 0 }
106};
107
108static void usage (const char *argv0)
109{
110  fprintf (stderr, "usage: %s {", argv0);
111  const char *sep = "";
112  for (struct tpentry *tpentry = &tptab[0]; tpentry->name; tpentry++)
113  {
114    fprintf (stderr, "%s%s", sep, tpentry->name);
115    sep = "|";
116  }
117  fprintf (stderr, "}\n");
118  exit (2);
119}
120
121static volatile sig_atomic_t interrupted;
122
123static void sigint (int sig)
124{
125  (void) sig;
126  interrupted = 1;
127}
128
129int main (int argc, char **argv)
130{
131  if (argc != 2)
132    usage (argv[0]);
133  struct tpentry *tpentry;
134  for (tpentry = &tptab[0]; tpentry->name; tpentry++)
135    if (strcmp (tpentry->name, argv[1]) == 0)
136      break;
137  if (tpentry->name == NULL)
138    usage (argv[0]);
139
140  const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
141  if (participant < 0)
142  {
143    fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
144    return 1;
145  }
146
147  const dds_entity_t topic = dds_create_topic (participant, tpentry->descr, tpentry->name, NULL, NULL);
148  const dds_entity_t writer = dds_create_writer (participant, topic, NULL, NULL);
149  uint32_t sample_idx = 0;
150  uint32_t count = 0;
151  signal (SIGINT, sigint);
152  while (!interrupted)
153  {
154    dds_return_t ret = 0;
155    void *sample = tpentry->samples[sample_idx];
156    uint32_t * const countp =
157      (tpentry->count_offset != SIZE_MAX)
158      ? (uint32_t *) ((unsigned char *) sample + tpentry->count_offset)
159      : 0;
160    if (countp)
161      *countp = count++;
162    if ((ret = dds_write (writer, sample)) < 0)
163    {
164      fprintf (stderr, "dds_write: %s\n", dds_strretcode (ret));
165      dds_delete (participant);
166      return 1;
167    }
168    if (tpentry->samples[++sample_idx] == NULL)
169    {
170      sample_idx = 0;
171    }
172    dds_sleepfor (DDS_SECS (1));
173  }
174
175  dds_delete (participant);
176  return 0;
177}