Throughput¶
The Throughput example allows the measurement of data throughput when receiving samples from a publisher.
Design¶
The Throughput example consists of two units:
Publisher: Sends samples at a specified size and rate.
Subscriber: Receives samples and outputs statistics about throughput
Scenario¶
The publisher sends the samples. You can specify a payload size in bytes, and whether to send data in bursts. Unless a time-out is specified, the publisher sends data forever.
Configuration:
payloadSize
: The size of the payload in bytes.burstInterval
: The time interval between each burst in ms.burstSize
: The number of samples to send each burst.timeOut
: The number of seconds the publisher should run for (0=infinite).partitionName
: The name of the partition.
The subscriber receives the data and outputs:
The total amount of data received.
The data-rate in bytes-per-second.
Whether any samples were received out-of-order.
You can specify the maximum number of cycles. When the maximum is reached, the subscriber terminates and outputs the totals and averages.
The subscriber executable measures:
transferred
: the total amount of data transferred in bytes.outOfOrder
: the number of samples that were received out of order.The transfer rate: the data transfer rate in bytes per second.
The subscriber also calculates statistics on these values over a configurable number of cycles.
Configuration:
maxCycles
: The number of times to output statistics before terminating.pollingDelay
partitionName
: The name of the partition.
Running the example¶
To avoid mixing the output, run the publisher and subscriber in separate terminals.
Open 2 terminals.
In the first terminal start publisher by running ThroughputPublisher.
Publisher usage (parameters must be supplied in order):
./ThroughputPublisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]
Defaults:
./ThroughputPublisher 8192 0 1 0 "Throughput example"
In the second terminal start subscriber by running ThroughputSubscriber.
Subscriber usage (parameters must be supplied in order):
./ThroughputSubscriber [maxCycles (0=infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]
Defaults:
./ThroughputSubscriber 0 0 "Throughput example"
To achieve optimal performance, set the CPU affinity so that the publisher and subscriber run on separate CPU cores and use real-time scheduling:
Publisher usage: ``taskset -c 0 chrt -f 80 ./ThroughputSubscriber [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]`` Subscriber usage: ``taskset -c 1 chrt -f 80 ./ThroughputSubscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``Publisher usage: ``START /affinity 1 /high cmd /k "ThroughputSubscriber.exe" [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]`` Subscriber usage: ``START /affinity 2 /high cmd /k "ThroughputSubscriber.exe" [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
Source code¶
1module ThroughputModule
2{
3 @final
4 struct DataType
5 {
6 unsigned long long count;
7 sequence<octet> payload;
8 };
9};
1#include "dds/dds.h"
2#include "Throughput.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <signal.h>
7
8/*
9 * The Throughput example measures data throughput in bytes per second. The publisher
10 * allows you to specify a payload size in bytes as well as allowing you to specify
11 * whether to send data in bursts. The publisher will continue to send data forever
12 * unless a time out is specified. The subscriber will receive data and output the
13 * total amount received and the data rate in bytes per second. It will also indicate
14 * if any samples were received out of order. A maximum number of cycles can be
15 * specified and once this has been reached the subscriber will terminate and output
16 * totals and averages.
17 */
18
19#define MAX_SAMPLES 100
20
21static bool done = false;
22
23/* Forward declarations */
24static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant);
25static void start_writing(dds_entity_t writer, ThroughputModule_DataType *sample,
26 int burstInterval, uint32_t burstSize, int timeOut);
27static int parse_args(int argc, char **argv, uint32_t *payloadSize, int *burstInterval,
28 uint32_t *burstSize, int *timeOut, char **partitionName);
29static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName);
30static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample);
31
32#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
33static void sigint (int sig)
34{
35 (void)sig;
36 done = true;
37}
38#endif
39
40int main (int argc, char **argv)
41{
42 uint32_t payloadSize = 8192;
43 int burstInterval = 0;
44 uint32_t burstSize = 1;
45 int timeOut = 0;
46 char * partitionName = "Throughput example";
47 dds_entity_t participant;
48 dds_entity_t writer;
49 dds_return_t rc;
50 ThroughputModule_DataType sample;
51
52 if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
53 return EXIT_FAILURE;
54 }
55
56 participant = prepare_dds(&writer, partitionName);
57
58 /* Wait until have a reader */
59 if (wait_for_reader(writer, participant) == 0) {
60 printf ("=== [Publisher] Did not discover a reader.\n");
61 fflush (stdout);
62 rc = dds_delete (participant);
63 if (rc < 0)
64 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
65 return EXIT_FAILURE;
66 }
67
68 /* Fill the sample payload with data */
69 sample.count = 0;
70 sample.payload._buffer = dds_alloc (payloadSize);
71 sample.payload._length = payloadSize;
72 sample.payload._release = true;
73 for (uint32_t i = 0; i < payloadSize; i++) {
74 sample.payload._buffer[i] = 'a';
75 }
76
77 /* Register handler for Ctrl-C */
78#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
79 signal (SIGINT, sigint);
80#endif
81
82 /* Register the sample instance and write samples repeatedly or until time out */
83 start_writing(writer, &sample, burstInterval, burstSize, timeOut);
84
85 /* Cleanup */
86 finalize_dds(participant, writer, sample);
87 return EXIT_SUCCESS;
88}
89
90static int parse_args(
91 int argc,
92 char **argv,
93 uint32_t *payloadSize,
94 int *burstInterval,
95 uint32_t *burstSize,
96 int *timeOut,
97 char **partitionName)
98{
99 int result = EXIT_SUCCESS;
100 /*
101 * Get the program parameters
102 * Parameters: publisher [payloadSize] [burstInterval] [burstSize] [timeOut] [partitionName]
103 */
104 if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
105 {
106 printf ("Usage (parameters must be supplied in order):\n");
107 printf ("./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]\n");
108 printf ("Defaults:\n");
109 printf ("./publisher 8192 0 1 0 \"Throughput example\"\n");
110 return EXIT_FAILURE;
111 }
112 if (argc > 1)
113 {
114 *payloadSize = (uint32_t) atoi (argv[1]); /* The size of the payload in bytes */
115 }
116 if (argc > 2)
117 {
118 *burstInterval = atoi (argv[2]); /* The time interval between each burst in ms */
119 }
120 if (argc > 3)
121 {
122 *burstSize = (uint32_t) atoi (argv[3]); /* The number of samples to send each burst */
123 }
124 if (argc > 4)
125 {
126 *timeOut = atoi (argv[4]); /* The number of seconds the publisher should run for (0 = infinite) */
127 }
128 if (argc > 5)
129 {
130 *partitionName = argv[5]; /* The name of the partition */
131 }
132
133 printf ("payloadSize: %"PRIu32" bytes burstInterval: %d ms burstSize: %"PRIu32" timeOut: %d seconds partitionName: %s\n",
134 *payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
135 fflush (stdout);
136
137 return result;
138}
139
140static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName)
141{
142 dds_entity_t participant;
143 dds_entity_t topic;
144 dds_entity_t publisher;
145 const char *pubParts[1];
146 dds_qos_t *pubQos;
147 dds_qos_t *wrQos;
148 dds_qos_t *tQos;
149
150 /* A domain participant is created for the default domain. */
151 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
152 if (participant < 0)
153 DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
154
155 /* A topic is created for our sample type on the domain participant. */
156 tQos = dds_create_qos ();
157 dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
158 dds_qset_history (tQos, DDS_HISTORY_KEEP_ALL, 0);
159 dds_qset_resource_limits (tQos, MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
160 topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", tQos, NULL);
161 if (topic < 0)
162 DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
163 dds_delete_qos (tQos);
164
165 /* A publisher is created on the domain participant. */
166 pubQos = dds_create_qos ();
167 pubParts[0] = partitionName;
168 dds_qset_partition (pubQos, 1, pubParts);
169 publisher = dds_create_publisher (participant, pubQos, NULL);
170 if (publisher < 0)
171 DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
172 dds_delete_qos (pubQos);
173
174 /* A DataWriter is created on the publisher. */
175 wrQos = dds_create_qos ();
176 dds_qset_writer_batching (wrQos, true);
177 *writer = dds_create_writer (publisher, topic, wrQos, NULL);
178 if (*writer < 0)
179 DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*writer));
180 dds_delete_qos (wrQos);
181
182 return participant;
183}
184
185static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
186{
187 printf ("\n=== [Publisher] Waiting for a reader ...\n");
188 fflush (stdout);
189
190 dds_return_t rc;
191 dds_entity_t waitset;
192
193 rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
194 if (rc < 0)
195 DDS_FATAL("dds_set_status_mask: %s\n", dds_strretcode(-rc));
196
197 waitset = dds_create_waitset(participant);
198 if (waitset < 0)
199 DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitset));
200
201 rc = dds_waitset_attach(waitset, writer, (dds_attach_t)NULL);
202 if (rc < 0)
203 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-rc));
204
205 rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(30));
206 if (rc < 0)
207 DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-rc));
208
209 return rc;
210}
211
212static void start_writing(
213 dds_entity_t writer,
214 ThroughputModule_DataType *sample,
215 int burstInterval,
216 uint32_t burstSize,
217 int timeOut)
218{
219 bool timedOut = false;
220 dds_time_t pubStart = dds_time ();
221 dds_time_t now;
222 dds_time_t deltaTv;
223 dds_return_t status;
224
225 if (!done)
226 {
227 dds_time_t burstStart = pubStart;
228 unsigned int burstCount = 0;
229
230 printf ("=== [Publisher] Writing samples...\n");
231 fflush (stdout);
232
233 while (!done && !timedOut)
234 {
235 /* Write data until burst size has been reached */
236
237 if (burstCount < burstSize)
238 {
239 status = dds_write (writer, sample);
240 if (status == DDS_RETCODE_TIMEOUT)
241 {
242 timedOut = true;
243 }
244 else if (status < 0)
245 {
246 DDS_FATAL("dds_write: %s\n", dds_strretcode(-status));
247 }
248 else
249 {
250 sample->count++;
251 burstCount++;
252 }
253 }
254 else if (burstInterval)
255 {
256 /* Sleep until burst interval has passed */
257
258 dds_time_t time = dds_time ();
259 deltaTv = time - burstStart;
260 if (deltaTv < DDS_MSECS (burstInterval))
261 {
262 dds_write_flush (writer);
263 dds_sleepfor (DDS_MSECS (burstInterval) - deltaTv);
264 }
265 burstStart = dds_time ();
266 burstCount = 0;
267 }
268 else
269 {
270 burstCount = 0;
271 }
272
273 if (timeOut)
274 {
275 now = dds_time ();
276 deltaTv = now - pubStart;
277 if ((deltaTv) > DDS_SECS (timeOut))
278 {
279 timedOut = true;
280 }
281 }
282 }
283 dds_write_flush (writer);
284
285 printf ("=== [Publisher] %s, %llu samples written.\n", done ? "Terminated" : "Timed out", (unsigned long long) sample->count);
286 fflush (stdout);
287 }
288}
289
290static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample)
291{
292 dds_return_t status = dds_dispose (writer, &sample);
293 if (status != DDS_RETCODE_TIMEOUT && status < 0)
294 DDS_FATAL("dds_dispose: %s\n", dds_strretcode(-status));
295
296 dds_free (sample.payload._buffer);
297 status = dds_delete (participant);
298 if (status < 0)
299 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
300}
1#include "dds/dds.h"
2#include "Throughput.h"
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <signal.h>
7#include <assert.h>
8/*
9 * The Throughput example measures data throughput in bytes per second. The publisher
10 * allows you to specify a payload size in bytes as well as allowing you to specify
11 * whether to send data in bursts. The publisher will continue to send data forever
12 * unless a time out is specified. The subscriber will receive data and output the
13 * total amount received and the data rate in bytes per second. It will also indicate
14 * if any samples were received out of order. A maximum number of cycles can be
15 * specified and once this has been reached the subscriber will terminate and output
16 * totals and averages.
17 */
18
19#define BYTES_PER_SEC_TO_MEGABITS_PER_SEC 125000
20#define MAX_SAMPLES 1000
21
22typedef struct HandleEntry
23{
24 dds_instance_handle_t handle;
25 unsigned long long count;
26 struct HandleEntry * next;
27} HandleEntry;
28
29typedef struct HandleMap
30{
31 HandleEntry *entries;
32} HandleMap;
33
34static long pollingDelay = -1; /* i.e. use a listener */
35
36static HandleMap * imap;
37static unsigned long long outOfOrder = 0;
38static unsigned long long total_bytes = 0;
39static unsigned long long total_samples = 0;
40
41static dds_time_t startTime = 0;
42
43static unsigned long payloadSize = 0;
44
45static ThroughputModule_DataType data [MAX_SAMPLES];
46static void * samples[MAX_SAMPLES];
47static dds_sample_info_t info[MAX_SAMPLES];
48
49static dds_entity_t waitSet;
50
51#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
52static volatile sig_atomic_t done = false;
53#else
54static bool done = false;
55#endif
56
57/* Forward declarations */
58static HandleMap * HandleMap__alloc (void);
59static void HandleMap__free (HandleMap *map);
60static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key);
61static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key);
62
63static void data_available_handler (dds_entity_t reader, void *arg);
64static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName);
65static void process_samples(dds_entity_t reader, unsigned long long maxCycles);
66static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
67static void finalize_dds(dds_entity_t participant);
68
69#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
70static void sigint (int sig)
71{
72 (void) sig;
73 done = true;
74}
75#endif
76
77int main (int argc, char **argv)
78{
79 unsigned long long maxCycles = 0;
80 char *partitionName = "Throughput example";
81
82 dds_entity_t participant;
83 dds_entity_t reader;
84
85 if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
86 {
87 return EXIT_FAILURE;
88 }
89
90 printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
91 fflush (stdout);
92
93 participant = prepare_dds(&reader, partitionName);
94
95 printf ("=== [Subscriber] Waiting for samples...\n");
96 fflush (stdout);
97
98 /* Process samples until Ctrl-C is pressed or until maxCycles */
99 /* has been reached (0 = infinite) */
100#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
101 signal (SIGINT, sigint);
102#endif
103 process_samples(reader, maxCycles);
104
105 (void) dds_set_status_mask (reader, 0);
106 HandleMap__free (imap);
107 finalize_dds (participant);
108 return EXIT_SUCCESS;
109}
110
111/*
112 * This struct contains all of the entities used in the publisher and subscriber.
113 */
114static HandleMap * HandleMap__alloc (void)
115{
116 HandleMap * map = malloc (sizeof (*map));
117 assert(map);
118 memset (map, 0, sizeof (*map));
119 return map;
120}
121
122static void HandleMap__free (HandleMap *map)
123{
124 HandleEntry * entry;
125
126 while (map->entries)
127 {
128 entry = map->entries;
129 map->entries = entry->next;
130 free (entry);
131 }
132 free (map);
133}
134
135static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key)
136{
137 HandleEntry * entry = malloc (sizeof (*entry));
138 assert(entry);
139 memset (entry, 0, sizeof (*entry));
140
141 entry->handle = key;
142 entry->next = map->entries;
143 map->entries = entry;
144
145 return entry;
146}
147
148static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
149{
150 HandleEntry * entry = map->entries;
151
152 while (entry)
153 {
154 if (entry->handle == key)
155 {
156 break;
157 }
158 entry = entry->next;
159 }
160 return entry;
161}
162
163static int do_take (dds_entity_t reader)
164{
165 int samples_received;
166 dds_instance_handle_t ph = 0;
167 HandleEntry * current = NULL;
168
169 if (startTime == 0)
170 {
171 startTime = dds_time ();
172 }
173
174 /* Take samples and iterate through them */
175
176 samples_received = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
177 if (samples_received < 0)
178 {
179 DDS_FATAL("dds_take: %s\n", dds_strretcode(-samples_received));
180 }
181
182 for (int i = 0; !done && i < samples_received; i++)
183 {
184 if (info[i].valid_data)
185 {
186 ph = info[i].publication_handle;
187 current = retrieve_handle (imap, ph);
188 ThroughputModule_DataType * this_sample = &data[i];
189
190 if (current == NULL)
191 {
192 current = store_handle (imap, ph);
193 current->count = this_sample->count;
194 }
195
196 if (this_sample->count != current->count)
197 {
198 outOfOrder++;
199 }
200 current->count = this_sample->count + 1;
201
202 /* Add the sample payload size to the total received */
203
204 payloadSize = this_sample->payload._length;
205 total_bytes += payloadSize + 8;
206 total_samples++;
207 }
208 }
209 return samples_received;
210}
211
212static void data_available_handler (dds_entity_t reader, void *arg)
213{
214 (void)arg;
215 (void) do_take (reader);
216}
217
218static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName)
219{
220 /*
221 * Get the program parameters
222 * Parameters: subscriber [maxCycles] [pollingDelay] [partitionName]
223 */
224 if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
225 {
226 printf ("Usage (parameters must be supplied in order):\n");
227 printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = waitset, -1 = listener)] [partitionName]\n");
228 printf ("Defaults:\n");
229 printf ("./subscriber 0 0 \"Throughput example\"\n");
230 return EXIT_FAILURE;
231 }
232
233 if (argc > 1)
234 {
235 *maxCycles = (unsigned long long) atoi (argv[1]); /* The number of times to output statistics before terminating */
236 }
237 if (argc > 2)
238 {
239 pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = waitset, -1 = listener) */
240 }
241 if (argc > 3)
242 {
243 *partitionName = argv[3]; /* The name of the partition */
244 }
245 return EXIT_SUCCESS;
246}
247
248static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
249{
250 dds_return_t status;
251 unsigned long long prev_bytes = 0;
252 unsigned long long prev_samples = 0;
253 dds_attach_t wsresults[2];
254 dds_time_t deltaTv;
255 bool first_batch = true;
256 unsigned long cycles = 0;
257 double deltaTime = 0;
258 dds_time_t prev_time = 0;
259 dds_time_t time_now = 0;
260
261 while (!done && (maxCycles == 0 || cycles < maxCycles))
262 {
263 if (pollingDelay > 0)
264 dds_sleepfor (DDS_MSECS (pollingDelay));
265 else
266 {
267 status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_MSECS(100));
268 if (status < 0)
269 DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
270 }
271
272 if (pollingDelay >= 0)
273 {
274 while (do_take (reader))
275 ;
276 }
277
278 time_now = dds_time();
279 if (!first_batch)
280 {
281 deltaTv = time_now - prev_time;
282 deltaTime = (double) deltaTv / DDS_NSECS_IN_SEC;
283
284 if (deltaTime >= 1.0 && total_samples != prev_samples)
285 {
286 printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
287 "Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
288 deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
289 (deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
290 (deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
291 fflush (stdout);
292 cycles++;
293 prev_time = time_now;
294 prev_bytes = total_bytes;
295 prev_samples = total_samples;
296 }
297 }
298 else
299 {
300 prev_time = time_now;
301 first_batch = false;
302 }
303 }
304
305 /* Output totals and averages */
306 deltaTv = time_now - startTime;
307 deltaTime = (double) (deltaTv / DDS_NSECS_IN_SEC);
308 printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes);
309 printf ("Out of order: %llu samples\n", outOfOrder);
310 printf ("Average transfer rate: %.2lf samples/s, ", (double)total_samples / deltaTime);
311 printf ("%.2lf Mbit/s\n", (double)(total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
312 fflush (stdout);
313}
314
315static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
316{
317 dds_return_t status;
318 dds_entity_t topic;
319 dds_entity_t subscriber;
320 dds_listener_t *rd_listener;
321 dds_entity_t participant;
322
323 int32_t maxSamples = 4000;
324 const char *subParts[1];
325 dds_qos_t *subQos = dds_create_qos ();
326 dds_qos_t *tQos = dds_create_qos ();
327
328 /* A Participant is created for the default domain. */
329
330 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
331 if (participant < 0)
332 DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
333
334 /* A Topic is created for our sample type on the domain participant. */
335
336 dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
337 dds_qset_history (tQos, DDS_HISTORY_KEEP_ALL, 0);
338 dds_qset_resource_limits (tQos, maxSamples, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
339 topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", tQos, NULL);
340 if (topic < 0)
341 DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
342
343 /* A Subscriber is created on the domain participant. */
344
345 subParts[0] = partitionName;
346 dds_qset_partition (subQos, 1, subParts);
347 subscriber = dds_create_subscriber (participant, subQos, NULL);
348 if (subscriber < 0)
349 DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
350 dds_delete_qos (subQos);
351
352 /* A Listener is created which is triggered when data is available to read */
353
354 rd_listener = dds_create_listener(NULL);
355 dds_lset_data_available(rd_listener, data_available_handler);
356
357 /* A Waitset is created which is triggered when data is available to read */
358
359 waitSet = dds_create_waitset (participant);
360 if (waitSet < 0)
361 DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitSet));
362
363 status = dds_waitset_attach (waitSet, waitSet, waitSet);
364 if (status < 0)
365 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
366
367 imap = HandleMap__alloc ();
368
369 memset (data, 0, sizeof (data));
370 for (unsigned int i = 0; i < MAX_SAMPLES; i++)
371 {
372 samples[i] = &data[i];
373 }
374
375 /* A Reader is created on the Subscriber & Topic and attached to Waitset */
376
377 *reader = dds_create_reader (subscriber, topic, NULL, pollingDelay < 0 ? rd_listener : NULL);
378 if (*reader < 0)
379 DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*reader));
380
381 if (pollingDelay == 0)
382 {
383 status = dds_waitset_attach (waitSet, *reader, *reader);
384 if (status < 0)
385 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
386 }
387
388 dds_delete_qos (tQos);
389 dds_delete_listener(rd_listener);
390
391 return participant;
392}
393
394static void finalize_dds(dds_entity_t participant)
395{
396 dds_return_t status;
397
398 for (unsigned int i = 0; i < MAX_SAMPLES; i++)
399 {
400 ThroughputModule_DataType_free (&data[i], DDS_FREE_CONTENTS);
401 }
402
403 status = dds_waitset_detach (waitSet, waitSet);
404 if (status < 0)
405 DDS_FATAL("dds_waitset_detach: %s\n", dds_strretcode(-status));
406 status = dds_delete (waitSet);
407 if (status < 0)
408 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
409 status = dds_delete (participant);
410 if (status < 0)
411 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
412}
Note
There is also a Shared Memory variant of this example available.