diff options
Diffstat (limited to 'drivers/block/rbd.c')
| -rw-r--r-- | drivers/block/rbd.c | 6015 |
1 files changed, 4481 insertions, 1534 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 65665c9c42c..b2c98c1bc03 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1,3 +1,4 @@ + /* rbd.c -- Export ceph rados objects as a Linux block device @@ -32,15 +33,20 @@ #include <linux/ceph/mon_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> +#include <linux/bsearch.h> #include <linux/kernel.h> #include <linux/device.h> #include <linux/module.h> #include <linux/fs.h> #include <linux/blkdev.h> +#include <linux/slab.h> +#include <linux/idr.h> #include "rbd_types.h" +#define RBD_DEBUG /* Activate rbd_assert() calls */ + /* * The basic unit of block I/O is a sector. It is interpreted in a * number of contexts in Linux (blk, bio, genhd), but the default is @@ -50,18 +56,71 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) +/* + * Increment the given counter and return its updated value. + * If the counter is already 0 it will not be incremented. + * If the counter is already at its maximum value returns + * -EINVAL without updating it. + */ +static int atomic_inc_return_safe(atomic_t *v) +{ + unsigned int counter; + + counter = (unsigned int)__atomic_add_unless(v, 1, 0); + if (counter <= (unsigned int)INT_MAX) + return (int)counter; + + atomic_dec(v); + + return -EINVAL; +} + +/* Decrement the counter. Return the resulting value, or -EINVAL */ +static int atomic_dec_return_safe(atomic_t *v) +{ + int counter; + + counter = atomic_dec_return(v); + if (counter >= 0) + return counter; + + atomic_inc(v); + + return -EINVAL; +} + #define RBD_DRV_NAME "rbd" -#define RBD_DRV_NAME_LONG "rbd (rados block device)" -#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ +#define RBD_MINORS_PER_MAJOR 256 +#define RBD_SINGLE_MAJOR_PART_SHIFT 4 + +#define RBD_SNAP_DEV_NAME_PREFIX "snap_" +#define RBD_MAX_SNAP_NAME_LEN \ + (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) -#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX)) -#define RBD_MAX_POOL_NAME_LEN 64 -#define RBD_MAX_SNAP_NAME_LEN 32 -#define RBD_MAX_OPT_LEN 1024 +#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ #define RBD_SNAP_HEAD_NAME "-" +#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */ + +/* This allows a single page to hold an image name sent by OSD */ +#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) +#define RBD_IMAGE_ID_LEN_MAX 64 + +#define RBD_OBJ_PREFIX_LEN_MAX 64 + +/* Feature bits */ + +#define RBD_FEATURE_LAYERING (1<<0) +#define RBD_FEATURE_STRIPINGV2 (1<<1) +#define RBD_FEATURES_ALL \ + (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) + +/* Features supported by this (client software) implementation. */ + +#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) + /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. @@ -71,30 +130,62 @@ #define DEV_NAME_LEN 32 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) -#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 - /* * block device image metadata (in-memory version) */ struct rbd_image_header { - u64 image_size; - char block_name[32]; + /* These six fields never change for a given rbd image */ + char *object_prefix; __u8 obj_order; __u8 crypt_type; __u8 comp_type; + u64 stripe_unit; + u64 stripe_count; + u64 features; /* Might be changeable someday? */ + + /* The remaining fields need to be updated occasionally */ + u64 image_size; struct ceph_snap_context *snapc; - size_t snap_names_len; - u64 snap_seq; - u32 total_snaps; + char *snap_names; /* format 1 only */ + u64 *snap_sizes; /* format 1 only */ +}; - char *snap_names; - u64 *snap_sizes; +/* + * An rbd image specification. + * + * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely + * identify an image. Each rbd_dev structure includes a pointer to + * an rbd_spec structure that encapsulates this identity. + * + * Each of the id's in an rbd_spec has an associated name. For a + * user-mapped image, the names are supplied and the id's associated + * with them are looked up. For a layered image, a parent image is + * defined by the tuple, and the names are looked up. + * + * An rbd_dev structure contains a parent_spec pointer which is + * non-null if the image it represents is a child in a layered + * image. This pointer will refer to the rbd_spec structure used + * by the parent rbd_dev for its own identity (i.e., the structure + * is shared between the parent and child). + * + * Since these structures are populated once, during the discovery + * phase of image construction, they are effectively immutable so + * we make no effort to synchronize access to them. + * + * Note that code herein does not assume the image name is known (it + * could be a null pointer). + */ +struct rbd_spec { + u64 pool_id; + const char *pool_name; - u64 obj_version; -}; + const char *image_id; + const char *image_name; -struct rbd_options { - int notify_timeout; + u64 snap_id; + const char *snap_name; + + struct kref kref; }; /* @@ -102,92 +193,188 @@ struct rbd_options { */ struct rbd_client { struct ceph_client *client; - struct rbd_options *rbd_opts; struct kref kref; struct list_head node; }; -/* - * a request completion status - */ -struct rbd_req_status { - int done; - int rc; - u64 bytes; +struct rbd_img_request; +typedef void (*rbd_img_callback_t)(struct rbd_img_request *); + +#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ + +struct rbd_obj_request; +typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); + +enum obj_request_type { + OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES }; -/* - * a collection of requests - */ -struct rbd_req_coll { - int total; - int num_done; +enum obj_req_flags { + OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ + OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ + OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ + OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ +}; + +struct rbd_obj_request { + const char *object_name; + u64 offset; /* object start byte */ + u64 length; /* bytes from offset */ + unsigned long flags; + + /* + * An object request associated with an image will have its + * img_data flag set; a standalone object request will not. + * + * A standalone object request will have which == BAD_WHICH + * and a null obj_request pointer. + * + * An object request initiated in support of a layered image + * object (to check for its existence before a write) will + * have which == BAD_WHICH and a non-null obj_request pointer. + * + * Finally, an object request for rbd image data will have + * which != BAD_WHICH, and will have a non-null img_request + * pointer. The value of which will be in the range + * 0..(img_request->obj_request_count-1). + */ + union { + struct rbd_obj_request *obj_request; /* STAT op */ + struct { + struct rbd_img_request *img_request; + u64 img_offset; + /* links for img_request->obj_requests list */ + struct list_head links; + }; + }; + u32 which; /* posn image request list */ + + enum obj_request_type type; + union { + struct bio *bio_list; + struct { + struct page **pages; + u32 page_count; + }; + }; + struct page **copyup_pages; + u32 copyup_page_count; + + struct ceph_osd_request *osd_req; + + u64 xferred; /* bytes transferred */ + int result; + + rbd_obj_callback_t callback; + struct completion completion; + struct kref kref; - struct rbd_req_status status[0]; }; -/* - * a single io request - */ -struct rbd_request { - struct request *rq; /* blk layer request */ - struct bio *bio; /* cloned bio */ - struct page **pages; /* list of used pages */ - u64 len; - int coll_index; - struct rbd_req_coll *coll; +enum img_req_flags { + IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ + IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ + IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ }; -struct rbd_snap { - struct device dev; - const char *name; - u64 size; - struct list_head node; - u64 id; +struct rbd_img_request { + struct rbd_device *rbd_dev; + u64 offset; /* starting image byte offset */ + u64 length; /* byte count from offset */ + unsigned long flags; + union { + u64 snap_id; /* for reads */ + struct ceph_snap_context *snapc; /* for writes */ + }; + union { + struct request *rq; /* block request */ + struct rbd_obj_request *obj_request; /* obj req initiator */ + }; + struct page **copyup_pages; + u32 copyup_page_count; + spinlock_t completion_lock;/* protects next_completion */ + u32 next_completion; + rbd_img_callback_t callback; + u64 xferred;/* aggregate bytes transferred */ + int result; /* first nonzero obj_request result */ + + u32 obj_request_count; + struct list_head obj_requests; /* rbd_obj_request structs */ + + struct kref kref; +}; + +#define for_each_obj_request(ireq, oreq) \ + list_for_each_entry(oreq, &(ireq)->obj_requests, links) +#define for_each_obj_request_from(ireq, oreq) \ + list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) +#define for_each_obj_request_safe(ireq, oreq, n) \ + list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) + +struct rbd_mapping { + u64 size; + u64 features; + bool read_only; }; /* * a single device */ struct rbd_device { - int id; /* blkdev unique id */ + int dev_id; /* blkdev unique id */ int major; /* blkdev assigned major */ + int minor; struct gendisk *disk; /* blkdev's gendisk and rq */ - struct request_queue *q; + u32 image_format; /* Either 1 or 2 */ struct rbd_client *rbd_client; char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ - spinlock_t lock; /* queue lock */ + spinlock_t lock; /* queue, flags, open_count */ struct rbd_image_header header; - char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */ - int obj_len; - char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */ - char pool_name[RBD_MAX_POOL_NAME_LEN]; - int poolid; + unsigned long flags; /* possibly lock protected */ + struct rbd_spec *spec; + + char *header_name; + + struct ceph_file_layout layout; struct ceph_osd_event *watch_event; - struct ceph_osd_request *watch_request; + struct rbd_obj_request *watch_request; + + struct rbd_spec *parent_spec; + u64 parent_overlap; + atomic_t parent_ref; + struct rbd_device *parent; /* protects updating the header */ struct rw_semaphore header_rwsem; - char snap_name[RBD_MAX_SNAP_NAME_LEN]; - u64 snap_id; /* current snapshot id */ - int read_only; - struct list_head node; + struct rbd_mapping mapping; - /* list of snapshots */ - struct list_head snaps; + struct list_head node; /* sysfs related */ struct device dev; + unsigned long open_count; /* protected by lock */ +}; + +/* + * Flag bits for rbd_dev->flags. If atomicity is required, + * rbd_dev->lock is used to protect access. + * + * Currently, only the "removing" flag (which is coupled with the + * "open_count" field) requires atomic access. + */ +enum rbd_dev_flags { + RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ + RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ }; -static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ +static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ static DEFINE_SPINLOCK(rbd_dev_list_lock); @@ -195,29 +382,81 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); -static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); -static void rbd_dev_release(struct device *dev); -static ssize_t rbd_snap_add(struct device *dev, - struct device_attribute *attr, - const char *buf, - size_t count); -static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, - struct rbd_snap *snap); +/* Slab caches for frequently-allocated structures */ + +static struct kmem_cache *rbd_img_request_cache; +static struct kmem_cache *rbd_obj_request_cache; +static struct kmem_cache *rbd_segment_name_cache; + +static int rbd_major; +static DEFINE_IDA(rbd_dev_id_ida); + +/* + * Default to false for now, as single-major requires >= 0.75 version of + * userspace rbd utility. + */ +static bool single_major = false; +module_param(single_major, bool, S_IRUGO); +MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)"); + +static int rbd_img_request_submit(struct rbd_img_request *img_request); + +static void rbd_dev_device_release(struct device *dev); static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count); static ssize_t rbd_remove(struct bus_type *bus, const char *buf, size_t count); +static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf, + size_t count); +static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf, + size_t count); +static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping); +static void rbd_spec_put(struct rbd_spec *spec); + +static int rbd_dev_id_to_minor(int dev_id) +{ + return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT; +} -static struct bus_attribute rbd_bus_attrs[] = { - __ATTR(add, S_IWUSR, NULL, rbd_add), - __ATTR(remove, S_IWUSR, NULL, rbd_remove), - __ATTR_NULL +static int minor_to_rbd_dev_id(int minor) +{ + return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; +} + +static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); +static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); +static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); +static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major); + +static struct attribute *rbd_bus_attrs[] = { + &bus_attr_add.attr, + &bus_attr_remove.attr, + &bus_attr_add_single_major.attr, + &bus_attr_remove_single_major.attr, + NULL, }; +static umode_t rbd_bus_is_visible(struct kobject *kobj, + struct attribute *attr, int index) +{ + if (!single_major && + (attr == &bus_attr_add_single_major.attr || + attr == &bus_attr_remove_single_major.attr)) + return 0; + + return attr->mode; +} + +static const struct attribute_group rbd_bus_group = { + .attrs = rbd_bus_attrs, + .is_visible = rbd_bus_is_visible, +}; +__ATTRIBUTE_GROUPS(rbd_bus); + static struct bus_type rbd_bus_type = { .name = "rbd", - .bus_attrs = rbd_bus_attrs, + .bus_groups = rbd_bus_groups, }; static void rbd_root_dev_release(struct device *dev) @@ -229,59 +468,178 @@ static struct device rbd_root_dev = { .release = rbd_root_dev_release, }; - -static struct device *rbd_get_dev(struct rbd_device *rbd_dev) +static __printf(2, 3) +void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) { - return get_device(&rbd_dev->dev); -} + struct va_format vaf; + va_list args; -static void rbd_put_dev(struct rbd_device *rbd_dev) -{ - put_device(&rbd_dev->dev); + va_start(args, fmt); + vaf.fmt = fmt; + vaf.va = &args; + + if (!rbd_dev) + printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); + else if (rbd_dev->disk) + printk(KERN_WARNING "%s: %s: %pV\n", + RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); + else if (rbd_dev->spec && rbd_dev->spec->image_name) + printk(KERN_WARNING "%s: image %s: %pV\n", + RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); + else if (rbd_dev->spec && rbd_dev->spec->image_id) + printk(KERN_WARNING "%s: id %s: %pV\n", + RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); + else /* punt */ + printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", + RBD_DRV_NAME, rbd_dev, &vaf); + va_end(args); } -static int __rbd_refresh_header(struct rbd_device *rbd_dev); +#ifdef RBD_DEBUG +#define rbd_assert(expr) \ + if (unlikely(!(expr))) { \ + printk(KERN_ERR "\nAssertion failure in %s() " \ + "at line %d:\n\n" \ + "\trbd_assert(%s);\n\n", \ + __func__, __LINE__, #expr); \ + BUG(); \ + } +#else /* !RBD_DEBUG */ +# define rbd_assert(expr) ((void) 0) +#endif /* !RBD_DEBUG */ + +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); +static void rbd_img_parent_read(struct rbd_obj_request *obj_request); +static void rbd_dev_remove_parent(struct rbd_device *rbd_dev); + +static int rbd_dev_refresh(struct rbd_device *rbd_dev); +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev); +static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev); +static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, + u64 snap_id); +static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u8 *order, u64 *snap_size); +static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features); +static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name); static int rbd_open(struct block_device *bdev, fmode_t mode) { struct rbd_device *rbd_dev = bdev->bd_disk->private_data; + bool removing = false; - rbd_get_dev(rbd_dev); + if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) + return -EROFS; - set_device_ro(bdev, rbd_dev->read_only); + spin_lock_irq(&rbd_dev->lock); + if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) + removing = true; + else + rbd_dev->open_count++; + spin_unlock_irq(&rbd_dev->lock); + if (removing) + return -ENOENT; - if ((mode & FMODE_WRITE) && rbd_dev->read_only) - return -EROFS; + (void) get_device(&rbd_dev->dev); return 0; } -static int rbd_release(struct gendisk *disk, fmode_t mode) +static void rbd_release(struct gendisk *disk, fmode_t mode) { struct rbd_device *rbd_dev = disk->private_data; + unsigned long open_count_before; - rbd_put_dev(rbd_dev); + spin_lock_irq(&rbd_dev->lock); + open_count_before = rbd_dev->open_count--; + spin_unlock_irq(&rbd_dev->lock); + rbd_assert(open_count_before > 0); - return 0; + put_device(&rbd_dev->dev); +} + +static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg) +{ + int ret = 0; + int val; + bool ro; + bool ro_changed = false; + + /* get_user() may sleep, so call it before taking rbd_dev->lock */ + if (get_user(val, (int __user *)(arg))) + return -EFAULT; + + ro = val ? true : false; + /* Snapshot doesn't allow to write*/ + if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro) + return -EROFS; + + spin_lock_irq(&rbd_dev->lock); + /* prevent others open this device */ + if (rbd_dev->open_count > 1) { + ret = -EBUSY; + goto out; + } + + if (rbd_dev->mapping.read_only != ro) { + rbd_dev->mapping.read_only = ro; + ro_changed = true; + } + +out: + spin_unlock_irq(&rbd_dev->lock); + /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */ + if (ret == 0 && ro_changed) + set_disk_ro(rbd_dev->disk, ro ? 1 : 0); + + return ret; +} + +static int rbd_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + struct rbd_device *rbd_dev = bdev->bd_disk->private_data; + int ret = 0; + + switch (cmd) { + case BLKROSET: + ret = rbd_ioctl_set_ro(rbd_dev, arg); + break; + default: + ret = -ENOTTY; + } + + return ret; } +#ifdef CONFIG_COMPAT +static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + return rbd_ioctl(bdev, mode, cmd, arg); +} +#endif /* CONFIG_COMPAT */ + static const struct block_device_operations rbd_bd_ops = { .owner = THIS_MODULE, .open = rbd_open, .release = rbd_release, + .ioctl = rbd_ioctl, +#ifdef CONFIG_COMPAT + .compat_ioctl = rbd_compat_ioctl, +#endif }; /* - * Initialize an rbd client instance. - * We own *opt. + * Initialize an rbd client instance. Success or not, this function + * consumes ceph_opts. Caller holds client_mutex. */ -static struct rbd_client *rbd_client_create(struct ceph_options *opt, - struct rbd_options *rbd_opts) +static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) { struct rbd_client *rbdc; int ret = -ENOMEM; - dout("rbd_client_create\n"); + dout("%s:\n", __func__); rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); if (!rbdc) goto out_opt; @@ -289,80 +647,105 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt, kref_init(&rbdc->kref); INIT_LIST_HEAD(&rbdc->node); - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - - rbdc->client = ceph_create_client(opt, rbdc, 0, 0); + rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); if (IS_ERR(rbdc->client)) - goto out_mutex; - opt = NULL; /* Now rbdc->client is responsible for opt */ + goto out_rbdc; + ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ ret = ceph_open_session(rbdc->client); if (ret < 0) - goto out_err; - - rbdc->rbd_opts = rbd_opts; + goto out_client; spin_lock(&rbd_client_list_lock); list_add_tail(&rbdc->node, &rbd_client_list); spin_unlock(&rbd_client_list_lock); - mutex_unlock(&ctl_mutex); + dout("%s: rbdc %p\n", __func__, rbdc); - dout("rbd_client_create created %p\n", rbdc); return rbdc; - -out_err: +out_client: ceph_destroy_client(rbdc->client); -out_mutex: - mutex_unlock(&ctl_mutex); +out_rbdc: kfree(rbdc); out_opt: - if (opt) - ceph_destroy_options(opt); + if (ceph_opts) + ceph_destroy_options(ceph_opts); + dout("%s: error %d\n", __func__, ret); + return ERR_PTR(ret); } +static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) +{ + kref_get(&rbdc->kref); + + return rbdc; +} + /* - * Find a ceph client with specific addr and configuration. + * Find a ceph client with specific addr and configuration. If + * found, bump its reference count. */ -static struct rbd_client *__rbd_client_find(struct ceph_options *opt) +static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) { struct rbd_client *client_node; + bool found = false; - if (opt->flags & CEPH_OPT_NOSHARE) + if (ceph_opts->flags & CEPH_OPT_NOSHARE) return NULL; - list_for_each_entry(client_node, &rbd_client_list, node) - if (ceph_compare_options(opt, client_node->client) == 0) - return client_node; - return NULL; + spin_lock(&rbd_client_list_lock); + list_for_each_entry(client_node, &rbd_client_list, node) { + if (!ceph_compare_options(ceph_opts, client_node->client)) { + __rbd_get_client(client_node); + + found = true; + break; + } + } + spin_unlock(&rbd_client_list_lock); + + return found ? client_node : NULL; } /* * mount options */ enum { - Opt_notify_timeout, Opt_last_int, /* int args above */ Opt_last_string, /* string args above */ + Opt_read_only, + Opt_read_write, + /* Boolean args above */ + Opt_last_bool, }; -static match_table_t rbdopt_tokens = { - {Opt_notify_timeout, "notify_timeout=%d"}, +static match_table_t rbd_opts_tokens = { /* int args above */ /* string args above */ + {Opt_read_only, "read_only"}, + {Opt_read_only, "ro"}, /* Alternate spelling */ + {Opt_read_write, "read_write"}, + {Opt_read_write, "rw"}, /* Alternate spelling */ + /* Boolean args above */ {-1, NULL} }; +struct rbd_options { + bool read_only; +}; + +#define RBD_READ_ONLY_DEFAULT false + static int parse_rbd_opts_token(char *c, void *private) { - struct rbd_options *rbdopt = private; + struct rbd_options *rbd_opts = private; substring_t argstr[MAX_OPT_ARGS]; int token, intval, ret; - token = match_token(c, rbdopt_tokens, argstr); + token = match_token(c, rbd_opts_tokens, argstr); if (token < 0) return -EINVAL; @@ -377,64 +760,42 @@ static int parse_rbd_opts_token(char *c, void *private) } else if (token > Opt_last_int && token < Opt_last_string) { dout("got string token %d val %s\n", token, argstr[0].from); + } else if (token > Opt_last_string && token < Opt_last_bool) { + dout("got Boolean token %d\n", token); } else { dout("got token %d\n", token); } switch (token) { - case Opt_notify_timeout: - rbdopt->notify_timeout = intval; + case Opt_read_only: + rbd_opts->read_only = true; + break; + case Opt_read_write: + rbd_opts->read_only = false; break; default: - BUG_ON(token); + rbd_assert(false); + break; } return 0; } /* * Get a ceph client with specific addr and configuration, if one does - * not exist create it. + * not exist create it. Either way, ceph_opts is consumed by this + * function. */ -static struct rbd_client *rbd_get_client(const char *mon_addr, - size_t mon_addr_len, - char *options) +static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) { struct rbd_client *rbdc; - struct ceph_options *opt; - struct rbd_options *rbd_opts; - - rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); - if (!rbd_opts) - return ERR_PTR(-ENOMEM); - - rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; - - opt = ceph_parse_options(options, mon_addr, - mon_addr + mon_addr_len, - parse_rbd_opts_token, rbd_opts); - if (IS_ERR(opt)) { - kfree(rbd_opts); - return ERR_CAST(opt); - } - - spin_lock(&rbd_client_list_lock); - rbdc = __rbd_client_find(opt); - if (rbdc) { - /* using an existing client */ - kref_get(&rbdc->kref); - spin_unlock(&rbd_client_list_lock); - ceph_destroy_options(opt); - kfree(rbd_opts); - - return rbdc; - } - spin_unlock(&rbd_client_list_lock); - - rbdc = rbd_client_create(opt, rbd_opts); - - if (IS_ERR(rbdc)) - kfree(rbd_opts); + mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING); + rbdc = rbd_client_find(ceph_opts); + if (rbdc) /* using an existing client */ + ceph_destroy_options(ceph_opts); + else + rbdc = rbd_client_create(ceph_opts); + mutex_unlock(&client_mutex); return rbdc; } @@ -448,13 +809,12 @@ static void rbd_client_release(struct kref *kref) { struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); - dout("rbd_release_client %p\n", rbdc); + dout("%s: rbdc %p\n", __func__, rbdc); spin_lock(&rbd_client_list_lock); list_del(&rbdc->node); spin_unlock(&rbd_client_list_lock); ceph_destroy_client(rbdc->client); - kfree(rbdc->rbd_opts); kfree(rbdc); } @@ -462,189 +822,375 @@ static void rbd_client_release(struct kref *kref) * Drop reference to ceph client node. If it's not referenced anymore, release * it. */ -static void rbd_put_client(struct rbd_device *rbd_dev) +static void rbd_put_client(struct rbd_client *rbdc) { - kref_put(&rbd_dev->rbd_client->kref, rbd_client_release); - rbd_dev->rbd_client = NULL; + if (rbdc) + kref_put(&rbdc->kref, rbd_client_release); } -/* - * Destroy requests collection - */ -static void rbd_coll_release(struct kref *kref) +static bool rbd_image_format_valid(u32 image_format) +{ + return image_format == 1 || image_format == 2; +} + +static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) { - struct rbd_req_coll *coll = - container_of(kref, struct rbd_req_coll, kref); + size_t size; + u32 snap_count; + + /* The header has to start with the magic rbd header text */ + if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) + return false; - dout("rbd_coll_release %p\n", coll); - kfree(coll); + /* The bio layer requires at least sector-sized I/O */ + + if (ondisk->options.order < SECTOR_SHIFT) + return false; + + /* If we use u64 in a few spots we may be able to loosen this */ + + if (ondisk->options.order > 8 * sizeof (int) - 1) + return false; + + /* + * The size of a snapshot header has to fit in a size_t, and + * that limits the number of snapshots. + */ + snap_count = le32_to_cpu(ondisk->snap_count); + size = SIZE_MAX - sizeof (struct ceph_snap_context); + if (snap_count > size / sizeof (__le64)) + return false; + + /* + * Not only that, but the size of the entire the snapshot + * header must also be representable in a size_t. + */ + size -= snap_count * sizeof (__le64); + if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) + return false; + + return true; } /* - * Create a new header structure, translate header format from the on-disk - * header. + * Fill an rbd image header with information from the given format 1 + * on-disk header. */ -static int rbd_header_from_disk(struct rbd_image_header *header, - struct rbd_image_header_ondisk *ondisk, - u32 allocated_snaps, - gfp_t gfp_flags) +static int rbd_header_from_disk(struct rbd_device *rbd_dev, + struct rbd_image_header_ondisk *ondisk) { - u32 i, snap_count; + struct rbd_image_header *header = &rbd_dev->header; + bool first_time = header->object_prefix == NULL; + struct ceph_snap_context *snapc; + char *object_prefix = NULL; + char *snap_names = NULL; + u64 *snap_sizes = NULL; + u32 snap_count; + size_t size; + int ret = -ENOMEM; + u32 i; - if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) - return -ENXIO; + /* Allocate this now to avoid having to handle failure below */ - snap_count = le32_to_cpu(ondisk->snap_count); - if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context)) - / sizeof (*ondisk)) - return -EINVAL; - header->snapc = kmalloc(sizeof(struct ceph_snap_context) + - snap_count * sizeof (*ondisk), - gfp_flags); - if (!header->snapc) - return -ENOMEM; + if (first_time) { + size_t len; - header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); - if (snap_count) { - header->snap_names = kmalloc(header->snap_names_len, - gfp_flags); - if (!header->snap_names) - goto err_snapc; - header->snap_sizes = kmalloc(snap_count * sizeof(u64), - gfp_flags); - if (!header->snap_sizes) - goto err_names; - } else { - header->snap_names = NULL; - header->snap_sizes = NULL; + len = strnlen(ondisk->object_prefix, + sizeof (ondisk->object_prefix)); + object_prefix = kmalloc(len + 1, GFP_KERNEL); + if (!object_prefix) + return -ENOMEM; + memcpy(object_prefix, ondisk->object_prefix, len); + object_prefix[len] = '\0'; } - memcpy(header->block_name, ondisk->block_name, - sizeof(ondisk->block_name)); - - header->image_size = le64_to_cpu(ondisk->image_size); - header->obj_order = ondisk->options.order; - header->crypt_type = ondisk->options.crypt_type; - header->comp_type = ondisk->options.comp_type; - atomic_set(&header->snapc->nref, 1); - header->snap_seq = le64_to_cpu(ondisk->snap_seq); - header->snapc->num_snaps = snap_count; - header->total_snaps = snap_count; + /* Allocate the snapshot context and fill it in */ - if (snap_count && allocated_snaps == snap_count) { + snap_count = le32_to_cpu(ondisk->snap_count); + snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); + if (!snapc) + goto out_err; + snapc->seq = le64_to_cpu(ondisk->snap_seq); + if (snap_count) { + struct rbd_image_snap_ondisk *snaps; + u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); + + /* We'll keep a copy of the snapshot names... */ + + if (snap_names_len > (u64)SIZE_MAX) + goto out_2big; + snap_names = kmalloc(snap_names_len, GFP_KERNEL); + if (!snap_names) + goto out_err; + + /* ...as well as the array of their sizes. */ + + size = snap_count * sizeof (*header->snap_sizes); + snap_sizes = kmalloc(size, GFP_KERNEL); + if (!snap_sizes) + goto out_err; + + /* + * Copy the names, and fill in each snapshot's id + * and size. + * + * Note that rbd_dev_v1_header_info() guarantees the + * ondisk buffer we're working with has + * snap_names_len bytes beyond the end of the + * snapshot id array, this memcpy() is safe. + */ + memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len); + snaps = ondisk->snaps; for (i = 0; i < snap_count; i++) { - header->snapc->snaps[i] = - le64_to_cpu(ondisk->snaps[i].id); - header->snap_sizes[i] = - le64_to_cpu(ondisk->snaps[i].image_size); + snapc->snaps[i] = le64_to_cpu(snaps[i].id); + snap_sizes[i] = le64_to_cpu(snaps[i].image_size); } + } - /* copy snapshot names */ - memcpy(header->snap_names, &ondisk->snaps[i], - header->snap_names_len); + /* We won't fail any more, fill in the header */ + + if (first_time) { + header->object_prefix = object_prefix; + header->obj_order = ondisk->options.order; + header->crypt_type = ondisk->options.crypt_type; + header->comp_type = ondisk->options.comp_type; + /* The rest aren't used for format 1 images */ + header->stripe_unit = 0; + header->stripe_count = 0; + header->features = 0; + } else { + ceph_put_snap_context(header->snapc); + kfree(header->snap_names); + kfree(header->snap_sizes); } + /* The remaining fields always get updated (when we refresh) */ + + header->image_size = le64_to_cpu(ondisk->image_size); + header->snapc = snapc; + header->snap_names = snap_names; + header->snap_sizes = snap_sizes; + + /* Make sure mapping size is consistent with header info */ + + if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time) + if (rbd_dev->mapping.size != header->image_size) + rbd_dev->mapping.size = header->image_size; + return 0; +out_2big: + ret = -EIO; +out_err: + kfree(snap_sizes); + kfree(snap_names); + ceph_put_snap_context(snapc); + kfree(object_prefix); -err_names: - kfree(header->snap_names); -err_snapc: - kfree(header->snapc); - return -ENOMEM; + return ret; } -static int snap_by_name(struct rbd_image_header *header, const char *snap_name, - u64 *seq, u64 *size) +static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which) { - int i; - char *p = header->snap_names; + const char *snap_name; - for (i = 0; i < header->total_snaps; i++) { - if (!strcmp(snap_name, p)) { + rbd_assert(which < rbd_dev->header.snapc->num_snaps); - /* Found it. Pass back its id and/or size */ + /* Skip over names until we find the one we are looking for */ - if (seq) - *seq = header->snapc->snaps[i]; - if (size) - *size = header->snap_sizes[i]; - return i; - } - p += strlen(p) + 1; /* Skip ahead to the next name */ - } - return -ENOENT; + snap_name = rbd_dev->header.snap_names; + while (which--) + snap_name += strlen(snap_name) + 1; + + return kstrdup(snap_name, GFP_KERNEL); } -static int rbd_header_set_snap(struct rbd_device *dev, u64 *size) +/* + * Snapshot id comparison function for use with qsort()/bsearch(). + * Note that result is for snapshots in *descending* order. + */ +static int snapid_compare_reverse(const void *s1, const void *s2) { - struct rbd_image_header *header = &dev->header; - struct ceph_snap_context *snapc = header->snapc; - int ret = -ENOENT; + u64 snap_id1 = *(u64 *)s1; + u64 snap_id2 = *(u64 *)s2; - BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME)); + if (snap_id1 < snap_id2) + return 1; + return snap_id1 == snap_id2 ? 0 : -1; +} - down_write(&dev->header_rwsem); +/* + * Search a snapshot context to see if the given snapshot id is + * present. + * + * Returns the position of the snapshot id in the array if it's found, + * or BAD_SNAP_INDEX otherwise. + * + * Note: The snapshot array is in kept sorted (by the osd) in + * reverse order, highest snapshot id first. + */ +static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + u64 *found; - if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME, - sizeof (RBD_SNAP_HEAD_NAME))) { - if (header->total_snaps) - snapc->seq = header->snap_seq; - else - snapc->seq = 0; - dev->snap_id = CEPH_NOSNAP; - dev->read_only = 0; - if (size) - *size = header->image_size; + found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps, + sizeof (snap_id), snapid_compare_reverse); + + return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX; +} + +static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, + u64 snap_id) +{ + u32 which; + const char *snap_name; + + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return ERR_PTR(-ENOENT); + + snap_name = _rbd_dev_v1_snap_name(rbd_dev, which); + return snap_name ? snap_name : ERR_PTR(-ENOMEM); +} + +static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) +{ + if (snap_id == CEPH_NOSNAP) + return RBD_SNAP_HEAD_NAME; + + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (rbd_dev->image_format == 1) + return rbd_dev_v1_snap_name(rbd_dev, snap_id); + + return rbd_dev_v2_snap_name(rbd_dev, snap_id); +} + +static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_size) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_size = rbd_dev->header.image_size; + } else if (rbd_dev->image_format == 1) { + u32 which; + + which = rbd_dev_snap_index(rbd_dev, snap_id); + if (which == BAD_SNAP_INDEX) + return -ENOENT; + + *snap_size = rbd_dev->header.snap_sizes[which]; } else { - ret = snap_by_name(header, dev->snap_name, &snapc->seq, size); - if (ret < 0) - goto done; - dev->snap_id = snapc->seq; - dev->read_only = 1; + u64 size = 0; + int ret; + + ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); + if (ret) + return ret; + + *snap_size = size; } + return 0; +} - ret = 0; -done: - up_write(&dev->header_rwsem); - return ret; +static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features) +{ + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + if (snap_id == CEPH_NOSNAP) { + *snap_features = rbd_dev->header.features; + } else if (rbd_dev->image_format == 1) { + *snap_features = 0; /* No features for format 1 */ + } else { + u64 features = 0; + int ret; + + ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; + + *snap_features = features; + } + return 0; } -static void rbd_header_free(struct rbd_image_header *header) +static int rbd_dev_mapping_set(struct rbd_device *rbd_dev) { - kfree(header->snapc); - kfree(header->snap_names); - kfree(header->snap_sizes); + u64 snap_id = rbd_dev->spec->snap_id; + u64 size = 0; + u64 features = 0; + int ret; + + ret = rbd_snap_size(rbd_dev, snap_id, &size); + if (ret) + return ret; + ret = rbd_snap_features(rbd_dev, snap_id, &features); + if (ret) + return ret; + + rbd_dev->mapping.size = size; + rbd_dev->mapping.features = features; + + return 0; } -/* - * get the actual striped segment name, offset and length - */ -static u64 rbd_get_segment(struct rbd_image_header *header, - const char *block_name, - u64 ofs, u64 len, - char *seg_name, u64 *segofs) +static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) +{ + rbd_dev->mapping.size = 0; + rbd_dev->mapping.features = 0; +} + +static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) { - u64 seg = ofs >> header->obj_order; + char *name; + u64 segment; + int ret; + char *name_format; + + name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); + if (!name) + return NULL; + segment = offset >> rbd_dev->header.obj_order; + name_format = "%s.%012llx"; + if (rbd_dev->image_format == 2) + name_format = "%s.%016llx"; + ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, + rbd_dev->header.object_prefix, segment); + if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { + pr_err("error formatting segment name for #%llu (%d)\n", + segment, ret); + kfree(name); + name = NULL; + } - if (seg_name) - snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, - "%s.%012llx", block_name, seg); + return name; +} + +static void rbd_segment_name_free(const char *name) +{ + /* The explicit cast here is needed to drop the const qualifier */ - ofs = ofs & ((1 << header->obj_order) - 1); - len = min_t(u64, len, (1 << header->obj_order) - ofs); + kmem_cache_free(rbd_segment_name_cache, (void *)name); +} - if (segofs) - *segofs = ofs; +static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) +{ + u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; - return len; + return offset & (segment_size - 1); } -static int rbd_get_num_segments(struct rbd_image_header *header, - u64 ofs, u64 len) +static u64 rbd_segment_length(struct rbd_device *rbd_dev, + u64 offset, u64 length) { - u64 start_seg = ofs >> header->obj_order; - u64 end_seg = (ofs + len - 1) >> header->obj_order; - return end_seg - start_seg + 1; + u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; + + offset &= segment_size - 1; + + rbd_assert(length <= U64_MAX - offset); + if (offset + length > segment_size) + length = segment_size - offset; + + return length; } /* @@ -675,22 +1221,23 @@ static void bio_chain_put(struct bio *chain) */ static void zero_bio_chain(struct bio *chain, int start_ofs) { - struct bio_vec *bv; + struct bio_vec bv; + struct bvec_iter iter; unsigned long flags; void *buf; - int i; int pos = 0; while (chain) { - bio_for_each_segment(bv, chain, i) { - if (pos + bv->bv_len > start_ofs) { + bio_for_each_segment(bv, chain, iter) { + if (pos + bv.bv_len > start_ofs) { int remainder = max(start_ofs - pos, 0); - buf = bvec_kmap_irq(bv, &flags); + buf = bvec_kmap_irq(&bv, &flags); memset(buf + remainder, 0, - bv->bv_len - remainder); + bv.bv_len - remainder); + flush_dcache_page(bv.bv_page); bvec_kunmap_irq(buf, &flags); } - pos += bv->bv_len; + pos += bv.bv_len; } chain = chain->bi_next; @@ -698,858 +1245,2086 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } /* - * bio_chain_clone - clone a chain of bios up to a certain length. - * might return a bio_pair that will need to be released. + * similar to zero_bio_chain(), zeros data defined by a page array, + * starting at the given byte offset from the start of the array and + * continuing up to the given end offset. The pages array is + * assumed to be big enough to hold all bytes up to the end. */ -static struct bio *bio_chain_clone(struct bio **old, struct bio **next, - struct bio_pair **bp, - int len, gfp_t gfpmask) +static void zero_pages(struct page **pages, u64 offset, u64 end) { - struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; - int total = 0; - - if (*bp) { - bio_pair_release(*bp); - *bp = NULL; + struct page **page = &pages[offset >> PAGE_SHIFT]; + + rbd_assert(end > offset); + rbd_assert(end - offset <= (u64)SIZE_MAX); + while (offset < end) { + size_t page_offset; + size_t length; + unsigned long flags; + void *kaddr; + + page_offset = offset & ~PAGE_MASK; + length = min_t(size_t, PAGE_SIZE - page_offset, end - offset); + local_irq_save(flags); + kaddr = kmap_atomic(*page); + memset(kaddr + page_offset, 0, length); + flush_dcache_page(*page); + kunmap_atomic(kaddr); + local_irq_restore(flags); + + offset += length; + page++; } +} - while (old_chain && (total < len)) { - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); - if (!tmp) - goto err_out; +/* + * Clone a portion of a bio, starting at the given byte offset + * and continuing for the number of bytes indicated. + */ +static struct bio *bio_clone_range(struct bio *bio_src, + unsigned int offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio *bio; - if (total + old_chain->bi_size > len) { - struct bio_pair *bp; + bio = bio_clone(bio_src, gfpmask); + if (!bio) + return NULL; /* ENOMEM */ - /* - * this split can only happen with a single paged bio, - * split_bio will BUG_ON if this is not the case - */ - dout("bio_chain_clone split! total=%d remaining=%d" - "bi_size=%d\n", - (int)total, (int)len-total, - (int)old_chain->bi_size); + bio_advance(bio, offset); + bio->bi_iter.bi_size = len; - /* split the bio. We'll release it either in the next - call, or it will have to be released outside */ - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); - if (!bp) - goto err_out; + return bio; +} - __bio_clone(tmp, &bp->bio1); +/* + * Clone a portion of a bio chain, starting at the given byte offset + * into the first bio in the source chain and continuing for the + * number of bytes indicated. The result is another bio chain of + * exactly the given length, or a null pointer on error. + * + * The bio_src and offset parameters are both in-out. On entry they + * refer to the first source bio and the offset into that bio where + * the start of data to be cloned is located. + * + * On return, bio_src is updated to refer to the bio in the source + * chain that contains first un-cloned byte, and *offset will + * contain the offset of that byte within that bio. + */ +static struct bio *bio_chain_clone_range(struct bio **bio_src, + unsigned int *offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio *bi = *bio_src; + unsigned int off = *offset; + struct bio *chain = NULL; + struct bio **end; - *next = &bp->bio2; - } else { - __bio_clone(tmp, old_chain); - *next = old_chain->bi_next; - } + /* Build up a chain of clone bios up to the limit */ - tmp->bi_bdev = NULL; - gfpmask &= ~__GFP_WAIT; - tmp->bi_next = NULL; + if (!bi || off >= bi->bi_iter.bi_size || !len) + return NULL; /* Nothing to clone */ - if (!new_chain) { - new_chain = tail = tmp; - } else { - tail->bi_next = tmp; - tail = tmp; - } - old_chain = old_chain->bi_next; + end = &chain; + while (len) { + unsigned int bi_size; + struct bio *bio; - total += tmp->bi_size; + if (!bi) { + rbd_warn(NULL, "bio_chain exhausted with %u left", len); + goto out_err; /* EINVAL; ran out of bio's */ + } + bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len); + bio = bio_clone_range(bi, off, bi_size, gfpmask); + if (!bio) + goto out_err; /* ENOMEM */ + + *end = bio; + end = &bio->bi_next; + + off += bi_size; + if (off == bi->bi_iter.bi_size) { + bi = bi->bi_next; + off = 0; + } + len -= bi_size; } + *bio_src = bi; + *offset = off; - BUG_ON(total < len); + return chain; +out_err: + bio_chain_put(chain); - if (tail) - tail->bi_next = NULL; + return NULL; +} - *old = old_chain; +/* + * The default/initial value for all object request flags is 0. For + * each flag, once its value is set to 1 it is never reset to 0 + * again. + */ +static void obj_request_img_data_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { + struct rbd_device *rbd_dev; - return new_chain; + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", + obj_request); + } +} -err_out: - dout("bio_chain_clone with err\n"); - bio_chain_put(new_chain); - return NULL; +static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; +} + +static void obj_request_done_set(struct rbd_obj_request *obj_request) +{ + if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { + struct rbd_device *rbd_dev = NULL; + + if (obj_request_img_data_test(obj_request)) + rbd_dev = obj_request->img_request->rbd_dev; + rbd_warn(rbd_dev, "obj_request %p already marked done\n", + obj_request); + } +} + +static bool obj_request_done_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; } /* - * helpers for osd request op vectors. + * This sets the KNOWN flag after (possibly) setting the EXISTS + * flag. The latter is set based on the "exists" value provided. + * + * Note that for our purposes once an object exists it never goes + * away again. It's possible that the response from two existence + * checks are separated by the creation of the target object, and + * the first ("doesn't exist") response arrives *after* the second + * ("does exist"). In that case we ignore the second one. */ -static int rbd_create_rw_ops(struct ceph_osd_req_op **ops, - int num_ops, - int opcode, - u32 payload_len) -{ - *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1), - GFP_NOIO); - if (!*ops) - return -ENOMEM; - (*ops)[0].op = opcode; +static void obj_request_existence_set(struct rbd_obj_request *obj_request, + bool exists) +{ + if (exists) + set_bit(OBJ_REQ_EXISTS, &obj_request->flags); + set_bit(OBJ_REQ_KNOWN, &obj_request->flags); + smp_mb(); +} + +static bool obj_request_known_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; +} + +static bool obj_request_exists_test(struct rbd_obj_request *obj_request) +{ + smp_mb(); + return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; +} + +static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; + + return obj_request->img_offset < + round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header)); +} + +static void rbd_obj_request_get(struct rbd_obj_request *obj_request) +{ + dout("%s: obj %p (was %d)\n", __func__, obj_request, + atomic_read(&obj_request->kref.refcount)); + kref_get(&obj_request->kref); +} + +static void rbd_obj_request_destroy(struct kref *kref); +static void rbd_obj_request_put(struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request != NULL); + dout("%s: obj %p (was %d)\n", __func__, obj_request, + atomic_read(&obj_request->kref.refcount)); + kref_put(&obj_request->kref, rbd_obj_request_destroy); +} + +static void rbd_img_request_get(struct rbd_img_request *img_request) +{ + dout("%s: img %p (was %d)\n", __func__, img_request, + atomic_read(&img_request->kref.refcount)); + kref_get(&img_request->kref); +} + +static bool img_request_child_test(struct rbd_img_request *img_request); +static void rbd_parent_request_destroy(struct kref *kref); +static void rbd_img_request_destroy(struct kref *kref); +static void rbd_img_request_put(struct rbd_img_request *img_request) +{ + rbd_assert(img_request != NULL); + dout("%s: img %p (was %d)\n", __func__, img_request, + atomic_read(&img_request->kref.refcount)); + if (img_request_child_test(img_request)) + kref_put(&img_request->kref, rbd_parent_request_destroy); + else + kref_put(&img_request->kref, rbd_img_request_destroy); +} + +static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request->img_request == NULL); + + /* Image request now owns object's original reference */ + obj_request->img_request = img_request; + obj_request->which = img_request->obj_request_count; + rbd_assert(!obj_request_img_data_test(obj_request)); + obj_request_img_data_set(obj_request); + rbd_assert(obj_request->which != BAD_WHICH); + img_request->obj_request_count++; + list_add_tail(&obj_request->links, &img_request->obj_requests); + dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, + obj_request->which); +} + +static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, + struct rbd_obj_request *obj_request) +{ + rbd_assert(obj_request->which != BAD_WHICH); + + dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, + obj_request->which); + list_del(&obj_request->links); + rbd_assert(img_request->obj_request_count > 0); + img_request->obj_request_count--; + rbd_assert(obj_request->which == img_request->obj_request_count); + obj_request->which = BAD_WHICH; + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->img_request == img_request); + obj_request->img_request = NULL; + obj_request->callback = NULL; + rbd_obj_request_put(obj_request); +} + +static bool obj_request_type_valid(enum obj_request_type type) +{ + switch (type) { + case OBJ_REQUEST_NODATA: + case OBJ_REQUEST_BIO: + case OBJ_REQUEST_PAGES: + return true; + default: + return false; + } +} + +static int rbd_obj_request_submit(struct ceph_osd_client *osdc, + struct rbd_obj_request *obj_request) +{ + dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); + + return ceph_osdc_start_request(osdc, obj_request->osd_req, false); +} + +static void rbd_img_request_complete(struct rbd_img_request *img_request) +{ + + dout("%s: img %p\n", __func__, img_request); + /* - * op extent offset and length will be set later on - * in calc_raw_layout() + * If no error occurred, compute the aggregate transfer + * count for the image request. We could instead use + * atomic64_cmpxchg() to update it as each object request + * completes; not clear which way is better off hand. */ - (*ops)[0].payload_len = payload_len; - return 0; + if (!img_request->result) { + struct rbd_obj_request *obj_request; + u64 xferred = 0; + + for_each_obj_request(img_request, obj_request) + xferred += obj_request->xferred; + img_request->xferred = xferred; + } + + if (img_request->callback) + img_request->callback(img_request); + else + rbd_img_request_put(img_request); } -static void rbd_destroy_ops(struct ceph_osd_req_op *ops) +/* Caller is responsible for rbd_obj_request_destroy(obj_request) */ + +static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) { - kfree(ops); + dout("%s: obj %p\n", __func__, obj_request); + + return wait_for_completion_interruptible(&obj_request->completion); } -static void rbd_coll_end_req_index(struct request *rq, - struct rbd_req_coll *coll, - int index, - int ret, u64 len) +/* + * The default/initial value for all image request flags is 0. Each + * is conditionally set to 1 at image request initialization time + * and currently never change thereafter. + */ +static void img_request_write_set(struct rbd_img_request *img_request) { - struct request_queue *q; - int min, max, i; + set_bit(IMG_REQ_WRITE, &img_request->flags); + smp_mb(); +} - dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n", - coll, index, ret, len); +static bool img_request_write_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; +} - if (!rq) - return; +static void img_request_child_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_CHILD, &img_request->flags); + smp_mb(); +} - if (!coll) { - blk_end_request(rq, ret, len); - return; +static void img_request_child_clear(struct rbd_img_request *img_request) +{ + clear_bit(IMG_REQ_CHILD, &img_request->flags); + smp_mb(); +} + +static bool img_request_child_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; +} + +static void img_request_layered_set(struct rbd_img_request *img_request) +{ + set_bit(IMG_REQ_LAYERED, &img_request->flags); + smp_mb(); +} + +static void img_request_layered_clear(struct rbd_img_request *img_request) +{ + clear_bit(IMG_REQ_LAYERED, &img_request->flags); + smp_mb(); +} + +static bool img_request_layered_test(struct rbd_img_request *img_request) +{ + smp_mb(); + return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; +} + +static void +rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) +{ + u64 xferred = obj_request->xferred; + u64 length = obj_request->length; + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, obj_request->img_request, obj_request->result, + xferred, length); + /* + * ENOENT means a hole in the image. We zero-fill the entire + * length of the request. A short read also implies zero-fill + * to the end of the request. An error requires the whole + * length of the request to be reported finished with an error + * to the block layer. In each case we update the xferred + * count to indicate the whole request was satisfied. + */ + rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); + if (obj_request->result == -ENOENT) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, 0); + else + zero_pages(obj_request->pages, 0, length); + obj_request->result = 0; + } else if (xferred < length && !obj_request->result) { + if (obj_request->type == OBJ_REQUEST_BIO) + zero_bio_chain(obj_request->bio_list, xferred); + else + zero_pages(obj_request->pages, xferred, length); } + obj_request->xferred = length; + obj_request_done_set(obj_request); +} - q = rq->q; +static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) +{ + dout("%s: obj %p cb %p\n", __func__, obj_request, + obj_request->callback); + if (obj_request->callback) + obj_request->callback(obj_request); + else + complete_all(&obj_request->completion); +} - spin_lock_irq(q->queue_lock); - coll->status[index].done = 1; - coll->status[index].rc = ret; - coll->status[index].bytes = len; - max = min = coll->num_done; - while (max < coll->total && coll->status[max].done) - max++; +static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) +{ + dout("%s: obj %p\n", __func__, obj_request); + obj_request_done_set(obj_request); +} + +static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = NULL; + bool layered = false; - for (i = min; i<max; i++) { - __blk_end_request(rq, coll->status[i].rc, - coll->status[i].bytes); - coll->num_done++; - kref_put(&coll->kref, rbd_coll_release); + if (obj_request_img_data_test(obj_request)) { + img_request = obj_request->img_request; + layered = img_request && img_request_layered_test(img_request); + rbd_dev = img_request->rbd_dev; } - spin_unlock_irq(q->queue_lock); + + dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, + obj_request, img_request, obj_request->result, + obj_request->xferred, obj_request->length); + if (layered && obj_request->result == -ENOENT && + obj_request->img_offset < rbd_dev->parent_overlap) + rbd_img_parent_read(obj_request); + else if (img_request) + rbd_img_obj_request_read_callback(obj_request); + else + obj_request_done_set(obj_request); } -static void rbd_coll_end_req(struct rbd_request *req, - int ret, u64 len) +static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) { - rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len); + dout("%s: obj %p result %d %llu\n", __func__, obj_request, + obj_request->result, obj_request->length); + /* + * There is no such thing as a successful short write. Set + * it to our originally-requested length. + */ + obj_request->xferred = obj_request->length; + obj_request_done_set(obj_request); } /* - * Send ceph osd request + * For a simple stat call there's nothing to do. We'll do more if + * this is part of a write sequence for a layered image. */ -static int rbd_do_request(struct request *rq, - struct rbd_device *dev, - struct ceph_snap_context *snapc, - u64 snapid, - const char *obj, u64 ofs, u64 len, - struct bio *bio, - struct page **pages, - int num_pages, - int flags, - struct ceph_osd_req_op *ops, - int num_reply, - struct rbd_req_coll *coll, - int coll_index, - void (*rbd_cb)(struct ceph_osd_request *req, - struct ceph_msg *msg), - struct ceph_osd_request **linger_req, - u64 *ver) -{ - struct ceph_osd_request *req; - struct ceph_file_layout *layout; - int ret; - u64 bno; - struct timespec mtime = CURRENT_TIME; - struct rbd_request *req_data; - struct ceph_osd_request_head *reqhead; - struct ceph_osd_client *osdc; +static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) +{ + dout("%s: obj %p\n", __func__, obj_request); + obj_request_done_set(obj_request); +} - req_data = kzalloc(sizeof(*req_data), GFP_NOIO); - if (!req_data) { - if (coll) - rbd_coll_end_req_index(rq, coll, coll_index, - -ENOMEM, len); - return -ENOMEM; +static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, + struct ceph_msg *msg) +{ + struct rbd_obj_request *obj_request = osd_req->r_priv; + u16 opcode; + + dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); + rbd_assert(osd_req == obj_request->osd_req); + if (obj_request_img_data_test(obj_request)) { + rbd_assert(obj_request->img_request); + rbd_assert(obj_request->which != BAD_WHICH); + } else { + rbd_assert(obj_request->which == BAD_WHICH); } - if (coll) { - req_data->coll = coll; - req_data->coll_index = coll_index; - } + if (osd_req->r_result < 0) + obj_request->result = osd_req->r_result; - dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs); + rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP); - down_read(&dev->header_rwsem); + /* + * We support a 64-bit length, but ultimately it has to be + * passed to blk_end_request(), which takes an unsigned int. + */ + obj_request->xferred = osd_req->r_reply_op_len[0]; + rbd_assert(obj_request->xferred < (u64)UINT_MAX); - osdc = &dev->rbd_client->client->osdc; - req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, - false, GFP_NOIO, pages, bio); - if (!req) { - up_read(&dev->header_rwsem); - ret = -ENOMEM; - goto done_pages; + opcode = osd_req->r_ops[0].op; + switch (opcode) { + case CEPH_OSD_OP_READ: + rbd_osd_read_callback(obj_request); + break; + case CEPH_OSD_OP_SETALLOCHINT: + rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE); + /* fall through */ + case CEPH_OSD_OP_WRITE: + rbd_osd_write_callback(obj_request); + break; + case CEPH_OSD_OP_STAT: + rbd_osd_stat_callback(obj_request); + break; + case CEPH_OSD_OP_CALL: + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_WATCH: + rbd_osd_trivial_callback(obj_request); + break; + default: + rbd_warn(NULL, "%s: unsupported op %hu\n", + obj_request->object_name, (unsigned short) opcode); + break; } - req->r_callback = rbd_cb; + if (obj_request_done_test(obj_request)) + rbd_obj_request_complete(obj_request); +} - req_data->rq = rq; - req_data->bio = bio; - req_data->pages = pages; - req_data->len = len; +static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + u64 snap_id; - req->r_priv = req_data; + rbd_assert(osd_req != NULL); - reqhead = req->r_request->front.iov_base; - reqhead->snapid = cpu_to_le64(CEPH_NOSNAP); + snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; + ceph_osdc_build_request(osd_req, obj_request->offset, + NULL, snap_id, NULL); +} - strncpy(req->r_oid, obj, sizeof(req->r_oid)); - req->r_oid_len = strlen(req->r_oid); +static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + struct ceph_snap_context *snapc; + struct timespec mtime = CURRENT_TIME; - layout = &req->r_file_layout; - memset(layout, 0, sizeof(*layout)); - layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_stripe_count = cpu_to_le32(1); - layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); - layout->fl_pg_pool = cpu_to_le32(dev->poolid); - ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, - req, ops); + rbd_assert(osd_req != NULL); - ceph_osdc_build_request(req, ofs, &len, - ops, - snapc, - &mtime, - req->r_oid, req->r_oid_len); - up_read(&dev->header_rwsem); + snapc = img_request ? img_request->snapc : NULL; + ceph_osdc_build_request(osd_req, obj_request->offset, + snapc, CEPH_NOSNAP, &mtime); +} - if (linger_req) { - ceph_osdc_set_request_linger(osdc, req); - *linger_req = req; - } +/* + * Create an osd request. A read request has one osd op (read). + * A write request has either one (watch) or two (hint+write) osd ops. + * (All rbd data writes are prefixed with an allocation hint op, but + * technically osd watch is a write request, hence this distinction.) + */ +static struct ceph_osd_request *rbd_osd_req_create( + struct rbd_device *rbd_dev, + bool write_request, + unsigned int num_ops, + struct rbd_obj_request *obj_request) +{ + struct ceph_snap_context *snapc = NULL; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; - ret = ceph_osdc_start_request(osdc, req, false); - if (ret < 0) - goto done_err; + if (obj_request_img_data_test(obj_request)) { + struct rbd_img_request *img_request = obj_request->img_request; - if (!rbd_cb) { - ret = ceph_osdc_wait_request(osdc, req); - if (ver) - *ver = le64_to_cpu(req->r_reassert_version.version); - dout("reassert_ver=%lld\n", - le64_to_cpu(req->r_reassert_version.version)); - ceph_osdc_put_request(req); + rbd_assert(write_request == + img_request_write_test(img_request)); + if (write_request) + snapc = img_request->snapc; } - return ret; -done_err: - bio_chain_put(req_data->bio); - ceph_osdc_put_request(req); -done_pages: - rbd_coll_end_req(req_data, ret, len); - kfree(req_data); - return ret; + rbd_assert(num_ops == 1 || (write_request && num_ops == 2)); + + /* Allocate and initialize the request, for the num_ops ops */ + + osdc = &rbd_dev->rbd_client->client->osdc; + osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, + GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ + + if (write_request) + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + else + osd_req->r_flags = CEPH_OSD_FLAG_READ; + + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; + + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); + + return osd_req; } /* - * Ceph osd op callback + * Create a copyup osd request based on the information in the + * object request supplied. A copyup request has three osd ops, + * a copyup method call, a hint op, and a write op. */ -static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +static struct ceph_osd_request * +rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) { - struct rbd_request *req_data = req->r_priv; - struct ceph_osd_reply_head *replyhead; - struct ceph_osd_op *op; - __s32 rc; - u64 bytes; - int read_op; + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct ceph_osd_request *osd_req; - /* parse reply */ - replyhead = msg->front.iov_base; - WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); - op = (void *)(replyhead + 1); - rc = le32_to_cpu(replyhead->result); - bytes = le64_to_cpu(op->extent.length); - read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ); + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_assert(img_request_write_test(img_request)); - dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc); + /* Allocate and initialize the request, for the three ops */ - if (rc == -ENOENT && read_op) { - zero_bio_chain(req_data->bio, 0); - rc = 0; - } else if (rc == 0 && read_op && bytes < req_data->len) { - zero_bio_chain(req_data->bio, bytes); - bytes = req_data->len; - } + snapc = img_request->snapc; + rbd_dev = img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC); + if (!osd_req) + return NULL; /* ENOMEM */ - rbd_coll_end_req(req_data, rc, bytes); + osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + osd_req->r_callback = rbd_osd_req_callback; + osd_req->r_priv = obj_request; - if (req_data->bio) - bio_chain_put(req_data->bio); + osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); + ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); - ceph_osdc_put_request(req); - kfree(req_data); + return osd_req; } -static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) + +static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { - ceph_osdc_put_request(req); + ceph_osdc_put_request(osd_req); } -/* - * Do a synchronous ceph osd operation - */ -static int rbd_req_sync_op(struct rbd_device *dev, - struct ceph_snap_context *snapc, - u64 snapid, - int opcode, - int flags, - struct ceph_osd_req_op *orig_ops, - int num_reply, - const char *obj, - u64 ofs, u64 len, - char *buf, - struct ceph_osd_request **linger_req, - u64 *ver) +/* object_name is assumed to be a non-null pointer and NUL-terminated */ + +static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, + u64 offset, u64 length, + enum obj_request_type type) { - int ret; - struct page **pages; - int num_pages; - struct ceph_osd_req_op *ops = orig_ops; - u32 payload_len; + struct rbd_obj_request *obj_request; + size_t size; + char *name; - num_pages = calc_pages_for(ofs , len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + rbd_assert(obj_request_type_valid(type)); - if (!orig_ops) { - payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0); - ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); - if (ret < 0) - goto done; + size = strlen(object_name) + 1; + name = kmalloc(size, GFP_KERNEL); + if (!name) + return NULL; - if ((flags & CEPH_OSD_FLAG_WRITE) && buf) { - ret = ceph_copy_to_page_vector(pages, buf, ofs, len); - if (ret < 0) - goto done_ops; - } + obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL); + if (!obj_request) { + kfree(name); + return NULL; } - ret = rbd_do_request(NULL, dev, snapc, snapid, - obj, ofs, len, NULL, - pages, num_pages, - flags, - ops, - 2, - NULL, 0, - NULL, - linger_req, ver); - if (ret < 0) - goto done_ops; + obj_request->object_name = memcpy(name, object_name, size); + obj_request->offset = offset; + obj_request->length = length; + obj_request->flags = 0; + obj_request->which = BAD_WHICH; + obj_request->type = type; + INIT_LIST_HEAD(&obj_request->links); + init_completion(&obj_request->completion); + kref_init(&obj_request->kref); - if ((flags & CEPH_OSD_FLAG_READ) && buf) - ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); + dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, + offset, length, (int)type, obj_request); -done_ops: - if (!orig_ops) - rbd_destroy_ops(ops); -done: - ceph_release_page_vector(pages, num_pages); - return ret; + return obj_request; +} + +static void rbd_obj_request_destroy(struct kref *kref) +{ + struct rbd_obj_request *obj_request; + + obj_request = container_of(kref, struct rbd_obj_request, kref); + + dout("%s: obj %p\n", __func__, obj_request); + + rbd_assert(obj_request->img_request == NULL); + rbd_assert(obj_request->which == BAD_WHICH); + + if (obj_request->osd_req) + rbd_osd_req_destroy(obj_request->osd_req); + + rbd_assert(obj_request_type_valid(obj_request->type)); + switch (obj_request->type) { + case OBJ_REQUEST_NODATA: + break; /* Nothing to do */ + case OBJ_REQUEST_BIO: + if (obj_request->bio_list) + bio_chain_put(obj_request->bio_list); + break; + case OBJ_REQUEST_PAGES: + if (obj_request->pages) + ceph_release_page_vector(obj_request->pages, + obj_request->page_count); + break; + } + + kfree(obj_request->object_name); + obj_request->object_name = NULL; + kmem_cache_free(rbd_obj_request_cache, obj_request); +} + +/* It's OK to call this for a device with no parent */ + +static void rbd_spec_put(struct rbd_spec *spec); +static void rbd_dev_unparent(struct rbd_device *rbd_dev) +{ + rbd_dev_remove_parent(rbd_dev); + rbd_spec_put(rbd_dev->parent_spec); + rbd_dev->parent_spec = NULL; + rbd_dev->parent_overlap = 0; } /* - * Do an asynchronous ceph osd operation + * Parent image reference counting is used to determine when an + * image's parent fields can be safely torn down--after there are no + * more in-flight requests to the parent image. When the last + * reference is dropped, cleaning them up is safe. */ -static int rbd_do_op(struct request *rq, - struct rbd_device *rbd_dev , - struct ceph_snap_context *snapc, - u64 snapid, - int opcode, int flags, int num_reply, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - char *seg_name; - u64 seg_ofs; - u64 seg_len; - int ret; - struct ceph_osd_req_op *ops; - u32 payload_len; +static void rbd_dev_parent_put(struct rbd_device *rbd_dev) +{ + int counter; - seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); - if (!seg_name) - return -ENOMEM; + if (!rbd_dev->parent_spec) + return; - seg_len = rbd_get_segment(&rbd_dev->header, - rbd_dev->header.block_name, - ofs, len, - seg_name, &seg_ofs); + counter = atomic_dec_return_safe(&rbd_dev->parent_ref); + if (counter > 0) + return; - payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); + /* Last reference; clean up parent data structures */ - ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len); - if (ret < 0) - goto done; - - /* we've taken care of segment sizes earlier when we - cloned the bios. We should never have a segment - truncated at this point */ - BUG_ON(seg_len < len); - - ret = rbd_do_request(rq, rbd_dev, snapc, snapid, - seg_name, seg_ofs, seg_len, - bio, - NULL, 0, - flags, - ops, - num_reply, - coll, coll_index, - rbd_req_cb, 0, NULL); - - rbd_destroy_ops(ops); -done: - kfree(seg_name); - return ret; + if (!counter) + rbd_dev_unparent(rbd_dev); + else + rbd_warn(rbd_dev, "parent reference underflow\n"); } /* - * Request async osd write + * If an image has a non-zero parent overlap, get a reference to its + * parent. + * + * We must get the reference before checking for the overlap to + * coordinate properly with zeroing the parent overlap in + * rbd_dev_v2_parent_info() when an image gets flattened. We + * drop it again if there is no overlap. + * + * Returns true if the rbd device has a parent with a non-zero + * overlap and a reference for it was successfully taken, or + * false otherwise. */ -static int rbd_req_write(struct request *rq, - struct rbd_device *rbd_dev, - struct ceph_snap_context *snapc, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) +static bool rbd_dev_parent_get(struct rbd_device *rbd_dev) { - return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - 2, - ofs, len, bio, coll, coll_index); + int counter; + + if (!rbd_dev->parent_spec) + return false; + + counter = atomic_inc_return_safe(&rbd_dev->parent_ref); + if (counter > 0 && rbd_dev->parent_overlap) + return true; + + /* Image was flattened, but parent is not yet torn down */ + + if (counter < 0) + rbd_warn(rbd_dev, "parent reference overflow\n"); + + return false; } /* - * Request async osd read + * Caller is responsible for filling in the list of object requests + * that comprises the image request, and the Linux request pointer + * (if there is one). */ -static int rbd_req_read(struct request *rq, - struct rbd_device *rbd_dev, - u64 snapid, - u64 ofs, u64 len, - struct bio *bio, - struct rbd_req_coll *coll, - int coll_index) -{ - return rbd_do_op(rq, rbd_dev, NULL, - snapid, - CEPH_OSD_OP_READ, - CEPH_OSD_FLAG_READ, - 2, - ofs, len, bio, coll, coll_index); +static struct rbd_img_request *rbd_img_request_create( + struct rbd_device *rbd_dev, + u64 offset, u64 length, + bool write_request) +{ + struct rbd_img_request *img_request; + + img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC); + if (!img_request) + return NULL; + + if (write_request) { + down_read(&rbd_dev->header_rwsem); + ceph_get_snap_context(rbd_dev->header.snapc); + up_read(&rbd_dev->header_rwsem); + } + + img_request->rq = NULL; + img_request->rbd_dev = rbd_dev; + img_request->offset = offset; + img_request->length = length; + img_request->flags = 0; + if (write_request) { + img_request_write_set(img_request); + img_request->snapc = rbd_dev->header.snapc; + } else { + img_request->snap_id = rbd_dev->spec->snap_id; + } + if (rbd_dev_parent_get(rbd_dev)) + img_request_layered_set(img_request); + spin_lock_init(&img_request->completion_lock); + img_request->next_completion = 0; + img_request->callback = NULL; + img_request->result = 0; + img_request->obj_request_count = 0; + INIT_LIST_HEAD(&img_request->obj_requests); + kref_init(&img_request->kref); + + dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, + write_request ? "write" : "read", offset, length, + img_request); + + return img_request; +} + +static void rbd_img_request_destroy(struct kref *kref) +{ + struct rbd_img_request *img_request; + struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; + + img_request = container_of(kref, struct rbd_img_request, kref); + + dout("%s: img %p\n", __func__, img_request); + + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_img_obj_request_del(img_request, obj_request); + rbd_assert(img_request->obj_request_count == 0); + + if (img_request_layered_test(img_request)) { + img_request_layered_clear(img_request); + rbd_dev_parent_put(img_request->rbd_dev); + } + + if (img_request_write_test(img_request)) + ceph_put_snap_context(img_request->snapc); + + kmem_cache_free(rbd_img_request_cache, img_request); +} + +static struct rbd_img_request *rbd_parent_request_create( + struct rbd_obj_request *obj_request, + u64 img_offset, u64 length) +{ + struct rbd_img_request *parent_request; + struct rbd_device *rbd_dev; + + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + + parent_request = rbd_img_request_create(rbd_dev->parent, + img_offset, length, false); + if (!parent_request) + return NULL; + + img_request_child_set(parent_request); + rbd_obj_request_get(obj_request); + parent_request->obj_request = obj_request; + + return parent_request; +} + +static void rbd_parent_request_destroy(struct kref *kref) +{ + struct rbd_img_request *parent_request; + struct rbd_obj_request *orig_request; + + parent_request = container_of(kref, struct rbd_img_request, kref); + orig_request = parent_request->obj_request; + + parent_request->obj_request = NULL; + rbd_obj_request_put(orig_request); + img_request_child_clear(parent_request); + + rbd_img_request_destroy(kref); +} + +static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + unsigned int xferred; + int result; + bool more; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + rbd_assert(obj_request->xferred <= (u64)UINT_MAX); + xferred = (unsigned int)obj_request->xferred; + result = obj_request->result; + if (result) { + struct rbd_device *rbd_dev = img_request->rbd_dev; + + rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", + img_request_write_test(img_request) ? "write" : "read", + obj_request->length, obj_request->img_offset, + obj_request->offset); + rbd_warn(rbd_dev, " result %d xferred %x\n", + result, xferred); + if (!img_request->result) + img_request->result = result; + } + + /* Image object requests don't own their page array */ + + if (obj_request->type == OBJ_REQUEST_PAGES) { + obj_request->pages = NULL; + obj_request->page_count = 0; + } + + if (img_request_child_test(img_request)) { + rbd_assert(img_request->obj_request != NULL); + more = obj_request->which < img_request->obj_request_count - 1; + } else { + rbd_assert(img_request->rq != NULL); + more = blk_end_request(img_request->rq, result, xferred); + } + + return more; +} + +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + u32 which = obj_request->which; + bool more = true; + + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + + dout("%s: img %p obj %p\n", __func__, img_request, obj_request); + rbd_assert(img_request != NULL); + rbd_assert(img_request->obj_request_count > 0); + rbd_assert(which != BAD_WHICH); + rbd_assert(which < img_request->obj_request_count); + + spin_lock_irq(&img_request->completion_lock); + if (which != img_request->next_completion) + goto out; + + for_each_obj_request_from(img_request, obj_request) { + rbd_assert(more); + rbd_assert(which < img_request->obj_request_count); + + if (!obj_request_done_test(obj_request)) + break; + more = rbd_img_obj_end_request(obj_request); + which++; + } + + rbd_assert(more ^ (which == img_request->obj_request_count)); + img_request->next_completion = which; +out: + spin_unlock_irq(&img_request->completion_lock); + rbd_img_request_put(img_request); + + if (!more) + rbd_img_request_complete(img_request); } /* - * Request sync osd read + * Split up an image request into one or more object requests, each + * to a different object. The "type" parameter indicates whether + * "data_desc" is the pointer to the head of a list of bio + * structures, or the base of a page array. In either case this + * function assumes data_desc describes memory sufficient to hold + * all data described by the image request. */ -static int rbd_req_sync_read(struct rbd_device *dev, - struct ceph_snap_context *snapc, - u64 snapid, - const char *obj, - u64 ofs, u64 len, - char *buf, - u64 *ver) -{ - return rbd_req_sync_op(dev, NULL, - snapid, - CEPH_OSD_OP_READ, - CEPH_OSD_FLAG_READ, - NULL, - 1, obj, ofs, len, buf, NULL, ver); +static int rbd_img_request_fill(struct rbd_img_request *img_request, + enum obj_request_type type, + void *data_desc) +{ + struct rbd_device *rbd_dev = img_request->rbd_dev; + struct rbd_obj_request *obj_request = NULL; + struct rbd_obj_request *next_obj_request; + bool write_request = img_request_write_test(img_request); + struct bio *bio_list = NULL; + unsigned int bio_offset = 0; + struct page **pages = NULL; + u64 img_offset; + u64 resid; + u16 opcode; + + dout("%s: img %p type %d data_desc %p\n", __func__, img_request, + (int)type, data_desc); + + opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ; + img_offset = img_request->offset; + resid = img_request->length; + rbd_assert(resid > 0); + + if (type == OBJ_REQUEST_BIO) { + bio_list = data_desc; + rbd_assert(img_offset == + bio_list->bi_iter.bi_sector << SECTOR_SHIFT); + } else { + rbd_assert(type == OBJ_REQUEST_PAGES); + pages = data_desc; + } + + while (resid) { + struct ceph_osd_request *osd_req; + const char *object_name; + u64 offset; + u64 length; + unsigned int which = 0; + + object_name = rbd_segment_name(rbd_dev, img_offset); + if (!object_name) + goto out_unwind; + offset = rbd_segment_offset(rbd_dev, img_offset); + length = rbd_segment_length(rbd_dev, img_offset, resid); + obj_request = rbd_obj_request_create(object_name, + offset, length, type); + /* object request has its own copy of the object name */ + rbd_segment_name_free(object_name); + if (!obj_request) + goto out_unwind; + + /* + * set obj_request->img_request before creating the + * osd_request so that it gets the right snapc + */ + rbd_img_obj_request_add(img_request, obj_request); + + if (type == OBJ_REQUEST_BIO) { + unsigned int clone_size; + + rbd_assert(length <= (u64)UINT_MAX); + clone_size = (unsigned int)length; + obj_request->bio_list = + bio_chain_clone_range(&bio_list, + &bio_offset, + clone_size, + GFP_ATOMIC); + if (!obj_request->bio_list) + goto out_unwind; + } else { + unsigned int page_count; + + obj_request->pages = pages; + page_count = (u32)calc_pages_for(offset, length); + obj_request->page_count = page_count; + if ((offset + length) & ~PAGE_MASK) + page_count--; /* more on last page */ + pages += page_count; + } + + osd_req = rbd_osd_req_create(rbd_dev, write_request, + (write_request ? 2 : 1), + obj_request); + if (!osd_req) + goto out_unwind; + obj_request->osd_req = osd_req; + obj_request->callback = rbd_img_obj_callback; + rbd_img_request_get(img_request); + + if (write_request) { + osd_req_op_alloc_hint_init(osd_req, which, + rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + which++; + } + + osd_req_op_extent_init(osd_req, which, opcode, offset, length, + 0, 0); + if (type == OBJ_REQUEST_BIO) + osd_req_op_extent_osd_data_bio(osd_req, which, + obj_request->bio_list, length); + else + osd_req_op_extent_osd_data_pages(osd_req, which, + obj_request->pages, length, + offset & ~PAGE_MASK, false, false); + + if (write_request) + rbd_osd_req_format_write(obj_request); + else + rbd_osd_req_format_read(obj_request); + + obj_request->img_offset = img_offset; + + img_offset += length; + resid -= length; + } + + return 0; + +out_unwind: + for_each_obj_request_safe(img_request, obj_request, next_obj_request) + rbd_img_obj_request_del(img_request, obj_request); + + return -ENOMEM; +} + +static void +rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_img_request *img_request; + struct rbd_device *rbd_dev; + struct page **pages; + u32 page_count; + + rbd_assert(obj_request->type == OBJ_REQUEST_BIO); + rbd_assert(obj_request_img_data_test(obj_request)); + img_request = obj_request->img_request; + rbd_assert(img_request); + + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev); + + pages = obj_request->copyup_pages; + rbd_assert(pages != NULL); + obj_request->copyup_pages = NULL; + page_count = obj_request->copyup_page_count; + rbd_assert(page_count); + obj_request->copyup_page_count = 0; + ceph_release_page_vector(pages, page_count); + + /* + * We want the transfer count to reflect the size of the + * original write request. There is no such thing as a + * successful short write, so if the request was successful + * we can just set it to the originally-requested length. + */ + if (!obj_request->result) + obj_request->xferred = obj_request->length; + + /* Finish up with the normal image object callback */ + + rbd_img_obj_callback(obj_request); +} + +static void +rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *orig_request; + struct ceph_osd_request *osd_req; + struct ceph_osd_client *osdc; + struct rbd_device *rbd_dev; + struct page **pages; + u32 page_count; + int img_result; + u64 parent_length; + u64 offset; + u64 length; + + rbd_assert(img_request_child_test(img_request)); + + /* First get what we need from the image request */ + + pages = img_request->copyup_pages; + rbd_assert(pages != NULL); + img_request->copyup_pages = NULL; + page_count = img_request->copyup_page_count; + rbd_assert(page_count); + img_request->copyup_page_count = 0; + + orig_request = img_request->obj_request; + rbd_assert(orig_request != NULL); + rbd_assert(obj_request_type_valid(orig_request->type)); + img_result = img_request->result; + parent_length = img_request->length; + rbd_assert(parent_length == img_request->xferred); + rbd_img_request_put(img_request); + + rbd_assert(orig_request->img_request); + rbd_dev = orig_request->img_request->rbd_dev; + rbd_assert(rbd_dev); + + /* + * If the overlap has become 0 (most likely because the + * image has been flattened) we need to free the pages + * and re-submit the original write request. + */ + if (!rbd_dev->parent_overlap) { + struct ceph_osd_client *osdc; + + ceph_release_page_vector(pages, page_count); + osdc = &rbd_dev->rbd_client->client->osdc; + img_result = rbd_obj_request_submit(osdc, orig_request); + if (!img_result) + return; + } + + if (img_result) + goto out_err; + + /* + * The original osd request is of no use to use any more. + * We need a new one that can hold the three ops in a copyup + * request. Allocate the new copyup osd request for the + * original request, and release the old one. + */ + img_result = -ENOMEM; + osd_req = rbd_osd_req_create_copyup(orig_request); + if (!osd_req) + goto out_err; + rbd_osd_req_destroy(orig_request->osd_req); + orig_request->osd_req = osd_req; + orig_request->copyup_pages = pages; + orig_request->copyup_page_count = page_count; + + /* Initialize the copyup op */ + + osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); + osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0, + false, false); + + /* Then the hint op */ + + osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header), + rbd_obj_bytes(&rbd_dev->header)); + + /* And the original write request op */ + + offset = orig_request->offset; + length = orig_request->length; + osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE, + offset, length, 0, 0); + if (orig_request->type == OBJ_REQUEST_BIO) + osd_req_op_extent_osd_data_bio(osd_req, 2, + orig_request->bio_list, length); + else + osd_req_op_extent_osd_data_pages(osd_req, 2, + orig_request->pages, length, + offset & ~PAGE_MASK, false, false); + + rbd_osd_req_format_write(orig_request); + + /* All set, send it off. */ + + orig_request->callback = rbd_img_obj_copyup_callback; + osdc = &rbd_dev->rbd_client->client->osdc; + img_result = rbd_obj_request_submit(osdc, orig_request); + if (!img_result) + return; +out_err: + /* Record the error code and complete the request */ + + orig_request->result = img_result; + orig_request->xferred = 0; + obj_request_done_set(orig_request); + rbd_obj_request_complete(orig_request); } /* - * Request sync osd watch + * Read from the parent image the range of data that covers the + * entire target of the given object request. This is used for + * satisfying a layered image write request when the target of an + * object request from the image request does not exist. + * + * A page array big enough to hold the returned data is allocated + * and supplied to rbd_img_request_fill() as the "data descriptor." + * When the read completes, this page array will be transferred to + * the original object request for the copyup operation. + * + * If an error occurs, record it as the result of the original + * object request and mark it done so it gets completed. */ -static int rbd_req_sync_notify_ack(struct rbd_device *dev, - u64 ver, - u64 notify_id, - const char *obj) +static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { - struct ceph_osd_req_op *ops; + struct rbd_img_request *img_request = NULL; + struct rbd_img_request *parent_request = NULL; + struct rbd_device *rbd_dev; + u64 img_offset; + u64 length; struct page **pages = NULL; + u32 page_count; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request_type_valid(obj_request->type)); + + img_request = obj_request->img_request; + rbd_assert(img_request != NULL); + rbd_dev = img_request->rbd_dev; + rbd_assert(rbd_dev->parent != NULL); + + /* + * Determine the byte range covered by the object in the + * child image to which the original request was to be sent. + */ + img_offset = obj_request->img_offset - obj_request->offset; + length = (u64)1 << rbd_dev->header.obj_order; + + /* + * There is no defined parent data beyond the parent + * overlap, so limit what we read at that boundary if + * necessary. + */ + if (img_offset + length > rbd_dev->parent_overlap) { + rbd_assert(img_offset < rbd_dev->parent_overlap); + length = rbd_dev->parent_overlap - img_offset; + } + + /* + * Allocate a page array big enough to receive the data read + * from the parent. + */ + page_count = (u32)calc_pages_for(0, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) { + result = PTR_ERR(pages); + pages = NULL; + goto out_err; + } + + result = -ENOMEM; + parent_request = rbd_parent_request_create(obj_request, + img_offset, length); + if (!parent_request) + goto out_err; + + result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); + if (result) + goto out_err; + parent_request->copyup_pages = pages; + parent_request->copyup_page_count = page_count; + + parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); + if (!result) + return 0; + + parent_request->copyup_pages = NULL; + parent_request->copyup_page_count = 0; + parent_request->obj_request = NULL; + rbd_obj_request_put(obj_request); +out_err: + if (pages) + ceph_release_page_vector(pages, page_count); + if (parent_request) + rbd_img_request_put(parent_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); + + return result; +} + +static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *orig_request; + struct rbd_device *rbd_dev; + int result; + + rbd_assert(!obj_request_img_data_test(obj_request)); + + /* + * All we need from the object request is the original + * request and the result of the STAT op. Grab those, then + * we're done with the request. + */ + orig_request = obj_request->obj_request; + obj_request->obj_request = NULL; + rbd_obj_request_put(orig_request); + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + + result = obj_request->result; + obj_request->result = 0; + + dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, + obj_request, orig_request, result, + obj_request->xferred, obj_request->length); + rbd_obj_request_put(obj_request); + + /* + * If the overlap has become 0 (most likely because the + * image has been flattened) we need to free the pages + * and re-submit the original write request. + */ + rbd_dev = orig_request->img_request->rbd_dev; + if (!rbd_dev->parent_overlap) { + struct ceph_osd_client *osdc; + + osdc = &rbd_dev->rbd_client->client->osdc; + result = rbd_obj_request_submit(osdc, orig_request); + if (!result) + return; + } + + /* + * Our only purpose here is to determine whether the object + * exists, and we don't want to treat the non-existence as + * an error. If something else comes back, transfer the + * error to the original request and complete it now. + */ + if (!result) { + obj_request_existence_set(orig_request, true); + } else if (result == -ENOENT) { + obj_request_existence_set(orig_request, false); + } else if (result) { + orig_request->result = result; + goto out; + } + + /* + * Resubmit the original request now that we have recorded + * whether the target object exists. + */ + orig_request->result = rbd_img_obj_request_submit(orig_request); +out: + if (orig_request->result) + rbd_obj_request_complete(orig_request); +} + +static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *stat_request; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; int ret; - ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); - if (ret < 0) - return ret; + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); + page_count = (u32)calc_pages_for(0, size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); - ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); - ops[0].watch.cookie = notify_id; - ops[0].watch.flag = 0; + ret = -ENOMEM; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + goto out; + + rbd_obj_request_get(obj_request); + stat_request->obj_request = obj_request; + stat_request->pages = pages; + stat_request->page_count = page_count; - ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, - obj, 0, 0, NULL, - pages, 0, - CEPH_OSD_FLAG_READ, - ops, - 1, - NULL, 0, - rbd_simple_req_cb, 0, NULL); + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + stat_request); + if (!stat_request->osd_req) + goto out; + stat_request->callback = rbd_img_obj_exists_callback; + + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); + rbd_osd_req_format_read(stat_request); + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, stat_request); +out: + if (ret) + rbd_obj_request_put(obj_request); - rbd_destroy_ops(ops); return ret; } -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { - struct rbd_device *dev = (struct rbd_device *)data; - int rc; + struct rbd_img_request *img_request; + struct rbd_device *rbd_dev; + bool known; - if (!dev) - return; + rbd_assert(obj_request_img_data_test(obj_request)); - dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, - notify_id, (int)opcode); - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_refresh_header(dev); - mutex_unlock(&ctl_mutex); - if (rc) - pr_warning(RBD_DRV_NAME "%d got notification but failed to " - " update snaps: %d\n", dev->major, rc); + img_request = obj_request->img_request; + rbd_assert(img_request); + rbd_dev = img_request->rbd_dev; + + /* + * Only writes to layered images need special handling. + * Reads and non-layered writes are simple object requests. + * Layered writes that start beyond the end of the overlap + * with the parent have no parent data, so they too are + * simple object requests. Finally, if the target object is + * known to already exist, its parent data has already been + * copied, so a write to the object can also be handled as a + * simple object request. + */ + if (!img_request_write_test(img_request) || + !img_request_layered_test(img_request) || + !obj_request_overlaps_parent(obj_request) || + ((known = obj_request_known_test(obj_request)) && + obj_request_exists_test(obj_request))) { - rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + + rbd_dev = obj_request->img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + + return rbd_obj_request_submit(osdc, obj_request); + } + + /* + * It's a layered write. The target object might exist but + * we may not know that yet. If we know it doesn't exist, + * start by reading the data for the full target object from + * the parent so we can use it for a copyup to the target. + */ + if (known) + return rbd_img_obj_parent_read_full(obj_request); + + /* We don't know whether the target exists. Go find out. */ + + return rbd_img_obj_exists_submit(obj_request); } -/* - * Request sync osd watch - */ -static int rbd_req_sync_watch(struct rbd_device *dev, - const char *obj, - u64 ver) +static int rbd_img_request_submit(struct rbd_img_request *img_request) { - struct ceph_osd_req_op *ops; - struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + struct rbd_obj_request *next_obj_request; - int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); - if (ret < 0) - return ret; + dout("%s: img %p\n", __func__, img_request); + for_each_obj_request_safe(img_request, obj_request, next_obj_request) { + int ret; - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, - (void *)dev, &dev->watch_event); - if (ret < 0) - goto fail; + ret = rbd_img_obj_request_submit(obj_request); + if (ret) + return ret; + } + + return 0; +} - ops[0].watch.ver = cpu_to_le64(ver); - ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); - ops[0].watch.flag = 1; +static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) +{ + struct rbd_obj_request *obj_request; + struct rbd_device *rbd_dev; + u64 obj_end; + u64 img_xferred; + int img_result; - ret = rbd_req_sync_op(dev, NULL, - CEPH_NOSNAP, - 0, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - 1, obj, 0, 0, NULL, - &dev->watch_request, NULL); + rbd_assert(img_request_child_test(img_request)); - if (ret < 0) - goto fail_event; + /* First get what we need from the image request and release it */ - rbd_destroy_ops(ops); - return 0; + obj_request = img_request->obj_request; + img_xferred = img_request->xferred; + img_result = img_request->result; + rbd_img_request_put(img_request); -fail_event: - ceph_osdc_cancel_event(dev->watch_event); - dev->watch_event = NULL; -fail: - rbd_destroy_ops(ops); - return ret; + /* + * If the overlap has become 0 (most likely because the + * image has been flattened) we need to re-submit the + * original request. + */ + rbd_assert(obj_request); + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + if (!rbd_dev->parent_overlap) { + struct ceph_osd_client *osdc; + + osdc = &rbd_dev->rbd_client->client->osdc; + img_result = rbd_obj_request_submit(osdc, obj_request); + if (!img_result) + return; + } + + obj_request->result = img_result; + if (obj_request->result) + goto out; + + /* + * We need to zero anything beyond the parent overlap + * boundary. Since rbd_img_obj_request_read_callback() + * will zero anything beyond the end of a short read, an + * easy way to do this is to pretend the data from the + * parent came up short--ending at the overlap boundary. + */ + rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); + obj_end = obj_request->img_offset + obj_request->length; + if (obj_end > rbd_dev->parent_overlap) { + u64 xferred = 0; + + if (obj_request->img_offset < rbd_dev->parent_overlap) + xferred = rbd_dev->parent_overlap - + obj_request->img_offset; + + obj_request->xferred = min(img_xferred, xferred); + } else { + obj_request->xferred = img_xferred; + } +out: + rbd_img_obj_request_read_callback(obj_request); + rbd_obj_request_complete(obj_request); } -/* - * Request sync osd unwatch - */ -static int rbd_req_sync_unwatch(struct rbd_device *dev, - const char *obj) +static void rbd_img_parent_read(struct rbd_obj_request *obj_request) { - struct ceph_osd_req_op *ops; + struct rbd_img_request *img_request; + int result; + + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request->img_request != NULL); + rbd_assert(obj_request->result == (s32) -ENOENT); + rbd_assert(obj_request_type_valid(obj_request->type)); + + /* rbd_read_finish(obj_request, obj_request->length); */ + img_request = rbd_parent_request_create(obj_request, + obj_request->img_offset, + obj_request->length); + result = -ENOMEM; + if (!img_request) + goto out_err; - int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); - if (ret < 0) - return ret; + if (obj_request->type == OBJ_REQUEST_BIO) + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + obj_request->bio_list); + else + result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES, + obj_request->pages); + if (result) + goto out_err; + + img_request->callback = rbd_img_parent_read_callback; + result = rbd_img_request_submit(img_request); + if (result) + goto out_err; + + return; +out_err: + if (img_request) + rbd_img_request_put(img_request); + obj_request->result = result; + obj_request->xferred = 0; + obj_request_done_set(obj_request); +} + +static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) +{ + struct rbd_obj_request *obj_request; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int ret; - ops[0].watch.ver = 0; - ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); - ops[0].watch.flag = 0; + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) + return -ENOMEM; - ret = rbd_req_sync_op(dev, NULL, - CEPH_NOSNAP, - 0, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - 1, obj, 0, 0, NULL, NULL, NULL); + ret = -ENOMEM; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); + if (!obj_request->osd_req) + goto out; + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, + notify_id, 0, 0); + rbd_osd_req_format_read(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out; + ret = rbd_obj_request_wait(obj_request); +out: + rbd_obj_request_put(obj_request); - rbd_destroy_ops(ops); - ceph_osdc_cancel_event(dev->watch_event); - dev->watch_event = NULL; return ret; } -struct rbd_notify_info { - struct rbd_device *dev; -}; - -static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) { - struct rbd_device *dev = (struct rbd_device *)data; - if (!dev) + struct rbd_device *rbd_dev = (struct rbd_device *)data; + int ret; + + if (!rbd_dev) return; - dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, - notify_id, (int)opcode); + dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, + rbd_dev->header_name, (unsigned long long)notify_id, + (unsigned int)opcode); + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "header refresh error (%d)\n", ret); + + rbd_obj_notify_ack_sync(rbd_dev, notify_id); } /* - * Request sync osd notify + * Initiate a watch request, synchronously. */ -static int rbd_req_sync_notify(struct rbd_device *dev, - const char *obj) -{ - struct ceph_osd_req_op *ops; - struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc; - struct ceph_osd_event *event; - struct rbd_notify_info info; - int payload_len = sizeof(u32) + sizeof(u32); +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; int ret; - ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); + rbd_assert(!rbd_dev->watch_event); + rbd_assert(!rbd_dev->watch_request); + + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, + &rbd_dev->watch_event); if (ret < 0) return ret; - info.dev = dev; + rbd_assert(rbd_dev->watch_event); - ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, - (void *)&info, &event); - if (ret < 0) - goto fail; - - ops[0].watch.ver = 1; - ops[0].watch.flag = 1; - ops[0].watch.cookie = event->cookie; - ops[0].watch.prot_ver = RADOS_NOTIFY_VER; - ops[0].watch.timeout = 12; - - ret = rbd_req_sync_op(dev, NULL, - CEPH_NOSNAP, - 0, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - 1, obj, 0, 0, NULL, NULL, NULL); - if (ret < 0) - goto fail_event; + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; + goto out_cancel; + } + + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; + } + + ceph_osdc_set_request_linger(osdc, obj_request->osd_req); + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, 0, 1); + rbd_osd_req_format_write(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out_linger; + + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out_linger; + + ret = obj_request->result; + if (ret) + goto out_linger; + + /* + * A watch request is set to linger, so the underlying osd + * request won't go away until we unregister it. We retain + * a pointer to the object request during that time (in + * rbd_dev->watch_request), so we'll keep a reference to + * it. We'll drop that reference (below) after we've + * unregistered it. + */ + rbd_dev->watch_request = obj_request; - ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); - dout("ceph_osdc_wait_event returned %d\n", ret); - rbd_destroy_ops(ops); return 0; -fail_event: - ceph_osdc_cancel_event(event); -fail: - rbd_destroy_ops(ops); +out_linger: + ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req); +out_put: + rbd_obj_request_put(obj_request); +out_cancel: + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_dev->watch_event = NULL; + return ret; } /* - * Request sync osd read + * Tear down a watch request, synchronously. */ -static int rbd_req_sync_exec(struct rbd_device *dev, - const char *obj, - const char *cls, - const char *method, - const char *data, - int len, - u64 *ver) -{ - struct ceph_osd_req_op *ops; - int cls_len = strlen(cls); - int method_len = strlen(method); - int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL, - cls_len + method_len + len); - if (ret < 0) - return ret; +static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + int ret; - ops[0].cls.class_name = cls; - ops[0].cls.class_len = (__u8)cls_len; - ops[0].cls.method_name = method; - ops[0].cls.method_len = (__u8)method_len; - ops[0].cls.argc = 0; - ops[0].cls.indata = data; - ops[0].cls.indata_len = len; + rbd_assert(rbd_dev->watch_event); + rbd_assert(rbd_dev->watch_request); - ret = rbd_req_sync_op(dev, NULL, - CEPH_NOSNAP, - 0, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - 1, obj, 0, 0, NULL, NULL, ver); + obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + OBJ_REQUEST_NODATA); + if (!obj_request) { + ret = -ENOMEM; + goto out_cancel; + } + + obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, + obj_request); + if (!obj_request->osd_req) { + ret = -ENOMEM; + goto out_put; + } + + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, + rbd_dev->watch_event->cookie, 0, 0); + rbd_osd_req_format_write(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out_put; - rbd_destroy_ops(ops); + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out_put; + + ret = obj_request->result; + if (ret) + goto out_put; + + /* We have successfully torn down the watch request */ + + ceph_osdc_unregister_linger_request(osdc, + rbd_dev->watch_request->osd_req); + rbd_obj_request_put(rbd_dev->watch_request); + rbd_dev->watch_request = NULL; + +out_put: + rbd_obj_request_put(obj_request); +out_cancel: + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_dev->watch_event = NULL; - dout("cls_exec returned %d\n", ret); return ret; } -static struct rbd_req_coll *rbd_alloc_coll(int num_reqs) +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { - struct rbd_req_coll *coll = - kzalloc(sizeof(struct rbd_req_coll) + - sizeof(struct rbd_req_status) * num_reqs, - GFP_ATOMIC); + int ret; - if (!coll) - return NULL; - coll->total = num_reqs; - kref_init(&coll->kref); - return coll; + ret = __rbd_dev_header_unwatch_sync(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", + ret); + } } /* - * block device queue callback + * Synchronous osd object method call. Returns the number of bytes + * returned in the outbound buffer, or a negative error code. */ -static void rbd_rq_fn(struct request_queue *q) +static int rbd_obj_method_sync(struct rbd_device *rbd_dev, + const char *object_name, + const char *class_name, + const char *method_name, + const void *outbound, + size_t outbound_size, + void *inbound, + size_t inbound_size) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + struct page **pages; + u32 page_count; + int ret; + + /* + * Method calls are ultimately read operations. The result + * should placed into the inbound buffer provided. They + * also supply outbound data--parameters for the object + * method. Currently if this is present it will be a + * snapshot id. + */ + page_count = (u32)calc_pages_for(0, inbound_size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = -ENOMEM; + obj_request = rbd_obj_request_create(object_name, 0, inbound_size, + OBJ_REQUEST_PAGES); + if (!obj_request) + goto out; + + obj_request->pages = pages; + obj_request->page_count = page_count; + + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); + if (!obj_request->osd_req) + goto out; + + osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, + class_name, method_name); + if (outbound_size) { + struct ceph_pagelist *pagelist; + + pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + if (!pagelist) + goto out; + + ceph_pagelist_init(pagelist); + ceph_pagelist_append(pagelist, outbound, outbound_size); + osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, + pagelist); + } + osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, + obj_request->pages, inbound_size, + 0, false, false); + rbd_osd_req_format_read(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out; + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out; + + ret = obj_request->result; + if (ret < 0) + goto out; + + rbd_assert(obj_request->xferred < (u64)INT_MAX); + ret = (int)obj_request->xferred; + ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); +out: + if (obj_request) + rbd_obj_request_put(obj_request); + else + ceph_release_page_vector(pages, page_count); + + return ret; +} + +static void rbd_request_fn(struct request_queue *q) + __releases(q->queue_lock) __acquires(q->queue_lock) { struct rbd_device *rbd_dev = q->queuedata; struct request *rq; - struct bio_pair *bp = NULL; + int result; while ((rq = blk_fetch_request(q))) { - struct bio *bio; - struct bio *rq_bio, *next_bio = NULL; - bool do_write; - int size, op_size = 0; - u64 ofs; - int num_segs, cur_seg = 0; - struct rbd_req_coll *coll; - - /* peek at request from block layer */ - if (!rq) - break; + bool write_request = rq_data_dir(rq) == WRITE; + struct rbd_img_request *img_request; + u64 offset; + u64 length; - dout("fetched request\n"); + /* Ignore any non-FS requests that filter through. */ - /* filter out block requests we don't understand */ - if ((rq->cmd_type != REQ_TYPE_FS)) { + if (rq->cmd_type != REQ_TYPE_FS) { + dout("%s: non-fs request type %d\n", __func__, + (int) rq->cmd_type); __blk_end_request_all(rq, 0); continue; } - /* deduce our operation (read, write) */ - do_write = (rq_data_dir(rq) == WRITE); + /* Ignore/skip any zero-length requests */ - size = blk_rq_bytes(rq); - ofs = blk_rq_pos(rq) * SECTOR_SIZE; - rq_bio = rq->bio; - if (do_write && rbd_dev->read_only) { - __blk_end_request_all(rq, -EROFS); + offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; + length = (u64) blk_rq_bytes(rq); + + if (!length) { + dout("%s: zero-length request\n", __func__); + __blk_end_request_all(rq, 0); continue; } spin_unlock_irq(q->queue_lock); - dout("%s 0x%x bytes at 0x%llx\n", - do_write ? "write" : "read", - size, blk_rq_pos(rq) * SECTOR_SIZE); + /* Disallow writes to a read-only device */ - num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size); - coll = rbd_alloc_coll(num_segs); - if (!coll) { - spin_lock_irq(q->queue_lock); - __blk_end_request_all(rq, -ENOMEM); - continue; + if (write_request) { + result = -EROFS; + if (rbd_dev->mapping.read_only) + goto end_request; + rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); } - do { - /* a bio clone to be passed down to OSD req */ - dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt); - op_size = rbd_get_segment(&rbd_dev->header, - rbd_dev->header.block_name, - ofs, size, - NULL, NULL); - kref_get(&coll->kref); - bio = bio_chain_clone(&rq_bio, &next_bio, &bp, - op_size, GFP_ATOMIC); - if (!bio) { - rbd_coll_end_req_index(rq, coll, cur_seg, - -ENOMEM, op_size); - goto next_seg; - } + /* + * Quit early if the mapped snapshot no longer + * exists. It's still possible the snapshot will + * have disappeared by the time our request arrives + * at the osd, but there's no sense in sending it if + * we already know. + */ + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { + dout("request for non-existent snapshot"); + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); + result = -ENXIO; + goto end_request; + } + result = -EINVAL; + if (offset && length > U64_MAX - offset + 1) { + rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", + offset, length); + goto end_request; /* Shouldn't happen */ + } - /* init OSD command: write or read */ - if (do_write) - rbd_req_write(rq, rbd_dev, - rbd_dev->header.snapc, - ofs, - op_size, bio, - coll, cur_seg); - else - rbd_req_read(rq, rbd_dev, - rbd_dev->snap_id, - ofs, - op_size, bio, - coll, cur_seg); - -next_seg: - size -= op_size; - ofs += op_size; - - cur_seg++; - rq_bio = next_bio; - } while (size > 0); - kref_put(&coll->kref, rbd_coll_release); - - if (bp) - bio_pair_release(bp); + result = -EIO; + if (offset + length > rbd_dev->mapping.size) { + rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", + offset, length, rbd_dev->mapping.size); + goto end_request; + } + + result = -ENOMEM; + img_request = rbd_img_request_create(rbd_dev, offset, length, + write_request); + if (!img_request) + goto end_request; + + img_request->rq = rq; + + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, + rq->bio); + if (!result) + result = rbd_img_request_submit(img_request); + if (result) + rbd_img_request_put(img_request); +end_request: spin_lock_irq(q->queue_lock); + if (result < 0) { + rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", + write_request ? "write" : "read", + length, offset, result); + + __blk_end_request_all(rq, result); + } } } /* * a queue callback. Makes sure that we don't create a bio that spans across * multiple osd objects. One exception would be with a single page bios, - * which we handle later at bio_chain_clone + * which we handle later at bio_chain_clone_range() */ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, struct bio_vec *bvec) { struct rbd_device *rbd_dev = q->queuedata; - unsigned int chunk_sectors; - sector_t sector; - unsigned int bio_sectors; - int max; + sector_t sector_offset; + sector_t sectors_per_obj; + sector_t obj_sector_offset; + int ret; - chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); - sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev); - bio_sectors = bmd->bi_size >> SECTOR_SHIFT; + /* + * Find how far into its rbd object the partition-relative + * bio start sector is to offset relative to the enclosing + * device. + */ + sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; + sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); + obj_sector_offset = sector_offset & (sectors_per_obj - 1); + + /* + * Compute the number of bytes from that offset to the end + * of the object. Account for what's already used by the bio. + */ + ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; + if (ret > bmd->bi_size) + ret -= bmd->bi_size; + else + ret = 0; - max = (chunk_sectors - ((sector & (chunk_sectors - 1)) - + bio_sectors)) << SECTOR_SHIFT; - if (max < 0) - max = 0; /* bio_add cannot handle a negative return */ - if (max <= bvec->bv_len && bio_sectors == 0) - return bvec->bv_len; - return max; + /* + * Don't send back more than was asked for. And if the bio + * was empty, let the whole thing through because: "Note + * that a block device *must* allow a single page to be + * added to an empty bio." + */ + rbd_assert(bvec->bv_len <= PAGE_SIZE); + if (ret > (int) bvec->bv_len || !bmd->bi_size) + ret = (int) bvec->bv_len; + + return ret; } static void rbd_free_disk(struct rbd_device *rbd_dev) @@ -1559,177 +3334,206 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) if (!disk) return; - rbd_header_free(&rbd_dev->header); - - if (disk->flags & GENHD_FL_UP) + rbd_dev->disk = NULL; + if (disk->flags & GENHD_FL_UP) { del_gendisk(disk); - if (disk->queue) - blk_cleanup_queue(disk->queue); + if (disk->queue) + blk_cleanup_queue(disk->queue); + } put_disk(disk); } -/* - * reload the ondisk the header - */ -static int rbd_read_header(struct rbd_device *rbd_dev, - struct rbd_image_header *header) +static int rbd_obj_read_sync(struct rbd_device *rbd_dev, + const char *object_name, + u64 offset, u64 length, void *buf) + { - ssize_t rc; - struct rbd_image_header_ondisk *dh; - u32 snap_count = 0; - u64 ver; - size_t len; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_obj_request *obj_request; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; - /* - * First reads the fixed-size header to determine the number - * of snapshots, then re-reads it, along with all snapshot - * records as well as their stored names. - */ - len = sizeof (*dh); - while (1) { - dh = kmalloc(len, GFP_KERNEL); - if (!dh) - return -ENOMEM; + page_count = (u32) calc_pages_for(offset, length); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + ret = PTR_ERR(pages); - rc = rbd_req_sync_read(rbd_dev, - NULL, CEPH_NOSNAP, - rbd_dev->obj_md_name, - 0, len, - (char *)dh, &ver); - if (rc < 0) - goto out_dh; - - rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL); - if (rc < 0) { - if (rc == -ENXIO) - pr_warning("unrecognized header format" - " for image %s", rbd_dev->obj); - goto out_dh; - } + ret = -ENOMEM; + obj_request = rbd_obj_request_create(object_name, offset, length, + OBJ_REQUEST_PAGES); + if (!obj_request) + goto out; - if (snap_count == header->total_snaps) - break; + obj_request->pages = pages; + obj_request->page_count = page_count; - snap_count = header->total_snaps; - len = sizeof (*dh) + - snap_count * sizeof(struct rbd_image_snap_ondisk) + - header->snap_names_len; + obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1, + obj_request); + if (!obj_request->osd_req) + goto out; - rbd_header_free(header); - kfree(dh); - } - header->obj_version = ver; + osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, + offset, length, 0, 0); + osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, + obj_request->pages, + obj_request->length, + obj_request->offset & ~PAGE_MASK, + false, false); + rbd_osd_req_format_read(obj_request); + + ret = rbd_obj_request_submit(osdc, obj_request); + if (ret) + goto out; + ret = rbd_obj_request_wait(obj_request); + if (ret) + goto out; -out_dh: - kfree(dh); - return rc; + ret = obj_request->result; + if (ret < 0) + goto out; + + rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); + size = (size_t) obj_request->xferred; + ceph_copy_from_page_vector(pages, buf, 0, size); + rbd_assert(size <= (size_t)INT_MAX); + ret = (int)size; +out: + if (obj_request) + rbd_obj_request_put(obj_request); + else + ceph_release_page_vector(pages, page_count); + + return ret; } /* - * create a snapshot + * Read the complete header for the given rbd device. On successful + * return, the rbd_dev->header field will contain up-to-date + * information about the image. */ -static int rbd_header_add_snap(struct rbd_device *dev, - const char *snap_name, - gfp_t gfp_flags) +static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) { - int name_len = strlen(snap_name); - u64 new_snapid; + struct rbd_image_header_ondisk *ondisk = NULL; + u32 snap_count = 0; + u64 names_size = 0; + u32 want_count; int ret; - void *data, *p, *e; - u64 ver; - struct ceph_mon_client *monc; - /* we should create a snapshot only if we're pointing at the head */ - if (dev->snap_id != CEPH_NOSNAP) - return -EINVAL; + /* + * The complete header will include an array of its 64-bit + * snapshot ids, followed by the names of those snapshots as + * a contiguous block of NUL-terminated strings. Note that + * the number of snapshots could change by the time we read + * it in, in which case we re-read it. + */ + do { + size_t size; - monc = &dev->rbd_client->client->monc; - ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid); - dout("created snapid=%lld\n", new_snapid); - if (ret < 0) - return ret; + kfree(ondisk); - data = kmalloc(name_len + 16, gfp_flags); - if (!data) - return -ENOMEM; + size = sizeof (*ondisk); + size += snap_count * sizeof (struct rbd_image_snap_ondisk); + size += names_size; + ondisk = kmalloc(size, GFP_KERNEL); + if (!ondisk) + return -ENOMEM; - p = data; - e = data + name_len + 16; + ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, + 0, size, ondisk); + if (ret < 0) + goto out; + if ((size_t)ret < size) { + ret = -ENXIO; + rbd_warn(rbd_dev, "short header read (want %zd got %d)", + size, ret); + goto out; + } + if (!rbd_dev_ondisk_valid(ondisk)) { + ret = -ENXIO; + rbd_warn(rbd_dev, "invalid header"); + goto out; + } - ceph_encode_string_safe(&p, e, snap_name, name_len, bad); - ceph_encode_64_safe(&p, e, new_snapid, bad); + names_size = le64_to_cpu(ondisk->snap_names_len); + want_count = snap_count; + snap_count = le32_to_cpu(ondisk->snap_count); + } while (snap_count != want_count); - ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", - data, p - data, &ver); + ret = rbd_header_from_disk(rbd_dev, ondisk); +out: + kfree(ondisk); - kfree(data); + return ret; +} - if (ret < 0) - return ret; +/* + * Clear the rbd device's EXISTS flag if the snapshot it's mapped to + * has disappeared from the (just updated) snapshot context. + */ +static void rbd_exists_validate(struct rbd_device *rbd_dev) +{ + u64 snap_id; - down_write(&dev->header_rwsem); - dev->header.snapc->seq = new_snapid; - up_write(&dev->header_rwsem); + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) + return; - return 0; -bad: - return -ERANGE; + snap_id = rbd_dev->spec->snap_id; + if (snap_id == CEPH_NOSNAP) + return; + + if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX) + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); } -static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev) +static void rbd_dev_update_size(struct rbd_device *rbd_dev) { - struct rbd_snap *snap; + sector_t size; + bool removing; - while (!list_empty(&rbd_dev->snaps)) { - snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node); - __rbd_remove_snap_dev(rbd_dev, snap); + /* + * Don't hold the lock while doing disk operations, + * or lock ordering will conflict with the bdev mutex via: + * rbd_add() -> blkdev_get() -> rbd_open() + */ + spin_lock_irq(&rbd_dev->lock); + removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); + spin_unlock_irq(&rbd_dev->lock); + /* + * If the device is being removed, rbd_dev->disk has + * been destroyed, so don't try to update its size + */ + if (!removing) { + size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; + dout("setting size to %llu sectors", (unsigned long long)size); + set_capacity(rbd_dev->disk, size); + revalidate_disk(rbd_dev->disk); } } -/* - * only read the first part of the ondisk header, without the snaps info - */ -static int __rbd_refresh_header(struct rbd_device *rbd_dev) +static int rbd_dev_refresh(struct rbd_device *rbd_dev) { + u64 mapping_size; int ret; - struct rbd_image_header h; - u64 snap_seq; - int follow_seq = 0; - - ret = rbd_read_header(rbd_dev, &h); - if (ret < 0) - return ret; - - /* resized? */ - set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE); + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); down_write(&rbd_dev->header_rwsem); - - snap_seq = rbd_dev->header.snapc->seq; - if (rbd_dev->header.total_snaps && - rbd_dev->header.snapc->snaps[0] == snap_seq) - /* pointing at the head, will need to follow that - if head moves */ - follow_seq = 1; - - kfree(rbd_dev->header.snapc); - kfree(rbd_dev->header.snap_names); - kfree(rbd_dev->header.snap_sizes); - - rbd_dev->header.total_snaps = h.total_snaps; - rbd_dev->header.snapc = h.snapc; - rbd_dev->header.snap_names = h.snap_names; - rbd_dev->header.snap_names_len = h.snap_names_len; - rbd_dev->header.snap_sizes = h.snap_sizes; - if (follow_seq) - rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; + mapping_size = rbd_dev->mapping.size; + if (rbd_dev->image_format == 1) + ret = rbd_dev_v1_header_info(rbd_dev); else - rbd_dev->header.snapc->seq = snap_seq; + ret = rbd_dev_v2_header_info(rbd_dev); - ret = __rbd_init_snaps_header(rbd_dev); + /* If it's a mapped snapshot, validate its EXISTS flag */ + rbd_exists_validate(rbd_dev); up_write(&rbd_dev->header_rwsem); + if (mapping_size != rbd_dev->mapping.size) { + rbd_dev_update_size(rbd_dev); + } + return ret; } @@ -1737,40 +3541,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) { struct gendisk *disk; struct request_queue *q; - int rc; u64 segment_size; - u64 total_size = 0; - - /* contact OSD, request size info about the object being mapped */ - rc = rbd_read_header(rbd_dev, &rbd_dev->header); - if (rc) - return rc; - - /* no need to lock here, as rbd_dev is not registered yet */ - rc = __rbd_init_snaps_header(rbd_dev); - if (rc) - return rc; - - rc = rbd_header_set_snap(rbd_dev, &total_size); - if (rc) - return rc; /* create gendisk info */ - rc = -ENOMEM; - disk = alloc_disk(RBD_MINORS_PER_MAJOR); + disk = alloc_disk(single_major ? + (1 << RBD_SINGLE_MAJOR_PART_SHIFT) : + RBD_MINORS_PER_MAJOR); if (!disk) - goto out; + return -ENOMEM; snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", - rbd_dev->id); + rbd_dev->dev_id); disk->major = rbd_dev->major; - disk->first_minor = 0; + disk->first_minor = rbd_dev->minor; + if (single_major) + disk->flags |= GENHD_FL_EXT_DEVT; disk->fops = &rbd_bd_ops; disk->private_data = rbd_dev; - /* init rq */ - rc = -ENOMEM; - q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock); + q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); if (!q) goto out_disk; @@ -1790,20 +3579,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) q->queuedata = rbd_dev; rbd_dev->disk = disk; - rbd_dev->q = q; - - /* finally, announce the disk to the world */ - set_capacity(disk, total_size / SECTOR_SIZE); - add_disk(disk); - pr_info("%s: added with size 0x%llx\n", - disk->disk_name, (unsigned long long)total_size); return 0; - out_disk: put_disk(disk); -out: - return rc; + + return -ENOMEM; } /* @@ -1820,7 +3601,21 @@ static ssize_t rbd_size_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size); + return sprintf(buf, "%llu\n", + (unsigned long long)rbd_dev->mapping.size); +} + +/* + * Note this shows the features for whatever's mapped, which is not + * necessarily the base image. + */ +static ssize_t rbd_features_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "0x%016llx\n", + (unsigned long long)rbd_dev->mapping.features); } static ssize_t rbd_major_show(struct device *dev, @@ -1828,7 +3623,18 @@ static ssize_t rbd_major_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%d\n", rbd_dev->major); + if (rbd_dev->major) + return sprintf(buf, "%d\n", rbd_dev->major); + + return sprintf(buf, "(none)\n"); +} + +static ssize_t rbd_minor_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%d\n", rbd_dev->minor); } static ssize_t rbd_client_id_show(struct device *dev, @@ -1845,7 +3651,16 @@ static ssize_t rbd_pool_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%s\n", rbd_dev->pool_name); + return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); +} + +static ssize_t rbd_pool_id_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%llu\n", + (unsigned long long) rbd_dev->spec->pool_id); } static ssize_t rbd_name_show(struct device *dev, @@ -1853,16 +3668,74 @@ static ssize_t rbd_name_show(struct device *dev, { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%s\n", rbd_dev->obj); + if (rbd_dev->spec->image_name) + return sprintf(buf, "%s\n", rbd_dev->spec->image_name); + + return sprintf(buf, "(unknown)\n"); } +static ssize_t rbd_image_id_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%s\n", rbd_dev->spec->image_id); +} + +/* + * Shows the name of the currently-mapped snapshot (or + * RBD_SNAP_HEAD_NAME for the base image). + */ static ssize_t rbd_snap_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - return sprintf(buf, "%s\n", rbd_dev->snap_name); + return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); +} + +/* + * For an rbd v2 image, shows the pool id, image id, and snapshot id + * for the parent image. If there is no parent, simply shows + * "(no parent image)". + */ +static ssize_t rbd_parent_show(struct device *dev, + struct device_attribute *attr, + char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + struct rbd_spec *spec = rbd_dev->parent_spec; + int count; + char *bufp = buf; + + if (!spec) + return sprintf(buf, "(no parent image)\n"); + + count = sprintf(bufp, "pool_id %llu\npool_name %s\n", + (unsigned long long) spec->pool_id, spec->pool_name); + if (count < 0) + return count; + bufp += count; + + count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, + spec->image_name ? spec->image_name : "(unknown)"); + if (count < 0) + return count; + bufp += count; + + count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", + (unsigned long long) spec->snap_id, spec->snap_name); + if (count < 0) + return count; + bufp += count; + + count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); + if (count < 0) + return count; + bufp += count; + + return (ssize_t) (bufp - buf); } static ssize_t rbd_image_refresh(struct device *dev, @@ -1871,37 +3744,41 @@ static ssize_t rbd_image_refresh(struct device *dev, size_t size) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - int rc; - int ret = size; - - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + int ret; - rc = __rbd_refresh_header(rbd_dev); - if (rc < 0) - ret = rc; + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret); - mutex_unlock(&ctl_mutex); - return ret; + return ret < 0 ? ret : size; } static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); +static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); +static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); +static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); +static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); -static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add); +static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); static struct attribute *rbd_attrs[] = { &dev_attr_size.attr, + &dev_attr_features.attr, &dev_attr_major.attr, + &dev_attr_minor.attr, &dev_attr_client_id.attr, &dev_attr_pool.attr, + &dev_attr_pool_id.attr, &dev_attr_name.attr, + &dev_attr_image_id.attr, &dev_attr_current_snap.attr, + &dev_attr_parent.attr, &dev_attr_refresh.attr, - &dev_attr_create_snap.attr, NULL }; @@ -1924,312 +3801,784 @@ static struct device_type rbd_device_type = { .release = rbd_sysfs_dev_release, }; +static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) +{ + kref_get(&spec->kref); -/* - sysfs - snapshots -*/ + return spec; +} + +static void rbd_spec_free(struct kref *kref); +static void rbd_spec_put(struct rbd_spec *spec) +{ + if (spec) + kref_put(&spec->kref, rbd_spec_free); +} -static ssize_t rbd_snap_size_show(struct device *dev, - struct device_attribute *attr, - char *buf) +static struct rbd_spec *rbd_spec_alloc(void) { - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); + struct rbd_spec *spec; - return sprintf(buf, "%llu\n", (unsigned long long)snap->size); + spec = kzalloc(sizeof (*spec), GFP_KERNEL); + if (!spec) + return NULL; + kref_init(&spec->kref); + + return spec; } -static ssize_t rbd_snap_id_show(struct device *dev, - struct device_attribute *attr, - char *buf) +static void rbd_spec_free(struct kref *kref) { - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); + struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); - return sprintf(buf, "%llu\n", (unsigned long long)snap->id); + kfree(spec->pool_name); + kfree(spec->image_id); + kfree(spec->image_name); + kfree(spec->snap_name); + kfree(spec); } -static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); -static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); +static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec) +{ + struct rbd_device *rbd_dev; -static struct attribute *rbd_snap_attrs[] = { - &dev_attr_snap_size.attr, - &dev_attr_snap_id.attr, - NULL, -}; + rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); + if (!rbd_dev) + return NULL; -static struct attribute_group rbd_snap_attr_group = { - .attrs = rbd_snap_attrs, -}; + spin_lock_init(&rbd_dev->lock); + rbd_dev->flags = 0; + atomic_set(&rbd_dev->parent_ref, 0); + INIT_LIST_HEAD(&rbd_dev->node); + init_rwsem(&rbd_dev->header_rwsem); + + rbd_dev->spec = spec; + rbd_dev->rbd_client = rbdc; -static void rbd_snap_dev_release(struct device *dev) + /* Initialize the layout used for all rbd requests */ + + rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); + rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); + rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); + rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); + + return rbd_dev; +} + +static void rbd_dev_destroy(struct rbd_device *rbd_dev) { - struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); - kfree(snap->name); - kfree(snap); + rbd_put_client(rbd_dev->rbd_client); + rbd_spec_put(rbd_dev->spec); + kfree(rbd_dev); } -static const struct attribute_group *rbd_snap_attr_groups[] = { - &rbd_snap_attr_group, - NULL -}; +/* + * Get the size and object order for an image snapshot, or if + * snap_id is CEPH_NOSNAP, gets this information for the base + * image. + */ +static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, + u8 *order, u64 *snap_size) +{ + __le64 snapid = cpu_to_le64(snap_id); + int ret; + struct { + u8 order; + __le64 size; + } __attribute__ ((packed)) size_buf = { 0 }; + + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_size", + &snapid, sizeof (snapid), + &size_buf, sizeof (size_buf)); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + return ret; + if (ret < sizeof (size_buf)) + return -ERANGE; -static struct device_type rbd_snap_device_type = { - .groups = rbd_snap_attr_groups, - .release = rbd_snap_dev_release, -}; + if (order) { + *order = size_buf.order; + dout(" order %u", (unsigned int)*order); + } + *snap_size = le64_to_cpu(size_buf.size); -static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev, - struct rbd_snap *snap) + dout(" snap_id 0x%016llx snap_size = %llu\n", + (unsigned long long)snap_id, + (unsigned long long)*snap_size); + + return 0; +} + +static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) { - list_del(&snap->node); - device_unregister(&snap->dev); + return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, + &rbd_dev->header.obj_order, + &rbd_dev->header.image_size); } -static int rbd_register_snap_dev(struct rbd_device *rbd_dev, - struct rbd_snap *snap, - struct device *parent) +static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) { - struct device *dev = &snap->dev; + void *reply_buf; int ret; + void *p; - dev->type = &rbd_snap_device_type; - dev->parent = parent; - dev->release = rbd_snap_dev_release; - dev_set_name(dev, "snap_%s", snap->name); - ret = device_register(dev); + reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); + if (!reply_buf) + return -ENOMEM; + + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_object_prefix", NULL, 0, + reply_buf, RBD_OBJ_PREFIX_LEN_MAX); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + goto out; + + p = reply_buf; + rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, + p + ret, NULL, GFP_NOIO); + ret = 0; + + if (IS_ERR(rbd_dev->header.object_prefix)) { + ret = PTR_ERR(rbd_dev->header.object_prefix); + rbd_dev->header.object_prefix = NULL; + } else { + dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); + } +out: + kfree(reply_buf); return ret; } -static int __rbd_add_snap_dev(struct rbd_device *rbd_dev, - int i, const char *name, - struct rbd_snap **snapp) +static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, + u64 *snap_features) { + __le64 snapid = cpu_to_le64(snap_id); + struct { + __le64 features; + __le64 incompat; + } __attribute__ ((packed)) features_buf = { 0 }; + u64 incompat; int ret; - struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL); - if (!snap) + + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_features", + &snapid, sizeof (snapid), + &features_buf, sizeof (features_buf)); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + return ret; + if (ret < sizeof (features_buf)) + return -ERANGE; + + incompat = le64_to_cpu(features_buf.incompat); + if (incompat & ~RBD_FEATURES_SUPPORTED) + return -ENXIO; + + *snap_features = le64_to_cpu(features_buf.features); + + dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", + (unsigned long long)snap_id, + (unsigned long long)*snap_features, + (unsigned long long)le64_to_cpu(features_buf.incompat)); + + return 0; +} + +static int rbd_dev_v2_features(struct rbd_device *rbd_dev) +{ + return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, + &rbd_dev->header.features); +} + +static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) +{ + struct rbd_spec *parent_spec; + size_t size; + void *reply_buf = NULL; + __le64 snapid; + void *p; + void *end; + u64 pool_id; + char *image_id; + u64 snap_id; + u64 overlap; + int ret; + + parent_spec = rbd_spec_alloc(); + if (!parent_spec) return -ENOMEM; - snap->name = kstrdup(name, GFP_KERNEL); - snap->size = rbd_dev->header.snap_sizes[i]; - snap->id = rbd_dev->header.snapc->snaps[i]; - if (device_is_registered(&rbd_dev->dev)) { - ret = rbd_register_snap_dev(rbd_dev, snap, - &rbd_dev->dev); - if (ret < 0) - goto err; + + size = sizeof (__le64) + /* pool_id */ + sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ + sizeof (__le64) + /* snap_id */ + sizeof (__le64); /* overlap */ + reply_buf = kmalloc(size, GFP_KERNEL); + if (!reply_buf) { + ret = -ENOMEM; + goto out_err; } - *snapp = snap; - return 0; -err: - kfree(snap->name); - kfree(snap); + + snapid = cpu_to_le64(CEPH_NOSNAP); + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_parent", + &snapid, sizeof (snapid), + reply_buf, size); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + goto out_err; + + p = reply_buf; + end = reply_buf + ret; + ret = -ERANGE; + ceph_decode_64_safe(&p, end, pool_id, out_err); + if (pool_id == CEPH_NOPOOL) { + /* + * Either the parent never existed, or we have + * record of it but the image got flattened so it no + * longer has a parent. When the parent of a + * layered image disappears we immediately set the + * overlap to 0. The effect of this is that all new + * requests will be treated as if the image had no + * parent. + */ + if (rbd_dev->parent_overlap) { + rbd_dev->parent_overlap = 0; + smp_mb(); + rbd_dev_parent_put(rbd_dev); + pr_info("%s: clone image has been flattened\n", + rbd_dev->disk->disk_name); + } + + goto out; /* No parent? No problem. */ + } + + /* The ceph file layout needs to fit pool id in 32 bits */ + + ret = -EIO; + if (pool_id > (u64)U32_MAX) { + rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", + (unsigned long long)pool_id, U32_MAX); + goto out_err; + } + + image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); + if (IS_ERR(image_id)) { + ret = PTR_ERR(image_id); + goto out_err; + } + ceph_decode_64_safe(&p, end, snap_id, out_err); + ceph_decode_64_safe(&p, end, overlap, out_err); + + /* + * The parent won't change (except when the clone is + * flattened, already handled that). So we only need to + * record the parent spec we have not already done so. + */ + if (!rbd_dev->parent_spec) { + parent_spec->pool_id = pool_id; + parent_spec->image_id = image_id; + parent_spec->snap_id = snap_id; + rbd_dev->parent_spec = parent_spec; + parent_spec = NULL; /* rbd_dev now owns this */ + } + + /* + * We always update the parent overlap. If it's zero we + * treat it specially. + */ + rbd_dev->parent_overlap = overlap; + smp_mb(); + if (!overlap) { + + /* A null parent_spec indicates it's the initial probe */ + + if (parent_spec) { + /* + * The overlap has become zero, so the clone + * must have been resized down to 0 at some + * point. Treat this the same as a flatten. + */ + rbd_dev_parent_put(rbd_dev); + pr_info("%s: clone image now standalone\n", + rbd_dev->disk->disk_name); + } else { + /* + * For the initial probe, if we find the + * overlap is zero we just pretend there was + * no parent image. + */ + rbd_warn(rbd_dev, "ignoring parent of " + "clone with overlap 0\n"); + } + } +out: + ret = 0; +out_err: + kfree(reply_buf); + rbd_spec_put(parent_spec); + return ret; } -/* - * search for the previous snap in a null delimited string list - */ -const char *rbd_prev_snap_name(const char *name, const char *start) +static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) { - if (name < start + 2) - return NULL; + struct { + __le64 stripe_unit; + __le64 stripe_count; + } __attribute__ ((packed)) striping_info_buf = { 0 }; + size_t size = sizeof (striping_info_buf); + void *p; + u64 obj_size; + u64 stripe_unit; + u64 stripe_count; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_stripe_unit_count", NULL, 0, + (char *)&striping_info_buf, size); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) + return ret; + if (ret < size) + return -ERANGE; - name -= 2; - while (*name) { - if (name == start) - return start; - name--; + /* + * We don't actually support the "fancy striping" feature + * (STRIPINGV2) yet, but if the striping sizes are the + * defaults the behavior is the same as before. So find + * out, and only fail if the image has non-default values. + */ + ret = -EINVAL; + obj_size = (u64)1 << rbd_dev->header.obj_order; + p = &striping_info_buf; + stripe_unit = ceph_decode_64(&p); + if (stripe_unit != obj_size) { + rbd_warn(rbd_dev, "unsupported stripe unit " + "(got %llu want %llu)", + stripe_unit, obj_size); + return -EINVAL; + } + stripe_count = ceph_decode_64(&p); + if (stripe_count != 1) { + rbd_warn(rbd_dev, "unsupported stripe count " + "(got %llu want 1)", stripe_count); + return -EINVAL; } - return name + 1; + rbd_dev->header.stripe_unit = stripe_unit; + rbd_dev->header.stripe_count = stripe_count; + + return 0; } -/* - * compare the old list of snapshots that we have to what's in the header - * and update it accordingly. Note that the header holds the snapshots - * in a reverse order (from newest to oldest) and we need to go from - * older to new so that we don't get a duplicate snap name when - * doing the process (e.g., removed snapshot and recreated a new - * one with the same name. - */ -static int __rbd_init_snaps_header(struct rbd_device *rbd_dev) +static char *rbd_dev_image_name(struct rbd_device *rbd_dev) { - const char *name, *first_name; - int i = rbd_dev->header.total_snaps; - struct rbd_snap *snap, *old_snap = NULL; + size_t image_id_size; + char *image_id; + void *p; + void *end; + size_t size; + void *reply_buf = NULL; + size_t len = 0; + char *image_name = NULL; int ret; - struct list_head *p, *n; - first_name = rbd_dev->header.snap_names; - name = first_name + rbd_dev->header.snap_names_len; + rbd_assert(!rbd_dev->spec->image_name); - list_for_each_prev_safe(p, n, &rbd_dev->snaps) { - u64 cur_id; + len = strlen(rbd_dev->spec->image_id); + image_id_size = sizeof (__le32) + len; + image_id = kmalloc(image_id_size, GFP_KERNEL); + if (!image_id) + return NULL; - old_snap = list_entry(p, struct rbd_snap, node); + p = image_id; + end = image_id + image_id_size; + ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); - if (i) - cur_id = rbd_dev->header.snapc->snaps[i - 1]; + size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; + reply_buf = kmalloc(size, GFP_KERNEL); + if (!reply_buf) + goto out; - if (!i || old_snap->id < cur_id) { - /* old_snap->id was skipped, thus was removed */ - __rbd_remove_snap_dev(rbd_dev, old_snap); - continue; - } - if (old_snap->id == cur_id) { - /* we have this snapshot already */ - i--; - name = rbd_prev_snap_name(name, first_name); - continue; - } - for (; i > 0; - i--, name = rbd_prev_snap_name(name, first_name)) { - if (!name) { - WARN_ON(1); - return -EINVAL; - } - cur_id = rbd_dev->header.snapc->snaps[i]; - /* snapshot removal? handle it above */ - if (cur_id >= old_snap->id) + ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, + "rbd", "dir_get_name", + image_id, image_id_size, + reply_buf, size); + if (ret < 0) + goto out; + p = reply_buf; + end = reply_buf + ret; + + image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); + if (IS_ERR(image_name)) + image_name = NULL; + else + dout("%s: name is %s len is %zd\n", __func__, image_name, len); +out: + kfree(reply_buf); + kfree(image_id); + + return image_name; +} + +static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + const char *snap_name; + u32 which = 0; + + /* Skip over names until we find the one we are looking for */ + + snap_name = rbd_dev->header.snap_names; + while (which < snapc->num_snaps) { + if (!strcmp(name, snap_name)) + return snapc->snaps[which]; + snap_name += strlen(snap_name) + 1; + which++; + } + return CEPH_NOSNAP; +} + +static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) +{ + struct ceph_snap_context *snapc = rbd_dev->header.snapc; + u32 which; + bool found = false; + u64 snap_id; + + for (which = 0; !found && which < snapc->num_snaps; which++) { + const char *snap_name; + + snap_id = snapc->snaps[which]; + snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id); + if (IS_ERR(snap_name)) { + /* ignore no-longer existing snapshots */ + if (PTR_ERR(snap_name) == -ENOENT) + continue; + else break; - /* a new snapshot */ - ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); - if (ret < 0) - return ret; - - /* note that we add it backward so using n and not p */ - list_add(&snap->node, n); - p = &snap->node; } + found = !strcmp(name, snap_name); + kfree(snap_name); } - /* we're done going over the old snap list, just add what's left */ - for (; i > 0; i--) { - name = rbd_prev_snap_name(name, first_name); - if (!name) { - WARN_ON(1); - return -EINVAL; + return found ? snap_id : CEPH_NOSNAP; +} + +/* + * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if + * no snapshot by that name is found, or if an error occurs. + */ +static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name) +{ + if (rbd_dev->image_format == 1) + return rbd_v1_snap_id_by_name(rbd_dev, name); + + return rbd_v2_snap_id_by_name(rbd_dev, name); +} + +/* + * When an rbd image has a parent image, it is identified by the + * pool, image, and snapshot ids (not names). This function fills + * in the names for those ids. (It's OK if we can't figure out the + * name for an image id, but the pool and snapshot ids should always + * exist and have names.) All names in an rbd spec are dynamically + * allocated. + * + * When an image being mapped (not a parent) is probed, we have the + * pool name and pool id, image name and image id, and the snapshot + * name. The only thing we're missing is the snapshot id. + */ +static int rbd_dev_spec_update(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_spec *spec = rbd_dev->spec; + const char *pool_name; + const char *image_name; + const char *snap_name; + int ret; + + /* + * An image being mapped will have the pool name (etc.), but + * we need to look up the snapshot id. + */ + if (spec->pool_name) { + if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) { + u64 snap_id; + + snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name); + if (snap_id == CEPH_NOSNAP) + return -ENOENT; + spec->snap_id = snap_id; + } else { + spec->snap_id = CEPH_NOSNAP; } - ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap); - if (ret < 0) - return ret; - list_add(&snap->node, &rbd_dev->snaps); + + return 0; + } + + /* Get the pool name; we have to make our own copy of this */ + + pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id); + if (!pool_name) { + rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id); + return -EIO; + } + pool_name = kstrdup(pool_name, GFP_KERNEL); + if (!pool_name) + return -ENOMEM; + + /* Fetch the image name; tolerate failure here */ + + image_name = rbd_dev_image_name(rbd_dev); + if (!image_name) + rbd_warn(rbd_dev, "unable to get image name"); + + /* Look up the snapshot name, and make a copy */ + + snap_name = rbd_snap_name(rbd_dev, spec->snap_id); + if (IS_ERR(snap_name)) { + ret = PTR_ERR(snap_name); + goto out_err; } + spec->pool_name = pool_name; + spec->image_name = image_name; + spec->snap_name = snap_name; + return 0; +out_err: + kfree(image_name); + kfree(pool_name); + + return ret; } -static int rbd_bus_add_dev(struct rbd_device *rbd_dev) +static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) { + size_t size; int ret; - struct device *dev; - struct rbd_snap *snap; + void *reply_buf; + void *p; + void *end; + u64 seq; + u32 snap_count; + struct ceph_snap_context *snapc; + u32 i; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - dev = &rbd_dev->dev; + /* + * We'll need room for the seq value (maximum snapshot id), + * snapshot count, and array of that many snapshot ids. + * For now we have a fixed upper limit on the number we're + * prepared to receive. + */ + size = sizeof (__le64) + sizeof (__le32) + + RBD_MAX_SNAP_COUNT * sizeof (__le64); + reply_buf = kzalloc(size, GFP_KERNEL); + if (!reply_buf) + return -ENOMEM; - dev->bus = &rbd_bus_type; - dev->type = &rbd_device_type; - dev->parent = &rbd_root_dev; - dev->release = rbd_dev_release; - dev_set_name(dev, "%d", rbd_dev->id); - ret = device_register(dev); + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_snapcontext", NULL, 0, + reply_buf, size); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; - list_for_each_entry(snap, &rbd_dev->snaps, node) { - ret = rbd_register_snap_dev(rbd_dev, snap, - &rbd_dev->dev); - if (ret < 0) - break; + p = reply_buf; + end = reply_buf + ret; + ret = -ERANGE; + ceph_decode_64_safe(&p, end, seq, out); + ceph_decode_32_safe(&p, end, snap_count, out); + + /* + * Make sure the reported number of snapshot ids wouldn't go + * beyond the end of our buffer. But before checking that, + * make sure the computed size of the snapshot context we + * allocate is representable in a size_t. + */ + if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) + / sizeof (u64)) { + ret = -EINVAL; + goto out; + } + if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) + goto out; + ret = 0; + + snapc = ceph_create_snap_context(snap_count, GFP_KERNEL); + if (!snapc) { + ret = -ENOMEM; + goto out; } + snapc->seq = seq; + for (i = 0; i < snap_count; i++) + snapc->snaps[i] = ceph_decode_64(&p); + + ceph_put_snap_context(rbd_dev->header.snapc); + rbd_dev->header.snapc = snapc; + + dout(" snap context seq = %llu, snap_count = %u\n", + (unsigned long long)seq, (unsigned int)snap_count); out: - mutex_unlock(&ctl_mutex); + kfree(reply_buf); + return ret; } -static void rbd_bus_del_dev(struct rbd_device *rbd_dev) +static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, + u64 snap_id) { - device_unregister(&rbd_dev->dev); + size_t size; + void *reply_buf; + __le64 snapid; + int ret; + void *p; + void *end; + char *snap_name; + + size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; + reply_buf = kmalloc(size, GFP_KERNEL); + if (!reply_buf) + return ERR_PTR(-ENOMEM); + + snapid = cpu_to_le64(snap_id); + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + "rbd", "get_snapshot_name", + &snapid, sizeof (snapid), + reply_buf, size); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret < 0) { + snap_name = ERR_PTR(ret); + goto out; + } + + p = reply_buf; + end = reply_buf + ret; + snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); + if (IS_ERR(snap_name)) + goto out; + + dout(" snap_id 0x%016llx snap_name = %s\n", + (unsigned long long)snap_id, snap_name); +out: + kfree(reply_buf); + + return snap_name; } -static int rbd_init_watch_dev(struct rbd_device *rbd_dev) +static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev) { - int ret, rc; + bool first_time = rbd_dev->header.object_prefix == NULL; + int ret; - do { - ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, - rbd_dev->header.obj_version); - if (ret == -ERANGE) { - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); - rc = __rbd_refresh_header(rbd_dev); - mutex_unlock(&ctl_mutex); - if (rc < 0) - return rc; - } - } while (ret == -ERANGE); + ret = rbd_dev_v2_image_size(rbd_dev); + if (ret) + return ret; + + if (first_time) { + ret = rbd_dev_v2_header_onetime(rbd_dev); + if (ret) + return ret; + } + + /* + * If the image supports layering, get the parent info. We + * need to probe the first time regardless. Thereafter we + * only need to if there's a parent, to see if it has + * disappeared due to the mapped image getting flattened. + */ + if (rbd_dev->header.features & RBD_FEATURE_LAYERING && + (first_time || rbd_dev->parent_spec)) { + bool warn; + + ret = rbd_dev_v2_parent_info(rbd_dev); + if (ret) + return ret; + + /* + * Print a warning if this is the initial probe and + * the image has a parent. Don't print it if the + * image now being probed is itself a parent. We + * can tell at this point because we won't know its + * pool name yet (just its pool id). + */ + warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name; + if (first_time && warn) + rbd_warn(rbd_dev, "WARNING: kernel layering " + "is EXPERIMENTAL!"); + } + + if (rbd_dev->spec->snap_id == CEPH_NOSNAP) + if (rbd_dev->mapping.size != rbd_dev->header.image_size) + rbd_dev->mapping.size = rbd_dev->header.image_size; + + ret = rbd_dev_v2_snap_context(rbd_dev); + dout("rbd_dev_v2_snap_context returned %d\n", ret); return ret; } -static atomic64_t rbd_id_max = ATOMIC64_INIT(0); +static int rbd_bus_add_dev(struct rbd_device *rbd_dev) +{ + struct device *dev; + int ret; + + dev = &rbd_dev->dev; + dev->bus = &rbd_bus_type; + dev->type = &rbd_device_type; + dev->parent = &rbd_root_dev; + dev->release = rbd_dev_device_release; + dev_set_name(dev, "%d", rbd_dev->dev_id); + ret = device_register(dev); + + return ret; +} + +static void rbd_bus_del_dev(struct rbd_device *rbd_dev) +{ + device_unregister(&rbd_dev->dev); +} /* * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. The minimum rbd id is 1. + * the rbd_dev to the global list. */ -static void rbd_id_get(struct rbd_device *rbd_dev) +static int rbd_dev_id_get(struct rbd_device *rbd_dev) { - rbd_dev->id = atomic64_inc_return(&rbd_id_max); + int new_dev_id; + + new_dev_id = ida_simple_get(&rbd_dev_id_ida, + 0, minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); + if (new_dev_id < 0) + return new_dev_id; + + rbd_dev->dev_id = new_dev_id; spin_lock(&rbd_dev_list_lock); list_add_tail(&rbd_dev->node, &rbd_dev_list); spin_unlock(&rbd_dev_list_lock); + + dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); + + return 0; } /* * Remove an rbd_dev from the global list, and record that its * identifier is no longer in use. */ -static void rbd_id_put(struct rbd_device *rbd_dev) +static void rbd_dev_id_put(struct rbd_device *rbd_dev) { - struct list_head *tmp; - int rbd_id = rbd_dev->id; - int max_id; - - BUG_ON(rbd_id < 1); - spin_lock(&rbd_dev_list_lock); list_del_init(&rbd_dev->node); - - /* - * If the id being "put" is not the current maximum, there - * is nothing special we need to do. - */ - if (rbd_id != atomic64_read(&rbd_id_max)) { - spin_unlock(&rbd_dev_list_lock); - return; - } - - /* - * We need to update the current maximum id. Search the - * list to find out what it is. We're more likely to find - * the maximum at the end, so search the list backward. - */ - max_id = 0; - list_for_each_prev(tmp, &rbd_dev_list) { - struct rbd_device *rbd_dev; - - rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_id > max_id) - max_id = rbd_id; - } spin_unlock(&rbd_dev_list_lock); - /* - * The max id could have been updated by rbd_id_get(), in - * which case it now accurately reflects the new maximum. - * Be careful not to overwrite the maximum value in that - * case. - */ - atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + + dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); } /* @@ -2282,287 +4631,802 @@ static inline size_t copy_token(const char **buf, } /* - * This fills in the pool_name, obj, obj_len, snap_name, obj_len, - * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based - * on the list of monitor addresses and other options provided via - * /sys/bus/rbd/add. + * Finds the next token in *buf, dynamically allocates a buffer big + * enough to hold a copy of it, and copies the token into the new + * buffer. The copy is guaranteed to be terminated with '\0'. Note + * that a duplicate buffer is created even for a zero-length token. + * + * Returns a pointer to the newly-allocated duplicate, or a null + * pointer if memory for the duplicate was not available. If + * the lenp argument is a non-null pointer, the length of the token + * (not including the '\0') is returned in *lenp. + * + * If successful, the *buf pointer will be updated to point beyond + * the end of the found token. + * + * Note: uses GFP_KERNEL for allocation. */ -static int rbd_add_parse_args(struct rbd_device *rbd_dev, - const char *buf, - const char **mon_addrs, - size_t *mon_addrs_size, - char *options, - size_t options_size) +static inline char *dup_token(const char **buf, size_t *lenp) { - size_t len; + char *dup; + size_t len; + + len = next_token(buf); + dup = kmemdup(*buf, len + 1, GFP_KERNEL); + if (!dup) + return NULL; + *(dup + len) = '\0'; + *buf += len; + + if (lenp) + *lenp = len; + + return dup; +} + +/* + * Parse the options provided for an "rbd add" (i.e., rbd image + * mapping) request. These arrive via a write to /sys/bus/rbd/add, + * and the data written is passed here via a NUL-terminated buffer. + * Returns 0 if successful or an error code otherwise. + * + * The information extracted from these options is recorded in + * the other parameters which return dynamically-allocated + * structures: + * ceph_opts + * The address of a pointer that will refer to a ceph options + * structure. Caller must release the returned pointer using + * ceph_destroy_options() when it is no longer needed. + * rbd_opts + * Address of an rbd options pointer. Fully initialized by + * this function; caller must release with kfree(). + * spec + * Address of an rbd image specification pointer. Fully + * initialized by this function based on parsed options. + * Caller must release with rbd_spec_put(). + * + * The options passed take this form: + * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] + * where: + * <mon_addrs> + * A comma-separated list of one or more monitor addresses. + * A monitor address is an ip address, optionally followed + * by a port number (separated by a colon). + * I.e.: ip1[:port1][,ip2[:port2]...] + * <options> + * A comma-separated list of ceph and/or rbd options. + * <pool_name> + * The name of the rados pool containing the rbd image. + * <image_name> + * The name of the image in that pool to map. + * <snap_id> + * An optional snapshot id. If provided, the mapping will + * present data from the image at the time that snapshot was + * created. The image head is used if no snapshot id is + * provided. Snapshot mappings are always read-only. + */ +static int rbd_add_parse_args(const char *buf, + struct ceph_options **ceph_opts, + struct rbd_options **opts, + struct rbd_spec **rbd_spec) +{ + size_t len; + char *options; + const char *mon_addrs; + char *snap_name; + size_t mon_addrs_size; + struct rbd_spec *spec = NULL; + struct rbd_options *rbd_opts = NULL; + struct ceph_options *copts; + int ret; /* The first four tokens are required */ len = next_token(&buf); - if (!len) + if (!len) { + rbd_warn(NULL, "no monitor address(es) provided"); return -EINVAL; - *mon_addrs_size = len + 1; - *mon_addrs = buf; - + } + mon_addrs = buf; + mon_addrs_size = len + 1; buf += len; - len = copy_token(&buf, options, options_size); - if (!len || len >= options_size) - return -EINVAL; + ret = -EINVAL; + options = dup_token(&buf, NULL); + if (!options) + return -ENOMEM; + if (!*options) { + rbd_warn(NULL, "no options provided"); + goto out_err; + } - len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name)); - if (!len || len >= sizeof (rbd_dev->pool_name)) - return -EINVAL; + spec = rbd_spec_alloc(); + if (!spec) + goto out_mem; - len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj)); - if (!len || len >= sizeof (rbd_dev->obj)) - return -EINVAL; + spec->pool_name = dup_token(&buf, NULL); + if (!spec->pool_name) + goto out_mem; + if (!*spec->pool_name) { + rbd_warn(NULL, "no pool name provided"); + goto out_err; + } - /* We have the object length in hand, save it. */ + spec->image_name = dup_token(&buf, NULL); + if (!spec->image_name) + goto out_mem; + if (!*spec->image_name) { + rbd_warn(NULL, "no image name provided"); + goto out_err; + } - rbd_dev->obj_len = len; + /* + * Snapshot name is optional; default is to use "-" + * (indicating the head/no snapshot). + */ + len = next_token(&buf); + if (!len) { + buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ + len = sizeof (RBD_SNAP_HEAD_NAME) - 1; + } else if (len > RBD_MAX_SNAP_NAME_LEN) { + ret = -ENAMETOOLONG; + goto out_err; + } + snap_name = kmemdup(buf, len + 1, GFP_KERNEL); + if (!snap_name) + goto out_mem; + *(snap_name + len) = '\0'; + spec->snap_name = snap_name; - BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN - < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX)); - sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX); + /* Initialize all rbd options to the defaults */ + + rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); + if (!rbd_opts) + goto out_mem; + + rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; + + copts = ceph_parse_options(options, mon_addrs, + mon_addrs + mon_addrs_size - 1, + parse_rbd_opts_token, rbd_opts); + if (IS_ERR(copts)) { + ret = PTR_ERR(copts); + goto out_err; + } + kfree(options); + + *ceph_opts = copts; + *opts = rbd_opts; + *rbd_spec = spec; + + return 0; +out_mem: + ret = -ENOMEM; +out_err: + kfree(rbd_opts); + rbd_spec_put(spec); + kfree(options); + + return ret; +} + +/* + * Return pool id (>= 0) or a negative error code. + */ +static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) +{ + u64 newest_epoch; + unsigned long timeout = rbdc->client->options->mount_timeout * HZ; + int tries = 0; + int ret; + +again: + ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); + if (ret == -ENOENT && tries++ < 1) { + ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap", + &newest_epoch); + if (ret < 0) + return ret; + + if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { + ceph_monc_request_next_osdmap(&rbdc->client->monc); + (void) ceph_monc_wait_osdmap(&rbdc->client->monc, + newest_epoch, timeout); + goto again; + } else { + /* the osdmap we have is new enough */ + return -ENOENT; + } + } + + return ret; +} + +/* + * An rbd format 2 image has a unique identifier, distinct from the + * name given to it by the user. Internally, that identifier is + * what's used to specify the names of objects related to the image. + * + * A special "rbd id" object is used to map an rbd image name to its + * id. If that object doesn't exist, then there is no v2 rbd image + * with the supplied name. + * + * This function will record the given rbd_dev's image_id field if + * it can be determined, and in that case will return 0. If any + * errors occur a negative errno will be returned and the rbd_dev's + * image_id field will be unchanged (and should be NULL). + */ +static int rbd_dev_image_id(struct rbd_device *rbd_dev) +{ + int ret; + size_t size; + char *object_name; + void *response; + char *image_id; /* - * The snapshot name is optional, but it's an error if it's - * too long. If no snapshot is supplied, fill in the default. + * When probing a parent image, the image id is already + * known (and the image name likely is not). There's no + * need to fetch the image id again in this case. We + * do still need to set the image format though. */ - len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name)); - if (!len) - memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME, - sizeof (RBD_SNAP_HEAD_NAME)); - else if (len >= sizeof (rbd_dev->snap_name)) - return -EINVAL; + if (rbd_dev->spec->image_id) { + rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; + + return 0; + } + + /* + * First, see if the format 2 image id file exists, and if + * so, get the image's persistent id from it. + */ + size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); + object_name = kmalloc(size, GFP_NOIO); + if (!object_name) + return -ENOMEM; + sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); + dout("rbd id object name is %s\n", object_name); + + /* Response will be an encoded string, which includes a length */ + + size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; + response = kzalloc(size, GFP_NOIO); + if (!response) { + ret = -ENOMEM; + goto out; + } + + /* If it doesn't exist we'll assume it's a format 1 image */ + + ret = rbd_obj_method_sync(rbd_dev, object_name, + "rbd", "get_id", NULL, 0, + response, RBD_IMAGE_ID_LEN_MAX); + dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); + if (ret == -ENOENT) { + image_id = kstrdup("", GFP_KERNEL); + ret = image_id ? 0 : -ENOMEM; + if (!ret) + rbd_dev->image_format = 1; + } else if (ret > sizeof (__le32)) { + void *p = response; + + image_id = ceph_extract_encoded_string(&p, p + ret, + NULL, GFP_NOIO); + ret = PTR_ERR_OR_ZERO(image_id); + if (!ret) + rbd_dev->image_format = 2; + } else { + ret = -EINVAL; + } + + if (!ret) { + rbd_dev->spec->image_id = image_id; + dout("image_id is %s\n", image_id); + } +out: + kfree(response); + kfree(object_name); + + return ret; +} + +/* + * Undo whatever state changes are made by v1 or v2 header info + * call. + */ +static void rbd_dev_unprobe(struct rbd_device *rbd_dev) +{ + struct rbd_image_header *header; + + /* Drop parent reference unless it's already been done (or none) */ + + if (rbd_dev->parent_overlap) + rbd_dev_parent_put(rbd_dev); + + /* Free dynamic fields from the header, then zero it out */ + + header = &rbd_dev->header; + ceph_put_snap_context(header->snapc); + kfree(header->snap_sizes); + kfree(header->snap_names); + kfree(header->object_prefix); + memset(header, 0, sizeof (*header)); +} + +static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) +{ + int ret; + + ret = rbd_dev_v2_object_prefix(rbd_dev); + if (ret) + goto out_err; + + /* + * Get the and check features for the image. Currently the + * features are assumed to never change. + */ + ret = rbd_dev_v2_features(rbd_dev); + if (ret) + goto out_err; + + /* If the image supports fancy striping, get its parameters */ + + if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { + ret = rbd_dev_v2_striping_info(rbd_dev); + if (ret < 0) + goto out_err; + } + /* No support for crypto and compression type format 2 images */ return 0; +out_err: + rbd_dev->header.features = 0; + kfree(rbd_dev->header.object_prefix); + rbd_dev->header.object_prefix = NULL; + + return ret; } -static ssize_t rbd_add(struct bus_type *bus, - const char *buf, - size_t count) +static int rbd_dev_probe_parent(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev; - const char *mon_addrs = NULL; - size_t mon_addrs_size = 0; - char *options = NULL; - struct ceph_osd_client *osdc; - int rc = -ENOMEM; + struct rbd_device *parent = NULL; + struct rbd_spec *parent_spec; + struct rbd_client *rbdc; + int ret; - if (!try_module_get(THIS_MODULE)) - return -ENODEV; + if (!rbd_dev->parent_spec) + return 0; + /* + * We need to pass a reference to the client and the parent + * spec when creating the parent rbd_dev. Images related by + * parent/child relationships always share both. + */ + parent_spec = rbd_spec_get(rbd_dev->parent_spec); + rbdc = __rbd_get_client(rbd_dev->rbd_client); - rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); - if (!rbd_dev) - goto err_nomem; - options = kmalloc(count, GFP_KERNEL); - if (!options) - goto err_nomem; + ret = -ENOMEM; + parent = rbd_dev_create(rbdc, parent_spec); + if (!parent) + goto out_err; - /* static rbd_device initialization */ - spin_lock_init(&rbd_dev->lock); - INIT_LIST_HEAD(&rbd_dev->node); - INIT_LIST_HEAD(&rbd_dev->snaps); - init_rwsem(&rbd_dev->header_rwsem); + ret = rbd_dev_image_probe(parent, false); + if (ret < 0) + goto out_err; + rbd_dev->parent = parent; + atomic_set(&rbd_dev->parent_ref, 1); - init_rwsem(&rbd_dev->header_rwsem); + return 0; +out_err: + if (parent) { + rbd_dev_unparent(rbd_dev); + kfree(rbd_dev->header_name); + rbd_dev_destroy(parent); + } else { + rbd_put_client(rbdc); + rbd_spec_put(parent_spec); + } - /* generate unique id: find highest unique id, add one */ - rbd_id_get(rbd_dev); + return ret; +} + +static int rbd_dev_device_setup(struct rbd_device *rbd_dev) +{ + int ret; + + /* Get an id and fill in device name. */ + + ret = rbd_dev_id_get(rbd_dev); + if (ret) + return ret; - /* Fill in the device name, now that we have its id. */ BUILD_BUG_ON(DEV_NAME_LEN < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); - sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id); + sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* parse add command */ - rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size, - options, count); - if (rc) - goto err_put_id; + /* Record our major and minor device numbers. */ - rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1, - options); - if (IS_ERR(rbd_dev->rbd_client)) { - rc = PTR_ERR(rbd_dev->rbd_client); - goto err_put_id; - } + if (!single_major) { + ret = register_blkdev(0, rbd_dev->name); + if (ret < 0) + goto err_out_id; - /* pick the pool */ - osdc = &rbd_dev->rbd_client->client->osdc; - rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name); - if (rc < 0) - goto err_out_client; - rbd_dev->poolid = rc; + rbd_dev->major = ret; + rbd_dev->minor = 0; + } else { + rbd_dev->major = rbd_major; + rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id); + } - /* register our block device */ - rc = register_blkdev(0, rbd_dev->name); - if (rc < 0) - goto err_out_client; - rbd_dev->major = rc; + /* Set up the blkdev mapping. */ - rc = rbd_bus_add_dev(rbd_dev); - if (rc) + ret = rbd_init_disk(rbd_dev); + if (ret) goto err_out_blkdev; - /* - * At this point cleanup in the event of an error is the job - * of the sysfs code (initiated by rbd_bus_del_dev()). - * - * Set up and announce blkdev mapping. - */ - rc = rbd_init_disk(rbd_dev); - if (rc) - goto err_out_bus; + ret = rbd_dev_mapping_set(rbd_dev); + if (ret) + goto err_out_disk; + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); + set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); - rc = rbd_init_watch_dev(rbd_dev); - if (rc) - goto err_out_bus; + ret = rbd_bus_add_dev(rbd_dev); + if (ret) + goto err_out_mapping; - return count; + /* Everything's ready. Announce the disk to the world. */ -err_out_bus: - /* this will also clean up rest of rbd_dev stuff */ + set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + add_disk(rbd_dev->disk); - rbd_bus_del_dev(rbd_dev); - kfree(options); - return rc; + pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, + (unsigned long long) rbd_dev->mapping.size); -err_out_blkdev: - unregister_blkdev(rbd_dev->major, rbd_dev->name); -err_out_client: - rbd_put_client(rbd_dev); -err_put_id: - rbd_id_put(rbd_dev); -err_nomem: - kfree(options); - kfree(rbd_dev); + return ret; - dout("Error adding device %s\n", buf); - module_put(THIS_MODULE); +err_out_mapping: + rbd_dev_mapping_clear(rbd_dev); +err_out_disk: + rbd_free_disk(rbd_dev); +err_out_blkdev: + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); +err_out_id: + rbd_dev_id_put(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); - return (ssize_t) rc; + return ret; } -static struct rbd_device *__rbd_get_dev(unsigned long id) +static int rbd_dev_header_name(struct rbd_device *rbd_dev) { - struct list_head *tmp; - struct rbd_device *rbd_dev; + struct rbd_spec *spec = rbd_dev->spec; + size_t size; - spin_lock(&rbd_dev_list_lock); - list_for_each(tmp, &rbd_dev_list) { - rbd_dev = list_entry(tmp, struct rbd_device, node); - if (rbd_dev->id == id) { - spin_unlock(&rbd_dev_list_lock); - return rbd_dev; - } - } - spin_unlock(&rbd_dev_list_lock); - return NULL; + /* Record the header object name for this rbd image. */ + + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + + if (rbd_dev->image_format == 1) + size = strlen(spec->image_name) + sizeof (RBD_SUFFIX); + else + size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id); + + rbd_dev->header_name = kmalloc(size, GFP_KERNEL); + if (!rbd_dev->header_name) + return -ENOMEM; + + if (rbd_dev->image_format == 1) + sprintf(rbd_dev->header_name, "%s%s", + spec->image_name, RBD_SUFFIX); + else + sprintf(rbd_dev->header_name, "%s%s", + RBD_HEADER_PREFIX, spec->image_id); + return 0; } -static void rbd_dev_release(struct device *dev) +static void rbd_dev_image_release(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + rbd_dev_unprobe(rbd_dev); + kfree(rbd_dev->header_name); + rbd_dev->header_name = NULL; + rbd_dev->image_format = 0; + kfree(rbd_dev->spec->image_id); + rbd_dev->spec->image_id = NULL; + + rbd_dev_destroy(rbd_dev); +} - if (rbd_dev->watch_request) { - struct ceph_client *client = rbd_dev->rbd_client->client; +/* + * Probe for the existence of the header object for the given rbd + * device. If this image is the one being mapped (i.e., not a + * parent), initiate a watch on its header object before using that + * object to get detailed information about the rbd image. + */ +static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping) +{ + int ret; - ceph_osdc_unregister_linger_request(&client->osdc, - rbd_dev->watch_request); + /* + * Get the id from the image id object. Unless there's an + * error, rbd_dev->spec->image_id will be filled in with + * a dynamically-allocated string, and rbd_dev->image_format + * will be set to either 1 or 2. + */ + ret = rbd_dev_image_id(rbd_dev); + if (ret) + return ret; + rbd_assert(rbd_dev->spec->image_id); + rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + + ret = rbd_dev_header_name(rbd_dev); + if (ret) + goto err_out_format; + + if (mapping) { + ret = rbd_dev_header_watch_sync(rbd_dev); + if (ret) + goto out_header_name; } - if (rbd_dev->watch_event) - rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name); - rbd_put_client(rbd_dev); + if (rbd_dev->image_format == 1) + ret = rbd_dev_v1_header_info(rbd_dev); + else + ret = rbd_dev_v2_header_info(rbd_dev); + if (ret) + goto err_out_watch; - /* clean up and free blkdev */ - rbd_free_disk(rbd_dev); - unregister_blkdev(rbd_dev->major, rbd_dev->name); + ret = rbd_dev_spec_update(rbd_dev); + if (ret) + goto err_out_probe; - /* done with the id, and with the rbd_dev */ - rbd_id_put(rbd_dev); - kfree(rbd_dev); + ret = rbd_dev_probe_parent(rbd_dev); + if (ret) + goto err_out_probe; - /* release module ref */ - module_put(THIS_MODULE); + dout("discovered format %u image, header name is %s\n", + rbd_dev->image_format, rbd_dev->header_name); + + return 0; +err_out_probe: + rbd_dev_unprobe(rbd_dev); +err_out_watch: + if (mapping) + rbd_dev_header_unwatch_sync(rbd_dev); +out_header_name: + kfree(rbd_dev->header_name); + rbd_dev->header_name = NULL; +err_out_format: + rbd_dev->image_format = 0; + kfree(rbd_dev->spec->image_id); + rbd_dev->spec->image_id = NULL; + + dout("probe failed, returning %d\n", ret); + + return ret; } -static ssize_t rbd_remove(struct bus_type *bus, +static ssize_t do_rbd_add(struct bus_type *bus, const char *buf, size_t count) { struct rbd_device *rbd_dev = NULL; - int target_id, rc; - unsigned long ul; - int ret = count; + struct ceph_options *ceph_opts = NULL; + struct rbd_options *rbd_opts = NULL; + struct rbd_spec *spec = NULL; + struct rbd_client *rbdc; + bool read_only; + int rc = -ENOMEM; - rc = strict_strtoul(buf, 10, &ul); - if (rc) - return rc; + if (!try_module_get(THIS_MODULE)) + return -ENODEV; - /* convert to int; abort if we lost anything in the conversion */ - target_id = (int) ul; - if (target_id != ul) - return -EINVAL; + /* parse add command */ + rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); + if (rc < 0) + goto err_out_module; + read_only = rbd_opts->read_only; + kfree(rbd_opts); + rbd_opts = NULL; /* done with this */ + + rbdc = rbd_get_client(ceph_opts); + if (IS_ERR(rbdc)) { + rc = PTR_ERR(rbdc); + goto err_out_args; + } - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + /* pick the pool */ + rc = rbd_add_get_pool_id(rbdc, spec->pool_name); + if (rc < 0) + goto err_out_client; + spec->pool_id = (u64)rc; + + /* The ceph file layout needs to fit pool id in 32 bits */ - rbd_dev = __rbd_get_dev(target_id); - if (!rbd_dev) { - ret = -ENOENT; - goto done; + if (spec->pool_id > (u64)U32_MAX) { + rbd_warn(NULL, "pool id too large (%llu > %u)\n", + (unsigned long long)spec->pool_id, U32_MAX); + rc = -EIO; + goto err_out_client; } - __rbd_remove_all_snaps(rbd_dev); - rbd_bus_del_dev(rbd_dev); + rbd_dev = rbd_dev_create(rbdc, spec); + if (!rbd_dev) + goto err_out_client; + rbdc = NULL; /* rbd_dev now owns this */ + spec = NULL; /* rbd_dev now owns this */ -done: - mutex_unlock(&ctl_mutex); - return ret; + rc = rbd_dev_image_probe(rbd_dev, true); + if (rc < 0) + goto err_out_rbd_dev; + + /* If we are mapping a snapshot it must be marked read-only */ + + if (rbd_dev->spec->snap_id != CEPH_NOSNAP) + read_only = true; + rbd_dev->mapping.read_only = read_only; + + rc = rbd_dev_device_setup(rbd_dev); + if (rc) { + /* + * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_dev_image_release() without refactoring, see + * commit 1f3ef78861ac. + */ + rbd_dev_header_unwatch_sync(rbd_dev); + rbd_dev_image_release(rbd_dev); + goto err_out_module; + } + + return count; + +err_out_rbd_dev: + rbd_dev_destroy(rbd_dev); +err_out_client: + rbd_put_client(rbdc); +err_out_args: + rbd_spec_put(spec); +err_out_module: + module_put(THIS_MODULE); + + dout("Error adding device %s\n", buf); + + return (ssize_t)rc; } -static ssize_t rbd_snap_add(struct device *dev, - struct device_attribute *attr, - const char *buf, - size_t count) +static ssize_t rbd_add(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; + + return do_rbd_add(bus, buf, count); +} + +static ssize_t rbd_add_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_add(bus, buf, count); +} + +static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + rbd_free_disk(rbd_dev); + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); + rbd_dev_mapping_clear(rbd_dev); + if (!single_major) + unregister_blkdev(rbd_dev->major, rbd_dev->name); + rbd_dev_id_put(rbd_dev); + rbd_dev_mapping_clear(rbd_dev); +} + +static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) +{ + while (rbd_dev->parent) { + struct rbd_device *first = rbd_dev; + struct rbd_device *second = first->parent; + struct rbd_device *third; + + /* + * Follow to the parent with no grandparent and + * remove it. + */ + while (second && (third = second->parent)) { + first = second; + second = third; + } + rbd_assert(second); + rbd_dev_image_release(second); + first->parent = NULL; + first->parent_overlap = 0; + + rbd_assert(first->parent_spec); + rbd_spec_put(first->parent_spec); + first->parent_spec = NULL; + } +} + +static ssize_t do_rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) +{ + struct rbd_device *rbd_dev = NULL; + struct list_head *tmp; + int dev_id; + unsigned long ul; + bool already = false; int ret; - char *name = kmalloc(count + 1, GFP_KERNEL); - if (!name) - return -ENOMEM; - snprintf(name, count, "%s", buf); + ret = kstrtoul(buf, 10, &ul); + if (ret) + return ret; - mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + /* convert to int; abort if we lost anything in the conversion */ + dev_id = (int)ul; + if (dev_id != ul) + return -EINVAL; - ret = rbd_header_add_snap(rbd_dev, - name, GFP_KERNEL); - if (ret < 0) - goto err_unlock; + ret = -ENOENT; + spin_lock(&rbd_dev_list_lock); + list_for_each(tmp, &rbd_dev_list) { + rbd_dev = list_entry(tmp, struct rbd_device, node); + if (rbd_dev->dev_id == dev_id) { + ret = 0; + break; + } + } + if (!ret) { + spin_lock_irq(&rbd_dev->lock); + if (rbd_dev->open_count) + ret = -EBUSY; + else + already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, + &rbd_dev->flags); + spin_unlock_irq(&rbd_dev->lock); + } + spin_unlock(&rbd_dev_list_lock); + if (ret < 0 || already) + return ret; - ret = __rbd_refresh_header(rbd_dev); - if (ret < 0) - goto err_unlock; + rbd_dev_header_unwatch_sync(rbd_dev); + /* + * flush remaining watch callbacks - these must be complete + * before the osd_client is shutdown + */ + dout("%s: flushing notifies", __func__); + ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); + + /* + * Don't free anything from rbd_dev->disk until after all + * notifies are completely processed. Otherwise + * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting + * in a potential use after free of rbd_dev->disk or rbd_dev. + */ + rbd_bus_del_dev(rbd_dev); + rbd_dev_image_release(rbd_dev); + module_put(THIS_MODULE); - /* shouldn't hold ctl_mutex when notifying.. notify might - trigger a watch callback that would need to get that mutex */ - mutex_unlock(&ctl_mutex); + return count; +} - /* make a best effort, don't error if failed */ - rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); +static ssize_t rbd_remove(struct bus_type *bus, + const char *buf, + size_t count) +{ + if (single_major) + return -EINVAL; - ret = count; - kfree(name); - return ret; + return do_rbd_remove(bus, buf, count); +} -err_unlock: - mutex_unlock(&ctl_mutex); - kfree(name); - return ret; +static ssize_t rbd_remove_single_major(struct bus_type *bus, + const char *buf, + size_t count) +{ + return do_rbd_remove(bus, buf, count); } /* @@ -2590,30 +5454,113 @@ static void rbd_sysfs_cleanup(void) device_unregister(&rbd_root_dev); } -int __init rbd_init(void) +static int rbd_slab_init(void) +{ + rbd_assert(!rbd_img_request_cache); + rbd_img_request_cache = kmem_cache_create("rbd_img_request", + sizeof (struct rbd_img_request), + __alignof__(struct rbd_img_request), + 0, NULL); + if (!rbd_img_request_cache) + return -ENOMEM; + + rbd_assert(!rbd_obj_request_cache); + rbd_obj_request_cache = kmem_cache_create("rbd_obj_request", + sizeof (struct rbd_obj_request), + __alignof__(struct rbd_obj_request), + 0, NULL); + if (!rbd_obj_request_cache) + goto out_err; + + rbd_assert(!rbd_segment_name_cache); + rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", + CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL); + if (rbd_segment_name_cache) + return 0; +out_err: + if (rbd_obj_request_cache) { + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; + } + + kmem_cache_destroy(rbd_img_request_cache); + rbd_img_request_cache = NULL; + + return -ENOMEM; +} + +static void rbd_slab_exit(void) +{ + rbd_assert(rbd_segment_name_cache); + kmem_cache_destroy(rbd_segment_name_cache); + rbd_segment_name_cache = NULL; + + rbd_assert(rbd_obj_request_cache); + kmem_cache_destroy(rbd_obj_request_cache); + rbd_obj_request_cache = NULL; + + rbd_assert(rbd_img_request_cache); + kmem_cache_destroy(rbd_img_request_cache); + rbd_img_request_cache = NULL; +} + +static int __init rbd_init(void) { int rc; - rc = rbd_sysfs_init(); + if (!libceph_compatible(NULL)) { + rbd_warn(NULL, "libceph incompatibility (quitting)"); + return -EINVAL; + } + + rc = rbd_slab_init(); if (rc) return rc; - pr_info("loaded " RBD_DRV_NAME_LONG "\n"); + + if (single_major) { + rbd_major = register_blkdev(0, RBD_DRV_NAME); + if (rbd_major < 0) { + rc = rbd_major; + goto err_out_slab; + } + } + + rc = rbd_sysfs_init(); + if (rc) + goto err_out_blkdev; + + if (single_major) + pr_info("loaded (major %d)\n", rbd_major); + else + pr_info("loaded\n"); + return 0; + +err_out_blkdev: + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); +err_out_slab: + rbd_slab_exit(); + return rc; } -void __exit rbd_exit(void) +static void __exit rbd_exit(void) { + ida_destroy(&rbd_dev_id_ida); rbd_sysfs_cleanup(); + if (single_major) + unregister_blkdev(rbd_major, RBD_DRV_NAME); + rbd_slab_exit(); } module_init(rbd_init); module_exit(rbd_exit); +MODULE_AUTHOR("Alex Elder <elder@inktank.com>"); MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); -MODULE_DESCRIPTION("rados block device"); - /* following authorship retained from original osdblk.c */ MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); +MODULE_DESCRIPTION("RADOS Block Device (RBD) driver"); MODULE_LICENSE("GPL"); |
