diff options
Diffstat (limited to 'drivers/block/rbd.c')
| -rw-r--r-- | drivers/block/rbd.c | 339 |
1 files changed, 265 insertions, 74 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 34898d53395..b2c98c1bc03 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) return -ENOENT; (void) get_device(&rbd_dev->dev); - set_device_ro(bdev, rbd_dev->mapping.read_only); return 0; } @@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode) put_device(&rbd_dev->dev); } +static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) +{ + int ret = 0; + int val; + bool ro; + bool ro_changed = false; + + /* get_user() may sleep, so call it before taking rbd_dev->lock */ + if (get_user(val, (int __user *)(arg))) + return -EFAULT; + + ro = val ? true : false; + /* Snapshot doesn't allow to write*/ + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) + return -EROFS; + + spin_lock_irq(&rbd_dev->lock); + /* prevent others open this device */ + if (rbd_dev->open_count > 1) { + ret = -EBUSY; + goto out; + } + + if (rbd_dev->mapping.read_only != ro) { + rbd_dev->mapping.read_only = ro; + ro_changed = true; + } + +out: + spin_unlock_irq(&rbd_dev->lock); + /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */ + if (ret == 0 && ro_changed) + set_disk_ro(rbd_dev->disk, ro ? 1 : 0); + + return ret; +} + +static int rbd_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + struct rbd_device *rbd_dev = bdev->bd_disk->private_data; + int ret = 0; + + switch (cmd) { + case BLKROSET: + ret = rbd_ioctl_set_ro(rbd_dev, arg); + break; + default: + ret = -ENOTTY; + } + + return ret; +} + +#ifdef CONFIG_COMPAT +static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + return rbd_ioctl(bdev, mode, cmd, arg); +} +#endif /* CONFIG_COMPAT */ + static const struct block_device_operations rbd_bd_ops = { .owner = THIS_MODULE, .open = rbd_open, .release = rbd_release, + .ioctl = rbd_ioctl, +#ifdef CONFIG_COMPAT + .compat_ioctl = rbd_compat_ioctl, +#endif }; /* @@ -1366,6 +1431,14 @@ static bool obj_request_exists_test(struct rbd_obj_request *obj_request) return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; } +static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; + + return obj_request->img_offset < + round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); +} + static void rbd_obj_request_get(struct rbd_obj_request *obj_request) { dout("%s: obj %p (was %d)\n", __func__, obj_request, @@ -1382,6 +1455,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request) kref_put(&obj_request->kref, rbd_obj_request_destroy); } +static void rbd_img_request_get(struct rbd_img_request *img_request) +{ + dout("%s: img %p (was %d)\n", __func__, img_request, + atomic_read(&img_request->kref.refcount)); + kref_get(&img_request->kref); +} + static bool img_request_child_test(struct rbd_img_request *img_request); static void rbd_parent_request_destroy(struct kref *kref); static void rbd_img_request_destroy(struct kref *kref); @@ -1654,7 +1734,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, if (osd_req->r_result < 0) obj_request->result = osd_req->r_result; - BUG_ON(osd_req->r_num_ops > 2); + rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP); /* * We support a 64-bit length, but ultimately it has to be @@ -1662,11 +1742,15 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, */ obj_request->xferred = osd_req->r_reply_op_len[0]; rbd_assert(obj_request->xferred < (u64)UINT_MAX); + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); break; + case CEPH_OSD_OP_SETALLOCHINT: + rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE); + /* fall through */ case CEPH_OSD_OP_WRITE: rbd_osd_write_callback(obj_request); break; @@ -1715,9 +1799,16 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) snapc, CEPH_NOSNAP, &mtime); } +/* + * Create an osd request. A read request has one osd op (read). + * A write request has either one (watch) or two (hint+write) osd ops. + * (All rbd data writes are prefixed with an allocation hint op, but + * technically osd watch is a write request, hence this distinction.) + */ static struct ceph_osd_request *rbd_osd_req_create( struct rbd_device *rbd_dev, bool write_request, + unsigned int num_ops, struct rbd_obj_request *obj_request) { struct ceph_snap_context *snapc = NULL; @@ -1733,10 +1824,13 @@ static struct ceph_osd_request *rbd_osd_req_create( snapc = img_request->snapc; } - /* Allocate and initialize the request, for the single op */ + rbd_assert(num_ops == 1 || (write_request && num_ops == 2)); + + /* Allocate and initialize the request, for the num_ops ops */ osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); + osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, + GFP_ATOMIC); if (!osd_req) return NULL; /* ENOMEM */ @@ -1756,8 +1850,8 @@ static struct ceph_osd_request *rbd_osd_req_create( /* * Create a copyup osd request based on the information in the - * object request supplied. A copyup request has two osd ops, - * a copyup method call, and a "normal" write request. + * object request supplied. A copyup request has three osd ops, + * a copyup method call, a hint op, and a write op. */ static struct ceph_osd_request * rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) @@ -1773,12 +1867,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) rbd_assert(img_request); rbd_assert(img_request_write_test(img_request)); - /* Allocate and initialize the request, for the two ops */ + /* Allocate and initialize the request, for the three ops */ snapc = img_request->snapc; rbd_dev = img_request->rbd_dev; osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); + osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC); if (!osd_req) return NULL; /* ENOMEM */ @@ -2128,6 +2222,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) img_request->next_completion = which; out: spin_unlock_irq(&img_request->completion_lock); + rbd_img_request_put(img_request); if (!more) rbd_img_request_complete(img_request); @@ -2178,6 +2273,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, const char *object_name; u64 offset; u64 length; + unsigned int which = 0; object_name = rbd_segment_name(rbd_dev, img_offset); if (!object_name) @@ -2190,6 +2286,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_segment_name_free(object_name); if (!obj_request) goto out_unwind; + /* * set obj_request->img_request before creating the * osd_request so that it gets the right snapc @@ -2207,7 +2304,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, clone_size, GFP_ATOMIC); if (!obj_request->bio_list) - goto out_partial; + goto out_unwind; } else { unsigned int page_count; @@ -2220,19 +2317,28 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, } osd_req = rbd_osd_req_create(rbd_dev, write_request, - obj_request); + (write_request ? 2 : 1), + obj_request); if (!osd_req) - goto out_partial; + goto out_unwind; obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; + rbd_img_request_get(img_request); - osd_req_op_extent_init(osd_req, 0, opcode, offset, length, - 0, 0); + if (write_request) { + osd_req_op_alloc_hint_init(osd_req, which, + rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + which++; + } + + osd_req_op_extent_init(osd_req, which, opcode, offset, length, + 0, 0); if (type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_req, 0, + osd_req_op_extent_osd_data_bio(osd_req, which, obj_request->bio_list, length); else - osd_req_op_extent_osd_data_pages(osd_req, 0, + osd_req_op_extent_osd_data_pages(osd_req, which, obj_request->pages, length, offset & ~PAGE_MASK, false, false); @@ -2249,11 +2355,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, return 0; -out_partial: - rbd_obj_request_put(obj_request); out_unwind: for_each_obj_request_safe(img_request, obj_request, next_obj_request) - rbd_obj_request_put(obj_request); + rbd_img_obj_request_del(img_request, obj_request); return -ENOMEM; } @@ -2353,7 +2457,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) /* * The original osd request is of no use to use any more. - * We need a new one that can hold the two ops in a copyup + * We need a new one that can hold the three ops in a copyup * request. Allocate the new copyup osd request for the * original request, and release the old one. */ @@ -2372,17 +2476,22 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, false, false); - /* Then the original write request op */ + /* Then the hint op */ + + osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + + /* And the original write request op */ offset = orig_request->offset; length = orig_request->length; - osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, + osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE, offset, length, 0, 0); if (orig_request->type == OBJ_REQUEST_BIO) - osd_req_op_extent_osd_data_bio(osd_req, 1, + osd_req_op_extent_osd_data_bio(osd_req, 2, orig_request->bio_list, length); else - osd_req_op_extent_osd_data_pages(osd_req, 1, + osd_req_op_extent_osd_data_pages(osd_req, 2, orig_request->pages, length, offset & ~PAGE_MASK, false, false); @@ -2603,8 +2712,8 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) rbd_assert(obj_request->img_request); rbd_dev = obj_request->img_request->rbd_dev; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, - stat_request); + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + stat_request); if (!stat_request->osd_req) goto out; stat_request->callback = rbd_img_obj_exists_callback; @@ -2647,7 +2756,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) */ if (!img_request_write_test(img_request) || !img_request_layered_test(img_request) || - rbd_dev->parent_overlap <= obj_request->img_offset || + !obj_request_overlaps_parent(obj_request) || ((known = obj_request_known_test(obj_request)) && obj_request_exists_test(obj_request))) { @@ -2807,7 +2916,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) return -ENOMEM; ret = -ENOMEM; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; @@ -2844,55 +2954,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) } /* - * Request sync osd watch/unwatch. The value of "start" determines - * whether a watch request is being initiated or torn down. + * Initiate a watch request, synchronously. */ -static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; int ret; - rbd_assert(start ^ !!rbd_dev->watch_event); - rbd_assert(start ^ !!rbd_dev->watch_request); + rbd_assert(!rbd_dev->watch_event); + rbd_assert(!rbd_dev->watch_request); - if (start) { - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, - &rbd_dev->watch_event); - if (ret < 0) - return ret; - rbd_assert(rbd_dev->watch_event != NULL); - } + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, + &rbd_dev->watch_event); + if (ret < 0) + return ret; + + rbd_assert(rbd_dev->watch_event); - ret = -ENOMEM; obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; goto out_cancel; + } - obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); - if (!obj_request->osd_req) - goto out_cancel; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; + } - if (start) - ceph_osdc_set_request_linger(osdc, obj_request->osd_req); - else - ceph_osdc_unregister_linger_request(osdc, - rbd_dev->watch_request->osd_req); + ceph_osdc_set_request_linger(osdc, obj_request->osd_req); osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, start ? 1 : 0); + rbd_dev->watch_event->cookie, 0, 1); rbd_osd_req_format_write(obj_request); ret = rbd_obj_request_submit(osdc, obj_request); if (ret) - goto out_cancel; + goto out_linger; + ret = rbd_obj_request_wait(obj_request); if (ret) - goto out_cancel; + goto out_linger; + ret = obj_request->result; if (ret) - goto out_cancel; + goto out_linger; /* * A watch request is set to linger, so the underlying osd @@ -2902,36 +3012,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) * it. We'll drop that reference (below) after we've * unregistered it. */ - if (start) { - rbd_dev->watch_request = obj_request; + rbd_dev->watch_request = obj_request; - return 0; + return 0; + +out_linger: + ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req); +out_put: + rbd_obj_request_put(obj_request); +out_cancel: + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_dev->watch_event = NULL; + + return ret; +} + +/* + * Tear down a watch request, synchronously. + */ +static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + int ret; + + rbd_assert(rbd_dev->watch_event); + rbd_assert(rbd_dev->watch_request); + + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; + goto out_cancel; + } + + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; } + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, 0, 0); + rbd_osd_req_format_write(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out_put; + + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out_put; + + ret = obj_request->result; + if (ret) + goto out_put; + /* We have successfully torn down the watch request */ + ceph_osdc_unregister_linger_request(osdc, + rbd_dev->watch_request->osd_req); rbd_obj_request_put(rbd_dev->watch_request); rbd_dev->watch_request = NULL; + +out_put: + rbd_obj_request_put(obj_request); out_cancel: - /* Cancel the event if we're tearing down, or on error */ ceph_osdc_cancel_event(rbd_dev->watch_event); rbd_dev->watch_event = NULL; - if (obj_request) - rbd_obj_request_put(obj_request); return ret; } -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) -{ - return __rbd_dev_header_watch_sync(rbd_dev, true); -} - static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { int ret; - ret = __rbd_dev_header_watch_sync(rbd_dev, false); + ret = __rbd_dev_header_unwatch_sync(rbd_dev); if (ret) { rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", ret); @@ -2978,7 +3136,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; @@ -3028,7 +3187,6 @@ static void rbd_request_fn(struct request_queue *q) __releases(q->queue_lock) __acquires(q->queue_lock) { struct rbd_device *rbd_dev = q->queuedata; - bool read_only = rbd_dev->mapping.read_only; struct request *rq; int result; @@ -3064,7 +3222,7 @@ static void rbd_request_fn(struct request_queue *q) if (write_request) { result = -EROFS; - if (read_only) + if (rbd_dev->mapping.read_only) goto end_request; rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); } @@ -3211,7 +3369,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->pages = pages; obj_request->page_count = page_count; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); if (!obj_request->osd_req) goto out; @@ -4652,6 +4811,38 @@ out_err: } /* + * Return pool id (>= 0) or a negative error code. + */ +static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) +{ + u64 newest_epoch; + unsigned long timeout = rbdc->client->options->mount_timeout * HZ; + int tries = 0; + int ret; + +again: + ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); + if (ret == -ENOENT && tries++ < 1) { + ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap", + &newest_epoch); + if (ret < 0) + return ret; + + if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { + ceph_monc_request_next_osdmap(&rbdc->client->monc); + (void) ceph_monc_wait_osdmap(&rbdc->client->monc, + newest_epoch, timeout); + goto again; + } else { + /* the osdmap we have is new enough */ + return -ENOENT; + } + } + + return ret; +} + +/* * An rbd format 2 image has a unique identifier, distinct from the * name given to it by the user. Internally, that identifier is * what's used to specify the names of objects related to the image. @@ -4721,7 +4912,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) image_id = ceph_extract_encoded_string(&p, p + ret, NULL, GFP_NOIO); - ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; + ret = PTR_ERR_OR_ZERO(image_id); if (!ret) rbd_dev->image_format = 2; } else { @@ -4876,6 +5067,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) if (ret) goto err_out_disk; set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); + set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); ret = rbd_bus_add_dev(rbd_dev); if (ret) @@ -5022,7 +5214,6 @@ static ssize_t do_rbd_add(struct bus_type *bus, struct rbd_options *rbd_opts = NULL; struct rbd_spec *spec = NULL; struct rbd_client *rbdc; - struct ceph_osd_client *osdc; bool read_only; int rc = -ENOMEM; @@ -5044,8 +5235,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, } /* pick the pool */ - osdc = &rbdc->client->osdc; - rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); + rc = rbd_add_get_pool_id(rbdc, spec->pool_name); if (rc < 0) goto err_out_client; spec->pool_id = (u64)rc; @@ -5356,6 +5546,7 @@ err_out_slab: static void __exit rbd_exit(void) { + ida_destroy(&rbd_dev_id_ida); rbd_sysfs_cleanup(); if (single_major) unregister_blkdev(rbd_major, RBD_DRV_NAME); |
