listtopics

The listtopics example shows how to monitor topics that have been defined in the system.

Running the example

Discovering remote topics is only possible when topic discovery is enabled. To configure topic discovery set:

<Discovery>
  <EnableTopicDiscoveryEndpoints>true</EnableTopicDiscoveryEndpoints>
</Discovery>

Most applications create topics, which are always visible. The listtopics example creates no topics and therefore, if there is no discovery of remote topics, shows no output. To mitigate this, the listtopics example also monitors the discovery of other participants. If any show up but no topics are discovered, it prints a warning.

Note

Running two copies of the listtopics example (and nothing else) always triggers a warning.

Source code

listtopics.c
  1#include <stdio.h>
  2#include <string.h>
  3#include <stdlib.h>
  4#include <assert.h>
  5#include "dds/dds.h"
  6
  7/* Compile time constants representing the (DCPSPublication and DCPSSubscription) built-in
  8   topics that used for monitoring whether we should have discovered some topics on the
  9   DCPSTopic built-in topic. */
 10static const dds_entity_t ep_topics[] = {
 11  DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
 12  DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION
 13};
 14
 15struct keystr {
 16  char v[36]; /* (8 hex digits) : (8 h d) : (8 h d) : (8 h d) \0 */
 17};
 18
 19static char *keystr (struct keystr *gs, const dds_builtintopic_topic_key_t *g)
 20{
 21  (void) snprintf (gs->v, sizeof (gs->v),
 22                   "%02x%02x%02x%02x:%02x%02x%02x%02x:%02x%02x%02x%02x:%02x%02x%02x%02x",
 23                   g->d[0], g->d[1], g->d[2], g->d[3], g->d[4], g->d[5], g->d[6], g->d[7],
 24                   g->d[8], g->d[9], g->d[10], g->d[11], g->d[12], g->d[13], g->d[14], g->d[15]);
 25  return gs->v;
 26}
 27
 28static const char *instance_state_str (dds_instance_state_t s)
 29{
 30  switch (s)
 31  {
 32    case DDS_ALIVE_INSTANCE_STATE: return "alive";
 33    case DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE: return "nowriters";
 34    case DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE: return "disposed";
 35  }
 36  assert (0);
 37  return "";
 38}
 39
 40static bool process_topic (dds_entity_t readcond)
 41{
 42#define MAXCOUNT 10
 43  /* dds_take allocates memory for the data if samples[0] is a null pointer, and reuses
 44     it otherwise, so it must be initialized properly.  The "10" is arbitrary. */
 45  void *samples[MAXCOUNT] = { NULL };
 46  dds_sample_info_t infos[MAXCOUNT];
 47  samples[0] = NULL;
 48  /* Using the condition that was attached to the waitset means one never accidentally
 49     filters out some samples that the waitset triggers on.  Because waitsets are
 50     level-triggered, that would result in a spinning thread. */
 51  int32_t n = dds_take (readcond, samples, infos, MAXCOUNT, MAXCOUNT);
 52  bool topics_seen = false;
 53  for (int32_t i = 0; i < n; i++)
 54  {
 55    dds_builtintopic_topic_t const * const sample = samples[i];
 56    struct keystr gs;
 57    printf ("%s: %s", instance_state_str (infos[i].instance_state), keystr (&gs, &sample->key));
 58    if (infos[i].valid_data)
 59    {
 60      printf (" %s %s", sample->topic_name, sample->type_name);
 61      if (strncmp (sample->topic_name, "DCPS", 4) != 0)
 62      {
 63        /* Topic names starting with DCPS are guaranteed to be built-in topics, so we
 64           have discovered an application topic if the name doesn't start with DCPS */
 65        topics_seen = true;
 66      }
 67    }
 68    printf ("\n");
 69  }
 70  /* Release memory allocated by dds_take */
 71  (void) dds_return_loan (readcond, samples, n);
 72#undef MAXCOUNT
 73  return topics_seen;
 74}
 75
 76static bool process_pubsub (dds_entity_t ep_readconds[])
 77{
 78  bool endpoints_exist = false;
 79  for (size_t k = 0; k < sizeof (ep_topics) / sizeof (ep_topics[0]) && !endpoints_exist; k++)
 80  {
 81    /* Reuse samples/infos arrays when checking for readers/writers, using a single sample
 82       is just as arbitrary as using MAXCOUNT samples in process_topic */
 83    void *sampleptr = NULL;
 84    dds_sample_info_t info;
 85    int32_t n = dds_take (ep_readconds[k], &sampleptr, &info, 1, 1);
 86    if (n > 0)
 87    {
 88      dds_builtintopic_endpoint_t const * const sample = sampleptr;
 89      if (info.valid_data && strncmp (sample->topic_name, "DCPS", 4) != 0)
 90        endpoints_exist = true;
 91    }
 92    (void) dds_return_loan (ep_readconds[k], &sampleptr, n);
 93  }
 94  return endpoints_exist;
 95}
 96
 97int main (int argc, char **argv)
 98{
 99  (void)argc;
100  (void)argv;
101
102  const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
103  if (participant < 0)
104  {
105    fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
106    return 1;
107  }
108
109  const dds_entity_t waitset = dds_create_waitset (participant);
110
111  /* Create a reader for the DCPSTopic built-in topic.  The built-in topics are identified
112     using compile-time constants rather than ordinary, dynamically allocated handles.  In
113     Cyclone's C API, any QoS settings left unspecified in the reader and writer QoS are
114     inherited from the topic QoS, and so specifying no QoS object results in a
115     transient-local, reliable reader. */
116  const dds_entity_t reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSTOPIC, NULL, NULL);
117  if (reader < 0)
118  {
119    if (reader == DDS_RETCODE_UNSUPPORTED)
120      fprintf (stderr, "Topic discovery is not included in the build, rebuild with ENABLE_TOPIC_DISCOVERY=ON\n");
121    else
122      fprintf (stderr, "dds_create_reader(DCPSTopic): %s\n", dds_strretcode (reader));
123    dds_delete (participant);
124    return 1;
125  }
126
127  /* Create a read condition and attach it to the waitset.  Using a read condition for
128     ANY_STATE is almost, but not quite, equivalent to setting the status mask to
129     DATA_AVAILABLE and attaching the reader directly: the read condition remains in a
130     triggered state until the reader history cache no longer contains any matching
131     samples, but the DATA_AVAILABLE state is reset on a call to read/take and only raised
132     again on the receipt of the next sample.  Reading only a limited number of samples
133     every time DATA_AVAILABLE triggers therefore risks never reading some samples. */
134  const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
135  (void) dds_waitset_attach (waitset, readcond, 0);
136
137  /* Also create readers for the DCPSPublication and DCPSSubscription topics so we can
138     warn if topic discovery is most likely disabled in the configuration. */
139  dds_entity_t ep_readers[sizeof (ep_topics) / sizeof (ep_topics[0])];
140  dds_entity_t ep_readconds[sizeof (ep_topics) / sizeof (ep_topics[0])];
141  for (size_t k = 0; k < sizeof (ep_topics) / sizeof (ep_topics[0]); k++)
142  {
143    ep_readers[k] = dds_create_reader (participant, ep_topics[k], NULL, NULL);
144    ep_readconds[k] = dds_create_readcondition (ep_readers[k], DDS_ANY_STATE);
145    (void) dds_waitset_attach (waitset, ep_readconds[k], 0);
146  }
147
148  /* Keep track of whether (non-built-in) topics were discovered and of whether
149     (non-built-in) endpoints were for generating a warning that the configuration likely
150     has topic discovery disabled if only endpoints got discovered. */
151  bool topics_seen = false;
152  bool endpoints_exist = false;
153
154  /* Monitor topic creation/deletion for 10s.  There is no risk of spurious wakeups in a
155     simple case like this and so a timeout from wait_until really means that tstop has
156     passed. */
157  const dds_time_t tstop = dds_time () + DDS_SECS (10);
158  while (dds_waitset_wait_until (waitset, NULL, 0, tstop) > 0)
159  {
160    if (process_topic (readcond))
161      topics_seen = true;
162
163    /* No point in looking for other readers/writers once we know some exist */
164    if (!endpoints_exist && process_pubsub (ep_readconds))
165    {
166      endpoints_exist = true;
167      /* The readers used for monitoring the existence of readers/writers are no longer
168         useful once some eps have been seen.  Deleting them will also detach the
169         read conditions from the waitset and delete them. */
170      for (size_t k = 0; k < sizeof (ep_readers) / sizeof (ep_readers[0]); k++)
171        (void) dds_delete (ep_readers[k]);
172    }
173  }
174  if (!topics_seen && endpoints_exist)
175  {
176    fprintf (stderr, "No topics discovered but remote readers/writers exist. Is topic discovery enabled in the configuration?\n");
177  }
178
179  dds_delete (participant);
180  return 0;
181}