Roundtrip

Measures the roundtrip duration when sending and receiving a single message.

Design

The Roundtrip example consists of two units:

  • ping: Sends a message to pong and waits for its return.

  • pong: Waits for messages from ping and sends the same message back.

Scenario

A message is sent by the ping executable on the “PING” partition, which the pong executable is waiting for. The pong executable sends the same message back on the “PONG” partition, which the ping executable is waiting for. This sequence is repeated a configurable number of times.

The ping executable measures:

  • writeAccess time: time the write() method took.

  • readAccess time: time the take() method took.

  • roundTrip time: time between the call to the write() method and the return of the take() method.

  • ping also calculates min/max/average statistics on these values over a configurable number of samples and/or time out period.

Configurable:

  • payloadSize: the size of the payload in bytes.

  • numSamples: the number of samples to send.

  • timeOut: the number of seconds ping should run for.

Running the example

To avoid mixing the output, run the ping and pong in separate terminals.

  1. Open two terminals.

  2. In the first terminal start Pong by running pong.

    pong usage:

    ./pong

  3. In the second terminal start Ping by running ping.

    ping usage (parameters must be supplied in order):

    ./ping [payloadSize (bytes, 0 - 655536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]

    ./ping quit - ping sends a quit signal to pong.

    defaults:

    ./ping 0 0 0

  4. To achieve optimal performance, set the CPU affinity so that ping and pong run on separate CPU cores and use real-time scheduling:

Pong usage:
  ``taskset -c 0 chrt -f 80 ./pong``
Ping usage:
  ``taskset -c 1 chrt -f 80 ./ping [payloadSize (bytes, 0 - 655536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]``

Source code

RoundTrip.idl
1module RoundTripModule
2{
3  @final
4  struct DataType
5  {
6    sequence<octet> payload;
7  };
8};
ping.c
  1#include "dds/dds.h"
  2#include "dds/ddsrt/misc.h"
  3#include "RoundTrip.h"
  4#include <stdio.h>
  5#include <stdlib.h>
  6#include <string.h>
  7#include <signal.h>
  8#include <inttypes.h>
  9
 10#define TIME_STATS_SIZE_INCREMENT 50000
 11#define MAX_SAMPLES 100
 12#define US_IN_ONE_SEC 1000000LL
 13
 14/* Forward declaration */
 15
 16static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
 17static void finalize_dds(dds_entity_t participant);
 18
 19typedef struct ExampleTimeStats
 20{
 21  dds_time_t * values;
 22  unsigned long valuesSize;
 23  unsigned long valuesMax;
 24  double average;
 25  dds_time_t min;
 26  dds_time_t max;
 27  unsigned long count;
 28} ExampleTimeStats;
 29
 30static void exampleInitTimeStats (ExampleTimeStats *stats)
 31{
 32  stats->values = (dds_time_t*) malloc (TIME_STATS_SIZE_INCREMENT * sizeof (dds_time_t));
 33  stats->valuesSize = 0;
 34  stats->valuesMax = TIME_STATS_SIZE_INCREMENT;
 35  stats->average = 0;
 36  stats->min = 0;
 37  stats->max = 0;
 38  stats->count = 0;
 39}
 40
 41static void exampleResetTimeStats (ExampleTimeStats *stats)
 42{
 43  memset (stats->values, 0, stats->valuesMax * sizeof (dds_time_t));
 44  stats->valuesSize = 0;
 45  stats->average = 0;
 46  stats->min = 0;
 47  stats->max = 0;
 48  stats->count = 0;
 49}
 50
 51static void exampleDeleteTimeStats (ExampleTimeStats *stats)
 52{
 53  free (stats->values);
 54}
 55
 56static void exampleAddTimingToTimeStats (ExampleTimeStats *stats, dds_time_t timing)
 57{
 58  if (stats->valuesSize > stats->valuesMax)
 59  {
 60    dds_time_t * temp = (dds_time_t*) realloc (stats->values, (stats->valuesMax + TIME_STATS_SIZE_INCREMENT) * sizeof (dds_time_t));
 61    stats->values = temp;
 62    stats->valuesMax += TIME_STATS_SIZE_INCREMENT;
 63  }
 64  if (stats->values != NULL && stats->valuesSize < stats->valuesMax)
 65  {
 66    stats->values[stats->valuesSize++] = timing;
 67  }
 68  stats->average = ((double)stats->count * stats->average + (double)timing) / (double)(stats->count + 1);
 69  stats->min = (stats->count == 0 || timing < stats->min) ? timing : stats->min;
 70  stats->max = (stats->count == 0 || timing > stats->max) ? timing : stats->max;
 71  stats->count++;
 72}
 73
 74static int exampleCompareul (const void* a, const void* b)
 75{
 76  dds_time_t ul_a = *((dds_time_t*)a);
 77  dds_time_t ul_b = *((dds_time_t*)b);
 78
 79  if (ul_a < ul_b) return -1;
 80  if (ul_a > ul_b) return 1;
 81  return 0;
 82}
 83
 84static double exampleGetMedianFromTimeStats (ExampleTimeStats *stats)
 85{
 86  double median = 0.0;
 87
 88  qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul);
 89
 90  if (stats->valuesSize % 2 == 0)
 91  {
 92    median = (double)(stats->values[stats->valuesSize / 2 - 1] + stats->values[stats->valuesSize / 2]) / 2;
 93  }
 94  else
 95  {
 96    median = (double)stats->values[stats->valuesSize / 2];
 97  }
 98
 99  return median;
100}
101
102static dds_time_t exampleGet99PercentileFromTimeStats (ExampleTimeStats *stats)
103{
104  qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul);
105  return stats->values[stats->valuesSize - stats->valuesSize / 100];
106}
107
108static dds_entity_t waitSet;
109
110#ifdef _WIN32
111#include <Windows.h>
112static bool CtrlHandler (DWORD fdwCtrlType)
113{
114  (void)fdwCtrlType;
115  dds_waitset_set_trigger (waitSet, true);
116  return true; //Don't let other handlers handle this key
117}
118#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
119static void CtrlHandler (int sig)
120{
121  (void)sig;
122  dds_waitset_set_trigger (waitSet, true);
123}
124#endif
125
126static dds_entity_t writer;
127static dds_entity_t reader;
128static dds_entity_t participant;
129static dds_entity_t readCond;
130
131static ExampleTimeStats roundTrip;
132static ExampleTimeStats writeAccess;
133static ExampleTimeStats readAccess;
134static ExampleTimeStats roundTripOverall;
135static ExampleTimeStats writeAccessOverall;
136static ExampleTimeStats readAccessOverall;
137
138static RoundTripModule_DataType pub_data;
139static RoundTripModule_DataType sub_data[MAX_SAMPLES];
140static void *samples[MAX_SAMPLES];
141static dds_sample_info_t info[MAX_SAMPLES];
142
143static dds_time_t startTime;
144static dds_time_t preWriteTime;
145static dds_time_t postWriteTime;
146static dds_time_t preTakeTime;
147static dds_time_t postTakeTime;
148static dds_time_t elapsed = 0;
149
150static bool warmUp = true;
151
152static void data_available(dds_entity_t rd, void *arg)
153{
154  dds_time_t difference = 0;
155  int status;
156  (void)arg;
157  /* Take sample and check that it is valid */
158  preTakeTime = dds_time ();
159  status = dds_take (rd, samples, info, MAX_SAMPLES, MAX_SAMPLES);
160  if (status < 0)
161    DDS_FATAL("dds_take: %s\n", dds_strretcode(-status));
162  postTakeTime = dds_time ();
163
164  /* Update stats */
165  difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC;
166  exampleAddTimingToTimeStats (&writeAccess, difference);
167  exampleAddTimingToTimeStats (&writeAccessOverall, difference);
168
169  difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC;
170  exampleAddTimingToTimeStats (&readAccess, difference);
171  exampleAddTimingToTimeStats (&readAccessOverall, difference);
172
173  difference = (postTakeTime - info[0].source_timestamp)/DDS_NSECS_IN_USEC;
174  exampleAddTimingToTimeStats (&roundTrip, difference);
175  exampleAddTimingToTimeStats (&roundTripOverall, difference);
176
177  if (!warmUp) {
178    /* Print stats each second */
179    difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC;
180    if (difference > US_IN_ONE_SEC)
181    {
182      printf("%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
183             elapsed + 1,
184             roundTrip.count,
185             exampleGetMedianFromTimeStats (&roundTrip) / 2,
186             roundTrip.min / 2,
187             exampleGet99PercentileFromTimeStats (&roundTrip) / 2,
188             roundTrip.max / 2,
189             writeAccess.count,
190             exampleGetMedianFromTimeStats (&writeAccess),
191             writeAccess.min,
192             readAccess.count,
193             exampleGetMedianFromTimeStats (&readAccess),
194             readAccess.min);
195      fflush (stdout);
196
197      exampleResetTimeStats (&roundTrip);
198      exampleResetTimeStats (&writeAccess);
199      exampleResetTimeStats (&readAccess);
200      startTime = dds_time ();
201      elapsed++;
202    }
203  }
204
205  preWriteTime = dds_time();
206  status = dds_write_ts (writer, &pub_data, preWriteTime);
207  if (status < 0)
208    DDS_FATAL("dds_write_ts: %s\n", dds_strretcode(-status));
209  postWriteTime = dds_time();
210}
211
212static void usage(void)
213{
214  printf ("Usage (parameters must be supplied in order):\n"
215          "./ping [-l] [payloadSize (bytes, 0 - 100M)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n"
216          "./ping quit - ping sends a quit signal to pong.\n"
217          "Defaults:\n"
218          "./ping 0 0 0\n");
219  exit(EXIT_FAILURE);
220}
221
222int main (int argc, char *argv[])
223{
224  uint32_t payloadSize = 0;
225  uint64_t numSamples = 0;
226  bool invalidargs = false;
227  dds_time_t timeOut = 0;
228  dds_time_t time;
229  dds_time_t difference = 0;
230
231  dds_attach_t wsresults[1];
232  size_t wsresultsize = 1U;
233  dds_time_t waitTimeout = DDS_SECS (1);
234  unsigned long i;
235  int status;
236
237  dds_listener_t *listener = NULL;
238  bool use_listener = false;
239  int argidx = 1;
240
241  /* poor man's getopt works even on Windows */
242  if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
243  {
244    argidx++;
245    use_listener = true;
246  }
247
248  /* Register handler for Ctrl-C */
249#ifdef _WIN32
250  DDSRT_WARNING_GNUC_OFF(cast-function-type)
251  SetConsoleCtrlHandler ((PHANDLER_ROUTINE)CtrlHandler, TRUE);
252  DDSRT_WARNING_GNUC_ON(cast-function-type)
253#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
254  struct sigaction sat, oldAction;
255  sat.sa_handler = CtrlHandler;
256  sigemptyset (&sat.sa_mask);
257  sat.sa_flags = 0;
258  sigaction (SIGINT, &sat, &oldAction);
259#endif
260
261  exampleInitTimeStats (&roundTrip);
262  exampleInitTimeStats (&writeAccess);
263  exampleInitTimeStats (&readAccess);
264  exampleInitTimeStats (&roundTripOverall);
265  exampleInitTimeStats (&writeAccessOverall);
266  exampleInitTimeStats (&readAccessOverall);
267
268  memset (&sub_data, 0, sizeof (sub_data));
269  memset (&pub_data, 0, sizeof (pub_data));
270
271  for (i = 0; i < MAX_SAMPLES; i++)
272  {
273    samples[i] = &sub_data[i];
274  }
275
276  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
277  if (participant < 0)
278    DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
279
280  if (use_listener)
281  {
282    listener = dds_create_listener(NULL);
283    dds_lset_data_available(listener, data_available);
284  }
285  prepare_dds(&writer, &reader, &readCond, listener);
286
287  if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0)
288  {
289    printf ("Sending termination request.\n");
290    fflush (stdout);
291    /* pong uses a waitset which is triggered by instance disposal, and
292      quits when it fires. */
293    dds_sleepfor (DDS_SECS (1));
294    pub_data.payload._length = 0;
295    pub_data.payload._buffer = NULL;
296    pub_data.payload._release = true;
297    pub_data.payload._maximum = 0;
298    status = dds_writedispose (writer, &pub_data);
299    if (status < 0)
300      DDS_FATAL("dds_writedispose: %s\n", dds_strretcode(-status));
301    dds_sleepfor (DDS_SECS (1));
302    goto done;
303  }
304
305  if (argc - argidx == 0)
306  {
307    invalidargs = true;
308  }
309  if (argc - argidx >= 1)
310  {
311    payloadSize = (uint32_t) atol (argv[argidx]);
312
313    if (payloadSize > 100 * 1048576)
314    {
315      invalidargs = true;
316    }
317  }
318  if (argc - argidx >= 2)
319  {
320    numSamples = (uint64_t) atol (argv[argidx+1]);
321  }
322  if (argc - argidx >= 3)
323  {
324    timeOut = atol (argv[argidx+2]);
325  }
326  if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0)))
327    usage();
328  printf ("# payloadSize: %" PRIu32 " | numSamples: %" PRIu64 " | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut);
329  fflush (stdout);
330
331  pub_data.payload._length = payloadSize;
332  pub_data.payload._buffer = payloadSize ? dds_alloc (payloadSize) : NULL;
333  pub_data.payload._release = true;
334  pub_data.payload._maximum = 0;
335  for (i = 0; i < payloadSize; i++)
336  {
337    pub_data.payload._buffer[i] = 'a';
338  }
339
340  startTime = dds_time ();
341  printf ("# Waiting for startup jitter to stabilise\n");
342  fflush (stdout);
343  /* Write a sample that pong can send back */
344  while (!dds_triggered (waitSet) && difference < DDS_SECS(5))
345  {
346    status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
347    if (status < 0)
348      DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
349
350    if (status > 0 && listener == NULL) /* data */
351    {
352      status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
353      if (status < 0)
354        DDS_FATAL("dds_take: %s\n", dds_strretcode(-status));
355    }
356
357    time = dds_time ();
358    difference = time - startTime;
359  }
360  if (!dds_triggered (waitSet))
361  {
362    warmUp = false;
363    printf("# Warm up complete.\n\n");
364    printf("# Latency measurements (in us)\n");
365    printf("#             Latency [us]                                   Write-access time [us]       Read-access time [us]\n");
366    printf("# Seconds     Count   median      min      99%%      max      Count   median      min      Count   median      min\n");
367    fflush (stdout);
368  }
369
370  exampleResetTimeStats (&roundTrip);
371  exampleResetTimeStats (&writeAccess);
372  exampleResetTimeStats (&readAccess);
373  startTime = dds_time ();
374  /* Write a sample that pong can send back */
375  preWriteTime = dds_time ();
376  status = dds_write_ts (writer, &pub_data, preWriteTime);
377  if (status < 0)
378    DDS_FATAL("dds_write_ts: %s\n", dds_strretcode(-status));
379  postWriteTime = dds_time ();
380  for (i = 0; !dds_triggered (waitSet) && (!numSamples || i < numSamples) && !(timeOut && elapsed >= timeOut); i++)
381  {
382    status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
383    if (status < 0)
384      DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
385    if (status != 0 && listener == NULL) {
386      data_available(reader, NULL);
387    }
388  }
389
390  if (!warmUp)
391  {
392    printf
393    (
394      "\n%9s %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
395      "# Overall",
396      roundTripOverall.count,
397      exampleGetMedianFromTimeStats (&roundTripOverall) / 2,
398      roundTripOverall.min / 2,
399      exampleGet99PercentileFromTimeStats (&roundTripOverall) / 2,
400      roundTripOverall.max / 2,
401      writeAccessOverall.count,
402      exampleGetMedianFromTimeStats (&writeAccessOverall),
403      writeAccessOverall.min,
404      readAccessOverall.count,
405      exampleGetMedianFromTimeStats (&readAccessOverall),
406      readAccessOverall.min
407    );
408    fflush (stdout);
409  }
410
411done:
412
413#ifdef _WIN32
414  SetConsoleCtrlHandler (0, FALSE);
415#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
416  sigaction (SIGINT, &oldAction, 0);
417#endif
418
419  finalize_dds(participant);
420
421  /* Clean up */
422  exampleDeleteTimeStats (&roundTrip);
423  exampleDeleteTimeStats (&writeAccess);
424  exampleDeleteTimeStats (&readAccess);
425  exampleDeleteTimeStats (&roundTripOverall);
426  exampleDeleteTimeStats (&writeAccessOverall);
427  exampleDeleteTimeStats (&readAccessOverall);
428
429  for (i = 0; i < MAX_SAMPLES; i++)
430  {
431    RoundTripModule_DataType_free (&sub_data[i], DDS_FREE_CONTENTS);
432  }
433  RoundTripModule_DataType_free (&pub_data, DDS_FREE_CONTENTS);
434
435  return EXIT_SUCCESS;
436}
437
438static dds_entity_t prepare_dds(dds_entity_t *wr, dds_entity_t *rd, dds_entity_t *rdcond, dds_listener_t *listener)
439{
440  dds_return_t status;
441  dds_entity_t topic;
442  dds_entity_t publisher;
443  dds_entity_t subscriber;
444
445  const char *pubPartitions[] = { "ping" };
446  const char *subPartitions[] = { "pong" };
447  dds_qos_t *pubQos;
448  dds_qos_t *subQos;
449  dds_qos_t *tQos;
450  dds_qos_t *wQos;
451
452  /* A DDS_Topic is created for our sample type on the domain participant. */
453  tQos = dds_create_qos ();
454  dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
455  topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", tQos, NULL);
456  if (topic < 0)
457    DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
458  dds_delete_qos (tQos);
459
460  /* A DDS_Publisher is created on the domain participant. */
461  pubQos = dds_create_qos ();
462  dds_qset_partition (pubQos, 1, pubPartitions);
463
464  publisher = dds_create_publisher (participant, pubQos, NULL);
465  if (publisher < 0)
466    DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
467  dds_delete_qos (pubQos);
468
469  /* A DDS_DataWriter is created on the Publisher & Topic with a modified Qos. */
470  wQos = dds_create_qos ();
471  dds_qset_writer_data_lifecycle (wQos, false);
472  *wr = dds_create_writer (publisher, topic, wQos, NULL);
473  if (*wr < 0)
474    DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*wr));
475  dds_delete_qos (wQos);
476
477  /* A DDS_Subscriber is created on the domain participant. */
478  subQos = dds_create_qos ();
479
480  dds_qset_partition (subQos, 1, subPartitions);
481
482  subscriber = dds_create_subscriber (participant, subQos, NULL);
483  if (subscriber < 0)
484    DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
485  dds_delete_qos (subQos);
486  /* A DDS_DataReader is created on the Subscriber & Topic with a modified QoS. */
487  *rd = dds_create_reader (subscriber, topic, NULL, listener);
488  if (*rd < 0)
489    DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*rd));
490
491  waitSet = dds_create_waitset (participant);
492  if (listener == NULL) {
493    *rdcond = dds_create_readcondition (*rd, DDS_ANY_STATE);
494    status = dds_waitset_attach (waitSet, *rdcond, *rd);
495    if (status < 0)
496      DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
497  } else {
498    *rdcond = 0;
499  }
500  status = dds_waitset_attach (waitSet, waitSet, waitSet);
501  if (status < 0)
502    DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
503
504  return participant;
505}
506
507static void finalize_dds(dds_entity_t ppant)
508{
509  dds_return_t status;
510  status = dds_delete (ppant);
511  if (status < 0)
512    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
513}
pong.c
  1#include "dds/dds.h"
  2#include "dds/ddsrt/misc.h"
  3#include "RoundTrip.h"
  4#include <stdio.h>
  5#include <stdlib.h>
  6#include <string.h>
  7#include <signal.h>
  8#include <stdlib.h>
  9
 10static dds_entity_t waitSet;
 11#define MAX_SAMPLES 10
 12
 13/* Forward declarations */
 14static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
 15static void finalize_dds(dds_entity_t participant, RoundTripModule_DataType data[MAX_SAMPLES]);
 16
 17#ifdef _WIN32
 18#include <Windows.h>
 19static bool CtrlHandler (DWORD fdwCtrlType)
 20{
 21  (void)fdwCtrlType;
 22  dds_waitset_set_trigger (waitSet, true);
 23  return true; //Don't let other handlers handle this key
 24}
 25#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 26static void CtrlHandler (int sig)
 27{
 28  (void)sig;
 29  dds_waitset_set_trigger (waitSet, true);
 30}
 31#endif
 32
 33static RoundTripModule_DataType data[MAX_SAMPLES];
 34static void * samples[MAX_SAMPLES];
 35static dds_sample_info_t info[MAX_SAMPLES];
 36
 37static dds_entity_t participant;
 38static dds_entity_t reader;
 39static dds_entity_t writer;
 40static dds_entity_t readCond;
 41
 42static void data_available(dds_entity_t rd, void *arg)
 43{
 44  int status, samplecount;
 45  (void)arg;
 46  samplecount = dds_take (rd, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 47  if (samplecount < 0)
 48    DDS_FATAL("dds_take: %s\n", dds_strretcode(-samplecount));
 49  for (int j = 0; !dds_triggered (waitSet) && j < samplecount; j++)
 50  {
 51    /* If writer has been disposed terminate pong */
 52
 53    if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED)
 54    {
 55      printf ("Received termination request. Terminating.\n");
 56      dds_waitset_set_trigger (waitSet, true);
 57      break;
 58    }
 59    else if (info[j].valid_data)
 60    {
 61      /* If sample is valid, send it back to ping */
 62      RoundTripModule_DataType * valid_sample = &data[j];
 63      status = dds_write_ts (writer, valid_sample, info[j].source_timestamp);
 64      if (status < 0)
 65        DDS_FATAL("dds_write_ts: %d\n", -status);
 66    }
 67  }
 68}
 69
 70int main (int argc, char *argv[])
 71{
 72  dds_duration_t waitTimeout = DDS_INFINITY;
 73  unsigned int i;
 74  int status;
 75  dds_attach_t wsresults[1];
 76  size_t wsresultsize = 1U;
 77
 78  dds_listener_t *listener = NULL;
 79  bool use_listener = false;
 80  int argidx = 1;
 81
 82  if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
 83  {
 84    argidx++;
 85    use_listener = true;
 86  }
 87
 88  /* Register handler for Ctrl-C */
 89
 90#ifdef _WIN32
 91  DDSRT_WARNING_GNUC_OFF(cast-function-type)
 92  SetConsoleCtrlHandler ((PHANDLER_ROUTINE)CtrlHandler, TRUE);
 93  DDSRT_WARNING_GNUC_ON(cast-function-type)
 94#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 95  struct sigaction sat, oldAction;
 96  sat.sa_handler = CtrlHandler;
 97  sigemptyset (&sat.sa_mask);
 98  sat.sa_flags = 0;
 99  sigaction (SIGINT, &sat, &oldAction);
100#endif
101
102  /* Initialize sample data */
103  memset (data, 0, sizeof (data));
104  for (i = 0; i < MAX_SAMPLES; i++)
105  {
106    samples[i] = &data[i];
107  }
108
109  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
110  if (participant < 0)
111    DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
112
113  if (use_listener)
114  {
115    listener = dds_create_listener(NULL);
116    dds_lset_data_available(listener, data_available);
117  }
118
119  (void)prepare_dds(&writer, &reader, &readCond, listener);
120
121  while (!dds_triggered (waitSet))
122  {
123    /* Wait for a sample from ping */
124
125    status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
126    if (status < 0)
127      DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
128
129    /* Take samples */
130    if (listener == NULL) {
131      data_available (reader, 0);
132    }
133  }
134
135#ifdef _WIN32
136  SetConsoleCtrlHandler (0, FALSE);
137#elif !DDSRT_WITH_FREERTOS && !__ZEPHYR__
138  sigaction (SIGINT, &oldAction, 0);
139#endif
140
141  /* Clean up */
142  finalize_dds(participant, data);
143
144  return EXIT_SUCCESS;
145}
146
147static void finalize_dds(dds_entity_t pp, RoundTripModule_DataType xs[MAX_SAMPLES])
148{
149  dds_return_t status;
150  status = dds_delete (pp);
151  if (status < 0)
152    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
153  for (unsigned int i = 0; i < MAX_SAMPLES; i++)
154  {
155    RoundTripModule_DataType_free (&xs[i], DDS_FREE_CONTENTS);
156  }
157}
158
159static dds_entity_t prepare_dds(dds_entity_t *wr, dds_entity_t *rd, dds_entity_t *rdcond, dds_listener_t *rdlist)
160{
161  const char *pubPartitions[] = { "pong" };
162  const char *subPartitions[] = { "ping" };
163  dds_qos_t *qos;
164  dds_entity_t subscriber;
165  dds_entity_t publisher;
166  dds_entity_t topic;
167  dds_return_t status;
168
169  /* A DDS Topic is created for our sample type on the domain participant. */
170
171  qos = dds_create_qos ();
172  dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
173  topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", qos, NULL);
174  if (topic < 0)
175    DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
176  dds_delete_qos (qos);
177
178  /* A DDS Publisher is created on the domain participant. */
179
180  qos = dds_create_qos ();
181  dds_qset_partition (qos, 1, pubPartitions);
182
183  publisher = dds_create_publisher (participant, qos, NULL);
184  if (publisher < 0)
185    DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
186  dds_delete_qos (qos);
187
188  /* A DDS DataWriter is created on the Publisher & Topic with a modififed Qos. */
189
190  qos = dds_create_qos ();
191  dds_qset_writer_data_lifecycle (qos, false);
192  *wr = dds_create_writer (publisher, topic, qos, NULL);
193  if (*wr < 0)
194    DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*wr));
195  dds_delete_qos (qos);
196
197  /* A DDS Subscriber is created on the domain participant. */
198
199  qos = dds_create_qos ();
200  dds_qset_partition (qos, 1, subPartitions);
201
202  subscriber = dds_create_subscriber (participant, qos, NULL);
203  if (subscriber < 0)
204    DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
205  dds_delete_qos (qos);
206
207  /* A DDS DataReader is created on the Subscriber & Topic with a modified QoS. */
208
209  *rd = dds_create_reader (subscriber, topic, NULL, rdlist);
210  if (*rd < 0)
211    DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*rd));
212
213  waitSet = dds_create_waitset (participant);
214  if (rdlist == NULL) {
215    *rdcond = dds_create_readcondition (*rd, DDS_ANY_STATE);
216    status = dds_waitset_attach (waitSet, *rdcond, *rd);
217    if (status < 0)
218      DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
219  } else {
220    *rdcond = 0;
221  }
222  status = dds_waitset_attach (waitSet, waitSet, waitSet);
223  if (status < 0)
224    DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
225
226  printf ("Waiting for samples from ping to send back...\n");
227  fflush (stdout);
228
229  return participant;
230}