aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-04-06 15:01:27 -0700
committerSage Weil <sage@newdream.net>2010-10-20 15:37:18 -0700
commit68b4476b0bc13fef18266b4140309a30e86739d2 (patch)
tree47fab5ea2491c7bc75fe14a3b0d3a091eb6244b7 /fs/ceph/messenger.c
parent3499e8a5d4dbb083324efd942e2c4fb7eb65f27c (diff)
ceph: messenger and osdc changes for rbd
Allow the messenger to send/receive data in a bio. This is added so that we wouldn't need to copy the data into pages or some other buffer when doing IO for an rbd block device. We can now have trailing variable sized data for osd ops. Also osd ops encoding is more modular. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c219
1 files changed, 187 insertions, 32 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec..17a09b32a59 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
#include <net/tcp.h>
#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos =
- le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ if (m->pages)
+ con->out_msg_pos.page_pos =
+ le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ else
+ con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
return ret; /* done! */
}
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
+ int total_max_write;
+ int in_trail = 0;
+ size_t trail_len = (msg->trail ? msg->trail->length : 0);
dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);
- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+#ifdef CONFIG_BLOCK
+ if (msg->bio && !msg->bio_iter)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+
+ while (data_len > con->out_msg_pos.data_pos) {
struct page *page = NULL;
void *kaddr = NULL;
+ int max_write = PAGE_SIZE;
+ int page_shift = 0;
+
+ total_max_write = data_len - trail_len -
+ con->out_msg_pos.data_pos;
/*
* if we are calculating the data crc (the default), we need
* to map the page. if our pages[] has been revoked, use the
* zero page.
*/
- if (msg->pages) {
+
+ /* have we reached the trail part of the data? */
+ if (con->out_msg_pos.data_pos >= data_len - trail_len) {
+ in_trail = 1;
+
+ total_max_write = data_len - con->out_msg_pos.data_pos;
+
+ page = list_first_entry(&msg->trail->head,
+ struct page, lru);
+ if (crc)
+ kaddr = kmap(page);
+ max_write = PAGE_SIZE;
+ } else if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
struct page, lru);
if (crc)
kaddr = kmap(page);
+#ifdef CONFIG_BLOCK
+ } else if (msg->bio) {
+ struct bio_vec *bv;
+
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+ page = bv->bv_page;
+ page_shift = bv->bv_offset;
+ if (crc)
+ kaddr = kmap(page) + page_shift;
+ max_write = bv->bv_len;
+#endif
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
+ len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ total_max_write);
+
if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
cpu_to_le32(crc32c(tmpcrc, base, len));
con->out_msg_pos.did_page_crc = 1;
}
-
ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
+ con->out_msg_pos.page_pos + page_shift,
+ len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && (msg->pages || msg->pagelist))
+ if (crc &&
+ (msg->pages || msg->pagelist || msg->bio || in_trail))
kunmap(page);
if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
con->out_msg_pos.page_pos = 0;
con->out_msg_pos.page++;
con->out_msg_pos.did_page_crc = 0;
- if (msg->pagelist)
+ if (in_trail)
+ list_move_tail(&page->lru,
+ &msg->trail->head);
+ else if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+ else if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
}
}
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section,
unsigned int sec_len, u32 *crc)
{
- int left;
- int ret;
+ int ret, left;
BUG_ON(!section);
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con,
+ struct page **pages,
+ unsigned data_len, int datacrc)
+{
+ void *p;
+ int ret;
+ int left;
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ /* (page) data */
+ BUG_ON(pages == NULL);
+ p = kmap(pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(pages[con->in_msg_pos.page]);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+
+ return ret;
+}
+
+#ifdef CONFIG_BLOCK
+static int read_partial_message_bio(struct ceph_connection *con,
+ struct bio **bio_iter, int *bio_seg,
+ unsigned data_len, int datacrc)
+{
+ struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ void *p;
+ int ret, left;
+
+ if (IS_ERR(bv))
+ return PTR_ERR(bv);
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+ p = kmap(bv->bv_page) + bv->bv_offset;
+
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(bv->bv_page);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == bv->bv_len) {
+ con->in_msg_pos.page_pos = 0;
+ iter_bio_next(bio_iter, bio_seg);
+ }
+
+ return ret;
+}
+#endif
+
/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
- void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;
con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ if (m->pages)
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ else
+ con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
+#ifdef CONFIG_BLOCK
+ if (m->bio && !m->bio_iter)
+ init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- BUG_ON(m->pages == NULL);
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
+ if (m->pages) {
+ ret = read_partial_message_pages(con, m->pages,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#ifdef CONFIG_BLOCK
+ } else if (m->bio) {
+
+ ret = read_partial_message_bio(con,
+ &m->bio_iter, &m->bio_seg,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+#endif
+ } else {
+ BUG_ON(1);
}
}
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
+ m->bio = NULL;
+ m->bio_iter = NULL;
+ m->bio_seg = 0;
+ m->trail = NULL;
dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
m->pagelist = NULL;
}
+ m->trail = NULL;
+
if (m->pool)
ceph_msgpool_put(m->pool, m);
else