diff options
Diffstat (limited to 'drivers/block/rbd.c')
-rw-r--r-- | drivers/block/rbd.c | 538 |
1 files changed, 491 insertions, 47 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e1e38b11f48..9712fad82bc 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/decode.h> +#include <linux/parser.h> #include <linux/kernel.h> #include <linux/device.h> @@ -54,6 +55,8 @@ #define DEV_NAME_LEN 32 +#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 + /* * block device image metadata (in-memory version) */ @@ -71,6 +74,12 @@ struct rbd_image_header { char *snap_names; u64 *snap_sizes; + + u64 obj_version; +}; + +struct rbd_options { + int notify_timeout; }; /* @@ -78,10 +87,13 @@ struct rbd_image_header { */ struct rbd_client { struct ceph_client *client; + struct rbd_options *rbd_opts; struct kref kref; struct list_head node; }; +struct rbd_req_coll; + /* * a single io request */ @@ -90,6 +102,24 @@ struct rbd_request { struct bio *bio; /* cloned bio */ struct page **pages; /* list of used pages */ u64 len; + int coll_index; + struct rbd_req_coll *coll; +}; + +struct rbd_req_status { + int done; + int rc; + u64 bytes; +}; + +/* + * a collection of requests + */ +struct rbd_req_coll { + int total; + int num_done; + struct kref kref; + struct rbd_req_status status[0]; }; struct rbd_snap { @@ -124,6 +154,9 @@ struct rbd_device { char pool_name[RBD_MAX_POOL_NAME_LEN]; int poolid; + struct ceph_osd_event *watch_event; + struct ceph_osd_request *watch_request; + char snap_name[RBD_MAX_SNAP_NAME_LEN]; u32 cur_snap; /* index+1 of current snapshot within snap context 0 - for the head */ @@ -177,6 +210,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) put_device(&rbd_dev->dev); } +static int __rbd_update_snaps(struct rbd_device *rbd_dev); + static int rbd_open(struct block_device *bdev, fmode_t mode) { struct gendisk *disk = bdev->bd_disk; @@ -211,7 +246,8 @@ static const struct block_device_operations rbd_bd_ops = { * Initialize an rbd client instance. * We own *opt. */ -static struct rbd_client *rbd_client_create(struct ceph_options *opt) +static struct rbd_client *rbd_client_create(struct ceph_options *opt, + struct rbd_options *rbd_opts) { struct rbd_client *rbdc; int ret = -ENOMEM; @@ -233,6 +269,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt) if (ret < 0) goto out_err; + rbdc->rbd_opts = rbd_opts; + spin_lock(&node_lock); list_add_tail(&rbdc->node, &rbd_client_list); spin_unlock(&node_lock); @@ -267,6 +305,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt) } /* + * mount options + */ +enum { + Opt_notify_timeout, + Opt_last_int, + /* int args above */ + Opt_last_string, + /* string args above */ +}; + +static match_table_t rbdopt_tokens = { + {Opt_notify_timeout, "notify_timeout=%d"}, + /* int args above */ + /* string args above */ + {-1, NULL} +}; + +static int parse_rbd_opts_token(char *c, void *private) +{ + struct rbd_options *rbdopt = private; + substring_t argstr[MAX_OPT_ARGS]; + int token, intval, ret; + + token = match_token((char *)c, rbdopt_tokens, argstr); + if (token < 0) + return -EINVAL; + + if (token < Opt_last_int) { + ret = match_int(&argstr[0], &intval); + if (ret < 0) { + pr_err("bad mount option arg (not int) " + "at '%s'\n", c); + return ret; + } + dout("got int token %d val %d\n", token, intval); + } else if (token > Opt_last_int && token < Opt_last_string) { + dout("got string token %d val %s\n", token, + argstr[0].from); + } else { + dout("got token %d\n", token); + } + + switch (token) { + case Opt_notify_timeout: + rbdopt->notify_timeout = intval; + break; + default: + BUG_ON(token); + } + return 0; +} + +/* * Get a ceph client with specific addr and configuration, if one does * not exist create it. */ @@ -276,11 +367,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, struct rbd_client *rbdc; struct ceph_options *opt; int ret; + struct rbd_options *rbd_opts; + + rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); + if (!rbd_opts) + return -ENOMEM; + + rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; ret = ceph_parse_options(&opt, options, mon_addr, - mon_addr + strlen(mon_addr), NULL, NULL); + mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); if (ret < 0) - return ret; + goto done_err; spin_lock(&node_lock); rbdc = __rbd_client_find(opt); @@ -296,13 +394,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, } spin_unlock(&node_lock); - rbdc = rbd_client_create(opt); - if (IS_ERR(rbdc)) - return PTR_ERR(rbdc); + rbdc = rbd_client_create(opt, rbd_opts); + if (IS_ERR(rbdc)) { + ret = PTR_ERR(rbdc); + goto done_err; + } rbd_dev->rbd_client = rbdc; rbd_dev->client = rbdc->client; return 0; +done_err: + kfree(rbd_opts); + return ret; } /* @@ -318,6 +421,7 @@ static void rbd_client_release(struct kref *kref) spin_unlock(&node_lock); ceph_destroy_client(rbdc->client); + kfree(rbdc->rbd_opts); kfree(rbdc); } @@ -332,6 +436,17 @@ static void rbd_put_client(struct rbd_device *rbd_dev) rbd_dev->client = NULL; } +/* + * Destroy requests collection + */ +static void rbd_coll_release(struct kref *kref) +{ + struct rbd_req_coll *coll = + container_of(kref, struct rbd_req_coll, kref); + + dout("rbd_coll_release %p\n", coll); + kfree(coll); +} /* * Create a new header structure, translate header format from the on-disk @@ -506,6 +621,14 @@ static u64 rbd_get_segment(struct rbd_image_header *header, return len; } +static int rbd_get_num_segments(struct rbd_image_header *header, + u64 ofs, u64 len) +{ + u64 start_seg = ofs >> header->obj_order; + u64 end_seg = (ofs + len - 1) >> header->obj_order; + return end_seg - start_seg + 1; +} + /* * bio helpers */ @@ -651,6 +774,50 @@ static void rbd_destroy_ops(struct ceph_osd_req_op *ops) kfree(ops); } +static void rbd_coll_end_req_index(struct request *rq, + struct rbd_req_coll *coll, + int index, + int ret, u64 len) +{ + struct request_queue *q; + int min, max, i; + + dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n", + coll, index, ret, len); + + if (!rq) + return; + + if (!coll) { + blk_end_request(rq, ret, len); + return; + } + + q = rq->q; + + spin_lock_irq(q->queue_lock); + coll->status[index].done = 1; + coll->status[index].rc = ret; + coll->status[index].bytes = len; + max = min = coll->num_done; + while (max < coll->total && coll->status[max].done) + max++; + + for (i = min; i<max; i++) { + __blk_end_request(rq, coll->status[i].rc, + coll->status[i].bytes); + coll->num_done++; + kref_put(&coll->kref, rbd_coll_release); + } + spin_unlock_irq(q->queue_lock); +} + +static void rbd_coll_end_req(struct rbd_request *req, + int ret, u64 len) +{ + rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); +} + /* * Send ceph osd request */ @@ -665,8 +832,12 @@ static int rbd_do_request(struct request *rq, int flags, struct ceph_osd_req_op *ops, int num_reply, + struct rbd_req_coll *coll, + int coll_index, void (*rbd_cb)(struct ceph_osd_request *req, - struct ceph_msg *msg)) + struct ceph_msg *msg), + struct ceph_osd_request **linger_req, + u64 *ver) { struct ceph_osd_request *req; struct ceph_file_layout *layout; @@ -677,12 +848,20 @@ static int rbd_do_request(struct request *rq, struct ceph_osd_request_head *reqhead; struct rbd_image_header *header = &dev->header; - ret = -ENOMEM; req_data = kzalloc(sizeof(*req_data), GFP_NOIO); - if (!req_data) - goto done; + if (!req_data) { + if (coll) + rbd_coll_end_req_index(rq, coll, coll_index, + -ENOMEM, len); + return -ENOMEM; + } + + if (coll) { + req_data->coll = coll; + req_data->coll_index = coll_index; + } - dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs); + dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); down_read(&header->snap_rwsem); @@ -691,9 +870,9 @@ static int rbd_do_request(struct request *rq, ops, false, GFP_NOIO, pages, bio); - if (IS_ERR(req)) { + if (!req) { up_read(&header->snap_rwsem); - ret = PTR_ERR(req); + ret = -ENOMEM; goto done_pages; } @@ -729,12 +908,21 @@ static int rbd_do_request(struct request *rq, req->r_oid, req->r_oid_len); up_read(&header->snap_rwsem); + if (linger_req) { + ceph_osdc_set_request_linger(&dev->client->osdc, req); + *linger_req = req; + } + ret = ceph_osdc_start_request(&dev->client->osdc, req, false); if (ret < 0) goto done_err; if (!rbd_cb) { ret = ceph_osdc_wait_request(&dev->client->osdc, req); + if (ver) + *ver = le64_to_cpu(req->r_reassert_version.version); + dout("reassert_ver=%lld\n", + le64_to_cpu(req->r_reassert_version.version)); ceph_osdc_put_request(req); } return ret; @@ -743,10 +931,8 @@ done_err: bio_chain_put(req_data->bio); ceph_osdc_put_request(req); done_pages: + rbd_coll_end_req(req_data, ret, len); kfree(req_data); -done: - if (rq) - blk_end_request(rq, ret, len); return ret; } @@ -780,7 +966,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) bytes = req_data->len; } - blk_end_request(req_data->rq, rc, bytes); + rbd_coll_end_req(req_data, rc, bytes); if (req_data->bio) bio_chain_put(req_data->bio); @@ -789,6 +975,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) kfree(req_data); } +static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +{ + ceph_osdc_put_request(req); +} + /* * Do a synchronous ceph osd operation */ @@ -801,7 +992,9 @@ static int rbd_req_sync_op(struct rbd_device *dev, int num_reply, const char *obj, u64 ofs, u64 len, - char *buf) + char *buf, + struct ceph_osd_request **linger_req, + u64 *ver) { int ret; struct page **pages; @@ -833,7 +1026,9 @@ static int rbd_req_sync_op(struct rbd_device *dev, flags, ops, 2, - NULL); + NULL, 0, + NULL, + linger_req, ver); if (ret < 0) goto done_ops; @@ -857,7 +1052,9 @@ static int rbd_do_op(struct request *rq, u64 snapid, int opcode, int flags, int num_reply, u64 ofs, u64 len, - struct bio *bio) + struct bio *bio, + struct rbd_req_coll *coll, + int coll_index) { char *seg_name; u64 seg_ofs; @@ -893,7 +1090,10 @@ static int rbd_do_op(struct request *rq, flags, ops, num_reply, - rbd_req_cb); + coll, coll_index, + rbd_req_cb, 0, NULL); + + rbd_destroy_ops(ops); done: kfree(seg_name); return ret; @@ -906,13 +1106,15 @@ static int rbd_req_write(struct request *rq, struct rbd_device *rbd_dev, struct ceph_snap_context *snapc, u64 ofs, u64 len, - struct bio *bio) + struct bio *bio, + struct rbd_req_coll *coll, + int coll_index) { return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, 2, - ofs, len, bio); + ofs, len, bio, coll, coll_index); } /* @@ -922,14 +1124,16 @@ static int rbd_req_read(struct request *rq, struct rbd_device *rbd_dev, u64 snapid, u64 ofs, u64 len, - struct bio *bio) + struct bio *bio, + struct rbd_req_coll *coll, + int coll_index) { return rbd_do_op(rq, rbd_dev, NULL, (snapid ? snapid : CEPH_NOSNAP), CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2, - ofs, len, bio); + ofs, len, bio, coll, coll_index); } /* @@ -940,18 +1144,177 @@ static int rbd_req_sync_read(struct rbd_device *dev, u64 snapid, const char *obj, u64 ofs, u64 len, - char *buf) + char *buf, + u64 *ver) { return rbd_req_sync_op(dev, NULL, (snapid ? snapid : CEPH_NOSNAP), CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, - 1, obj, ofs, len, buf); + 1, obj, ofs, len, buf, NULL, ver); } /* - * Request sync osd read + * Request sync osd watch + */ +static int rbd_req_sync_notify_ack(struct rbd_device *dev, + u64 ver, + u64 notify_id, + const char *obj) +{ + struct ceph_osd_req_op *ops; + struct page **pages = NULL; + int ret; + + ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); + if (ret < 0) + return ret; + + ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); + ops[0].watch.cookie = notify_id; + ops[0].watch.flag = 0; + + ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, + obj, 0, 0, NULL, + pages, 0, + CEPH_OSD_FLAG_READ, + ops, + 1, + NULL, 0, + rbd_simple_req_cb, 0, NULL); + + rbd_destroy_ops(ops); + return ret; +} + +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +{ + struct rbd_device *dev = (struct rbd_device *)data; + if (!dev) + return; + + dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, + notify_id, (int)opcode); + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + __rbd_update_snaps(dev); + mutex_unlock(&ctl_mutex); + + rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); +} + +/* + * Request sync osd watch + */ +static int rbd_req_sync_watch(struct rbd_device *dev, + const char *obj, + u64 ver) +{ + struct ceph_osd_req_op *ops; + struct ceph_osd_client *osdc = &dev->client->osdc; + + int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); + if (ret < 0) + return ret; + + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, + (void *)dev, &dev->watch_event); + if (ret < 0) + goto fail; + + ops[0].watch.ver = cpu_to_le64(ver); + ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); + ops[0].watch.flag = 1; + + ret = rbd_req_sync_op(dev, NULL, + CEPH_NOSNAP, + 0, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + ops, + 1, obj, 0, 0, NULL, + &dev->watch_request, NULL); + + if (ret < 0) + goto fail_event; + + rbd_destroy_ops(ops); + return 0; + +fail_event: + ceph_osdc_cancel_event(dev->watch_event); + dev->watch_event = NULL; +fail: + rbd_destroy_ops(ops); + return ret; +} + +struct rbd_notify_info { + struct rbd_device *dev; +}; + +static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +{ + struct rbd_device *dev = (struct rbd_device *)data; + if (!dev) + return; + + dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, + notify_id, (int)opcode); +} + +/* + * Request sync osd notify + */ +static int rbd_req_sync_notify(struct rbd_device *dev, + const char *obj) +{ + struct ceph_osd_req_op *ops; + struct ceph_osd_client *osdc = &dev->client->osdc; + struct ceph_osd_event *event; + struct rbd_notify_info info; + int payload_len = sizeof(u32) + sizeof(u32); + int ret; + + ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); + if (ret < 0) + return ret; + + info.dev = dev; + + ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, + (void *)&info, &event); + if (ret < 0) + goto fail; + + ops[0].watch.ver = 1; + ops[0].watch.flag = 1; + ops[0].watch.cookie = event->cookie; + ops[0].watch.prot_ver = RADOS_NOTIFY_VER; + ops[0].watch.timeout = 12; + + ret = rbd_req_sync_op(dev, NULL, + CEPH_NOSNAP, + 0, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + ops, + 1, obj, 0, 0, NULL, NULL, NULL); + if (ret < 0) + goto fail_event; + + ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); + dout("ceph_osdc_wait_event returned %d\n", ret); + rbd_destroy_ops(ops); + return 0; + +fail_event: + ceph_osdc_cancel_event(event); +fail: + rbd_destroy_ops(ops); + return ret; +} + +/* + * Request sync osd rollback */ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, u64 snapid, @@ -969,13 +1332,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 0, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, ops, - 1, obj, 0, 0, NULL); + 1, obj, 0, 0, NULL, NULL, NULL); rbd_destroy_ops(ops); - if (ret < 0) - return ret; - return ret; } @@ -987,7 +1347,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev, const char *cls, const char *method, const char *data, - int len) + int len, + u64 *ver) { struct ceph_osd_req_op *ops; int cls_len = strlen(cls); @@ -1010,7 +1371,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev, 0, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, ops, - 1, obj, 0, 0, NULL); + 1, obj, 0, 0, NULL, NULL, ver); rbd_destroy_ops(ops); @@ -1018,6 +1379,20 @@ static int rbd_req_sync_exec(struct rbd_device *dev, return ret; } +static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) +{ + struct rbd_req_coll *coll = + kzalloc(sizeof(struct rbd_req_coll) + + sizeof(struct rbd_req_status) * num_reqs, + GFP_ATOMIC); + + if (!coll) + return NULL; + coll->total = num_reqs; + kref_init(&coll->kref); + return coll; +} + /* * block device queue callback */ @@ -1035,6 +1410,8 @@ static void rbd_rq_fn(struct request_queue *q) bool do_write; int size, op_size = 0; u64 ofs; + int num_segs, cur_seg = 0; + struct rbd_req_coll *coll; /* peek at request from block layer */ if (!rq) @@ -1065,6 +1442,14 @@ static void rbd_rq_fn(struct request_queue *q) do_write ? "write" : "read", size, blk_rq_pos(rq) * 512ULL); + num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); + coll = rbd_alloc_coll(num_segs); + if (!coll) { + spin_lock_irq(q->queue_lock); + __blk_end_request_all(rq, -ENOMEM); + goto next; + } + do { /* a bio clone to be passed down to OSD req */ dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); @@ -1072,35 +1457,41 @@ static void rbd_rq_fn(struct request_queue *q) rbd_dev->header.block_name, ofs, size, NULL, NULL); + kref_get(&coll->kref); bio = bio_chain_clone(&rq_bio, &next_bio, &bp, op_size, GFP_ATOMIC); if (!bio) { - spin_lock_irq(q->queue_lock); - __blk_end_request_all(rq, -ENOMEM); - goto next; + rbd_coll_end_req_index(rq, coll, cur_seg, + -ENOMEM, op_size); + goto next_seg; } + /* init OSD command: write or read */ if (do_write) rbd_req_write(rq, rbd_dev, rbd_dev->header.snapc, ofs, - op_size, bio); + op_size, bio, + coll, cur_seg); else rbd_req_read(rq, rbd_dev, cur_snap_id(rbd_dev), ofs, - op_size, bio); + op_size, bio, + coll, cur_seg); +next_seg: size -= op_size; ofs += op_size; + cur_seg++; rq_bio = next_bio; } while (size > 0); + kref_put(&coll->kref, rbd_coll_release); if (bp) bio_pair_release(bp); - spin_lock_irq(q->queue_lock); next: rq = blk_fetch_request(q); @@ -1156,6 +1547,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, struct rbd_image_header_ondisk *dh; int snap_count = 0; u64 snap_names_len = 0; + u64 ver; while (1) { int len = sizeof(*dh) + @@ -1171,7 +1563,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, NULL, CEPH_NOSNAP, rbd_dev->obj_md_name, 0, len, - (char *)dh); + (char *)dh, &ver); if (rc < 0) goto out_dh; @@ -1188,6 +1580,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, } break; } + header->obj_version = ver; out_dh: kfree(dh); @@ -1205,6 +1598,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, u64 new_snapid; int ret; void *data, *data_start, *data_end; + u64 ver; /* we should create a snapshot only if we're pointing at the head */ if (dev->cur_snap) @@ -1227,7 +1621,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ceph_encode_64_safe(&data, data_end, new_snapid, bad); ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", - data_start, data - data_start); + data_start, data - data_start, &ver); kfree(data_start); @@ -1259,6 +1653,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) int ret; struct rbd_image_header h; u64 snap_seq; + int follow_seq = 0; ret = rbd_read_header(rbd_dev, &h); if (ret < 0) @@ -1267,6 +1662,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) down_write(&rbd_dev->header.snap_rwsem); snap_seq = rbd_dev->header.snapc->seq; + if (rbd_dev->header.total_snaps && + rbd_dev->header.snapc->snaps[0] == snap_seq) + /* pointing at the head, will need to follow that + if head moves */ + follow_seq = 1; kfree(rbd_dev->header.snapc); kfree(rbd_dev->header.snap_names); @@ -1277,7 +1677,10 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) rbd_dev->header.snap_names = h.snap_names; rbd_dev->header.snap_names_len = h.snap_names_len; rbd_dev->header.snap_sizes = h.snap_sizes; - rbd_dev->header.snapc->seq = snap_seq; + if (follow_seq) + rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; + else + rbd_dev->header.snapc->seq = snap_seq; ret = __rbd_init_snaps_header(rbd_dev); @@ -1699,7 +2102,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) device_unregister(&rbd_dev->dev); } -static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) +static int rbd_init_watch_dev(struct rbd_device *rbd_dev) +{ + int ret, rc; + + do { + ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, + rbd_dev->header.obj_version); + if (ret == -ERANGE) { + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + rc = __rbd_update_snaps(rbd_dev); + mutex_unlock(&ctl_mutex); + if (rc < 0) + return rc; + } + } while (ret == -ERANGE); + + return ret; +} + +static ssize_t rbd_add(struct bus_type *bus, + const char *buf, + size_t count) { struct ceph_osd_client *osdc; struct rbd_device *rbd_dev; @@ -1797,6 +2221,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) if (rc) goto err_out_bus; + rc = rbd_init_watch_dev(rbd_dev); + if (rc) + goto err_out_bus; + return count; err_out_bus: @@ -1849,6 +2277,12 @@ static void rbd_dev_release(struct device *dev) struct rbd_device *rbd_dev = container_of(dev, struct rbd_device, dev); + if (rbd_dev->watch_request) + ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, + rbd_dev->watch_request); + if (rbd_dev->watch_event) + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_put_client(rbd_dev); /* clean up and free blkdev */ @@ -1914,14 +2348,24 @@ static ssize_t rbd_snap_add(struct device *dev, ret = rbd_header_add_snap(rbd_dev, name, GFP_KERNEL); if (ret < 0) - goto done_unlock; + goto err_unlock; ret = __rbd_update_snaps(rbd_dev); if (ret < 0) - goto done_unlock; + goto err_unlock; + + /* shouldn't hold ctl_mutex when notifying.. notify might + trigger a watch callback that would need to get that mutex */ + mutex_unlock(&ctl_mutex); + + /* make a best effort, don't error if failed */ + rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); ret = count; -done_unlock: + kfree(name); + return ret; + +err_unlock: mutex_unlock(&ctl_mutex); kfree(name); return ret; |