aboutsummaryrefslogtreecommitdiff
path: root/net/ceph
diff options
context:
space:
mode:
authorIlya Dryomov <ilya.dryomov@inktank.com>2014-01-27 17:40:20 +0200
committerIlya Dryomov <ilya.dryomov@inktank.com>2014-01-27 23:57:53 +0200
commit205ee1187a671c3b067d7f1e974903b44036f270 (patch)
tree2a310516bcf7fdfb769c360a9fcfce85f501f57a /net/ceph
parent3c972c95c68f455d80ff185aa440857be046bbe0 (diff)
libceph: follow redirect replies from osds
Follow redirect replies from osds, for details see ceph.git commit fbbe3ad1220799b7bb00ea30fce581c5eadaf034. v1 (current) version of redirect reply consists of oloc and oid, which expands to pool, key, nspace, hash and oid. However, server-side code that would populate anything other than pool doesn't exist yet, and hence this commit adds support for pool redirects only. To make sure that future server-side updates don't break us, we decode all fields and, if any of key, nspace, hash or oid have a non-default value, error out with "corrupt osd_op_reply ..." message. Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com> Reviewed-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/osd_client.c167
1 files changed, 158 insertions, 9 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3997a87c4f5..010ff3bd58a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -369,6 +369,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
INIT_LIST_HEAD(&req->r_osd_item);
req->r_base_oloc.pool = -1;
+ req->r_target_oloc.pool = -1;
/* create reply message */
if (use_mempool)
@@ -1256,23 +1257,36 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
struct ceph_osd_request *req,
struct ceph_pg *pg_out)
{
- if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ bool need_check_tiering;
+
+ need_check_tiering = false;
+ if (req->r_target_oloc.pool == -1) {
+ req->r_target_oloc = req->r_base_oloc; /* struct */
+ need_check_tiering = true;
+ }
+ if (req->r_target_oid.name_len == 0) {
+ ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+ need_check_tiering = true;
+ }
+
+ if (need_check_tiering &&
+ (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
struct ceph_pg_pool_info *pi;
- pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
+ pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
if (pi) {
if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
pi->read_tier >= 0)
- req->r_base_oloc.pool = pi->read_tier;
+ req->r_target_oloc.pool = pi->read_tier;
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
pi->write_tier >= 0)
- req->r_base_oloc.pool = pi->write_tier;
+ req->r_target_oloc.pool = pi->write_tier;
}
/* !pi is caught in ceph_oloc_oid_to_pg() */
}
- return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
- &req->r_base_oid, pg_out);
+ return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+ &req->r_target_oid, pg_out);
}
/*
@@ -1382,7 +1396,7 @@ static void __send_request(struct ceph_osd_client *osdc,
/* fill in message content that changes each time we send it */
put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
put_unaligned_le32(req->r_flags, req->r_request_flags);
- put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
+ put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
p = req->r_request_pgid;
ceph_encode_64(&p, req->r_pgid.pool);
ceph_encode_32(&p, req->r_pgid.seed);
@@ -1483,6 +1497,109 @@ static void handle_osds_timeout(struct work_struct *work)
round_jiffies_relative(delay));
}
+static int ceph_oloc_decode(void **p, void *end,
+ struct ceph_object_locator *oloc)
+{
+ u8 struct_v, struct_cv;
+ u32 len;
+ void *struct_end;
+ int ret = 0;
+
+ ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+ struct_v = ceph_decode_8(p);
+ struct_cv = ceph_decode_8(p);
+ if (struct_v < 3) {
+ pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ if (struct_cv > 6) {
+ pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, e_inval);
+ struct_end = *p + len;
+
+ oloc->pool = ceph_decode_64(p);
+ *p += 4; /* skip preferred */
+
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_object_locator::key is set\n");
+ goto e_inval;
+ }
+
+ if (struct_v >= 5) {
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_object_locator::nspace is set\n");
+ goto e_inval;
+ }
+ }
+
+ if (struct_v >= 6) {
+ s64 hash = ceph_decode_64(p);
+ if (hash != -1) {
+ pr_warn("ceph_object_locator::hash is set\n");
+ goto e_inval;
+ }
+ }
+
+ /* skip the rest */
+ *p = struct_end;
+out:
+ return ret;
+
+e_inval:
+ ret = -EINVAL;
+ goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+ struct ceph_request_redirect *redir)
+{
+ u8 struct_v, struct_cv;
+ u32 len;
+ void *struct_end;
+ int ret;
+
+ ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+ struct_v = ceph_decode_8(p);
+ struct_cv = ceph_decode_8(p);
+ if (struct_cv > 1) {
+ pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+ struct_v, struct_cv);
+ goto e_inval;
+ }
+ len = ceph_decode_32(p);
+ ceph_decode_need(p, end, len, e_inval);
+ struct_end = *p + len;
+
+ ret = ceph_oloc_decode(p, end, &redir->oloc);
+ if (ret)
+ goto out;
+
+ len = ceph_decode_32(p);
+ if (len > 0) {
+ pr_warn("ceph_request_redirect::object_name is set\n");
+ goto e_inval;
+ }
+
+ len = ceph_decode_32(p);
+ *p += len; /* skip osd_instructions */
+
+ /* skip the rest */
+ *p = struct_end;
+out:
+ return ret;
+
+e_inval:
+ ret = -EINVAL;
+ goto out;
+}
+
static void complete_request(struct ceph_osd_request *req)
{
complete_all(&req->r_safe_completion); /* fsync waiter */
@@ -1497,6 +1614,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
{
void *p, *end;
struct ceph_osd_request *req;
+ struct ceph_request_redirect redir;
u64 tid;
int object_len;
unsigned int numops;
@@ -1576,10 +1694,41 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
for (i = 0; i < numops; i++)
req->r_reply_op_result[i] = ceph_decode_32(&p);
- already_completed = req->r_got_reply;
+ if (le16_to_cpu(msg->hdr.version) >= 6) {
+ p += 8 + 4; /* skip replay_version */
+ p += 8; /* skip user_version */
- if (!req->r_got_reply) {
+ err = ceph_redirect_decode(&p, end, &redir);
+ if (err)
+ goto bad_put;
+ } else {
+ redir.oloc.pool = -1;
+ }
+ if (redir.oloc.pool != -1) {
+ dout("redirect pool %lld\n", redir.oloc.pool);
+
+ __unregister_request(osdc, req);
+ mutex_unlock(&osdc->request_mutex);
+
+ req->r_target_oloc = redir.oloc; /* struct */
+
+ /*
+ * Start redirect requests with nofail=true. If
+ * mapping fails, request will end up on the notarget
+ * list, waiting for the new osdmap (which can take
+ * a while), even though the original request mapped
+ * successfully. In the future we might want to follow
+ * original request's nofail setting here.
+ */
+ err = ceph_osdc_start_request(osdc, req, true);
+ BUG_ON(err);
+
+ goto done;
+ }
+
+ already_completed = req->r_got_reply;
+ if (!req->r_got_reply) {
req->r_result = result;
dout("handle_reply result %d bytes %d\n", req->r_result,
bytes);