diff options
Diffstat (limited to 'src/util/server_mst.c')
-rw-r--r-- | src/util/server_mst.c | 297 |
1 files changed, 143 insertions, 154 deletions
diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 835d8eebaa..f48ef5475f 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c @@ -50,7 +50,7 @@ struct GNUNET_SERVER_MessageStreamTokenizer * Function to call on completed messages. */ GNUNET_SERVER_MessageTokenizerCallback cb; - + /** * Closure for cb. */ @@ -89,12 +89,12 @@ struct GNUNET_SERVER_MessageStreamTokenizer */ struct GNUNET_SERVER_MessageStreamTokenizer * GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, - void *cb_cls) + void *cb_cls) { struct GNUNET_SERVER_MessageStreamTokenizer *ret; ret = GNUNET_malloc (sizeof (struct GNUNET_SERVER_MessageStreamTokenizer)); - ret->hdr = GNUNET_malloc(GNUNET_SERVER_MIN_BUFFER_SIZE); + ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE); ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; ret->cb = cb; ret->cb_cls = cb_cls; @@ -119,11 +119,9 @@ GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, */ int GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, - void *client_identity, - const char *buf, - size_t size, - int purge, - int one_shot) + void *client_identity, + const char *buf, + size_t size, int purge, int one_shot) { const struct GNUNET_MessageHeader *hdr; size_t delta; @@ -135,169 +133,160 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, #if DEBUG_SERVER_MST GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server-mst receives %u bytes with %u bytes already in private buffer\n", - (unsigned int) size, - (unsigned int) (mst->pos - mst->off)); + "Server-mst receives %u bytes with %u bytes already in private buffer\n", + (unsigned int) size, (unsigned int) (mst->pos - mst->off)); #endif ret = GNUNET_OK; - ibuf = (char*)mst->hdr; + ibuf = (char *) mst->hdr; while (mst->pos > 0) + { +do_align: + if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || + (0 != (mst->off % ALIGN_FACTOR))) + { + /* need to align or need more space */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) + { + delta = + GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - + (mst->pos - mst->off), size); + memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + want = ntohs (hdr->size); + if (want < sizeof (struct GNUNET_MessageHeader)) { - do_align: - if ( (mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || - (0 != (mst->off % ALIGN_FACTOR)) ) - { - /* need to align or need more space */ - mst->pos -= mst->off; - memmove (ibuf, - &ibuf[mst->off], - mst->pos); - mst->off = 0; - } - if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) - { - delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off), - size); - memcpy (&ibuf[mst->pos], - buf, - delta); - mst->pos += delta; - buf += delta; - size -= delta; - } - if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) - { - if (purge) - { - mst->off = 0; - mst->pos = 0; - } - return GNUNET_OK; - } - hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (mst->curr_buf - mst->off < want) + { + /* need more space */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (want > mst->curr_buf) + { + mst->hdr = GNUNET_realloc (mst->hdr, want); + ibuf = (char *) mst->hdr; + mst->curr_buf = want; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + if (mst->pos - mst->off < want) + { + delta = GNUNET_MIN (want - (mst->pos - mst->off), size); + memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < want) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; + mst->cb (mst->cb_cls, client_identity, hdr); + mst->off += want; + if (mst->off == mst->pos) + { + /* reset to beginning of buffer, it's free right now! */ + mst->off = 0; + mst->pos = 0; + } + } + while (size > 0) + { +#if DEBUG_SERVER_MST + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst has %u bytes left in inbound buffer\n", + (unsigned int) size); +#endif + if (size < sizeof (struct GNUNET_MessageHeader)) + break; + offset = (unsigned long) buf; + need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; + if (GNUNET_NO == need_align) + { + /* can try to do zero-copy and process directly from original buffer */ + hdr = (const struct GNUNET_MessageHeader *) buf; want = ntohs (hdr->size); if (want < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (mst->curr_buf - mst->off < want) - { - /* need more space */ - mst->pos -= mst->off; - memmove (ibuf, - &ibuf[mst->off], - mst->pos); - mst->off = 0; - } - if (want > mst->curr_buf) - { - mst->hdr = GNUNET_realloc(mst->hdr, want); - ibuf = (char*)mst->hdr; - mst->curr_buf = want; - } - hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; - if (mst->pos - mst->off < want) - { - delta = GNUNET_MIN (want - (mst->pos - mst->off), - size); - memcpy (&ibuf[mst->pos], - buf, - delta); - mst->pos += delta; - buf += delta; - size -= delta; - } - if (mst->pos - mst->off < want) - { - if (purge) - { - mst->off = 0; - mst->pos = 0; - } - return GNUNET_OK; - } + { + GNUNET_break_op (0); + mst->off = 0; + return GNUNET_SYSERR; + } + if (size < want) + break; /* or not, buffer incomplete, so copy to private buffer... */ if (one_shot == GNUNET_SYSERR) - { - /* cannot call callback again, but return value saying that - we have another full message in the buffer */ - ret = GNUNET_NO; - goto copy; - } + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } if (one_shot == GNUNET_YES) - one_shot = GNUNET_SYSERR; + one_shot = GNUNET_SYSERR; mst->cb (mst->cb_cls, client_identity, hdr); - mst->off += want; - if (mst->off == mst->pos) - { - /* reset to beginning of buffer, it's free right now! */ - mst->off = 0; - mst->pos = 0; - } + buf += want; + size -= want; } - while (size > 0) + else { -#if DEBUG_SERVER_MST - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server-mst has %u bytes left in inbound buffer\n", - (unsigned int) size); -#endif - if (size < sizeof (struct GNUNET_MessageHeader)) - break; - offset = (unsigned long) buf; - need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; - if (GNUNET_NO == need_align) - { - /* can try to do zero-copy and process directly from original buffer */ - hdr = (const struct GNUNET_MessageHeader *) buf; - want = ntohs (hdr->size); - if (want < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - mst->off = 0; - return GNUNET_SYSERR; - } - if (size < want) - break; /* or not, buffer incomplete, so copy to private buffer... */ - if (one_shot == GNUNET_SYSERR) - { - /* cannot call callback again, but return value saying that - we have another full message in the buffer */ - ret = GNUNET_NO; - goto copy; - } - if (one_shot == GNUNET_YES) - one_shot = GNUNET_SYSERR; - mst->cb (mst->cb_cls, client_identity, hdr); - buf += want; - size -= want; - } - else - { - /* need to copy to private buffer to align; - yes, we go a bit more spagetti than usual here */ - goto do_align; - } + /* need to copy to private buffer to align; + * yes, we go a bit more spagetti than usual here */ + goto do_align; } - copy: - if ( (size > 0) && (! purge) ) + } +copy: + if ((size > 0) && (!purge)) + { + if (size + mst->pos > mst->curr_buf) { - if (size + mst->pos > mst->curr_buf) - { - mst->hdr = GNUNET_realloc(mst->hdr, size + mst->pos); - ibuf = (char*)mst->hdr; - mst->curr_buf = size + mst->pos; - } - GNUNET_assert (mst->pos + size <= mst->curr_buf); - memcpy (&ibuf[mst->pos], buf, size); - mst->pos += size; + mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); + ibuf = (char *) mst->hdr; + mst->curr_buf = size + mst->pos; } + GNUNET_assert (mst->pos + size <= mst->curr_buf); + memcpy (&ibuf[mst->pos], buf, size); + mst->pos += size; + } if (purge) - mst->off = 0; + mst->off = 0; #if DEBUG_SERVER_MST GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server-mst leaves %u bytes in private buffer\n", - (unsigned int) (mst->pos - mst->off)); + "Server-mst leaves %u bytes in private buffer\n", + (unsigned int) (mst->pos - mst->off)); #endif return ret; } |