Roundtrip¶
Measures the roundtrip duration when sending and receiving a single message.
Design¶
The Roundtrip example consists of two units:
ping: Sends a message to pong and waits for its return.
pong: Waits for messages from ping and sends the same message back.
Scenario¶
A message is sent by the ping executable on the “PING” partition, which the pong executable is waiting for. The pong executable sends the same message back on the “PONG” partition, which the ping executable is waiting for. This sequence is repeated a configurable number of times.
The ping executable measures:
writeAccess time: time the write() method took.
readAccess time: time the take() method took.
roundTrip time: time between the call to the write() method and the return of the take() method.
ping also calculates min/max/average statistics on these values over a configurable number of samples and/or time out period.
Configurable:
payloadSize: the size of the payload in bytes.
numSamples: the number of samples to send.
timeOut: the number of seconds ping should run for.
Running the example¶
To avoid mixing the output, run the ping and pong in separate terminals.
Open two terminals.
In the first terminal start Pong by running pong.
- pong usage:
./pong
In the second terminal start Ping by running ping.
- ping usage (parameters must be supplied in order):
./ping [payloadSize (bytes, 0 - 655536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]
./ping quit - ping sends a quit signal to pong.
- defaults:
./ping 0 0 0
To achieve optimal performance, set the CPU affinity so that ping and pong run on separate CPU cores and use real-time scheduling:
Pong usage: ``taskset -c 0 chrt -f 80 ./pong`` Ping usage: ``taskset -c 1 chrt -f 80 ./ping [payloadSize (bytes, 0 - 655536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]``Pong usage: ``START /affinity 1 /high cmd /k "pong.exe"`` Ping usage: ``START /affinity 2 /high cmd /k "ping.exe" [payloadSize (bytes, 0 - 655536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]``
Source code¶
1module RoundTripModule
2{
3 @final
4 struct DataType
5 {
6 sequence<octet> payload;
7 };
8};
1#include "dds/dds.h"
2#include "dds/ddsrt/misc.h"
3#include "RoundTrip.h"
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7#include <signal.h>
8#include <inttypes.h>
9
10#define TIME_STATS_SIZE_INCREMENT 50000
11#define MAX_SAMPLES 100
12#define US_IN_ONE_SEC 1000000LL
13
14/* Forward declaration */
15
16static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
17static void finalize_dds(dds_entity_t participant);
18
19typedef struct ExampleTimeStats
20{
21 dds_time_t * values;
22 unsigned long valuesSize;
23 unsigned long valuesMax;
24 double average;
25 dds_time_t min;
26 dds_time_t max;
27 unsigned long count;
28} ExampleTimeStats;
29
30static void exampleInitTimeStats (ExampleTimeStats *stats)
31{
32 stats->values = (dds_time_t*) malloc (TIME_STATS_SIZE_INCREMENT * sizeof (dds_time_t));
33 stats->valuesSize = 0;
34 stats->valuesMax = TIME_STATS_SIZE_INCREMENT;
35 stats->average = 0;
36 stats->min = 0;
37 stats->max = 0;
38 stats->count = 0;
39}
40
41static void exampleResetTimeStats (ExampleTimeStats *stats)
42{
43 memset (stats->values, 0, stats->valuesMax * sizeof (dds_time_t));
44 stats->valuesSize = 0;
45 stats->average = 0;
46 stats->min = 0;
47 stats->max = 0;
48 stats->count = 0;
49}
50
51static void exampleDeleteTimeStats (ExampleTimeStats *stats)
52{
53 free (stats->values);
54}
55
56static void exampleAddTimingToTimeStats (ExampleTimeStats *stats, dds_time_t timing)
57{
58 if (stats->valuesSize > stats->valuesMax)
59 {
60 dds_time_t * temp = (dds_time_t*) realloc (stats->values, (stats->valuesMax + TIME_STATS_SIZE_INCREMENT) * sizeof (dds_time_t));
61 stats->values = temp;
62 stats->valuesMax += TIME_STATS_SIZE_INCREMENT;
63 }
64 if (stats->values != NULL && stats->valuesSize < stats->valuesMax)
65 {
66 stats->values[stats->valuesSize++] = timing;
67 }
68 stats->average = ((double)stats->count * stats->average + (double)timing) / (double)(stats->count + 1);
69 stats->min = (stats->count == 0 || timing < stats->min) ? timing : stats->min;
70 stats->max = (stats->count == 0 || timing > stats->max) ? timing : stats->max;
71 stats->count++;
72}
73
74static int exampleCompareul (const void* a, const void* b)
75{
76 dds_time_t ul_a = *((dds_time_t*)a);
77 dds_time_t ul_b = *((dds_time_t*)b);
78
79 if (ul_a < ul_b) return -1;
80 if (ul_a > ul_b) return 1;
81 return 0;
82}
83
84static double exampleGetMedianFromTimeStats (ExampleTimeStats *stats)
85{
86 double median = 0.0;
87
88 qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul);
89
90 if (stats->valuesSize % 2 == 0)
91 {
92 median = (double)(stats->values[stats->valuesSize / 2 - 1] + stats->values[stats->valuesSize / 2]) / 2;
93 }
94 else
95 {
96 median = (double)stats->values[stats->valuesSize / 2];
97 }
98
99 return median;
100}
101
102static dds_time_t exampleGet99PercentileFromTimeStats (ExampleTimeStats *stats)
103{
104 qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul);
105 return stats->values[stats->valuesSize - stats->valuesSize / 100];
106}
107
108static dds_entity_t waitSet;
109
110#ifdef _WIN32
111#include <Windows.h>
112static bool CtrlHandler (DWORD fdwCtrlType)
113{
114 (void)fdwCtrlType;
115 dds_waitset_set_trigger (waitSet, true);
116 return true; //Don't let other handlers handle this key
117}
118#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
119static void CtrlHandler (int sig)
120{
121 (void)sig;
122 dds_waitset_set_trigger (waitSet, true);
123}
124#endif
125
126static dds_entity_t writer;
127static dds_entity_t reader;
128static dds_entity_t participant;
129static dds_entity_t readCond;
130
131static ExampleTimeStats roundTrip;
132static ExampleTimeStats writeAccess;
133static ExampleTimeStats readAccess;
134static ExampleTimeStats roundTripOverall;
135static ExampleTimeStats writeAccessOverall;
136static ExampleTimeStats readAccessOverall;
137
138static RoundTripModule_DataType pub_data;
139static RoundTripModule_DataType sub_data[MAX_SAMPLES];
140static void *samples[MAX_SAMPLES];
141static dds_sample_info_t info[MAX_SAMPLES];
142
143static dds_time_t startTime;
144static dds_time_t preWriteTime;
145static dds_time_t postWriteTime;
146static dds_time_t preTakeTime;
147static dds_time_t postTakeTime;
148static dds_time_t elapsed = 0;
149
150static bool warmUp = true;
151
152static void data_available(dds_entity_t rd, void *arg)
153{
154 dds_time_t difference = 0;
155 int status;
156 (void)arg;
157 /* Take sample and check that it is valid */
158 preTakeTime = dds_time ();
159 status = dds_take (rd, samples, info, MAX_SAMPLES, MAX_SAMPLES);
160 if (status < 0)
161 DDS_FATAL("dds_take: %s\n", dds_strretcode(-status));
162 postTakeTime = dds_time ();
163
164 /* Update stats */
165 difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC;
166 exampleAddTimingToTimeStats (&writeAccess, difference);
167 exampleAddTimingToTimeStats (&writeAccessOverall, difference);
168
169 difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC;
170 exampleAddTimingToTimeStats (&readAccess, difference);
171 exampleAddTimingToTimeStats (&readAccessOverall, difference);
172
173 difference = (postTakeTime - info[0].source_timestamp)/DDS_NSECS_IN_USEC;
174 exampleAddTimingToTimeStats (&roundTrip, difference);
175 exampleAddTimingToTimeStats (&roundTripOverall, difference);
176
177 if (!warmUp) {
178 /* Print stats each second */
179 difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC;
180 if (difference > US_IN_ONE_SEC)
181 {
182 printf("%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
183 elapsed + 1,
184 roundTrip.count,
185 exampleGetMedianFromTimeStats (&roundTrip) / 2,
186 roundTrip.min / 2,
187 exampleGet99PercentileFromTimeStats (&roundTrip) / 2,
188 roundTrip.max / 2,
189 writeAccess.count,
190 exampleGetMedianFromTimeStats (&writeAccess),
191 writeAccess.min,
192 readAccess.count,
193 exampleGetMedianFromTimeStats (&readAccess),
194 readAccess.min);
195 fflush (stdout);
196
197 exampleResetTimeStats (&roundTrip);
198 exampleResetTimeStats (&writeAccess);
199 exampleResetTimeStats (&readAccess);
200 startTime = dds_time ();
201 elapsed++;
202 }
203 }
204
205 preWriteTime = dds_time();
206 status = dds_write_ts (writer, &pub_data, preWriteTime);
207 if (status < 0)
208 DDS_FATAL("dds_write_ts: %s\n", dds_strretcode(-status));
209 postWriteTime = dds_time();
210}
211
212static void usage(void)
213{
214 printf ("Usage (parameters must be supplied in order):\n"
215 "./ping [-l] [payloadSize (bytes, 0 - 100M)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n"
216 "./ping quit - ping sends a quit signal to pong.\n"
217 "Defaults:\n"
218 "./ping 0 0 0\n");
219 exit(EXIT_FAILURE);
220}
221
222int main (int argc, char *argv[])
223{
224 uint32_t payloadSize = 0;
225 uint64_t numSamples = 0;
226 bool invalidargs = false;
227 dds_time_t timeOut = 0;
228 dds_time_t time;
229 dds_time_t difference = 0;
230
231 dds_attach_t wsresults[1];
232 size_t wsresultsize = 1U;
233 dds_time_t waitTimeout = DDS_SECS (1);
234 unsigned long i;
235 int status;
236
237 dds_listener_t *listener = NULL;
238 bool use_listener = false;
239 int argidx = 1;
240
241 /* poor man's getopt works even on Windows */
242 if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
243 {
244 argidx++;
245 use_listener = true;
246 }
247
248 /* Register handler for Ctrl-C */
249#ifdef _WIN32
250 DDSRT_WARNING_GNUC_OFF(cast-function-type)
251 SetConsoleCtrlHandler ((PHANDLER_ROUTINE)CtrlHandler, TRUE);
252 DDSRT_WARNING_GNUC_ON(cast-function-type)
253#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
254 struct sigaction sat, oldAction;
255 sat.sa_handler = CtrlHandler;
256 sigemptyset (&sat.sa_mask);
257 sat.sa_flags = 0;
258 sigaction (SIGINT, &sat, &oldAction);
259#endif
260
261 exampleInitTimeStats (&roundTrip);
262 exampleInitTimeStats (&writeAccess);
263 exampleInitTimeStats (&readAccess);
264 exampleInitTimeStats (&roundTripOverall);
265 exampleInitTimeStats (&writeAccessOverall);
266 exampleInitTimeStats (&readAccessOverall);
267
268 memset (&sub_data, 0, sizeof (sub_data));
269 memset (&pub_data, 0, sizeof (pub_data));
270
271 for (i = 0; i < MAX_SAMPLES; i++)
272 {
273 samples[i] = &sub_data[i];
274 }
275
276 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
277 if (participant < 0)
278 DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
279
280 if (use_listener)
281 {
282 listener = dds_create_listener(NULL);
283 dds_lset_data_available(listener, data_available);
284 }
285 prepare_dds(&writer, &reader, &readCond, listener);
286
287 if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0)
288 {
289 printf ("Sending termination request.\n");
290 fflush (stdout);
291 /* pong uses a waitset which is triggered by instance disposal, and
292 quits when it fires. */
293 dds_sleepfor (DDS_SECS (1));
294 pub_data.payload._length = 0;
295 pub_data.payload._buffer = NULL;
296 pub_data.payload._release = true;
297 pub_data.payload._maximum = 0;
298 status = dds_writedispose (writer, &pub_data);
299 if (status < 0)
300 DDS_FATAL("dds_writedispose: %s\n", dds_strretcode(-status));
301 dds_sleepfor (DDS_SECS (1));
302 goto done;
303 }
304
305 if (argc - argidx == 0)
306 {
307 invalidargs = true;
308 }
309 if (argc - argidx >= 1)
310 {
311 payloadSize = (uint32_t) atol (argv[argidx]);
312
313 if (payloadSize > 100 * 1048576)
314 {
315 invalidargs = true;
316 }
317 }
318 if (argc - argidx >= 2)
319 {
320 numSamples = (uint64_t) atol (argv[argidx+1]);
321 }
322 if (argc - argidx >= 3)
323 {
324 timeOut = atol (argv[argidx+2]);
325 }
326 if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0)))
327 usage();
328 printf ("# payloadSize: %" PRIu32 " | numSamples: %" PRIu64 " | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut);
329 fflush (stdout);
330
331 pub_data.payload._length = payloadSize;
332 pub_data.payload._buffer = payloadSize ? dds_alloc (payloadSize) : NULL;
333 pub_data.payload._release = true;
334 pub_data.payload._maximum = 0;
335 for (i = 0; i < payloadSize; i++)
336 {
337 pub_data.payload._buffer[i] = 'a';
338 }
339
340 startTime = dds_time ();
341 printf ("# Waiting for startup jitter to stabilise\n");
342 fflush (stdout);
343 /* Write a sample that pong can send back */
344 while (!dds_triggered (waitSet) && difference < DDS_SECS(5))
345 {
346 status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
347 if (status < 0)
348 DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
349
350 if (status > 0 && listener == NULL) /* data */
351 {
352 status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
353 if (status < 0)
354 DDS_FATAL("dds_take: %s\n", dds_strretcode(-status));
355 }
356
357 time = dds_time ();
358 difference = time - startTime;
359 }
360 if (!dds_triggered (waitSet))
361 {
362 warmUp = false;
363 printf("# Warm up complete.\n\n");
364 printf("# Latency measurements (in us)\n");
365 printf("# Latency [us] Write-access time [us] Read-access time [us]\n");
366 printf("# Seconds Count median min 99%% max Count median min Count median min\n");
367 fflush (stdout);
368 }
369
370 exampleResetTimeStats (&roundTrip);
371 exampleResetTimeStats (&writeAccess);
372 exampleResetTimeStats (&readAccess);
373 startTime = dds_time ();
374 /* Write a sample that pong can send back */
375 preWriteTime = dds_time ();
376 status = dds_write_ts (writer, &pub_data, preWriteTime);
377 if (status < 0)
378 DDS_FATAL("dds_write_ts: %s\n", dds_strretcode(-status));
379 postWriteTime = dds_time ();
380 for (i = 0; !dds_triggered (waitSet) && (!numSamples || i < numSamples) && !(timeOut && elapsed >= timeOut); i++)
381 {
382 status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
383 if (status < 0)
384 DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
385 if (status != 0 && listener == NULL) {
386 data_available(reader, NULL);
387 }
388 }
389
390 if (!warmUp)
391 {
392 printf
393 (
394 "\n%9s %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
395 "# Overall",
396 roundTripOverall.count,
397 exampleGetMedianFromTimeStats (&roundTripOverall) / 2,
398 roundTripOverall.min / 2,
399 exampleGet99PercentileFromTimeStats (&roundTripOverall) / 2,
400 roundTripOverall.max / 2,
401 writeAccessOverall.count,
402 exampleGetMedianFromTimeStats (&writeAccessOverall),
403 writeAccessOverall.min,
404 readAccessOverall.count,
405 exampleGetMedianFromTimeStats (&readAccessOverall),
406 readAccessOverall.min
407 );
408 fflush (stdout);
409 }
410
411done:
412
413#ifdef _WIN32
414 SetConsoleCtrlHandler (0, FALSE);
415#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
416 sigaction (SIGINT, &oldAction, 0);
417#endif
418
419 finalize_dds(participant);
420
421 /* Clean up */
422 exampleDeleteTimeStats (&roundTrip);
423 exampleDeleteTimeStats (&writeAccess);
424 exampleDeleteTimeStats (&readAccess);
425 exampleDeleteTimeStats (&roundTripOverall);
426 exampleDeleteTimeStats (&writeAccessOverall);
427 exampleDeleteTimeStats (&readAccessOverall);
428
429 for (i = 0; i < MAX_SAMPLES; i++)
430 {
431 RoundTripModule_DataType_free (&sub_data[i], DDS_FREE_CONTENTS);
432 }
433 RoundTripModule_DataType_free (&pub_data, DDS_FREE_CONTENTS);
434
435 return EXIT_SUCCESS;
436}
437
438static dds_entity_t prepare_dds(dds_entity_t *wr, dds_entity_t *rd, dds_entity_t *rdcond, dds_listener_t *listener)
439{
440 dds_return_t status;
441 dds_entity_t topic;
442 dds_entity_t publisher;
443 dds_entity_t subscriber;
444
445 const char *pubPartitions[] = { "ping" };
446 const char *subPartitions[] = { "pong" };
447 dds_qos_t *pubQos;
448 dds_qos_t *subQos;
449 dds_qos_t *tQos;
450 dds_qos_t *wQos;
451
452 /* A DDS_Topic is created for our sample type on the domain participant. */
453 tQos = dds_create_qos ();
454 dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
455 topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", tQos, NULL);
456 if (topic < 0)
457 DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
458 dds_delete_qos (tQos);
459
460 /* A DDS_Publisher is created on the domain participant. */
461 pubQos = dds_create_qos ();
462 dds_qset_partition (pubQos, 1, pubPartitions);
463
464 publisher = dds_create_publisher (participant, pubQos, NULL);
465 if (publisher < 0)
466 DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
467 dds_delete_qos (pubQos);
468
469 /* A DDS_DataWriter is created on the Publisher & Topic with a modified Qos. */
470 wQos = dds_create_qos ();
471 dds_qset_writer_data_lifecycle (wQos, false);
472 *wr = dds_create_writer (publisher, topic, wQos, NULL);
473 if (*wr < 0)
474 DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*wr));
475 dds_delete_qos (wQos);
476
477 /* A DDS_Subscriber is created on the domain participant. */
478 subQos = dds_create_qos ();
479
480 dds_qset_partition (subQos, 1, subPartitions);
481
482 subscriber = dds_create_subscriber (participant, subQos, NULL);
483 if (subscriber < 0)
484 DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
485 dds_delete_qos (subQos);
486 /* A DDS_DataReader is created on the Subscriber & Topic with a modified QoS. */
487 *rd = dds_create_reader (subscriber, topic, NULL, listener);
488 if (*rd < 0)
489 DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*rd));
490
491 waitSet = dds_create_waitset (participant);
492 if (listener == NULL) {
493 *rdcond = dds_create_readcondition (*rd, DDS_ANY_STATE);
494 status = dds_waitset_attach (waitSet, *rdcond, *rd);
495 if (status < 0)
496 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
497 } else {
498 *rdcond = 0;
499 }
500 status = dds_waitset_attach (waitSet, waitSet, waitSet);
501 if (status < 0)
502 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
503
504 return participant;
505}
506
507static void finalize_dds(dds_entity_t ppant)
508{
509 dds_return_t status;
510 status = dds_delete (ppant);
511 if (status < 0)
512 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
513}
1#include "dds/dds.h"
2#include "dds/ddsrt/misc.h"
3#include "RoundTrip.h"
4#include <stdio.h>
5#include <stdlib.h>
6#include <string.h>
7#include <signal.h>
8#include <stdlib.h>
9
10static dds_entity_t waitSet;
11#define MAX_SAMPLES 10
12
13/* Forward declarations */
14static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
15static void finalize_dds(dds_entity_t participant, RoundTripModule_DataType data[MAX_SAMPLES]);
16
17#ifdef _WIN32
18#include <Windows.h>
19static bool CtrlHandler (DWORD fdwCtrlType)
20{
21 (void)fdwCtrlType;
22 dds_waitset_set_trigger (waitSet, true);
23 return true; //Don't let other handlers handle this key
24}
25#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
26static void CtrlHandler (int sig)
27{
28 (void)sig;
29 dds_waitset_set_trigger (waitSet, true);
30}
31#endif
32
33static RoundTripModule_DataType data[MAX_SAMPLES];
34static void * samples[MAX_SAMPLES];
35static dds_sample_info_t info[MAX_SAMPLES];
36
37static dds_entity_t participant;
38static dds_entity_t reader;
39static dds_entity_t writer;
40static dds_entity_t readCond;
41
42static void data_available(dds_entity_t rd, void *arg)
43{
44 int status, samplecount;
45 (void)arg;
46 samplecount = dds_take (rd, samples, info, MAX_SAMPLES, MAX_SAMPLES);
47 if (samplecount < 0)
48 DDS_FATAL("dds_take: %s\n", dds_strretcode(-samplecount));
49 for (int j = 0; !dds_triggered (waitSet) && j < samplecount; j++)
50 {
51 /* If writer has been disposed terminate pong */
52
53 if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED)
54 {
55 printf ("Received termination request. Terminating.\n");
56 dds_waitset_set_trigger (waitSet, true);
57 break;
58 }
59 else if (info[j].valid_data)
60 {
61 /* If sample is valid, send it back to ping */
62 RoundTripModule_DataType * valid_sample = &data[j];
63 status = dds_write_ts (writer, valid_sample, info[j].source_timestamp);
64 if (status < 0)
65 DDS_FATAL("dds_write_ts: %d\n", -status);
66 }
67 }
68}
69
70int main (int argc, char *argv[])
71{
72 dds_duration_t waitTimeout = DDS_INFINITY;
73 unsigned int i;
74 int status;
75 dds_attach_t wsresults[1];
76 size_t wsresultsize = 1U;
77
78 dds_listener_t *listener = NULL;
79 bool use_listener = false;
80 int argidx = 1;
81
82 if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
83 {
84 argidx++;
85 use_listener = true;
86 }
87
88 /* Register handler for Ctrl-C */
89
90#ifdef _WIN32
91 DDSRT_WARNING_GNUC_OFF(cast-function-type)
92 SetConsoleCtrlHandler ((PHANDLER_ROUTINE)CtrlHandler, TRUE);
93 DDSRT_WARNING_GNUC_ON(cast-function-type)
94#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
95 struct sigaction sat, oldAction;
96 sat.sa_handler = CtrlHandler;
97 sigemptyset (&sat.sa_mask);
98 sat.sa_flags = 0;
99 sigaction (SIGINT, &sat, &oldAction);
100#endif
101
102 /* Initialize sample data */
103 memset (data, 0, sizeof (data));
104 for (i = 0; i < MAX_SAMPLES; i++)
105 {
106 samples[i] = &data[i];
107 }
108
109 participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
110 if (participant < 0)
111 DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
112
113 if (use_listener)
114 {
115 listener = dds_create_listener(NULL);
116 dds_lset_data_available(listener, data_available);
117 }
118
119 (void)prepare_dds(&writer, &reader, &readCond, listener);
120
121 while (!dds_triggered (waitSet))
122 {
123 /* Wait for a sample from ping */
124
125 status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
126 if (status < 0)
127 DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
128
129 /* Take samples */
130 if (listener == NULL) {
131 data_available (reader, 0);
132 }
133 }
134
135#ifdef _WIN32
136 SetConsoleCtrlHandler (0, FALSE);
137#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
138 sigaction (SIGINT, &oldAction, 0);
139#endif
140
141 /* Clean up */
142 finalize_dds(participant, data);
143
144 return EXIT_SUCCESS;
145}
146
147static void finalize_dds(dds_entity_t pp, RoundTripModule_DataType xs[MAX_SAMPLES])
148{
149 dds_return_t status;
150 status = dds_delete (pp);
151 if (status < 0)
152 DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
153 for (unsigned int i = 0; i < MAX_SAMPLES; i++)
154 {
155 RoundTripModule_DataType_free (&xs[i], DDS_FREE_CONTENTS);
156 }
157}
158
159static dds_entity_t prepare_dds(dds_entity_t *wr, dds_entity_t *rd, dds_entity_t *rdcond, dds_listener_t *rdlist)
160{
161 const char *pubPartitions[] = { "pong" };
162 const char *subPartitions[] = { "ping" };
163 dds_qos_t *qos;
164 dds_entity_t subscriber;
165 dds_entity_t publisher;
166 dds_entity_t topic;
167 dds_return_t status;
168
169 /* A DDS Topic is created for our sample type on the domain participant. */
170
171 qos = dds_create_qos ();
172 dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
173 topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", qos, NULL);
174 if (topic < 0)
175 DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
176 dds_delete_qos (qos);
177
178 /* A DDS Publisher is created on the domain participant. */
179
180 qos = dds_create_qos ();
181 dds_qset_partition (qos, 1, pubPartitions);
182
183 publisher = dds_create_publisher (participant, qos, NULL);
184 if (publisher < 0)
185 DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
186 dds_delete_qos (qos);
187
188 /* A DDS DataWriter is created on the Publisher & Topic with a modififed Qos. */
189
190 qos = dds_create_qos ();
191 dds_qset_writer_data_lifecycle (qos, false);
192 *wr = dds_create_writer (publisher, topic, qos, NULL);
193 if (*wr < 0)
194 DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*wr));
195 dds_delete_qos (qos);
196
197 /* A DDS Subscriber is created on the domain participant. */
198
199 qos = dds_create_qos ();
200 dds_qset_partition (qos, 1, subPartitions);
201
202 subscriber = dds_create_subscriber (participant, qos, NULL);
203 if (subscriber < 0)
204 DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
205 dds_delete_qos (qos);
206
207 /* A DDS DataReader is created on the Subscriber & Topic with a modified QoS. */
208
209 *rd = dds_create_reader (subscriber, topic, NULL, rdlist);
210 if (*rd < 0)
211 DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*rd));
212
213 waitSet = dds_create_waitset (participant);
214 if (rdlist == NULL) {
215 *rdcond = dds_create_readcondition (*rd, DDS_ANY_STATE);
216 status = dds_waitset_attach (waitSet, *rdcond, *rd);
217 if (status < 0)
218 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
219 } else {
220 *rdcond = 0;
221 }
222 status = dds_waitset_attach (waitSet, waitSet, waitSet);
223 if (status < 0)
224 DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
225
226 printf ("Waiting for samples from ping to send back...\n");
227 fflush (stdout);
228
229 return participant;
230}