listtopics¶
The listtopics example shows how to monitor topics that have been defined in the system.
Running the example¶
Discovering remote topics is only possible when topic discovery is enabled. To configure topic discovery set:
<Discovery>
<EnableTopicDiscoveryEndpoints>true</EnableTopicDiscoveryEndpoints>
</Discovery>
Most applications create topics, which are always visible. The listtopics example creates no topics and therefore, if there is no discovery of remote topics, shows no output. To mitigate this, the listtopics example also monitors the discovery of other participants. If any show up but no topics are discovered, it prints a warning.
Note
Running two copies of the listtopics example (and nothing else) always triggers a warning.
Source code¶
1#include <stdio.h>
2#include <string.h>
3#include <stdlib.h>
4#include <assert.h>
5#include "dds/dds.h"
6
7/* Compile time constants representing the (DCPSPublication and DCPSSubscription) built-in
8 topics that used for monitoring whether we should have discovered some topics on the
9 DCPSTopic built-in topic. */
10static const dds_entity_t ep_topics[] = {
11 DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
12 DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION
13};
14
15struct keystr {
16 char v[36]; /* (8 hex digits) : (8 h d) : (8 h d) : (8 h d) \0 */
17};
18
19static char *keystr (struct keystr *gs, const dds_builtintopic_topic_key_t *g)
20{
21 (void) snprintf (gs->v, sizeof (gs->v),
22 "%02x%02x%02x%02x:%02x%02x%02x%02x:%02x%02x%02x%02x:%02x%02x%02x%02x",
23 g->d[0], g->d[1], g->d[2], g->d[3], g->d[4], g->d[5], g->d[6], g->d[7],
24 g->d[8], g->d[9], g->d[10], g->d[11], g->d[12], g->d[13], g->d[14], g->d[15]);
25 return gs->v;
26}
27
28static const char *instance_state_str (dds_instance_state_t s)
29{
30 switch (s)
31 {
32 case DDS_ALIVE_INSTANCE_STATE: return "alive";
33 case DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE: return "nowriters";
34 case DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE: return "disposed";
35 }
36 assert (0);
37 return "";
38}
39
40static bool process_topic (dds_entity_t readcond)
41{
42#define MAXCOUNT 10
43 /* dds_take allocates memory for the data if samples[0] is a null pointer, and reuses
44 it otherwise, so it must be initialized properly. The "10" is arbitrary. */
45 void *samples[MAXCOUNT] = { NULL };
46 dds_sample_info_t infos[MAXCOUNT];
47 samples[0] = NULL;
48 /* Using the condition that was attached to the waitset means one never accidentally
49 filters out some samples that the waitset triggers on. Because waitsets are
50 level-triggered, that would result in a spinning thread. */
51 int32_t n = dds_take (readcond, samples, infos, MAXCOUNT, MAXCOUNT);
52 bool topics_seen = false;
53 for (int32_t i = 0; i < n; i++)
54 {
55 dds_builtintopic_topic_t const * const sample = samples[i];
56 struct keystr gs;
57 printf ("%s: %s", instance_state_str (infos[i].instance_state), keystr (&gs, &sample->key));
58 if (infos[i].valid_data)
59 {
60 printf (" %s %s", sample->topic_name, sample->type_name);
61 if (strncmp (sample->topic_name, "DCPS", 4) != 0)
62 {
63 /* Topic names starting with DCPS are guaranteed to be built-in topics, so we
64 have discovered an application topic if the name doesn't start with DCPS */
65 topics_seen = true;
66 }
67 }
68 printf ("\n");
69 }
70 /* Release memory allocated by dds_take */
71 (void) dds_return_loan (readcond, samples, n);
72#undef MAXCOUNT
73 return topics_seen;
74}
75
76static bool process_pubsub (dds_entity_t ep_readconds[])
77{
78 bool endpoints_exist = false;
79 for (size_t k = 0; k < sizeof (ep_topics) / sizeof (ep_topics[0]) && !endpoints_exist; k++)
80 {
81 /* Reuse samples/infos arrays when checking for readers/writers, using a single sample
82 is just as arbitrary as using MAXCOUNT samples in process_topic */
83 void *sampleptr = NULL;
84 dds_sample_info_t info;
85 int32_t n = dds_take (ep_readconds[k], &sampleptr, &info, 1, 1);
86 if (n > 0)
87 {
88 dds_builtintopic_endpoint_t const * const sample = sampleptr;
89 if (info.valid_data && strncmp (sample->topic_name, "DCPS", 4) != 0)
90 endpoints_exist = true;
91 }
92 (void) dds_return_loan (ep_readconds[k], &sampleptr, n);
93 }
94 return endpoints_exist;
95}
96
97int main (int argc, char **argv)
98{
99 (void)argc;
100 (void)argv;
101
102 const dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
103 if (participant < 0)
104 {
105 fprintf (stderr, "dds_create_participant: %s\n", dds_strretcode (participant));
106 return 1;
107 }
108
109 const dds_entity_t waitset = dds_create_waitset (participant);
110
111 /* Create a reader for the DCPSTopic built-in topic. The built-in topics are identified
112 using compile-time constants rather than ordinary, dynamically allocated handles. In
113 Cyclone's C API, any QoS settings left unspecified in the reader and writer QoS are
114 inherited from the topic QoS, and so specifying no QoS object results in a
115 transient-local, reliable reader. */
116 const dds_entity_t reader = dds_create_reader (participant, DDS_BUILTIN_TOPIC_DCPSTOPIC, NULL, NULL);
117 if (reader < 0)
118 {
119 if (reader == DDS_RETCODE_UNSUPPORTED)
120 fprintf (stderr, "Topic discovery is not included in the build, rebuild with ENABLE_TOPIC_DISCOVERY=ON\n");
121 else
122 fprintf (stderr, "dds_create_reader(DCPSTopic): %s\n", dds_strretcode (reader));
123 dds_delete (participant);
124 return 1;
125 }
126
127 /* Create a read condition and attach it to the waitset. Using a read condition for
128 ANY_STATE is almost, but not quite, equivalent to setting the status mask to
129 DATA_AVAILABLE and attaching the reader directly: the read condition remains in a
130 triggered state until the reader history cache no longer contains any matching
131 samples, but the DATA_AVAILABLE state is reset on a call to read/take and only raised
132 again on the receipt of the next sample. Reading only a limited number of samples
133 every time DATA_AVAILABLE triggers therefore risks never reading some samples. */
134 const dds_entity_t readcond = dds_create_readcondition (reader, DDS_ANY_STATE);
135 (void) dds_waitset_attach (waitset, readcond, 0);
136
137 /* Also create readers for the DCPSPublication and DCPSSubscription topics so we can
138 warn if topic discovery is most likely disabled in the configuration. */
139 dds_entity_t ep_readers[sizeof (ep_topics) / sizeof (ep_topics[0])];
140 dds_entity_t ep_readconds[sizeof (ep_topics) / sizeof (ep_topics[0])];
141 for (size_t k = 0; k < sizeof (ep_topics) / sizeof (ep_topics[0]); k++)
142 {
143 ep_readers[k] = dds_create_reader (participant, ep_topics[k], NULL, NULL);
144 ep_readconds[k] = dds_create_readcondition (ep_readers[k], DDS_ANY_STATE);
145 (void) dds_waitset_attach (waitset, ep_readconds[k], 0);
146 }
147
148 /* Keep track of whether (non-built-in) topics were discovered and of whether
149 (non-built-in) endpoints were for generating a warning that the configuration likely
150 has topic discovery disabled if only endpoints got discovered. */
151 bool topics_seen = false;
152 bool endpoints_exist = false;
153
154 /* Monitor topic creation/deletion for 10s. There is no risk of spurious wakeups in a
155 simple case like this and so a timeout from wait_until really means that tstop has
156 passed. */
157 const dds_time_t tstop = dds_time () + DDS_SECS (10);
158 while (dds_waitset_wait_until (waitset, NULL, 0, tstop) > 0)
159 {
160 if (process_topic (readcond))
161 topics_seen = true;
162
163 /* No point in looking for other readers/writers once we know some exist */
164 if (!endpoints_exist && process_pubsub (ep_readconds))
165 {
166 endpoints_exist = true;
167 /* The readers used for monitoring the existence of readers/writers are no longer
168 useful once some eps have been seen. Deleting them will also detach the
169 read conditions from the waitset and delete them. */
170 for (size_t k = 0; k < sizeof (ep_readers) / sizeof (ep_readers[0]); k++)
171 (void) dds_delete (ep_readers[k]);
172 }
173 }
174 if (!topics_seen && endpoints_exist)
175 {
176 fprintf (stderr, "No topics discovered but remote readers/writers exist. Is topic discovery enabled in the configuration?\n");
177 }
178
179 dds_delete (participant);
180 return 0;
181}