diff options
author | Yehuda Sadeh <yehuda@hq.newdream.net> | 2010-01-13 17:03:23 -0800 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2010-01-25 12:58:08 -0800 |
commit | 0d59ab81c3d3adf466c3fd37d7fb6d46b05d1fd4 (patch) | |
tree | 1cdf338188bc323379ced8d5e1bdec31f500768c /fs/ceph/osd_client.c | |
parent | 0547a9b30a5ac8680325752b61d3ffa9d4971b6e (diff) |
ceph: keep reserved replies on the request structure
This includes treating all the data preallocation and revokation
at the same place, not having to have a special case for
the reserved pages.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r-- | fs/ceph/osd_client.c | 118 |
1 files changed, 83 insertions, 35 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c index 44abe299c69..df210683971 100644 --- a/fs/ceph/osd_client.c +++ b/fs/ceph/osd_client.c @@ -13,6 +13,8 @@ #include "decode.h" #include "auth.h" +#define OSD_REPLY_RESERVE_FRONT_LEN 512 + const static struct ceph_connection_operations osd_con_ops; static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); @@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc, req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages); } +static void remove_replies(struct ceph_osd_request *req) +{ + int i; + int max = ARRAY_SIZE(req->replies); + + for (i=0; i<max; i++) { + if (req->replies[i]) + ceph_msg_put(req->replies[i]); + } +} /* * requests @@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref) ceph_msg_put(req->r_request); if (req->r_reply) ceph_msg_put(req->r_reply); - if (req->r_con_filling_pages) { + remove_replies(req); + if (req->r_con_filling_msg) { dout("release_request revoking pages %p from con %p\n", - req->r_pages, req->r_con_filling_pages); - ceph_con_revoke_pages(req->r_con_filling_pages, - req->r_pages); - ceph_con_put(req->r_con_filling_pages); + req->r_pages, req->r_con_filling_msg); + ceph_con_revoke_message(req->r_con_filling_msg, + req->r_reply); + ceph_con_put(req->r_con_filling_msg); } if (req->r_own_pages) ceph_release_page_vector(req->r_pages, @@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref) kfree(req); } +static int alloc_replies(struct ceph_osd_request *req, int num_reply) +{ + int i; + int max = ARRAY_SIZE(req->replies); + + BUG_ON(num_reply > max); + + for (i=0; i<num_reply; i++) { + req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL); + if (IS_ERR(req->replies[i])) { + int j; + int err = PTR_ERR(req->replies[i]); + for (j = 0; j<=i; j++) { + ceph_msg_put(req->replies[j]); + } + return err; + } + } + + for (; i<max; i++) { + req->replies[i] = NULL; + } + + req->cur_reply = 0; + + return 0; +} + +static struct ceph_msg *__get_next_reply(struct ceph_connection *con, + struct ceph_osd_request *req, + int front_len) +{ + struct ceph_msg *reply; + if (req->r_con_filling_msg) { + dout("revoking reply msg %p from old con %p\n", req->r_reply, + req->r_con_filling_msg); + ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); + ceph_con_put(req->r_con_filling_msg); + req->cur_reply = 0; + } + reply = req->replies[req->cur_reply]; + if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) { + /* maybe we can allocate it now? */ + reply = ceph_msg_new(0, front_len, 0, 0, NULL); + if (!reply || IS_ERR(reply)) { + pr_err(" reply alloc failed, front_len=%d\n", front_len); + return ERR_PTR(-ENOMEM); + } + } + req->r_con_filling_msg = ceph_con_get(con); + req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */ + return ceph_msg_get(reply); +} + /* * build new request AND message, calculate layout, and adjust file * extent as needed. @@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (req == NULL) return ERR_PTR(-ENOMEM); - err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply); + err = alloc_replies(req, num_reply); if (err) { ceph_osdc_put_request(req); return ERR_PTR(-ENOMEM); @@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, else msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); if (IS_ERR(msg)) { - ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply); ceph_osdc_put_request(req); return ERR_PTR(PTR_ERR(msg)); } @@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, rb_erase(&req->r_node, &osdc->requests); osdc->num_requests--; - ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply); - if (req->r_osd) { /* make sure the original request isn't in flight. */ ceph_con_revoke(&req->r_osd->o_con, req->r_request); @@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, flags = le32_to_cpu(rhead->flags); /* - * if this connection filled our pages, drop our reference now, to + * if this connection filled our message, drop our reference now, to * avoid a (safe but slower) revoke later. */ - if (req->r_con_filling_pages == con && req->r_pages == msg->pages) { - dout(" got pages, dropping con_filling_pages ref %p\n", con); - req->r_con_filling_pages = NULL; + if (req->r_con_filling_msg == con && req->r_reply == msg) { + dout(" got pages, dropping con_filling_msg ref %p\n", con); + req->r_con_filling_msg = NULL; ceph_con_put(con); } @@ -998,7 +1062,7 @@ bad: * find those pages. * 0 = success, -1 failure. */ -static int prepare_pages(struct ceph_connection *con, +static int __prepare_pages(struct ceph_connection *con, struct ceph_msg_header *hdr, struct ceph_osd_request *req, u64 tid, @@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con, osdc = osd->o_osdc; - dout("prepare_pages on msg %p want %d\n", m, want); - dout("prepare_pages tid %llu has %d pages, want %d\n", + dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m, tid, req->r_num_pages, want); if (unlikely(req->r_num_pages < want)) goto out; - - if (req->r_con_filling_pages) { - dout("revoking pages %p from old con %p\n", req->r_pages, - req->r_con_filling_pages); - ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages); - ceph_con_put(req->r_con_filling_pages); - } - req->r_con_filling_pages = ceph_con_get(con); - req->r_reply = ceph_msg_get(m); /* for duration of read over socket */ m->pages = req->r_pages; m->nr_pages = req->r_num_pages; ret = 0; /* success */ @@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true); if (err < 0) goto out_mempool; - err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false); - if (err < 0) - goto out_msgpool; return 0; -out_msgpool: - ceph_msgpool_destroy(&osdc->msgpool_op); out_mempool: mempool_destroy(osdc->req_mempool); out: @@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) } mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); - ceph_msgpool_destroy(&osdc->msgpool_op_reply); } /* @@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; - dout("prepare_pages unknown tid %llu\n", tid); + dout("alloc_msg unknown tid %llu\n", tid); goto out; } - m = ceph_msgpool_get(&osdc->msgpool_op_reply, front); - if (!m) { + m = __get_next_reply(con, req, front); + if (!m || IS_ERR(m)) { *skip = 1; goto out; } if (data_len > 0) { - err = prepare_pages(con, hdr, req, tid, m); + err = __prepare_pages(con, hdr, req, tid, m); if (err < 0) { *skip = 1; ceph_msg_put(m); |