diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2012-10-08 06:38:18 +0900 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2012-10-08 06:38:18 +0900 |
commit | 7035cdf36d5c4d913f68ff97e1c2e5603500d946 (patch) | |
tree | eee5680c16771cb23bc6d3a47bc7b6f350171b22 | |
parent | 6432f2128414edbea5fd4f6c4fa4c28d0e1c6151 (diff) | |
parent | 6285bc231277419255f3498d3eb5ddc9f8e7fe79 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull ceph updates from Sage Weil:
"The bulk of this pull is a series from Alex that refactors and cleans
up the RBD code to lay the groundwork for supporting the new image
format and evolving feature set. There are also some cleanups in
libceph, and for ceph there's fixed validation of file striping
layouts and a bugfix in the code handling a shrinking MDS cluster."
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (71 commits)
ceph: avoid 32-bit page index overflow
ceph: return EIO on invalid layout on GET_DATALOC ioctl
rbd: BUG on invalid layout
ceph: propagate layout error on osd request creation
libceph: check for invalid mapping
ceph: convert to use le32_add_cpu()
ceph: Fix oops when handling mdsmap that decreases max_mds
rbd: update remaining header fields for v2
rbd: get snapshot name for a v2 image
rbd: get the snapshot context for a v2 image
rbd: get image features for a v2 image
rbd: get the object prefix for a v2 rbd image
rbd: add code to get the size of a v2 rbd image
rbd: lay out header probe infrastructure
rbd: encapsulate code that gets snapshot info
rbd: add an rbd features field
rbd: don't use index in __rbd_add_snap_dev()
rbd: kill create_snap sysfs entry
rbd: define rbd_dev_image_id()
rbd: define some new format constants
...
-rw-r--r-- | Documentation/ABI/testing/sysfs-bus-rbd | 18 | ||||
-rw-r--r-- | drivers/block/rbd.c | 1784 | ||||
-rw-r--r-- | drivers/block/rbd_types.h | 27 | ||||
-rw-r--r-- | fs/ceph/addr.c | 19 | ||||
-rw-r--r-- | fs/ceph/caps.c | 2 | ||||
-rw-r--r-- | fs/ceph/file.c | 4 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 8 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 3 | ||||
-rw-r--r-- | fs/ceph/super.c | 37 | ||||
-rw-r--r-- | include/linux/ceph/mon_client.h | 1 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 2 | ||||
-rw-r--r-- | include/linux/ceph/osdmap.h | 6 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 7 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 47 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 18 | ||||
-rw-r--r-- | net/ceph/pagelist.c | 5 |
16 files changed, 1299 insertions, 689 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 3c17b62899f..1cf2adf46b1 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -25,6 +25,10 @@ client_id The ceph unique client id that was assigned for this specific session. +features + + A hexadecimal encoding of the feature bits for this image. + major The block device major number. @@ -33,6 +37,11 @@ name The name of the rbd image. +image_id + + The unique id for the rbd image. (For rbd image format 1 + this is empty.) + pool The name of the storage pool where this rbd image resides. @@ -57,12 +66,6 @@ current_snap The current snapshot for which the device is mapped. -create_snap - - Create a snapshot: - - $ echo <snap-name> > /sys/bus/rbd/devices/<dev-id>/snap_create - snap_* A directory per each snapshot @@ -79,4 +82,7 @@ snap_size The size of the image when this snapshot was taken. +snap_features + + A hexadecimal encoding of the feature bits for this snapshot. diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 54a55f03115..bb3d9be3b1b 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -41,6 +41,8 @@ #include "rbd_types.h" +#define RBD_DEBUG /* Activate rbd_assert() calls */ + /* * The basic unit of block I/O is a sector. It is interpreted in a * number of contexts in Linux (blk, bio, genhd), but the default is @@ -50,16 +52,24 @@ #define SECTOR_SHIFT 9 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT) +/* It might be useful to have this defined elsewhere too */ + +#define U64_MAX ((u64) (~0ULL)) + #define RBD_DRV_NAME "rbd" #define RBD_DRV_NAME_LONG "rbd (rados block device)" #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ #define RBD_MAX_SNAP_NAME_LEN 32 +#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ #define RBD_MAX_OPT_LEN 1024 #define RBD_SNAP_HEAD_NAME "-" +#define RBD_IMAGE_ID_LEN_MAX 64 +#define RBD_OBJ_PREFIX_LEN_MAX 64 + /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. @@ -69,21 +79,22 @@ #define DEV_NAME_LEN 32 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) -#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 +#define RBD_READ_ONLY_DEFAULT false /* * block device image metadata (in-memory version) */ struct rbd_image_header { - u64 image_size; + /* These four fields never change for a given rbd image */ char *object_prefix; + u64 features; __u8 obj_order; __u8 crypt_type; __u8 comp_type; - struct ceph_snap_context *snapc; - size_t snap_names_len; - u32 total_snaps; + /* The remaining fields need to be updated occasionally */ + u64 image_size; + struct ceph_snap_context *snapc; char *snap_names; u64 *snap_sizes; @@ -91,7 +102,7 @@ struct rbd_image_header { }; struct rbd_options { - int notify_timeout; + bool read_only; }; /* @@ -99,7 +110,6 @@ struct rbd_options { */ struct rbd_client { struct ceph_client *client; - struct rbd_options *rbd_opts; struct kref kref; struct list_head node; }; @@ -141,6 +151,16 @@ struct rbd_snap { u64 size; struct list_head node; u64 id; + u64 features; +}; + +struct rbd_mapping { + char *snap_name; + u64 snap_id; + u64 size; + u64 features; + bool snap_exists; + bool read_only; }; /* @@ -151,8 +171,9 @@ struct rbd_device { int major; /* blkdev assigned major */ struct gendisk *disk; /* blkdev's gendisk and rq */ - struct request_queue *q; + u32 image_format; /* Either 1 or 2 */ + struct rbd_options rbd_opts; struct rbd_client *rbd_client; char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ @@ -160,6 +181,8 @@ struct rbd_device { spinlock_t lock; /* queue lock */ struct rbd_image_header header; + char *image_id; + size_t image_id_len; char *image_name; size_t image_name_len; char *header_name; @@ -171,13 +194,8 @@ struct rbd_device { /* protects updating the header */ struct rw_semaphore header_rwsem; - /* name of the snapshot this device reads from */ - char *snap_name; - /* id of the snapshot this device reads from */ - u64 snap_id; /* current snapshot id */ - /* whether the snap_id this device reads from still exists */ - bool snap_exists; - int read_only; + + struct rbd_mapping mapping; struct list_head node; @@ -196,12 +214,10 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock); static LIST_HEAD(rbd_client_list); /* clients */ static DEFINE_SPINLOCK(rbd_client_list_lock); -static int __rbd_init_snaps_header(struct rbd_device *rbd_dev); +static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); +static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); + static void rbd_dev_release(struct device *dev); -static ssize_t rbd_snap_add(struct device *dev, - struct device_attribute *attr, - const char *buf, - size_t count); static void __rbd_remove_snap_dev(struct rbd_snap *snap); static ssize_t rbd_add(struct bus_type *bus, const char *buf, @@ -229,6 +245,18 @@ static struct device rbd_root_dev = { .release = rbd_root_dev_release, }; +#ifdef RBD_DEBUG +#define rbd_assert(expr) \ + if (unlikely(!(expr))) { \ + printk(KERN_ERR "\nAssertion failure in %s() " \ + "at line %d:\n\n" \ + "\trbd_assert(%s);\n\n", \ + __func__, __LINE__, #expr); \ + BUG(); \ + } +#else /* !RBD_DEBUG */ +# define rbd_assert(expr) ((void) 0) +#endif /* !RBD_DEBUG */ static struct device *rbd_get_dev(struct rbd_device *rbd_dev) { @@ -246,11 +274,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) { struct rbd_device *rbd_dev = bdev->bd_disk->private_data; - if ((mode & FMODE_WRITE) && rbd_dev->read_only) + if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) return -EROFS; rbd_get_dev(rbd_dev); - set_device_ro(bdev, rbd_dev->read_only); + set_device_ro(bdev, rbd_dev->mapping.read_only); return 0; } @@ -274,8 +302,7 @@ static const struct block_device_operations rbd_bd_ops = { * Initialize an rbd client instance. * We own *ceph_opts. */ -static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts, - struct rbd_options *rbd_opts) +static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) { struct rbd_client *rbdc; int ret = -ENOMEM; @@ -299,8 +326,6 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts, if (ret < 0) goto out_err; - rbdc->rbd_opts = rbd_opts; - spin_lock(&rbd_client_list_lock); list_add_tail(&rbdc->node, &rbd_client_list); spin_unlock(&rbd_client_list_lock); @@ -322,36 +347,52 @@ out_opt: } /* - * Find a ceph client with specific addr and configuration. + * Find a ceph client with specific addr and configuration. If + * found, bump its reference count. */ -static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts) +static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) { struct rbd_client *client_node; + bool found = false; if (ceph_opts->flags & CEPH_OPT_NOSHARE) return NULL; - list_for_each_entry(client_node, &rbd_client_list, node) - if (!ceph_compare_options(ceph_opts, client_node->client)) - return client_node; - return NULL; + spin_lock(&rbd_client_list_lock); + list_for_each_entry(client_node, &rbd_client_list, node) { + if (!ceph_compare_options(ceph_opts, client_node->client)) { + kref_get(&client_node->kref); + found = true; + break; + } + } + spin_unlock(&rbd_client_list_lock); + + return found ? client_node : NULL; } /* * mount options */ enum { - Opt_notify_timeout, Opt_last_int, /* int args above */ Opt_last_string, /* string args above */ + Opt_read_only, + Opt_read_write, + /* Boolean args above */ + Opt_last_bool, }; static match_table_t rbd_opts_tokens = { - {Opt_notify_timeout, "notify_timeout=%d"}, /* int args above */ /* string args above */ + {Opt_read_only, "mapping.read_only"}, + {Opt_read_only, "ro"}, /* Alternate spelling */ + {Opt_read_write, "read_write"}, + {Opt_read_write, "rw"}, /* Alternate spelling */ + /* Boolean args above */ {-1, NULL} }; @@ -376,16 +417,22 @@ static int parse_rbd_opts_token(char *c, void *private) } else if (token > Opt_last_int && token < Opt_last_string) { dout("got string token %d val %s\n", token, argstr[0].from); + } else if (token > Opt_last_string && token < Opt_last_bool) { + dout("got Boolean token %d\n", token); } else { dout("got token %d\n", token); } switch (token) { - case Opt_notify_timeout: - rbd_opts->notify_timeout = intval; + case Opt_read_only: + rbd_opts->read_only = true; + break; + case Opt_read_write: + rbd_opts->read_only = false; break; default: - BUG_ON(token); + rbd_assert(false); + break; } return 0; } @@ -394,48 +441,33 @@ static int parse_rbd_opts_token(char *c, void *private) * Get a ceph client with specific addr and configuration, if one does * not exist create it. */ -static struct rbd_client *rbd_get_client(const char *mon_addr, - size_t mon_addr_len, - char *options) +static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, + size_t mon_addr_len, char *options) { - struct rbd_client *rbdc; + struct rbd_options *rbd_opts = &rbd_dev->rbd_opts; struct ceph_options *ceph_opts; - struct rbd_options *rbd_opts; - - rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); - if (!rbd_opts) - return ERR_PTR(-ENOMEM); + struct rbd_client *rbdc; - rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; + rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; ceph_opts = ceph_parse_options(options, mon_addr, mon_addr + mon_addr_len, parse_rbd_opts_token, rbd_opts); - if (IS_ERR(ceph_opts)) { - kfree(rbd_opts); - return ERR_CAST(ceph_opts); - } + if (IS_ERR(ceph_opts)) + return PTR_ERR(ceph_opts); - spin_lock(&rbd_client_list_lock); - rbdc = __rbd_client_find(ceph_opts); + rbdc = rbd_client_find(ceph_opts); if (rbdc) { /* using an existing client */ - kref_get(&rbdc->kref); - spin_unlock(&rbd_client_list_lock); - ceph_destroy_options(ceph_opts); - kfree(rbd_opts); - - return rbdc; + } else { + rbdc = rbd_client_create(ceph_opts); + if (IS_ERR(rbdc)) + return PTR_ERR(rbdc); } - spin_unlock(&rbd_client_list_lock); - - rbdc = rbd_client_create(ceph_opts, rbd_opts); + rbd_dev->rbd_client = rbdc; - if (IS_ERR(rbdc)) - kfree(rbd_opts); - - return rbdc; + return 0; } /* @@ -453,7 +485,6 @@ static void rbd_client_release(struct kref *kref) spin_unlock(&rbd_client_list_lock); ceph_destroy_client(rbdc->client); - kfree(rbdc->rbd_opts); kfree(rbdc); } @@ -479,10 +510,38 @@ static void rbd_coll_release(struct kref *kref) kfree(coll); } +static bool rbd_image_format_valid(u32 image_format) +{ + return image_format == 1 || image_format == 2; +} + static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) { - return !memcmp(&ondisk->text, - RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)); + size_t size; + u32 snap_count; + + /* The header has to start with the magic rbd header text */ + if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) + return false; + + /* + * The size of a snapshot header has to fit in a size_t, and + * that limits the number of snapshots. + */ + snap_count = le32_to_cpu(ondisk->snap_count); + size = SIZE_MAX - sizeof (struct ceph_snap_context); + if (snap_count > size / sizeof (__le64)) + return false; + + /* + * Not only that, but the size of the entire the snapshot + * header must also be representable in a size_t. + */ + size -= snap_count * sizeof (__le64); + if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) + return false; + + return true; } /* @@ -490,179 +549,203 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) * header. */ static int rbd_header_from_disk(struct rbd_image_header *header, - struct rbd_image_header_ondisk *ondisk, - u32 allocated_snaps) + struct rbd_image_header_ondisk *ondisk) { u32 snap_count; + size_t len; + size_t size; + u32 i; - if (!rbd_dev_ondisk_valid(ondisk)) - return -ENXIO; + memset(header, 0, sizeof (*header)); snap_count = le32_to_cpu(ondisk->snap_count); - if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context)) - / sizeof (u64)) - return -EINVAL; - header->snapc = kmalloc(sizeof(struct ceph_snap_context) + - snap_count * sizeof(u64), - GFP_KERNEL); - if (!header->snapc) + + len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); + header->object_prefix = kmalloc(len + 1, GFP_KERNEL); + if (!header->object_prefix) return -ENOMEM; + memcpy(header->object_prefix, ondisk->object_prefix, len); + header->object_prefix[len] = '\0'; if (snap_count) { - header->snap_names_len = le64_to_cpu(ondisk->snap_names_len); - header->snap_names = kmalloc(header->snap_names_len, - GFP_KERNEL); + u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); + + /* Save a copy of the snapshot names */ + + if (snap_names_len > (u64) SIZE_MAX) + return -EIO; + header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); if (!header->snap_names) - goto err_snapc; - header->snap_sizes = kmalloc(snap_count * sizeof(u64), - GFP_KERNEL); + goto out_err; + /* + * Note that rbd_dev_v1_header_read() guarantees + * the ondisk buffer we're working with has + * snap_names_len bytes beyond the end of the + * snapshot id array, this memcpy() is safe. + */ + memcpy(header->snap_names, &ondisk->snaps[snap_count], + snap_names_len); + + /* Record each snapshot's size */ + + size = snap_count * sizeof (*header->snap_sizes); + header->snap_sizes = kmalloc(size, GFP_KERNEL); if (!header->snap_sizes) - goto err_names; + goto out_err; + for (i = 0; i < snap_count; i++) + header->snap_sizes[i] = + le64_to_cpu(ondisk->snaps[i].image_size); } else { WARN_ON(ondisk->snap_names_len); - header->snap_names_len = 0; header->snap_names = NULL; header->snap_sizes = NULL; } - header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1, - GFP_KERNEL); - if (!header->object_prefix) - goto err_sizes; - - memcpy(header->object_prefix, ondisk->block_name, - sizeof(ondisk->block_name)); - header->object_prefix[sizeof (ondisk->block_name)] = '\0'; - - header->image_size = le64_to_cpu(ondisk->image_size); + header->features = 0; /* No features support in v1 images */ header->obj_order = ondisk->options.order; header->crypt_type = ondisk->options.crypt_type; header->comp_type = ondisk->options.comp_type; + /* Allocate and fill in the snapshot context */ + + header->image_size = le64_to_cpu(ondisk->image_size); + size = sizeof (struct ceph_snap_context); + size += snap_count * sizeof (header->snapc->snaps[0]); + header->snapc = kzalloc(size, GFP_KERNEL); + if (!header->snapc) + goto out_err; + atomic_set(&header->snapc->nref, 1); header->snapc->seq = le64_to_cpu(ondisk->snap_seq); header->snapc->num_snaps = snap_count; - header->total_snaps = snap_count; - - if (snap_count && allocated_snaps == snap_count) { - int i; - - for (i = 0; i < snap_count; i++) { - header->snapc->snaps[i] = - le64_to_cpu(ondisk->snaps[i].id); - header->snap_sizes[i] = - le64_to_cpu(ondisk->snaps[i].image_size); - } - - /* copy snapshot names */ - memcpy(header->snap_names, &ondisk->snaps[snap_count], - header->snap_names_len); - } + for (i = 0; i < snap_count; i++) + header->snapc->snaps[i] = + le64_to_cpu(ondisk->snaps[i].id); return 0; -err_sizes: +out_err: kfree(header->snap_sizes); header->snap_sizes = NULL; -err_names: kfree(header->snap_names); header->snap_names = NULL; -err_snapc: - kfree(header->snapc); - header->snapc = NULL; + kfree(header->object_prefix); + header->object_prefix = NULL; return -ENOMEM; } -static int snap_by_name(struct rbd_image_header *header, const char *snap_name, - u64 *seq, u64 *size) +static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) { - int i; - char *p = header->snap_names; - for (i = 0; i < header->total_snaps; i++) { - if (!strcmp(snap_name, p)) { + struct rbd_snap *snap; - /* Found it. Pass back its id and/or size */ + list_for_each_entry(snap, &rbd_dev->snaps, node) { + if (!strcmp(snap_name, snap->name)) { + rbd_dev->mapping.snap_id = snap->id; + rbd_dev->mapping.size = snap->size; + rbd_dev->mapping.features = snap->features; - if (seq) - *seq = header->snapc->snaps[i]; - if (size) - *size = header->snap_sizes[i]; - return i; + return 0; } - p += strlen(p) + 1; /* Skip ahead to the next name */ } + return -ENOENT; } -static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size) +static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name) { int ret; - down_write(&rbd_dev->header_rwsem); - - if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME, + if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME, sizeof (RBD_SNAP_HEAD_NAME))) { - rbd_dev->snap_id = CEPH_NOSNAP; - rbd_dev->snap_exists = false; - rbd_dev->read_only = 0; - if (size) - *size = rbd_dev->header.image_size; + rbd_dev->mapping.snap_id = CEPH_NOSNAP; + rbd_dev->mapping.size = rbd_dev->header.image_size; + rbd_dev->mapping.features = rbd_dev->header.features; + rbd_dev->mapping.snap_exists = false; + rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only; + ret = 0; } else { - u64 snap_id = 0; - - ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name, - &snap_id, size); + ret = snap_by_name(rbd_dev, snap_name); if (ret < 0) goto done; - rbd_dev->snap_id = snap_id; - rbd_dev->snap_exists = true; - rbd_dev->read_only = 1; + rbd_dev->mapping.snap_exists = true; + rbd_dev->mapping.read_only = true; } - - ret = 0; + rbd_dev->mapping.snap_name = snap_name; done: - up_write(&rbd_dev->header_rwsem); return ret; } static void rbd_header_free(struct rbd_image_header *header) { kfree(header->object_prefix); + header->object_prefix = NULL; kfree(header->snap_sizes); + header->snap_sizes = NULL; kfree(header->snap_names); + header->snap_names = NULL; ceph_put_snap_context(header->snapc); + header->snapc = NULL; } -/* - * get the actual striped segment name, offset and length - */ -static u64 rbd_get_segment(struct rbd_image_header *header, - const char *object_prefix, - u64 ofs, u64 len, - char *seg_name, u64 *segofs) +static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) +{ + char *name; + u64 segment; + int ret; + + name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); + if (!name) + return NULL; + segment = offset >> rbd_dev->header.obj_order; + ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx", + rbd_dev->header.object_prefix, segment); + if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) { + pr_err("error formatting segment name for #%llu (%d)\n", + segment, ret); + kfree(name); + name = NULL; + } + + return name; +} + +static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { - u64 seg = ofs >> header->obj_order; + u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; - if (seg_name) - snprintf(seg_name, RBD_MAX_SEG_NAME_LEN, - "%s.%012llx", object_prefix, seg); + return offset & (segment_size - 1); +} + +static u64 rbd_segment_length(struct rbd_device *rbd_dev, + u64 offset, u64 length) +{ + u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; - ofs = ofs & ((1 << header->obj_order) - 1); - len = min_t(u64, len, (1 << header->obj_order) - ofs); + offset &= segment_size - 1; - if (segofs) - *segofs = ofs; + rbd_assert(length <= U64_MAX - offset); + if (offset + length > segment_size) + length = segment_size - offset; - return len; + return length; } static int rbd_get_num_segments(struct rbd_image_header *header, u64 ofs, u64 len) { - u64 start_seg = ofs >> header->obj_order; - u64 end_seg = (ofs + len - 1) >> header->obj_order; + u64 start_seg; + u64 end_seg; + + if (!len) + return 0; + if (len - 1 > U64_MAX - ofs) + return -ERANGE; + + start_seg = ofs >> header->obj_order; + end_seg = (ofs + len - 1) >> header->obj_order; + return end_seg - start_seg + 1; } @@ -724,7 +807,9 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next, struct bio_pair **bp, int len, gfp_t gfpmask) { - struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL; + struct bio *old_chain = *old; + struct bio *new_chain = NULL; + struct bio *tail; int total = 0; if (*bp) { @@ -733,9 +818,12 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next, } while (old_chain && (total < len)) { + struct bio *tmp; + tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); if (!tmp) goto err_out; + gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ if (total + old_chain->bi_size > len) { struct bio_pair *bp; @@ -763,24 +851,18 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next, } tmp->bi_bdev = NULL; - gfpmask &= ~__GFP_WAIT; tmp->bi_next = NULL; - - if (!new_chain) { - new_chain = tail = tmp; - } else { + if (new_chain) tail->bi_next = tmp; - tail = tmp; - } + else + new_chain = tmp; + tail = tmp; old_chain = old_chain->bi_next; total += tmp->bi_size; } - BUG_ON(total < len); - - if (tail) - tail->bi_next = NULL; + rbd_assert(total == len); *old = old_chain; @@ -938,8 +1020,9 @@ static int rbd_do_request(struct request *rq, layout->fl_stripe_count = cpu_to_le32(1); layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id); - ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, - req, ops); + ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno, + req, ops); + rbd_assert(ret == 0); ceph_osdc_build_request(req, ofs, &len, ops, @@ -1030,8 +1113,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev, int flags, struct ceph_osd_req_op *ops, const char *object_name, - u64 ofs, u64 len, - char *buf, + u64 ofs, u64 inbound_size, + char *inbound, struct ceph_osd_request **linger_req, u64 *ver) { @@ -1039,15 +1122,15 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev, struct page **pages; int num_pages; - BUG_ON(ops == NULL); + rbd_assert(ops != NULL); - num_pages = calc_pages_for(ofs , len); + num_pages = calc_pages_for(ofs, inbound_size); pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) return PTR_ERR(pages); ret = rbd_do_request(NULL, rbd_dev, snapc, snapid, - object_name, ofs, len, NULL, + object_name, ofs, inbound_size, NULL, pages, num_pages, flags, ops, @@ -1057,8 +1140,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev, if (ret < 0) goto done; - if ((flags & CEPH_OSD_FLAG_READ) && buf) - ret = ceph_copy_from_page_vector(pages, buf, ofs, ret); + if ((flags & CEPH_OSD_FLAG_READ) && inbound) + ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret); done: ceph_release_page_vector(pages, num_pages); @@ -1085,14 +1168,11 @@ static int rbd_do_op(struct request *rq, struct ceph_osd_req_op *ops; u32 payload_len; - seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO); + seg_name = rbd_segment_name(rbd_dev, ofs); if (!seg_name) return -ENOMEM; - - seg_len = rbd_get_segment(&rbd_dev->header, - rbd_dev->header.object_prefix, - ofs, len, - seg_name, &seg_ofs); + seg_len = rbd_segment_length(rbd_dev, ofs, len); + seg_ofs = rbd_segment_offset(rbd_dev, ofs); payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0); @@ -1104,7 +1184,7 @@ static int rbd_do_op(struct request *rq, /* we've taken care of segment sizes earlier when we cloned the bios. We should never have a segment truncated at this point */ - BUG_ON(seg_len < len); + rbd_assert(seg_len == len); ret = rbd_do_request(rq, rbd_dev, snapc, snapid, seg_name, seg_ofs, seg_len, @@ -1306,89 +1386,36 @@ static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev) return ret; } -struct rbd_notify_info { - struct rbd_device *rbd_dev; -}; - -static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) -{ - struct rbd_device *rbd_dev = (struct rbd_device *)data; - if (!rbd_dev) - return; - - dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n", - rbd_dev->header_name, (unsigned long long) notify_id, - (unsigned int) opcode); -} - /* - * Request sync osd notify - */ -static int rbd_req_sync_notify(struct rbd_device *rbd_dev) -{ - struct ceph_osd_req_op *ops; - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct ceph_osd_event *event; - struct rbd_notify_info info; - int payload_len = sizeof(u32) + sizeof(u32); - int ret; - - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len); - if (!ops) - return -ENOMEM; - - info.rbd_dev = rbd_dev; - - ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, - (void *)&info, &event); - if (ret < 0) - goto fail; - - ops[0].watch.ver = 1; - ops[0].watch.flag = 1; - ops[0].watch.cookie = event->cookie; - ops[0].watch.prot_ver = RADOS_NOTIFY_VER; - ops[0].watch.timeout = 12; - - ret = rbd_req_sync_op(rbd_dev, NULL, - CEPH_NOSNAP, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - rbd_dev->header_name, - 0, 0, NULL, NULL, NULL); - if (ret < 0) - goto fail_event; - - ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); - dout("ceph_osdc_wait_event returned %d\n", ret); - rbd_destroy_ops(ops); - return 0; - -fail_event: - ceph_osdc_cancel_event(event); -fail: - rbd_destroy_ops(ops); - return ret; -} - -/* - * Request sync osd read + * Synchronous osd object method call */ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, const char *object_name, const char *class_name, const char *method_name, - const char *data, - int len, + const char *outbound, + size_t outbound_size, + char *inbound, + size_t inbound_size, + int flags, u64 *ver) { struct ceph_osd_req_op *ops; int class_name_len = strlen(class_name); int method_name_len = strlen(method_name); + int payload_size; int ret; - ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, - class_name_len + method_name_len + len); + /* + * Any input parameters required by the method we're calling + * will be sent along with the class and method names as + * part of the message payload. That data and its size are + * supplied via the indata and indata_len fields (named from + * the perspective of the server side) in the OSD request + * operation. + */ + payload_size = class_name_len + method_name_len + outbound_size; + ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size); if (!ops) return -ENOMEM; @@ -1397,14 +1424,14 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev, ops[0].cls.method_name = method_name; ops[0].cls.method_len = (__u8) method_name_len; ops[0].cls.argc = 0; - ops[0].cls.indata = data; - ops[0].cls.indata_len = len; + ops[0].cls.indata = outbound; + ops[0].cls.indata_len = outbound_size; ret = rbd_req_sync_op(rbd_dev, NULL, CEPH_NOSNAP, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - ops, - object_name, 0, 0, NULL, NULL, ver); + flags, ops, + object_name, 0, inbound_size, inbound, + NULL, ver); rbd_destroy_ops(ops); @@ -1446,10 +1473,6 @@ static void rbd_rq_fn(struct request_queue *q) struct rbd_req_coll *coll; struct ceph_snap_context *snapc; - /* peek at request from block layer */ - if (!rq) - |