aboutsummaryrefslogtreecommitdiff
path: root/src/fragmentation/defragmentation.c
blob: aabf9a6219665ef3b5fb79d78bfd08def2b13ca8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
/*
     This file is part of GNUnet
     Copyright (C) 2009, 2011 GNUnet e.V.

     GNUnet is free software: you can redistribute it and/or modify it
     under the terms of the GNU General Public License as published
     by the Free Software Foundation, either version 3 of the License,
     or (at your option) any later version.

     GNUnet is distributed in the hope that it will be useful, but
     WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     Affero General Public License for more details.
*/
/**
 * @file src/fragmentation/defragmentation.c
 * @brief library to help defragment messages
 * @author Christian Grothoff
 */
#include "platform.h"
#include "gnunet_fragmentation_lib.h"
#include "fragmentation.h"

/**
 * Timestamps for fragments.
 */
struct FragTimes
{
  /**
   * The time the fragment was received.
   */
  struct GNUNET_TIME_Absolute time;

  /**
   * Number of the bit for the fragment (in [0,..,63]).
   */
  unsigned int bit;
};


/**
 * Information we keep for one message that is being assembled.  Note
 * that we keep the context around even after the assembly is done to
 * handle 'stray' messages that are received 'late'.  A message
 * context is ONLY discarded when the queue gets too big.
 */
struct MessageContext
{
  /**
   * This is a DLL.
   */
  struct MessageContext *next;

  /**
   * This is a DLL.
   */
  struct MessageContext *prev;

  /**
   * Associated defragmentation context.
   */
  struct GNUNET_DEFRAGMENT_Context *dc;

  /**
   * Pointer to the assembled message, allocated at the
   * end of this struct.
   */
  const struct GNUNET_MessageHeader *msg;

  /**
   * Last time we received any update for this message
   * (least-recently updated message will be discarded
   * if we hit the queue size).
   */
  struct GNUNET_TIME_Absolute last_update;

  /**
   * Task scheduled for transmitting the next ACK to the
   * other peer.
   */
  struct GNUNET_SCHEDULER_Task * ack_task;

  /**
   * When did we receive which fragment? Used to calculate
   * the time we should send the ACK.
   */
  struct FragTimes frag_times[64];

  /**
   * Which fragments have we gotten yet? bits that are 1
   * indicate missing fragments.
   */
  uint64_t bits;

  /**
   * Unique ID for this message.
   */
  uint32_t fragment_id;

  /**
   * Which 'bit' did the last fragment we received correspond to?
   */
  unsigned int last_bit;

  /**
   * For the current ACK round, which is the first relevant
   * offset in @e frag_times?
   */
  unsigned int frag_times_start_offset;

  /**
   * Which offset whould we write the next frag value into
   * in the @e frag_times array? All smaller entries are valid.
   */
  unsigned int frag_times_write_offset;

  /**
   * Total size of the message that we are assembling.
   */
  uint16_t total_size;

  /**
   * Was the last fragment we got a duplicate?
   */
  int16_t last_duplicate;

};


/**
 * Defragmentation context (one per connection).
 */
struct GNUNET_DEFRAGMENT_Context
{

  /**
   * For statistics.
   */
  struct GNUNET_STATISTICS_Handle *stats;

  /**
   * Head of list of messages we're defragmenting.
   */
  struct MessageContext *head;

  /**
   * Tail of list of messages we're defragmenting.
   */
  struct MessageContext *tail;

  /**
   * Closure for @e proc and @e ackp.
   */
  void *cls;

  /**
   * Function to call with defragmented messages.
   */
  GNUNET_FRAGMENT_MessageProcessor proc;

  /**
   * Function to call with acknowledgements.
   */
  GNUNET_DEFRAGMENT_AckProcessor ackp;

  /**
   * Running average of the latency (delay between messages) for this
   * connection.
   */
  struct GNUNET_TIME_Relative latency;

  /**
   * num_msgs how many fragmented messages
   * to we defragment at most at the same time?
   */
  unsigned int num_msgs;

  /**
   * Current number of messages in the 'struct MessageContext'
   * DLL (smaller or equal to 'num_msgs').
   */
  unsigned int list_size;

  /**
   * Maximum message size for each fragment.
   */
  uint16_t mtu;

};


/**
 * Create a defragmentation context.
 *
 * @param stats statistics context
 * @param mtu the maximum message size for each fragment
 * @param num_msgs how many fragmented messages
 *                 to we defragment at most at the same time?
 * @param cls closure for @a proc and @a ackp
 * @param proc function to call with defragmented messages
 * @param ackp function to call with acknowledgements (to send
 *             back to the other side)
 * @return the defragmentation context
 */
struct GNUNET_DEFRAGMENT_Context *
GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
                                  uint16_t mtu, unsigned int num_msgs,
                                  void *cls,
                                  GNUNET_FRAGMENT_MessageProcessor proc,
                                  GNUNET_DEFRAGMENT_AckProcessor ackp)
{
  struct GNUNET_DEFRAGMENT_Context *dc;

  dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
  dc->stats = stats;
  dc->cls = cls;
  dc->proc = proc;
  dc->ackp = ackp;
  dc->num_msgs = num_msgs;
  dc->mtu = mtu;
  dc->latency = GNUNET_TIME_UNIT_SECONDS;       /* start with likely overestimate */
  return dc;
}


/**
 * Destroy the given defragmentation context.
 *
 * @param dc defragmentation context
 */
void
GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
{
  struct MessageContext *mc;

  while (NULL != (mc = dc->head))
  {
    GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
    dc->list_size--;
    if (NULL != mc->ack_task)
    {
      GNUNET_SCHEDULER_cancel (mc->ack_task);
      mc->ack_task = NULL;
    }
    GNUNET_free (mc);
  }
  GNUNET_assert (0 == dc->list_size);
  GNUNET_free (dc);
}


/**
 * Send acknowledgement to the other peer now.
 *
 * @param cls the message context
 */
static void
send_ack (void *cls)
{
  struct MessageContext *mc = cls;
  struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
  struct FragmentAcknowledgement fa;

  mc->ack_task = NULL;
  fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
  fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
  fa.fragment_id = htonl (mc->fragment_id);
  fa.bits = GNUNET_htonll (mc->bits);
  GNUNET_STATISTICS_update (mc->dc->stats,
                            _("# acknowledgements sent for fragment"),
                            1,
                            GNUNET_NO);
  mc->last_duplicate = GNUNET_NO; /* clear flag */
  dc->ackp (dc->cls,
            mc->fragment_id,
            &fa.header);
}


/**
 * This function is from the GNU Scientific Library, linear/fit.c,
 * Copyright (C) 2000 Brian Gough
 */
static void
gsl_fit_mul (const double *x, const size_t xstride, const double *y,
             const size_t ystride, const size_t n, double *c1, double *cov_11,
             double *sumsq)
{
  double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;

  size_t i;

  for (i = 0; i < n; i++)
  {
    m_x += (x[i * xstride] - m_x) / (i + 1.0);
    m_y += (y[i * ystride] - m_y) / (i + 1.0);
  }

  for (i = 0; i < n; i++)
  {
    const double dx = x[i * xstride] - m_x;
    const double dy = y[i * ystride] - m_y;

    m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
    m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
  }

  /* In terms of y =  b x */

  {
    double s2 = 0, d2 = 0;
    double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);

    *c1 = b;

    /* Compute chi^2 = \sum (y_i -  b * x_i)^2 */

    for (i = 0; i < n; i++)
    {
      const double dx = x[i * xstride] - m_x;
      const double dy = y[i * ystride] - m_y;
      const double d = (m_y - b * m_x) + dy - b * dx;

      d2 += d * d;
    }

    s2 = d2 / (n - 1.0);        /* chisq per degree of freedom */

    *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));

    *sumsq = d2;
  }
}


/**
 * Estimate the latency between messages based on the most recent
 * message time stamps.
 *
 * @param mc context with time stamps
 * @return average delay between time stamps (based on least-squares fit)
 */
static struct GNUNET_TIME_Relative
estimate_latency (struct MessageContext *mc)
{
  struct FragTimes *first;
  size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
  double x[total];
  double y[total];
  size_t i;
  double c1;
  double cov11;
  double sumsq;
  struct GNUNET_TIME_Relative ret;

  first = &mc->frag_times[mc->frag_times_start_offset];
  GNUNET_assert (total > 1);
  for (i = 0; i < total; i++)
  {
    x[i] = (double) i;
    y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
  }
  gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
  c1 += sqrt (sumsq);           /* add 1 std dev */
  ret.rel_value_us = (uint64_t) c1;
  if (0 == ret.rel_value_us)
    ret = GNUNET_TIME_UNIT_MICROSECONDS;        /* always at least 1 */
  return ret;
}


/**
 * Discard the message context that was inactive for the longest time.
 *
 * @param dc defragmentation context
 */
static void
discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
{
  struct MessageContext *old;
  struct MessageContext *pos;

  old = NULL;
  pos = dc->head;
  while (NULL != pos)
  {
    if ((old == NULL) ||
        (old->last_update.abs_value_us > pos->last_update.abs_value_us))
      old = pos;
    pos = pos->next;
  }
  GNUNET_assert (NULL != old);
  GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
  dc->list_size--;
  if (NULL != old->ack_task)
  {
    GNUNET_SCHEDULER_cancel (old->ack_task);
    old->ack_task = NULL;
  }
  GNUNET_free (old);
}


/**
 * We have received a fragment.  Process it.
 *
 * @param dc the context
 * @param msg the message that was received
 * @return #GNUNET_OK on success,
 *         #GNUNET_NO if this was a duplicate,
 *         #GNUNET_SYSERR on error
 */
int
GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
                                    const struct GNUNET_MessageHeader *msg)
{
  struct MessageContext *mc;
  const struct FragmentHeader *fh;
  uint16_t msize;
  uint16_t foff;
  uint32_t fid;
  char *mbuf;
  unsigned int bit;
  struct GNUNET_TIME_Absolute now;
  struct GNUNET_TIME_Relative delay;
  unsigned int bc;
  unsigned int b;
  unsigned int n;
  unsigned int num_fragments;
  int duplicate;
  int last;

  if (ntohs (msg->size) < sizeof (struct FragmentHeader))
  {
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  if (ntohs (msg->size) > dc->mtu)
  {
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  fh = (const struct FragmentHeader *) msg;
  msize = ntohs (fh->total_size);
  if (msize < sizeof (struct GNUNET_MessageHeader))
  {
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  fid = ntohl (fh->fragment_id);
  foff = ntohs (fh->offset);
  if (foff >= msize)
  {
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
  {
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  GNUNET_STATISTICS_update (dc->stats,
                            _("# fragments received"),
                            1,
                            GNUNET_NO);
  num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
  last = 0;
  for (mc = dc->head; NULL != mc; mc = mc->next)
    if (mc->fragment_id > fid)
      last++;

  mc = dc->head;
  while ((NULL != mc) && (fid != mc->fragment_id))
    mc = mc->next;
  bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
  if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
      sizeof (struct FragmentHeader) > msize)
  {
    /* payload extends past total message size */
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  if ((NULL != mc) && (msize != mc->total_size))
  {
    /* inconsistent message size */
    GNUNET_break_op (0);
    return GNUNET_SYSERR;
  }
  now = GNUNET_TIME_absolute_get ();
  if (NULL == mc)
  {
    mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
    mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
    mc->dc = dc;
    mc->total_size = msize;
    mc->fragment_id = fid;
    mc->last_update = now;
    n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
                                                                  sizeof (struct
                                                                          FragmentHeader));
    if (n == 64)
      mc->bits = UINT64_MAX;    /* set all 64 bit */
    else
      mc->bits = (1LLU << n) - 1;        /* set lowest 'bits' bit */
    if (dc->list_size >= dc->num_msgs)
      discard_oldest_mc (dc);
    GNUNET_CONTAINER_DLL_insert (dc->head,
                                 dc->tail,
                                 mc);
    dc->list_size++;
  }

  /* copy data to 'mc' */
  if (0 != (mc->bits & (1LLU << bit)))
  {
    mc->bits -= 1LLU << bit;
    mbuf = (char *) &mc[1];
    GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
            ntohs (msg->size) - sizeof (struct FragmentHeader));
    mc->last_update = now;
    if (bit < mc->last_bit)
      mc->frag_times_start_offset = mc->frag_times_write_offset;
    mc->last_bit = bit;
    mc->frag_times[mc->frag_times_write_offset].time = now;
    mc->frag_times[mc->frag_times_write_offset].bit = bit;
    mc->frag_times_write_offset++;
    duplicate = GNUNET_NO;
  }
  else
  {
    duplicate = GNUNET_YES;
    GNUNET_STATISTICS_update (dc->stats,
                              _("# duplicate fragments received"),
                              1,
                              GNUNET_NO);
  }

  /* count number of missing fragments after the current one */
  bc = 0;
  for (b = bit; b < 64; b++)
    if (0 != (mc->bits & (1LLU << b)))
      bc++;
    else
      bc = 0;

  /* notify about complete message */
  if ( (GNUNET_NO == duplicate) &&
       (0 == mc->bits) )
  {
    GNUNET_STATISTICS_update (dc->stats,
                              _("# messages defragmented"),
                              1,
                              GNUNET_NO);
    /* message complete, notify! */
    dc->proc (dc->cls, mc->msg);
  }
  /* send ACK */
  if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
  {
    dc->latency = estimate_latency (mc);
  }
  delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
                                                    bc + 1);
  if ( (last + fid == num_fragments) ||
       (0 == mc->bits) ||
       (GNUNET_YES == duplicate) )
  {
    /* message complete or duplicate or last missing fragment in
       linear sequence; ACK now! */
    delay = GNUNET_TIME_UNIT_ZERO;
  }
  if (NULL != mc->ack_task)
    GNUNET_SCHEDULER_cancel (mc->ack_task);
  mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
                                               &send_ack,
                                               mc);
  if (GNUNET_YES == duplicate)
  {
    mc->last_duplicate = GNUNET_YES;
    return GNUNET_NO;
  }
  return GNUNET_YES;
}

/* end of defragmentation.c */