diff options
Diffstat (limited to 'src/fragmentation/defragmentation.c')
-rw-r--r-- | src/fragmentation/defragmentation.c | 547 |
1 files changed, 547 insertions, 0 deletions
diff --git a/src/fragmentation/defragmentation.c b/src/fragmentation/defragmentation.c new file mode 100644 index 0000000..b07f204 --- /dev/null +++ b/src/fragmentation/defragmentation.c @@ -0,0 +1,547 @@ +/* + This file is part of GNUnet + (C) 2009, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file src/fragmentation/defragmentation.c + * @brief library to help defragment messages + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_fragmentation_lib.h" +#include "fragmentation.h" + +/** + * Timestamps for fragments. + */ +struct FragTimes +{ + /** + * The time the fragment was received. + */ + struct GNUNET_TIME_Absolute time; + + /** + * Number of the bit for the fragment (in [0,..,63]). + */ + unsigned int bit; +}; + + +/** + * Information we keep for one message that is being assembled. Note + * that we keep the context around even after the assembly is done to + * handle 'stray' messages that are received 'late'. A message + * context is ONLY discarded when the queue gets too big. + */ +struct MessageContext +{ + /** + * This is a DLL. + */ + struct MessageContext *next; + + /** + * This is a DLL. + */ + struct MessageContext *prev; + + /** + * Associated defragmentation context. + */ + struct GNUNET_DEFRAGMENT_Context *dc; + + /** + * Pointer to the assembled message, allocated at the + * end of this struct. + */ + const struct GNUNET_MessageHeader *msg; + + /** + * Last time we received any update for this message + * (least-recently updated message will be discarded + * if we hit the queue size). + */ + struct GNUNET_TIME_Absolute last_update; + + /** + * Task scheduled for transmitting the next ACK to the + * other peer. + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task; + + /** + * When did we receive which fragment? Used to calculate + * the time we should send the ACK. + */ + struct FragTimes frag_times[64]; + + /** + * Which fragments have we gotten yet? bits that are 1 + * indicate missing fragments. + */ + uint64_t bits; + + /** + * Unique ID for this message. + */ + uint32_t fragment_id; + + /** + * Which 'bit' did the last fragment we received correspond to? + */ + unsigned int last_bit; + + /** + * For the current ACK round, which is the first relevant + * offset in 'frag_times'? + */ + unsigned int frag_times_start_offset; + + /** + * Which offset whould we write the next frag value into + * in the 'frag_times' array? All smaller entries are valid. + */ + unsigned int frag_times_write_offset; + + /** + * Total size of the message that we are assembling. + */ + uint16_t total_size; + +}; + + +/** + * Defragmentation context (one per connection). + */ +struct GNUNET_DEFRAGMENT_Context +{ + + /** + * For statistics. + */ + struct GNUNET_STATISTICS_Handle *stats; + + /** + * Head of list of messages we're defragmenting. + */ + struct MessageContext *head; + + /** + * Tail of list of messages we're defragmenting. + */ + struct MessageContext *tail; + + /** + * Closure for 'proc' and 'ackp'. + */ + void *cls; + + /** + * Function to call with defragmented messages. + */ + GNUNET_FRAGMENT_MessageProcessor proc; + + /** + * Function to call with acknowledgements. + */ + GNUNET_DEFRAGMENT_AckProcessor ackp; + + /** + * Running average of the latency (delay between messages) for this + * connection. + */ + struct GNUNET_TIME_Relative latency; + + /** + * num_msgs how many fragmented messages + * to we defragment at most at the same time? + */ + unsigned int num_msgs; + + /** + * Current number of messages in the 'struct MessageContext' + * DLL (smaller or equal to 'num_msgs'). + */ + unsigned int list_size; + + /** + * Maximum message size for each fragment. + */ + uint16_t mtu; +}; + + +/** + * Create a defragmentation context. + * + * @param stats statistics context + * @param mtu the maximum message size for each fragment + * @param num_msgs how many fragmented messages + * to we defragment at most at the same time? + * @param cls closure for proc and ackp + * @param proc function to call with defragmented messages + * @param ackp function to call with acknowledgements (to send + * back to the other side) + * @return the defragmentation context + */ +struct GNUNET_DEFRAGMENT_Context * +GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, + uint16_t mtu, unsigned int num_msgs, + void *cls, + GNUNET_FRAGMENT_MessageProcessor proc, + GNUNET_DEFRAGMENT_AckProcessor ackp) +{ + struct GNUNET_DEFRAGMENT_Context *dc; + + dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context)); + dc->stats = stats; + dc->cls = cls; + dc->proc = proc; + dc->ackp = ackp; + dc->num_msgs = num_msgs; + dc->mtu = mtu; + dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ + return dc; +} + + +/** + * Destroy the given defragmentation context. + * + * @param dc defragmentation context + */ +void +GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) +{ + struct MessageContext *mc; + + while (NULL != (mc = dc->head)) + { + GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc); + dc->list_size--; + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + { + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_free (mc); + } + GNUNET_assert (0 == dc->list_size); + GNUNET_free (dc); +} + + +/** + * Send acknowledgement to the other peer now. + * + * @param cls the message context + * @param tc the scheduler context + */ +static void +send_ack (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MessageContext *mc = cls; + struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; + struct FragmentAcknowledgement fa; + + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + fa.header.size = htons (sizeof (struct FragmentAcknowledgement)); + fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); + fa.fragment_id = htonl (mc->fragment_id); + fa.bits = GNUNET_htonll (mc->bits); + GNUNET_STATISTICS_update (mc->dc->stats, + _("# acknowledgements sent for fragment"), 1, + GNUNET_NO); + dc->ackp (dc->cls, mc->fragment_id, &fa.header); +} + + +/** + * This function is from the GNU Scientific Library, linear/fit.c, + * (C) 2000 Brian Gough + */ +static void +gsl_fit_mul (const double *x, const size_t xstride, const double *y, + const size_t ystride, const size_t n, double *c1, double *cov_11, + double *sumsq) +{ + double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0; + + size_t i; + + for (i = 0; i < n; i++) + { + m_x += (x[i * xstride] - m_x) / (i + 1.0); + m_y += (y[i * ystride] - m_y) / (i + 1.0); + } + + for (i = 0; i < n; i++) + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; + + m_dx2 += (dx * dx - m_dx2) / (i + 1.0); + m_dxdy += (dx * dy - m_dxdy) / (i + 1.0); + } + + /* In terms of y = b x */ + + { + double s2 = 0, d2 = 0; + double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2); + + *c1 = b; + + /* Compute chi^2 = \sum (y_i - b * x_i)^2 */ + + for (i = 0; i < n; i++) + { + const double dx = x[i * xstride] - m_x; + const double dy = y[i * ystride] - m_y; + const double d = (m_y - b * m_x) + dy - b * dx; + + d2 += d * d; + } + + s2 = d2 / (n - 1.0); /* chisq per degree of freedom */ + + *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2)); + + *sumsq = d2; + } +} + + +/** + * Estimate the latency between messages based on the most recent + * message time stamps. + * + * @param mc context with time stamps + * @return average delay between time stamps (based on least-squares fit) + */ +static struct GNUNET_TIME_Relative +estimate_latency (struct MessageContext *mc) +{ + struct FragTimes *first; + size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset; + double x[total]; + double y[total]; + size_t i; + double c1; + double cov11; + double sumsq; + struct GNUNET_TIME_Relative ret; + + first = &mc->frag_times[mc->frag_times_start_offset]; + GNUNET_assert (total > 1); + for (i = 0; i < total; i++) + { + x[i] = (double) i; + y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value); + } + gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq); + c1 += sqrt (sumsq); /* add 1 std dev */ + ret.rel_value = (uint64_t) c1; + if (ret.rel_value == 0) + ret = GNUNET_TIME_UNIT_MILLISECONDS; /* always at least 1 */ + return ret; +}; + + +/** + * Discard the message context that was inactive for the longest time. + * + * @param dc defragmentation context + */ +static void +discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc) +{ + struct MessageContext *old; + struct MessageContext *pos; + + old = NULL; + pos = dc->head; + while (NULL != pos) + { + if ((old == NULL) || + (old->last_update.abs_value > pos->last_update.abs_value)) + old = pos; + pos = pos->next; + } + GNUNET_assert (NULL != old); + GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old); + dc->list_size--; + if (GNUNET_SCHEDULER_NO_TASK != old->ack_task) + { + GNUNET_SCHEDULER_cancel (old->ack_task); + old->ack_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_free (old); +} + + +/** + * We have received a fragment. Process it. + * + * @param dc the context + * @param msg the message that was received + * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error + */ +int +GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, + const struct GNUNET_MessageHeader *msg) +{ + struct MessageContext *mc; + const struct FragmentHeader *fh; + uint16_t msize; + uint16_t foff; + uint32_t fid; + char *mbuf; + unsigned int bit; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative delay; + unsigned int bc; + unsigned int b; + unsigned int n; + int duplicate; + + if (ntohs (msg->size) < sizeof (struct FragmentHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (ntohs (msg->size) > dc->mtu) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fh = (const struct FragmentHeader *) msg; + msize = ntohs (fh->total_size); + if (msize < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + fid = ntohl (fh->fragment_id); + foff = ntohs (fh->offset); + if (foff >= msize) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader)))) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + GNUNET_STATISTICS_update (dc->stats, _("# fragments received"), 1, GNUNET_NO); + mc = dc->head; + while ((NULL != mc) && (fid != mc->fragment_id)) + mc = mc->next; + bit = foff / (dc->mtu - sizeof (struct FragmentHeader)); + if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) - + sizeof (struct FragmentHeader) > msize) + { + /* payload extends past total message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ((NULL != mc) && (msize != mc->total_size)) + { + /* inconsistent message size */ + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + now = GNUNET_TIME_absolute_get (); + if (NULL == mc) + { + mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); + mc->msg = (const struct GNUNET_MessageHeader *) &mc[1]; + mc->dc = dc; + mc->total_size = msize; + mc->fragment_id = fid; + mc->last_update = now; + n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - + sizeof (struct + FragmentHeader)); + if (n == 64) + mc->bits = UINT64_MAX; /* set all 64 bit */ + else + mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */ + if (dc->list_size >= dc->num_msgs) + discard_oldest_mc (dc); + GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc); + dc->list_size++; + } + + /* copy data to 'mc' */ + if (0 != (mc->bits & (1LL << bit))) + { + mc->bits -= 1LL << bit; + mbuf = (char *) &mc[1]; + memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1], + ntohs (msg->size) - sizeof (struct FragmentHeader)); + mc->last_update = now; + if (bit < mc->last_bit) + mc->frag_times_start_offset = mc->frag_times_write_offset; + mc->last_bit = bit; + mc->frag_times[mc->frag_times_write_offset].time = now; + mc->frag_times[mc->frag_times_write_offset].bit = bit; + mc->frag_times_write_offset++; + duplicate = GNUNET_NO; + } + else + { + duplicate = GNUNET_YES; + GNUNET_STATISTICS_update (dc->stats, _("# duplicate fragments received"), 1, + GNUNET_NO); + } + + /* count number of missing fragments */ + bc = 0; + for (b = 0; b < 64; b++) + if (0 != (mc->bits & (1LL << b))) + bc++; + + /* notify about complete message */ + if ((duplicate == GNUNET_NO) && (0 == mc->bits)) + { + GNUNET_STATISTICS_update (dc->stats, _("# messages defragmented"), 1, + GNUNET_NO); + /* message complete, notify! */ + dc->proc (dc->cls, mc->msg); + } + /* send ACK */ + if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1) + dc->latency = estimate_latency (mc); + delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1); + if ((0 == mc->bits) || (GNUNET_YES == duplicate)) /* message complete or duplicate, ACK now! */ + { + delay = GNUNET_TIME_UNIT_ZERO; + } + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc); + if (duplicate == GNUNET_YES) + return GNUNET_NO; + return GNUNET_YES; +} + +/* end of defragmentation.c */ |