aboutsummaryrefslogtreecommitdiff
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c1328
1 files changed, 901 insertions, 427 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5ccf87ed8d6..1948d592aa5 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -9,17 +9,22 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
+#ifdef CONFIG_BLOCK
#include <linux/bio.h>
-#include <linux/blkdev.h>
+#endif /* CONFIG_BLOCK */
#include <linux/dns_resolver.h>
#include <net/tcp.h>
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
#include <linux/export.h>
+#define list_entry_next(pos, member) \
+ list_entry(pos->member.next, typeof(*pos), member)
+
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
@@ -97,6 +102,62 @@
#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
+static bool con_flag_valid(unsigned long con_flag)
+{
+ switch (con_flag) {
+ case CON_FLAG_LOSSYTX:
+ case CON_FLAG_KEEPALIVE_PENDING:
+ case CON_FLAG_WRITE_PENDING:
+ case CON_FLAG_SOCK_CLOSED:
+ case CON_FLAG_BACKOFF:
+ return true;
+ default:
+ return false;
+ }
+}
+
+static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ clear_bit(con_flag, &con->flags);
+}
+
+static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ set_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test_and_clear(struct ceph_connection *con,
+ unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_and_clear_bit(con_flag, &con->flags);
+}
+
+static bool con_flag_test_and_set(struct ceph_connection *con,
+ unsigned long con_flag)
+{
+ BUG_ON(!con_flag_valid(con_flag));
+
+ return test_and_set_bit(con_flag, &con->flags);
+}
+
+/* Slab caches for frequently-allocated structures */
+
+static struct kmem_cache *ceph_msg_cache;
+static struct kmem_cache *ceph_msg_data_cache;
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -114,7 +175,7 @@ static struct lock_class_key socket_class;
static void queue_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
-static void ceph_fault(struct ceph_connection *con);
+static void con_fault(struct ceph_connection *con);
/*
* Nicely render a sockaddr as a string. An array of formatted
@@ -171,13 +232,50 @@ static void encode_my_addr(struct ceph_messenger *msgr)
*/
static struct workqueue_struct *ceph_msgr_wq;
-void _ceph_msgr_exit(void)
+static int ceph_msgr_slab_init(void)
+{
+ BUG_ON(ceph_msg_cache);
+ ceph_msg_cache = kmem_cache_create("ceph_msg",
+ sizeof (struct ceph_msg),
+ __alignof__(struct ceph_msg), 0, NULL);
+
+ if (!ceph_msg_cache)
+ return -ENOMEM;
+
+ BUG_ON(ceph_msg_data_cache);
+ ceph_msg_data_cache = kmem_cache_create("ceph_msg_data",
+ sizeof (struct ceph_msg_data),
+ __alignof__(struct ceph_msg_data),
+ 0, NULL);
+ if (ceph_msg_data_cache)
+ return 0;
+
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+
+ return -ENOMEM;
+}
+
+static void ceph_msgr_slab_exit(void)
+{
+ BUG_ON(!ceph_msg_data_cache);
+ kmem_cache_destroy(ceph_msg_data_cache);
+ ceph_msg_data_cache = NULL;
+
+ BUG_ON(!ceph_msg_cache);
+ kmem_cache_destroy(ceph_msg_cache);
+ ceph_msg_cache = NULL;
+}
+
+static void _ceph_msgr_exit(void)
{
if (ceph_msgr_wq) {
destroy_workqueue(ceph_msgr_wq);
ceph_msgr_wq = NULL;
}
+ ceph_msgr_slab_exit();
+
BUG_ON(zero_page == NULL);
kunmap(zero_page);
page_cache_release(zero_page);
@@ -190,7 +288,10 @@ int ceph_msgr_init(void)
zero_page = ZERO_PAGE(0);
page_cache_get(zero_page);
- ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+ if (ceph_msgr_slab_init())
+ return -ENOMEM;
+
+ ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
if (ceph_msgr_wq)
return 0;
@@ -282,7 +383,7 @@ static void con_sock_state_closed(struct ceph_connection *con)
*/
/* data available on socket, or listen socket received a connect */
-static void ceph_sock_data_ready(struct sock *sk, int count_unused)
+static void ceph_sock_data_ready(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
if (atomic_read(&con->msgr->stopping)) {
@@ -308,8 +409,8 @@ static void ceph_sock_write_space(struct sock *sk)
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
- if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
- if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
+ if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
+ if (sk_stream_is_writeable(sk)) {
dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
queue_con(con);
@@ -333,7 +434,7 @@ static void ceph_sock_state_change(struct sock *sk)
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
- set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+ con_flag_set(con, CON_FLAG_SOCK_CLOSED);
queue_con(con);
break;
case TCP_ESTABLISHED:
@@ -419,6 +520,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
return r;
}
+static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
+ int page_offset, size_t length)
+{
+ void *kaddr;
+ int ret;
+
+ BUG_ON(page_offset + length > PAGE_SIZE);
+
+ kaddr = kmap(page);
+ BUG_ON(!kaddr);
+ ret = ceph_tcp_recvmsg(sock, kaddr + page_offset, length);
+ kunmap(page);
+
+ return ret;
+}
+
/*
* write something. @more is true if caller will be sending more data
* shortly.
@@ -440,8 +557,8 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
return r;
}
-static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
- int offset, size_t size, int more)
+static int __ceph_tcp_sendpage(struct socket *sock, struct page *page,
+ int offset, size_t size, bool more)
{
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
int ret;
@@ -453,6 +570,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
return ret;
}
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+ int offset, size_t size, bool more)
+{
+ int ret;
+ struct kvec iov;
+
+ /* sendpage cannot properly handle pages with page_count == 0,
+ * we need to fallback to sendmsg if that's the case */
+ if (page_count(page) >= 1)
+ return __ceph_tcp_sendpage(sock, page, offset, size, more);
+
+ iov.iov_base = kmap(page) + offset;
+ iov.iov_len = size;
+ ret = ceph_tcp_sendmsg(sock, &iov, 1, size, more);
+ kunmap(page);
+
+ return ret;
+}
/*
* Shutdown/close the socket for the given connection.
@@ -474,7 +609,7 @@ static int con_close_socket(struct ceph_connection *con)
* received a socket close event before we had the chance to
* shut the socket down.
*/
- clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+ con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
con_sock_state_closed(con);
return rc;
@@ -538,11 +673,10 @@ void ceph_con_close(struct ceph_connection *con)
ceph_pr_addr(&con->peer_addr.in_addr));
con->state = CON_STATE_CLOSED;
- clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
- clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
- clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
- clear_bit(CON_FLAG_BACKOFF, &con->flags);
+ con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */
+ con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
+ con_flag_clear(con, CON_FLAG_BACKOFF);
reset_connection(con);
con->peer_global_seq = 0;
@@ -646,50 +780,398 @@ static void con_out_kvec_add(struct ceph_connection *con,
}
#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- if (!bio) {
- *iter = NULL;
- *seg = 0;
- return;
+ struct ceph_msg_data *data = cursor->data;
+ struct bio *bio;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = data->bio;
+ BUG_ON(!bio);
+
+ cursor->resid = min(length, data->bio_length);
+ cursor->bio = bio;
+ cursor->bvec_iter = bio->bi_iter;
+ cursor->last_piece =
+ cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset,
+ size_t *length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct bio *bio;
+ struct bio_vec bio_vec;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+
+ *page_offset = (size_t) bio_vec.bv_offset;
+ BUG_ON(*page_offset >= PAGE_SIZE);
+ if (cursor->last_piece) /* pagelist offset is always 0 */
+ *length = cursor->resid;
+ else
+ *length = (size_t) bio_vec.bv_len;
+ BUG_ON(*length > cursor->resid);
+ BUG_ON(*page_offset + *length > PAGE_SIZE);
+
+ return bio_vec.bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ struct bio *bio;
+ struct bio_vec bio_vec;
+
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
+
+ bio = cursor->bio;
+ BUG_ON(!bio);
+
+ bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+
+ /* Advance the cursor offset */
+
+ BUG_ON(cursor->resid < bytes);
+ cursor->resid -= bytes;
+
+ bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+
+ if (bytes < bio_vec.bv_len)
+ return false; /* more bytes to process in this segment */
+
+ /* Move on to the next segment, and possibly the next bio */
+
+ if (!cursor->bvec_iter.bi_size) {
+ bio = bio->bi_next;
+ cursor->bio = bio;
+ if (bio)
+ cursor->bvec_iter = bio->bi_iter;
+ else
+ memset(&cursor->bvec_iter, 0,
+ sizeof(cursor->bvec_iter));
}
- *iter = bio;
- *seg = bio->bi_idx;
+
+ if (!cursor->last_piece) {
+ BUG_ON(!cursor->resid);
+ BUG_ON(!bio);
+ /* A short read is OK, so use <= rather than == */
+ if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
+ cursor->last_piece = true;
+ }
+
+ return true;
}
+#endif /* CONFIG_BLOCK */
-static void iter_bio_next(struct bio **bio_iter, int *seg)
+/*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
{
- if (*bio_iter == NULL)
- return;
+ struct ceph_msg_data *data = cursor->data;
+ int page_count;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
- BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+ BUG_ON(!data->pages);
+ BUG_ON(!data->length);
- (*seg)++;
- if (*seg == (*bio_iter)->bi_vcnt)
- init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+ cursor->resid = min(length, data->length);
+ page_count = calc_pages_for(data->alignment, (u64)data->length);
+ cursor->page_offset = data->alignment & ~PAGE_MASK;
+ cursor->page_index = 0;
+ BUG_ON(page_count > (int)USHRT_MAX);
+ cursor->page_count = (unsigned short)page_count;
+ BUG_ON(length > SIZE_MAX - cursor->page_offset);
+ cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
}
-#endif
-static void prepare_write_message_data(struct ceph_connection *con)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg *msg = con->out_msg;
+ struct ceph_msg_data *data = cursor->data;
- BUG_ON(!msg);
- BUG_ON(!msg->hdr.data_len);
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
- /* initialize page iterator */
- con->out_msg_pos.page = 0;
- if (msg->pages)
- con->out_msg_pos.page_pos = msg->page_alignment;
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ BUG_ON(cursor->page_offset >= PAGE_SIZE);
+
+ *page_offset = cursor->page_offset;
+ if (cursor->last_piece)
+ *length = cursor->resid;
else
- con->out_msg_pos.page_pos = 0;
+ *length = PAGE_SIZE - *page_offset;
+
+ return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
+
+ BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+
+ /* Advance the cursor page offset */
+
+ cursor->resid -= bytes;
+ cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK;
+ if (!bytes || cursor->page_offset)
+ return false; /* more bytes to process in the current page */
+
+ if (!cursor->resid)
+ return false; /* no more data */
+
+ /* Move on to the next page; offset is already at 0 */
+
+ BUG_ON(cursor->page_index >= cursor->page_count);
+ cursor->page_index++;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+ return true;
+}
+
+/*
+ * For a pagelist, a piece is whatever remains to be consumed in the
+ * first page in the list, or the front of the next page.
+ */
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+ struct page *page;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ if (!length)
+ return; /* pagelist can be assigned but empty */
+
+ BUG_ON(list_empty(&pagelist->head));
+ page = list_first_entry(&pagelist->head, struct page, lru);
+
+ cursor->resid = min(length, pagelist->length);
+ cursor->page = page;
+ cursor->offset = 0;
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+}
+
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+
+ /* offset of first page in pagelist is always 0 */
+ *page_offset = cursor->offset & ~PAGE_MASK;
+ if (cursor->last_piece)
+ *length = cursor->resid;
+ else
+ *length = PAGE_SIZE - *page_offset;
+
+ return cursor->page;
+}
+
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(cursor->offset + cursor->resid != pagelist->length);
+ BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+ /* Advance the cursor offset */
+
+ cursor->resid -= bytes;
+ cursor->offset += bytes;
+ /* offset of first page in pagelist is always 0 */
+ if (!bytes || cursor->offset & ~PAGE_MASK)
+ return false; /* more bytes to process in the current page */
+
+ if (!cursor->resid)
+ return false; /* no more data */
+
+ /* Move on to the next page */
+
+ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+ cursor->page = list_entry_next(cursor->page, lru);
+ cursor->last_piece = cursor->resid <= PAGE_SIZE;
+
+ return true;
+}
+
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page. The network layer might not
+ * consume an entire piece at once. A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece. It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
+{
+ size_t length = cursor->total_resid;
+
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ ceph_msg_data_pagelist_cursor_init(cursor, length);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ ceph_msg_data_pages_cursor_init(cursor, length);
+ break;
#ifdef CONFIG_BLOCK
- if (msg->bio)
- init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
- con->out_msg_pos.data_pos = 0;
- con->out_msg_pos.did_page_crc = false;
- con->out_more = 1; /* data + footer will follow */
+ case CEPH_MSG_DATA_BIO:
+ ceph_msg_data_bio_cursor_init(cursor, length);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ /* BUG(); */
+ break;
+ }
+ cursor->need_crc = true;
+}
+
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
+ bool *last_piece)
+{
+ struct page *page;
+
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ page = ceph_msg_data_pages_next(cursor, page_offset, length);
+ break;
+#ifdef CONFIG_BLOCK
+ case CEPH_MSG_DATA_BIO:
+ page = ceph_msg_data_bio_next(cursor, page_offset, length);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ page = NULL;
+ break;
+ }
+ BUG_ON(!page);
+ BUG_ON(*page_offset + *length > PAGE_SIZE);
+ BUG_ON(!*length);
+ if (last_piece)
+ *last_piece = cursor->last_piece;
+
+ return page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * of the data item.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ bool new_piece;
+
+ BUG_ON(bytes > cursor->resid);
+ switch (cursor->data->type) {
+ case CEPH_MSG_DATA_PAGELIST:
+ new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
+ break;
+ case CEPH_MSG_DATA_PAGES:
+ new_piece = ceph_msg_data_pages_advance(cursor, bytes);
+ break;
+#ifdef CONFIG_BLOCK
+ case CEPH_MSG_DATA_BIO:
+ new_piece = ceph_msg_data_bio_advance(cursor, bytes);
+ break;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_NONE:
+ default:
+ BUG();
+ break;
+ }
+ cursor->total_resid -= bytes;
+
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ new_piece = true;
+ }
+ cursor->need_crc = new_piece;
+
+ return new_piece;
+}
+
+static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+{
+ BUG_ON(!msg);
+ BUG_ON(!data_len);
+
+ /* Initialize data cursor */
+
+ ceph_msg_data_cursor_init(msg, (size_t)data_len);
}
/*
@@ -752,16 +1234,12 @@ static void prepare_write_message(struct ceph_connection *con)
m->hdr.seq = cpu_to_le64(++con->out_seq);
m->needs_out_seq = false;
}
-#ifdef CONFIG_BLOCK
- else
- m->bio_iter = NULL;
-#endif
+ WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
- dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
+ dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
m, con->out_seq, le16_to_cpu(m->hdr.type),
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
- le32_to_cpu(m->hdr.data_len),
- m->nr_pages);
+ m->data_length);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
@@ -792,13 +1270,15 @@ static void prepare_write_message(struct ceph_connection *con)
/* is there a data payload? */
con->out_msg->footer.data_crc = 0;
- if (m->hdr.data_len)
- prepare_write_message_data(con);
- else
+ if (m->data_length) {
+ prepare_message_data(con->out_msg, m->data_length);
+ con->out_more = 1; /* data + footer will follow */
+ } else {
/* no, queue up footer too and be done */
prepare_write_message_footer(con);
+ }
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -819,7 +1299,25 @@ static void prepare_write_ack(struct ceph_connection *con)
&con->out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
+}
+
+/*
+ * Prepare to share the seq during handshake
+ */
+static void prepare_write_seq(struct ceph_connection *con)
+{
+ dout("prepare_write_seq %p %llu -> %llu\n", con,
+ con->in_seq_acked, con->in_seq);
+ con->in_seq_acked = con->in_seq;
+
+ con_out_kvec_reset(con);
+
+ con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+ con_out_kvec_add(con, sizeof (con->out_temp_ack),
+ &con->out_temp_ack);
+
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -830,7 +1328,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con);
con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
/*
@@ -873,7 +1371,7 @@ static void prepare_write_banner(struct ceph_connection *con)
&con->msgr->my_enc_addr);
con->out_more = 0;
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
static int prepare_write_connect(struct ceph_connection *con)
@@ -923,7 +1421,7 @@ static int prepare_write_connect(struct ceph_connection *con)
auth->authorizer_buf);
con->out_more = 0;
- set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_set(con, CON_FLAG_WRITE_PENDING);
return 0;
}
@@ -971,35 +1469,19 @@ out:
return ret; /* done! */
}
-static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
- size_t len, size_t sent, bool in_trail)
+static u32 ceph_crc32c_page(u32 crc, struct page *page,
+ unsigned int page_offset,
+ unsigned int length)
{
- struct ceph_msg *msg = con->out_msg;
+ char *kaddr;
- BUG_ON(!msg);
- BUG_ON(!sent);
-
- con->out_msg_pos.data_pos += sent;
- con->out_msg_pos.page_pos += sent;
- if (sent < len)
- return;
+ kaddr = kmap(page);
+ BUG_ON(kaddr == NULL);
+ crc = crc32c(crc, kaddr + page_offset, length);
+ kunmap(page);
- BUG_ON(sent != len);
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.page++;
- con->out_msg_pos.did_page_crc = false;
- if (in_trail)
- list_move_tail(&page->lru,
- &msg->trail->head);
- else if (msg->pagelist)
- list_move_tail(&page->lru,
- &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
- else if (msg->bio)
- iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
+ return crc;
}
-
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -1007,21 +1489,17 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
* 0 -> socket full, but more to do
* <0 -> error
*/
-static int write_partial_msg_pages(struct ceph_connection *con)
+static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
- size_t len;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc;
- int ret;
- int total_max_write;
- bool in_trail = false;
- const size_t trail_len = (msg->trail ? msg->trail->length : 0);
- const size_t trail_off = data_len - trail_len;
+ u32 crc;
+
+ dout("%s %p msg %p\n", __func__, con, msg);
- dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
- con, msg, con->out_msg_pos.page, msg->nr_pages,
- con->out_msg_pos.page_pos);
+ if (list_empty(&msg->data))
+ return -EINVAL;
/*
* Iterate through each page that contains data to be
@@ -1031,72 +1509,41 @@ static int write_partial_msg_pages(struct ceph_connection *con)
* need to map the page. If we have no pages, they have
* been revoked, so use the zero page.
*/
- while (data_len > con->out_msg_pos.data_pos) {
- struct page *page = NULL;
- int max_write = PAGE_SIZE;
- int bio_offset = 0;
-
- in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
- if (!in_trail)
- total_max_write = trail_off - con->out_msg_pos.data_pos;
-
- if (in_trail) {
- total_max_write = data_len - con->out_msg_pos.data_pos;
-
- page = list_first_entry(&msg->trail->head,
- struct page, lru);
- } else if (msg->pages) {
- page = msg->pages[con->out_msg_pos.page];
- } else if (msg->pagelist) {
- page = list_first_entry(&msg->pagelist->head,
- struct page, lru);
-#ifdef CONFIG_BLOCK
- } else if (msg->bio) {
- struct bio_vec *bv;
+ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
+ while (cursor->resid) {
+ struct page *page;
+ size_t page_offset;
+ size_t length;
+ bool last_piece;
+ bool need_crc;
+ int ret;
+
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ &last_piece);
+ ret = ceph_tcp_sendpage(con->sock, page, page_offset,
+ length, last_piece);
+ if (ret <= 0) {
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
- bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
- page = bv->bv_page;
- bio_offset = bv->bv_offset;
- max_write = bv->bv_len;
-#endif
- } else {
- page = zero_page;
- }
- len = min_t(int, max_write - con->out_msg_pos.page_pos,
- total_max_write);
-
- if (do_datacrc && !con->out_msg_pos.did_page_crc) {
- void *base;
- u32 crc = le32_to_cpu(msg->footer.data_crc);
- char *kaddr;
-
- kaddr = kmap(page);
- BUG_ON(kaddr == NULL);
- base = kaddr + con->out_msg_pos.page_pos + bio_offset;
- crc = crc32c(crc, base, len);
- kunmap(page);
- msg->footer.data_crc = cpu_to_le32(crc);
- con->out_msg_pos.did_page_crc = true;
+ return ret;
}
- ret = ceph_tcp_sendpage(con->sock, page,
- con->out_msg_pos.page_pos + bio_offset,
- len, 1);
- if (ret <= 0)
- goto out;
-
- out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
+ if (do_datacrc && cursor->need_crc)
+ crc = ceph_crc32c_page(crc, page, page_offset, length);
+ need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
- dout("write_partial_msg_pages %p msg %p done\n", con, msg);
+ dout("%s %p msg %p done\n", __func__, con, msg);
/* prepare and queue up footer, too */
- if (!do_datacrc)
+ if (do_datacrc)
+ msg->footer.data_crc = cpu_to_le32(crc);
+ else
msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
con_out_kvec_reset(con);
prepare_write_message_footer(con);
- ret = 1;
-out:
- return ret;
+
+ return 1; /* must return > 0 to indicate success */
}
/*
@@ -1109,7 +1556,7 @@ static int write_partial_skip(struct ceph_connection *con)
while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
- ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
+ ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true);
if (ret <= 0)
goto out;
con->out_skip -= ret;
@@ -1140,6 +1587,13 @@ static void prepare_read_ack(struct ceph_connection *con)
con->in_base_pos = 0;
}
+static void prepare_read_seq(struct ceph_connection *con)
+{
+ dout("prepare_read_seq %p\n", con);
+ con->in_base_pos = 0;
+ con->in_tag = CEPH_MSGR_TAG_SEQ;
+}
+
static void prepare_read_tag(struct ceph_connection *con)
{
dout("prepare_read_tag %p\n", con);
@@ -1431,7 +1885,9 @@ int ceph_parse_ips(const char *c, const char *end,
port = (port * 10) + (*p - '0');
p++;
}
- if (port > 65535 || port == 0)
+ if (port == 0)
+ port = CEPH_MON_PORT;
+ else if (port > 65535)
goto bad;
} else {
port = CEPH_MON_PORT;
@@ -1511,7 +1967,8 @@ static int process_connect(struct ceph_connection *con)
{
u64 sup_feat = con->msgr->supported_features;
u64 req_feat = con->msgr->required_features;
- u64 server_feat = le64_to_cpu(con->in_reply.features);
+ u64 server_feat = ceph_sanitize_features(
+ le64_to_cpu(con->in_reply.features));
int ret;
dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1546,7 +2003,6 @@ static int process_connect(struct ceph_connection *con)
con->error_msg = "connect authorization failure";
return -1;
}
- con->auth_retry = 1;
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
@@ -1617,6 +2073,7 @@ static int process_connect(struct ceph_connection *con)
prepare_read_connect(con);
break;
+ case CEPH_MSGR_TAG_SEQ:
case CEPH_MSGR_TAG_READY:
if (req_feat & ~server_feat) {
pr_err("%s%lld %s protocol feature mismatch,"
@@ -1631,7 +2088,7 @@ static int process_connect(struct ceph_connection *con)
WARN_ON(con->state != CON_STATE_NEGOTIATING);
con->state = CON_STATE_OPEN;
-
+ con->auth_retry = 0; /* we authenticated; clear flag */
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
@@ -1643,11 +2100,16 @@ static int process_connect(struct ceph_connection *con)
le32_to_cpu(con->in_reply.connect_seq));
if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
- set_bit(CON_FLAG_LOSSYTX, &con->flags);
+ con_flag_set(con, CON_FLAG_LOSSYTX);
con->delay = 0; /* reset backoff memory */
- prepare_read_tag(con);
+ if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
+ prepare_write_seq(con);
+ prepare_read_seq(con);
+ } else {
+ prepare_read_tag(con);
+ }
break;
case CEPH_MSGR_TAG_WAIT:
@@ -1681,7 +2143,6 @@ static int read_partial_ack(struct ceph_connection *con)
return read_partial(con, end, size, &con->in_temp_ack);
}
-
/*
* We can finally discard anything that's been acked.
*/
@@ -1706,8 +2167,6 @@ static void process_ack(struct ceph_connection *con)
}
-
-
static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
@@ -1731,77 +2190,49 @@ static int read_partial_message_section(struct ceph_connection *con,
return 1;
}
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
-
-static int read_partial_message_pages(struct ceph_connection *con,
- struct page **pages,
- unsigned int data_len, bool do_datacrc)
+static int read_partial_msg_data(struct ceph_connection *con)
{
- void *p;
+ struct ceph_msg *msg = con->in_msg;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ const bool do_datacrc = !con->msgr->nocrc;
+ struct page *page;
+ size_t page_offset;
+ size_t length;
+ u32 crc = 0;
int ret;
- int left;
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- /* (page) data */
- BUG_ON(pages == NULL);
- p = kmap(pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && do_datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
- }
-
- return ret;
-}
-
-#ifdef CONFIG_BLOCK
-static int read_partial_message_bio(struct ceph_connection *con,
- struct bio **bio_iter, int *bio_seg,
- unsigned int data_len, bool do_datacrc)
-{
- struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
- void *p;
- int ret, left;
+ BUG_ON(!msg);
+ if (list_empty(&msg->data))
+ return -EIO;
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(bv->bv_len - con->in_msg_pos.page_pos));
+ if (do_datacrc)
+ crc = con->in_data_crc;
+ while (cursor->resid) {
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
+ NULL);
+ ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+ if (ret <= 0) {
+ if (do_datacrc)
+ con->in_data_crc = crc;
- p = kmap(bv->bv_page) + bv->bv_offset;
+ return ret;
+ }
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && do_datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(bv->bv_page);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == bv->bv_len) {
- con->in_msg_pos.page_pos = 0;
- iter_bio_next(bio_iter, bio_seg);
+ if (do_datacrc)
+ crc = ceph_crc32c_page(crc, page, page_offset, ret);
+ (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
+ if (do_datacrc)
+ con->in_data_crc = crc;
- return ret;
+ return 1; /* must return > 0 to indicate success */
}
-#endif
/*
* read (part of) a message.
*/
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
+
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
@@ -1834,7 +2265,7 @@ static int read_partial_message(struct ceph_connection *con)
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
middle_len = le32_to_cpu(con->in_hdr.middle_len);
- if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+ if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
@@ -1863,14 +2294,22 @@ static int read_partial_message(struct ceph_connection *con)
int skip = 0;
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
- con->in_hdr.front_len, con->in_hdr.data_len);
+ front_len, data_len);
ret = ceph_con_in_msg_alloc(con, &skip);
if (ret < 0)
return ret;
+
+ BUG_ON(!con->in_msg ^ skip);
+ if (con->in_msg && data_len > con->in_msg->data_length) {
+ pr_warning("%s skipping long message (%u > %zd)\n",
+ __func__, data_len, con->in_msg->data_length);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ skip = 1;
+ }
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
- BUG_ON(con->in_msg);
con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1885,17 +2324,10 @@ static int read_partial_message(struct ceph_connection *con)
if (m->middle)
m->middle->vec.iov_len = 0;
- con->in_msg_pos.page = 0;
- if (m->pages)
- con->in_msg_pos.page_pos = m->page_alignment;
- else
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.data_pos = 0;
+ /* prepare for data payload, if any */
-#ifdef CONFIG_BLOCK
- if (m->bio)
- init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
-#endif
+ if (data_len)
+ prepare_message_data(con->in_msg, data_len);
}
/* front */
@@ -1914,24 +2346,10 @@ static int read_partial_message(struct ceph_connection *con)
}
/* (page) data */
- while (con->in_msg_pos.data_pos < data_len) {
- if (m->pages) {
- ret = read_partial_message_pages(con, m->pages,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#ifdef CONFIG_BLOCK
- } else if (m->bio) {
- BUG_ON(!m->bio_iter);
- ret = read_partial_message_bio(con,
- &m->bio_iter, &m->bio_seg,
- data_len, do_datacrc);
- if (ret <= 0)
- return ret;
-#endif
- } else {
- BUG_ON(1);
- }
+ if (data_len) {
+ ret = read_partial_msg_data(con);
+ if (ret <= 0)
+ return ret;
}
/* footer */
@@ -2057,13 +2475,13 @@ more_kvec:
goto do_next;
}
- ret = write_partial_msg_pages(con);
+ ret = write_partial_message_data(con);
if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */
if (ret == 0)
goto out;
if (ret < 0) {
- dout("try_write write_partial_msg_pages err %d\n",
+ dout("try_write write_partial_message_data err %d\n",
ret);
goto out;
}
@@ -2080,15 +2498,14 @@ do_next:
prepare_write_ack(con);
goto more;
}
- if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
- &con->flags)) {
+ if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
prepare_write_keepalive(con);
goto more;
}
}
/* Nothing to do! */
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
dout("try_write nothing else to write.\n");
ret = 0;
out:
@@ -2216,7 +2633,12 @@ more:
prepare_read_tag(con);
goto more;
}
- if (con->in_tag == CEPH_MSGR_TAG_ACK) {
+ if (con->in_tag == CEPH_MSGR_TAG_ACK ||
+ con->in_tag == CEPH_MSGR_TAG_SEQ) {
+ /*
+ * the final handshake seq exchange is semantically
+ * equivalent to an ACK
+ */
ret = read_partial_ack(con);
if (ret <= 0)
goto out;
@@ -2268,7 +2690,7 @@ static void queue_con(struct ceph_connection *con)
static bool con_sock_closed(struct ceph_connection *con)
{
- if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+ if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
return false;
#define CASE(x) \
@@ -2295,6 +2717,41 @@ static bool con_sock_closed(struct ceph_connection *con)
return true;
}
+static bool con_backoff(struct ceph_connection *con)
+{
+ int ret;
+
+ if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
+ return false;
+
+ ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+ if (ret) {
+ dout("%s: con %p FAILED to back off %lu\n", __func__,
+ con, con->delay);
+ BUG_ON(ret == -ENOENT);
+ con_flag_set(con, CON_FLAG_BACKOFF);
+ }
+
+ return true;
+}
+
+/* Finish fault handling; con->mutex must *not* be held here */
+
+static void con_fault_finish(struct ceph_connection *con)
+{
+ /*
+ * in case we faulted due to authentication, invalidate our
+ * current tickets so that we can get new ones.
+ */
+ if (con->auth_retry && con->ops->invalidate_authorizer) {
+ dout("calling invalidate_authorizer()\n");
+ con->ops->invalidate_authorizer(con);
+ }
+
+ if (con->ops->fault)
+ con->ops->fault(con);
+}
+
/*
* Do some work on a connection. Drop a connection ref when we're done.
*/
@@ -2302,73 +2759,68 @@ static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
- int ret;
+ bool fault;
mutex_lock(&con->mutex);
-restart:
- if (con_sock_closed(con))
- goto fault;
+ while (true) {
+ int ret;
- if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
- dout("con_work %p backing off\n", con);
- ret = queue_con_delay(con, round_jiffies_relative(con->delay));
- if (ret) {
- dout("con_work %p FAILED to back off %lu\n", con,
- con->delay);
- BUG_ON(ret == -ENOENT);
- set_bit(CON_FLAG_BACKOFF, &con->flags);
+ if ((fault = con_sock_closed(con))) {
+ dout("%s: con %p SOCK_CLOSED\n", __func__, con);
+ break;
+ }
+ if (con_backoff(con)) {
+ dout("%s: con %p BACKOFF\n", __func__, con);
+ break;
+ }
+ if (con->state == CON_STATE_STANDBY) {
+ dout("%s: con %p STANDBY\n", __func__, con);
+ break;
+ }
+ if (con->state == CON_STATE_CLOSED) {
+ dout("%s: con %p CLOSED\n", __func__, con);
+ BUG_ON(con->sock);
+ break;
+ }
+ if (con->state == CON_STATE_PREOPEN) {
+ dout("%s: con %p PREOPEN\n", __func__, con);
+ BUG_ON(con->sock);
}
- goto done;
- }
- if (con->state == CON_STATE_STANDBY) {
- dout("con_work %p STANDBY\n", con);
- goto done;
- }
- if (con->state == CON_STATE_CLOSED) {
- dout("con_work %p CLOSED\n", con);
- BUG_ON(con->sock);
- goto done;
- }
- if (con->state == CON_STATE_PREOPEN) {
- dout("con_work OPENING\n");
- BUG_ON(con->sock);
- }
+ ret = try_read(con);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ continue;
+ con->error_msg = "socket error on read";
+ fault = true;
+ break;
+ }
- ret = try_read(con);
- if (ret == -EAGAIN)
- goto restart;
- if (ret < 0) {
- con->error_msg = "socket error on read";
- goto fault;
- }
+ ret = try_write(con);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ continue;
+ con->error_msg = "socket error on write";
+ fault = true;
+ }
- ret = try_write(con);
- if (ret == -EAGAIN)
- goto restart;
- if (ret < 0) {
- con->error_msg = "socket error on write";
- goto fault;
+ break; /* If we make it to here, we're done */
}
-
-done:
+ if (fault)
+ con_fault(con);
mutex_unlock(&con->mutex);
-done_unlocked:
- con->ops->put(con);
- return;
-fault:
- ceph_fault(con); /* error/fault path */
- goto done_unlocked;
-}
+ if (fault)
+ con_fault_finish(con);
+ con->ops->put(con);
+}
/*
* Generic error/fault handler. A retry mechanism is used with
* exponential backoff
*/
-static void ceph_fault(struct ceph_connection *con)
- __releases(con->mutex)
+static void con_fault(struct ceph_connection *con)
{
pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
@@ -2381,10 +2833,10 @@ static void ceph_fault(struct ceph_connection *con)
con_close_socket(con);
- if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+ if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CON_STATE_CLOSED;
- goto out_unlock;
+ return;
}
if (con->in_msg) {
@@ -2401,9 +2853,9 @@ static void ceph_fault(struct ceph_connection *con)
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
- !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
+ !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
- clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ con_flag_clear(con, CON_FLAG_WRITE_PENDING);
con->state = CON_STATE_STANDBY;
} else {
/* retry after a delay. */
@@ -2412,23 +2864,9 @@ static void ceph_fault(struct ceph_connection *con)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
- set_bit(CON_FLAG_BACKOFF, &con->flags);
+ con_flag_set(con, CON_FLAG_BACKOFF);
queue_con(con);
}
-
-out_unlock:
- mutex_unlock(&con->mutex);
- /*
- * in case we faulted due to authentication, invalidate our
- * current tickets so that we can get new ones.
- */
- if (con->auth_retry && con->ops->invalidate_authorizer) {
- dout("calling invalidate_authorizer()\n");
- con->ops->invalidate_authorizer(con);
- }
-
- if (con->ops->fault)
- con->ops->fault(con);
}
@@ -2438,8 +2876,8 @@ out_unlock:
*/
void ceph_messenger_init(struct ceph_messenger *msgr,
struct ceph_entity_addr *myaddr,
- u32 supported_features,
- u32 required_features,
+ u64 supported_features,
+ u64 required_features,
bool nocrc)
{
msgr->supported_features = supported_features;
@@ -2469,8 +2907,8 @@ static void clear_standby(struct ceph_connection *con)
dout("clear_standby %p and ++connect_seq\n", con);
con->state = CON_STATE_PREOPEN;
con->connect_seq++;
- WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
- WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
+ WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
+ WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
}
}
@@ -2511,7 +2949,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
/* if there wasn't anything waiting to send before, queue
* new work */
- if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
+ if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_send);
@@ -2600,12 +3038,94 @@ void ceph_con_keepalive(struct ceph_connection *con)
mutex_lock(&con->mutex);
clear_standby(con);
mutex_unlock(&con->mutex);
- if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
- test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
+ if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 &&
+ con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_keepalive);
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+{
+ struct ceph_msg_data *data;
+
+ if (WARN_ON(!ceph_msg_data_type_valid(type)))
+ return NULL;
+
+ data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
+ if (data)
+ data->type = type;
+ INIT_LIST_HEAD(&data->links);
+
+ return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+ if (!data)
+ return;
+
+ WARN_ON(!list_empty(&data->links));
+ if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ ceph_pagelist_release(data->pagelist);
+ kfree(data->pagelist);
+ }
+ kmem_cache_free(ceph_msg_data_cache, data);
+}
+
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
+ size_t length, size_t alignment)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!pages);
+ BUG_ON(!length);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+ BUG_ON(!data);
+ data->pages = pages;
+ data->length = length;
+ data->alignment = alignment & ~PAGE_MASK;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
+
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
+ struct ceph_pagelist *pagelist)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!pagelist);
+ BUG_ON(!pagelist->length);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+ BUG_ON(!data);
+ data->pagelist = pagelist;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += pagelist->length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
+
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
+ size_t length)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!bio);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+ BUG_ON(!data);
+ data->bio = bio;
+ data->bio_length = length;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
+#endif /* CONFIG_BLOCK */
/*
* construct a new message with given type, size
@@ -2616,55 +3136,21 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
{
struct ceph_msg *m;
- m = kmalloc(sizeof(*m), flags);
+ m = kmem_cache_zalloc(ceph_msg_cache, flags);
if (m == NULL)
goto out;
- kref_init(&m->kref);
- m->con = NULL;
- INIT_LIST_HEAD(&m->list_head);
-
- m->hdr.tid = 0;
m->hdr.type = cpu_to_le16(type);
m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
- m->hdr.version = 0;
m->hdr.front_len = cpu_to_le32(front_len);
- m->hdr.middle_len = 0;
- m->hdr.data_len = 0;
- m->hdr.data_off = 0;
- m->hdr.reserved = 0;
- m->footer.front_crc = 0;
- m->footer.middle_crc = 0;
- m->footer.data_crc = 0;
- m->footer.flags = 0;
- m->front_max = front_len;
- m->front_is_vmalloc = false;
- m->more_to_follow = false;
- m->ack_stamp = 0;
- m->pool = NULL;
- /* middle */
- m->middle = NULL;
-
- /* data */
- m->nr_pages = 0;
- m->page_alignment = 0;
- m->pages = NULL;
- m->pagelist = NULL;
- m->bio = NULL;
- m->bio_iter = NULL;
- m->bio_seg = 0;
- m->trail = NULL;
+ INIT_LIST_HEAD(&m->list_head);
+ kref_init(&m->kref);
+ INIT_LIST_HEAD(&m->data);
/* front */
if (front_len) {
- if (front_len > PAGE_CACHE_SIZE) {
- m->front.iov_base = __vmalloc(front_len, flags,
- PAGE_KERNEL);
- m->front_is_vmalloc = true;
- } else {
- m->front.iov_base = kmalloc(front_len, flags);
- }
+ m->front.iov_base = ceph_kvmalloc(front_len, flags);
if (m->front.iov_base == NULL) {
dout("ceph_msg_new can't allocate %d bytes\n",
front_len);
@@ -2673,7 +3159,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
} else {
m->front.iov_base = NULL;
}
- m->front.iov_len = front_len;
+ m->front_alloc_len = m->front.iov_len = front_len;
dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
@@ -2734,49 +3220,37 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
{
struct ceph_msg_header *hdr = &con->in_hdr;
- int type = le16_to_cpu(hdr->type);
- int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
+ struct ceph_msg *msg;
int ret = 0;
BUG_ON(con->in_msg != NULL);
+ BUG_ON(!con->ops->alloc_msg);
- if (con->ops->alloc_msg) {
- struct ceph_msg *msg;
-
- mutex_unlock(&con->mutex);
- msg = con->ops->alloc_msg(con, hdr, skip);
- mutex_lock(&con->mutex);
- if (con->state != CON_STATE_OPEN) {
- if (msg)
- ceph_msg_put(msg);
- return -EAGAIN;
- }
- con->in_msg = msg;
- if (con->in_msg) {
- con->in_msg->con = con->ops->get(con);
- BUG_ON(con->in_msg->con == NULL);
- }
- if (*skip) {
- con->in_msg = NULL;
- return 0;
- }
- if (!con->in_msg) {
- con->error_msg =
- "error allocating memory for incoming message";
- return -ENOMEM;
- }
+ mutex_unlock(&con->mutex);
+ msg = con->ops->alloc_msg(con, hdr, skip);
+ mutex_lock(&con->mutex);
+ if (con->state != CON_STATE_OPEN) {
+ if (msg)
+ ceph_msg_put(msg);
+ return -EAGAIN;
}
- if (!con->in_msg) {
- con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
- if (!con->in_msg) {
- pr_err("unable to allocate msg type %d len %d\n",
- type, front_len);
- return -ENOMEM;
- }
+ if (msg) {
+ BUG_ON(*skip);
+ con->in_msg = msg;
con->in_msg->con = con->ops->get(con);
BUG_ON(con->in_msg->con == NULL);
- con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
+ } else {
+ /*
+ * Null message pointer means either we should skip
+ * this message or we couldn't allocate memory. The
+ * former is not an error.
+ */
+ if (*skip)
+ return 0;
+ con->error_msg = "error allocating memory for incoming message";
+
+ return -ENOMEM;
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
@@ -2798,11 +3272,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
void ceph_msg_kfree(struct ceph_msg *m)
{
dout("msg_kfree %p\n", m);
- if (m->front_is_vmalloc)
- vfree(m->front.iov_base);
- else
- kfree(m->front.iov_base);
- kfree(m);
+ ceph_kvfree(m->front.iov_base);
+ kmem_cache_free(ceph_msg_cache, m);
}
/*
@@ -2811,6 +3282,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
void ceph_msg_last_put(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+ LIST_HEAD(data);
+ struct list_head *links;
+ struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
@@ -2820,16 +3294,16 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- m->nr_pages = 0;
- m->pages = NULL;
- if (m->pagelist) {
- ceph_pagelist_release(m->pagelist);
- kfree(m->pagelist);
- m->pagelist = NULL;
- }
+ list_splice_init(&m->data, &data);
+ list_for_each_safe(links, next, &data) {
+ struct ceph_msg_data *data;
- m->trail = NULL;
+ data = list_entry(links, struct ceph_msg_data, links);
+ list_del_init(links);
+ ceph_msg_data_destroy(data);
+ }
+ m->data_length = 0;
if (m->pool)
ceph_msgpool_put(m->pool, m);
@@ -2840,8 +3314,8 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
- pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg,
- msg->front_max, msg->nr_pages);
+ pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg,
+ msg->front_alloc_len, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);