Throughput

The Throughput example allows the measurement of data throughput when receiving samples from a publisher.

Design

The Throughput example consists of two units:

  • Publisher: Sends samples at a specified size and rate.

  • Subscriber: Receives samples and outputs statistics about throughput

Scenario

The publisher sends the samples. You can specify a payload size in bytes, and whether to send data in bursts. Unless a time-out is specified, the publisher sends data forever.

Configuration:

  • payloadSize: The size of the payload in bytes.

  • burstInterval: The time interval between each burst in ms.

  • burstSize: The number of samples to send each burst.

  • timeOut: The number of seconds the publisher should run for (0=infinite).

  • partitionName: The name of the partition.

The subscriber receives the data and outputs:

  • The total amount of data received.

  • The data-rate in bytes-per-second.

  • Whether any samples were received out-of-order.

You can specify the maximum number of cycles. When the maximum is reached, the subscriber terminates and outputs the totals and averages.

The subscriber executable measures:

  • transferred: the total amount of data transferred in bytes.

  • outOfOrder: the number of samples that were received out of order.

  • The transfer rate: the data transfer rate in bytes per second.

  • The subscriber also calculates statistics on these values over a configurable number of cycles.

Configuration:

  • maxCycles: The number of times to output statistics before terminating.

  • pollingDelay

  • partitionName: The name of the partition.

Running the example

To avoid mixing the output, run the publisher and subscriber in separate terminals.

  1. Open 2 terminals.

  2. In the first terminal start publisher by running ThroughputPublisher.

    Publisher usage (parameters must be supplied in order):

    ./ThroughputPublisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]

    Defaults:

    ./ThroughputPublisher 8192 0 1 0 "Throughput example"

  3. In the second terminal start subscriber by running ThroughputSubscriber.

    Subscriber usage (parameters must be supplied in order):

    ./ThroughputSubscriber [maxCycles (0=infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]

    Defaults:

    ./ThroughputSubscriber 0 0 "Throughput example"

  4. To achieve optimal performance, set the CPU affinity so that the publisher and subscriber run on separate CPU cores and use real-time scheduling:

Publisher usage:
  ``taskset -c 0 chrt -f 80 ./ThroughputSubscriber [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
Subscriber usage:
  ``taskset -c 1 chrt -f 80 ./ThroughputSubscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``

Source code

Throughput.idl
1module ThroughputModule
2{
3  @final
4  struct DataType
5  {
6    unsigned long long count;
7    sequence<octet> payload;
8  };
9};
publisher.c
  1#include "dds/dds.h"
  2#include "Throughput.h"
  3#include <stdio.h>
  4#include <stdlib.h>
  5#include <string.h>
  6#include <signal.h>
  7
  8/*
  9 * The Throughput example measures data throughput in bytes per second. The publisher
 10 * allows you to specify a payload size in bytes as well as allowing you to specify
 11 * whether to send data in bursts. The publisher will continue to send data forever
 12 * unless a time out is specified. The subscriber will receive data and output the
 13 * total amount received and the data rate in bytes per second. It will also indicate
 14 * if any samples were received out of order. A maximum number of cycles can be
 15 * specified and once this has been reached the subscriber will terminate and output
 16 * totals and averages.
 17 */
 18
 19#define MAX_SAMPLES 100
 20
 21static bool done = false;
 22
 23/* Forward declarations */
 24static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant);
 25static void start_writing(dds_entity_t writer, ThroughputModule_DataType *sample,
 26    int burstInterval, uint32_t burstSize, int timeOut);
 27static int parse_args(int argc, char **argv, uint32_t *payloadSize, int *burstInterval,
 28    uint32_t *burstSize, int *timeOut, char **partitionName);
 29static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName);
 30static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample);
 31
 32#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 33static void sigint (int sig)
 34{
 35  (void)sig;
 36  done = true;
 37}
 38#endif
 39
 40int main (int argc, char **argv)
 41{
 42  uint32_t payloadSize = 8192;
 43  int burstInterval = 0;
 44  uint32_t burstSize = 1;
 45  int timeOut = 0;
 46  char * partitionName = "Throughput example";
 47  dds_entity_t participant;
 48  dds_entity_t writer;
 49  dds_return_t rc;
 50  ThroughputModule_DataType sample;
 51
 52  if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
 53    return EXIT_FAILURE;
 54  }
 55
 56  participant = prepare_dds(&writer, partitionName);
 57
 58  /* Wait until have a reader */
 59  if (wait_for_reader(writer, participant) == 0) {
 60    printf ("=== [Publisher]  Did not discover a reader.\n");
 61    fflush (stdout);
 62    rc = dds_delete (participant);
 63    if (rc < 0)
 64      DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
 65    return EXIT_FAILURE;
 66  }
 67
 68  /* Fill the sample payload with data */
 69  sample.count = 0;
 70  sample.payload._buffer = dds_alloc (payloadSize);
 71  sample.payload._length = payloadSize;
 72  sample.payload._release = true;
 73  for (uint32_t i = 0; i < payloadSize; i++) {
 74    sample.payload._buffer[i] = 'a';
 75  }
 76
 77  /* Register handler for Ctrl-C */
 78#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 79  signal (SIGINT, sigint);
 80#endif
 81
 82  /* Register the sample instance and write samples repeatedly or until time out */
 83  start_writing(writer, &sample, burstInterval, burstSize, timeOut);
 84
 85  /* Cleanup */
 86  finalize_dds(participant, writer, sample);
 87  return EXIT_SUCCESS;
 88}
 89
 90static int parse_args(
 91    int argc,
 92    char **argv,
 93    uint32_t *payloadSize,
 94    int *burstInterval,
 95    uint32_t *burstSize,
 96    int *timeOut,
 97    char **partitionName)
 98{
 99  int result = EXIT_SUCCESS;
100  /*
101   * Get the program parameters
102   * Parameters: publisher [payloadSize] [burstInterval] [burstSize] [timeOut] [partitionName]
103   */
104  if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
105  {
106    printf ("Usage (parameters must be supplied in order):\n");
107    printf ("./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]\n");
108    printf ("Defaults:\n");
109    printf ("./publisher 8192 0 1 0 \"Throughput example\"\n");
110    return EXIT_FAILURE;
111  }
112  if (argc > 1)
113  {
114    *payloadSize = (uint32_t) atoi (argv[1]); /* The size of the payload in bytes */
115  }
116  if (argc > 2)
117  {
118    *burstInterval = atoi (argv[2]); /* The time interval between each burst in ms */
119  }
120  if (argc > 3)
121  {
122    *burstSize = (uint32_t) atoi (argv[3]); /* The number of samples to send each burst */
123  }
124  if (argc > 4)
125  {
126    *timeOut = atoi (argv[4]); /* The number of seconds the publisher should run for (0 = infinite) */
127  }
128  if (argc > 5)
129  {
130    *partitionName = argv[5]; /* The name of the partition */
131  }
132
133  printf ("payloadSize: %"PRIu32" bytes burstInterval: %d ms burstSize: %"PRIu32" timeOut: %d seconds partitionName: %s\n",
134    *payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
135  fflush (stdout);
136
137  return result;
138}
139
140static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName)
141{
142  dds_entity_t participant;
143  dds_entity_t topic;
144  dds_entity_t publisher;
145  const char *pubParts[1];
146  dds_qos_t *pubQos;
147  dds_qos_t *wrQos;
148  dds_qos_t *tQos;
149
150  /* A domain participant is created for the default domain. */
151  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
152  if (participant < 0)
153    DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
154
155  /* A topic is created for our sample type on the domain participant. */
156  tQos = dds_create_qos ();
157  dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
158  dds_qset_history (tQos, DDS_HISTORY_KEEP_ALL, 0);
159  dds_qset_resource_limits (tQos, MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
160  topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", tQos, NULL);
161  if (topic < 0)
162    DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
163  dds_delete_qos (tQos);
164
165  /* A publisher is created on the domain participant. */
166  pubQos = dds_create_qos ();
167  pubParts[0] = partitionName;
168  dds_qset_partition (pubQos, 1, pubParts);
169  publisher = dds_create_publisher (participant, pubQos, NULL);
170  if (publisher < 0)
171    DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
172  dds_delete_qos (pubQos);
173
174  /* A DataWriter is created on the publisher. */
175  wrQos = dds_create_qos ();
176  dds_qset_writer_batching (wrQos, true);
177  *writer = dds_create_writer (publisher, topic, wrQos, NULL);
178  if (*writer < 0)
179    DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*writer));
180  dds_delete_qos (wrQos);
181
182  return participant;
183}
184
185static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
186{
187  printf ("\n=== [Publisher]  Waiting for a reader ...\n");
188  fflush (stdout);
189
190  dds_return_t rc;
191  dds_entity_t waitset;
192
193  rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
194  if (rc < 0)
195    DDS_FATAL("dds_set_status_mask: %s\n", dds_strretcode(-rc));
196
197  waitset = dds_create_waitset(participant);
198  if (waitset < 0)
199    DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitset));
200
201  rc = dds_waitset_attach(waitset, writer, (dds_attach_t)NULL);
202  if (rc < 0)
203    DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-rc));
204
205  rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(30));
206  if (rc < 0)
207    DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-rc));
208
209  return rc;
210}
211
212static void start_writing(
213    dds_entity_t writer,
214    ThroughputModule_DataType *sample,
215    int burstInterval,
216    uint32_t burstSize,
217    int timeOut)
218{
219  bool timedOut = false;
220  dds_time_t pubStart = dds_time ();
221  dds_time_t now;
222  dds_time_t deltaTv;
223  dds_return_t status;
224
225  if (!done)
226  {
227    dds_time_t burstStart = pubStart;
228    unsigned int burstCount = 0;
229
230    printf ("=== [Publisher]  Writing samples...\n");
231    fflush (stdout);
232
233    while (!done && !timedOut)
234    {
235      /* Write data until burst size has been reached */
236
237      if (burstCount < burstSize)
238      {
239        status = dds_write (writer, sample);
240        if (status == DDS_RETCODE_TIMEOUT)
241        {
242          timedOut = true;
243        }
244        else if (status < 0)
245        {
246          DDS_FATAL("dds_write: %s\n", dds_strretcode(-status));
247        }
248        else
249        {
250          sample->count++;
251          burstCount++;
252        }
253      }
254      else if (burstInterval)
255      {
256        /* Sleep until burst interval has passed */
257
258        dds_time_t time = dds_time ();
259        deltaTv = time - burstStart;
260        if (deltaTv < DDS_MSECS (burstInterval))
261        {
262          dds_write_flush (writer);
263          dds_sleepfor (DDS_MSECS (burstInterval) - deltaTv);
264        }
265        burstStart = dds_time ();
266        burstCount = 0;
267      }
268      else
269      {
270        burstCount = 0;
271      }
272
273      if (timeOut)
274      {
275        now = dds_time ();
276        deltaTv = now - pubStart;
277        if ((deltaTv) > DDS_SECS (timeOut))
278        {
279          timedOut = true;
280        }
281      }
282    }
283    dds_write_flush (writer);
284
285    printf ("=== [Publisher]  %s, %llu samples written.\n", done ? "Terminated" : "Timed out", (unsigned long long) sample->count);
286    fflush (stdout);
287  }
288}
289
290static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample)
291{
292  dds_return_t status = dds_dispose (writer, &sample);
293  if (status != DDS_RETCODE_TIMEOUT && status < 0)
294    DDS_FATAL("dds_dispose: %s\n", dds_strretcode(-status));
295
296  dds_free (sample.payload._buffer);
297  status = dds_delete (participant);
298  if (status < 0)
299    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
300}
subscriber.c
  1#include "dds/dds.h"
  2#include "Throughput.h"
  3#include <stdio.h>
  4#include <stdlib.h>
  5#include <string.h>
  6#include <signal.h>
  7#include <assert.h>
  8/*
  9 * The Throughput example measures data throughput in bytes per second. The publisher
 10 * allows you to specify a payload size in bytes as well as allowing you to specify
 11 * whether to send data in bursts. The publisher will continue to send data forever
 12 * unless a time out is specified. The subscriber will receive data and output the
 13 * total amount received and the data rate in bytes per second. It will also indicate
 14 * if any samples were received out of order. A maximum number of cycles can be
 15 * specified and once this has been reached the subscriber will terminate and output
 16 * totals and averages.
 17 */
 18
 19#define BYTES_PER_SEC_TO_MEGABITS_PER_SEC 125000
 20#define MAX_SAMPLES 1000
 21
 22typedef struct HandleEntry
 23{
 24  dds_instance_handle_t handle;
 25  unsigned long long count;
 26  struct HandleEntry * next;
 27} HandleEntry;
 28
 29typedef struct HandleMap
 30{
 31  HandleEntry *entries;
 32} HandleMap;
 33
 34static long pollingDelay = -1; /* i.e. use a listener */
 35
 36static HandleMap * imap;
 37static unsigned long long outOfOrder = 0;
 38static unsigned long long total_bytes = 0;
 39static unsigned long long total_samples = 0;
 40
 41static dds_time_t startTime = 0;
 42
 43static unsigned long payloadSize = 0;
 44
 45static ThroughputModule_DataType data [MAX_SAMPLES];
 46static void * samples[MAX_SAMPLES];
 47static dds_sample_info_t info[MAX_SAMPLES];
 48
 49static dds_entity_t waitSet;
 50
 51#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 52static volatile sig_atomic_t done = false;
 53#else
 54static bool done = false;
 55#endif
 56
 57/* Forward declarations */
 58static HandleMap * HandleMap__alloc (void);
 59static void HandleMap__free (HandleMap *map);
 60static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key);
 61static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key);
 62
 63static void data_available_handler (dds_entity_t reader, void *arg);
 64static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName);
 65static void process_samples(dds_entity_t reader, unsigned long long maxCycles);
 66static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
 67static void finalize_dds(dds_entity_t participant);
 68
 69#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
 70static void sigint (int sig)
 71{
 72  (void) sig;
 73  done = true;
 74}
 75#endif
 76
 77int main (int argc, char **argv)
 78{
 79  unsigned long long maxCycles = 0;
 80  char *partitionName = "Throughput example";
 81
 82  dds_entity_t participant;
 83  dds_entity_t reader;
 84
 85  if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
 86  {
 87    return EXIT_FAILURE;
 88  }
 89
 90  printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
 91  fflush (stdout);
 92
 93  participant = prepare_dds(&reader, partitionName);
 94
 95  printf ("=== [Subscriber] Waiting for samples...\n");
 96  fflush (stdout);
 97
 98  /* Process samples until Ctrl-C is pressed or until maxCycles */
 99  /* has been reached (0 = infinite) */
100#if !DDSRT_WITH_FREERTOS && !__ZEPHYR__
101  signal (SIGINT, sigint);
102#endif
103  process_samples(reader, maxCycles);
104
105  (void) dds_set_status_mask (reader, 0);
106  HandleMap__free (imap);
107  finalize_dds (participant);
108  return EXIT_SUCCESS;
109}
110
111/*
112 * This struct contains all of the entities used in the publisher and subscriber.
113 */
114static HandleMap * HandleMap__alloc (void)
115{
116  HandleMap * map = malloc (sizeof (*map));
117  assert(map);
118  memset (map, 0, sizeof (*map));
119  return map;
120}
121
122static void HandleMap__free (HandleMap *map)
123{
124  HandleEntry * entry;
125
126  while (map->entries)
127  {
128    entry = map->entries;
129    map->entries = entry->next;
130    free (entry);
131  }
132  free (map);
133}
134
135static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key)
136{
137  HandleEntry * entry = malloc (sizeof (*entry));
138  assert(entry);
139  memset (entry, 0, sizeof (*entry));
140
141  entry->handle = key;
142  entry->next = map->entries;
143  map->entries = entry;
144
145  return entry;
146}
147
148static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
149{
150  HandleEntry * entry = map->entries;
151
152  while (entry)
153  {
154    if (entry->handle == key)
155    {
156      break;
157    }
158    entry = entry->next;
159  }
160  return entry;
161}
162
163static int do_take (dds_entity_t reader)
164{
165  int samples_received;
166  dds_instance_handle_t ph = 0;
167  HandleEntry * current = NULL;
168
169  if (startTime == 0)
170  {
171    startTime = dds_time ();
172  }
173
174  /* Take samples and iterate through them */
175
176  samples_received = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
177  if (samples_received < 0)
178  {
179    DDS_FATAL("dds_take: %s\n", dds_strretcode(-samples_received));
180  }
181
182  for (int i = 0; !done && i < samples_received; i++)
183  {
184    if (info[i].valid_data)
185    {
186      ph = info[i].publication_handle;
187      current = retrieve_handle (imap, ph);
188      ThroughputModule_DataType * this_sample = &data[i];
189
190      if (current == NULL)
191      {
192        current = store_handle (imap, ph);
193        current->count = this_sample->count;
194      }
195
196      if (this_sample->count != current->count)
197      {
198        outOfOrder++;
199      }
200      current->count = this_sample->count + 1;
201
202      /* Add the sample payload size to the total received */
203
204      payloadSize = this_sample->payload._length;
205      total_bytes += payloadSize + 8;
206      total_samples++;
207    }
208  }
209  return samples_received;
210}
211
212static void data_available_handler (dds_entity_t reader, void *arg)
213{
214  (void)arg;
215  (void) do_take (reader);
216}
217
218static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName)
219{
220  /*
221   * Get the program parameters
222   * Parameters: subscriber [maxCycles] [pollingDelay] [partitionName]
223   */
224  if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
225  {
226    printf ("Usage (parameters must be supplied in order):\n");
227    printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = waitset, -1 = listener)] [partitionName]\n");
228    printf ("Defaults:\n");
229    printf ("./subscriber 0 0 \"Throughput example\"\n");
230    return EXIT_FAILURE;
231  }
232
233  if (argc > 1)
234  {
235    *maxCycles = (unsigned long long) atoi (argv[1]); /* The number of times to output statistics before terminating */
236  }
237  if (argc > 2)
238  {
239    pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = waitset, -1 = listener) */
240  }
241  if (argc > 3)
242  {
243    *partitionName = argv[3]; /* The name of the partition */
244  }
245  return EXIT_SUCCESS;
246}
247
248static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
249{
250  dds_return_t status;
251  unsigned long long prev_bytes = 0;
252  unsigned long long prev_samples = 0;
253  dds_attach_t wsresults[2];
254  dds_time_t deltaTv;
255  bool first_batch = true;
256  unsigned long cycles = 0;
257  double deltaTime = 0;
258  dds_time_t prev_time = 0;
259  dds_time_t time_now = 0;
260
261  while (!done && (maxCycles == 0 || cycles < maxCycles))
262  {
263    if (pollingDelay > 0)
264      dds_sleepfor (DDS_MSECS (pollingDelay));
265    else
266    {
267      status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_MSECS(100));
268      if (status < 0)
269        DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
270    }
271
272    if (pollingDelay >= 0)
273    {
274      while (do_take (reader))
275        ;
276    }
277
278    time_now = dds_time();
279    if (!first_batch)
280    {
281      deltaTv = time_now - prev_time;
282      deltaTime = (double) deltaTv / DDS_NSECS_IN_SEC;
283
284      if (deltaTime >= 1.0 && total_samples != prev_samples)
285      {
286        printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
287                "Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
288                deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
289                (deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
290                (deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
291        fflush (stdout);
292        cycles++;
293        prev_time = time_now;
294        prev_bytes = total_bytes;
295        prev_samples = total_samples;
296      }
297    }
298    else
299    {
300      prev_time = time_now;
301      first_batch = false;
302    }
303  }
304
305  /* Output totals and averages */
306  deltaTv = time_now - startTime;
307  deltaTime = (double) (deltaTv / DDS_NSECS_IN_SEC);
308  printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes);
309  printf ("Out of order: %llu samples\n", outOfOrder);
310  printf ("Average transfer rate: %.2lf samples/s, ", (double)total_samples / deltaTime);
311  printf ("%.2lf Mbit/s\n", (double)(total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
312  fflush (stdout);
313}
314
315static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
316{
317  dds_return_t status;
318  dds_entity_t topic;
319  dds_entity_t subscriber;
320  dds_listener_t *rd_listener;
321  dds_entity_t participant;
322
323  int32_t maxSamples = 4000;
324  const char *subParts[1];
325  dds_qos_t *subQos = dds_create_qos ();
326  dds_qos_t *tQos = dds_create_qos ();
327
328  /* A Participant is created for the default domain. */
329
330  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
331  if (participant < 0)
332    DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
333
334  /* A Topic is created for our sample type on the domain participant. */
335
336  dds_qset_reliability (tQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
337  dds_qset_history (tQos, DDS_HISTORY_KEEP_ALL, 0);
338  dds_qset_resource_limits (tQos, maxSamples, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
339  topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", tQos, NULL);
340  if (topic < 0)
341    DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
342
343  /* A Subscriber is created on the domain participant. */
344
345  subParts[0] = partitionName;
346  dds_qset_partition (subQos, 1, subParts);
347  subscriber = dds_create_subscriber (participant, subQos, NULL);
348  if (subscriber < 0)
349    DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
350  dds_delete_qos (subQos);
351
352  /* A Listener is created which is triggered when data is available to read */
353
354  rd_listener = dds_create_listener(NULL);
355  dds_lset_data_available(rd_listener, data_available_handler);
356
357  /* A Waitset is created which is triggered when data is available to read */
358
359  waitSet = dds_create_waitset (participant);
360  if (waitSet < 0)
361    DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitSet));
362
363  status = dds_waitset_attach (waitSet, waitSet, waitSet);
364  if (status < 0)
365    DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
366
367  imap = HandleMap__alloc ();
368
369  memset (data, 0, sizeof (data));
370  for (unsigned int i = 0; i < MAX_SAMPLES; i++)
371  {
372    samples[i] = &data[i];
373  }
374
375  /* A Reader is created on the Subscriber & Topic and attached to Waitset */
376
377  *reader = dds_create_reader (subscriber, topic, NULL, pollingDelay < 0 ? rd_listener : NULL);
378  if (*reader < 0)
379    DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*reader));
380
381  if (pollingDelay == 0)
382  {
383    status = dds_waitset_attach (waitSet, *reader, *reader);
384    if (status < 0)
385      DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
386  }
387
388  dds_delete_qos (tQos);
389  dds_delete_listener(rd_listener);
390
391  return participant;
392}
393
394static void finalize_dds(dds_entity_t participant)
395{
396  dds_return_t status;
397
398  for (unsigned int i = 0; i < MAX_SAMPLES; i++)
399  {
400    ThroughputModule_DataType_free (&data[i], DDS_FREE_CONTENTS);
401  }
402
403  status = dds_waitset_detach (waitSet, waitSet);
404  if (status < 0)
405    DDS_FATAL("dds_waitset_detach: %s\n", dds_strretcode(-status));
406  status = dds_delete (waitSet);
407  if (status < 0)
408    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
409  status = dds_delete (participant);
410  if (status < 0)
411    DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
412}

Note

There is also a Shared Memory variant of this example available.