diff options
-rw-r--r-- | Documentation/ABI/testing/sysfs-bus-rbd | 2 | ||||
-rw-r--r-- | drivers/block/rbd.c | 361 | ||||
-rw-r--r-- | fs/ceph/debugfs.c | 6 | ||||
-rw-r--r-- | fs/ceph/dir.c | 24 | ||||
-rw-r--r-- | fs/ceph/file.c | 10 | ||||
-rw-r--r-- | fs/ceph/inode.c | 25 | ||||
-rw-r--r-- | fs/ceph/super.c | 9 | ||||
-rw-r--r-- | fs/ceph/super.h | 66 | ||||
-rw-r--r-- | include/linux/ceph/ceph_fs.h | 19 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 1 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 57 | ||||
-rw-r--r-- | include/linux/ceph/rados.h | 39 | ||||
-rw-r--r-- | net/ceph/armor.c | 4 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 1 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 624 |
15 files changed, 1018 insertions, 230 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 90a87e2a572..fa72ccb2282 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -1,6 +1,6 @@ What: /sys/bus/rbd/ Date: November 2010 -Contact: Yehuda Sadeh <yehuda@hq.newdream.net>, +Contact: Yehuda Sadeh <yehuda@newdream.net>, Sage Weil <sage@newdream.net> Description: diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e1e38b11f48..16dc3645291 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/decode.h> +#include <linux/parser.h> #include <linux/kernel.h> #include <linux/device.h> @@ -54,6 +55,8 @@ #define DEV_NAME_LEN 32 +#define RBD_NOTIFY_TIMEOUT_DEFAULT 10 + /* * block device image metadata (in-memory version) */ @@ -71,6 +74,12 @@ struct rbd_image_header { char *snap_names; u64 *snap_sizes; + + u64 obj_version; +}; + +struct rbd_options { + int notify_timeout; }; /* @@ -78,6 +87,7 @@ struct rbd_image_header { */ struct rbd_client { struct ceph_client *client; + struct rbd_options *rbd_opts; struct kref kref; struct list_head node; }; @@ -124,6 +134,9 @@ struct rbd_device { char pool_name[RBD_MAX_POOL_NAME_LEN]; int poolid; + struct ceph_osd_event *watch_event; + struct ceph_osd_request *watch_request; + char snap_name[RBD_MAX_SNAP_NAME_LEN]; u32 cur_snap; /* index+1 of current snapshot within snap context 0 - for the head */ @@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev) put_device(&rbd_dev->dev); } +static int __rbd_update_snaps(struct rbd_device *rbd_dev); + static int rbd_open(struct block_device *bdev, fmode_t mode) { struct gendisk *disk = bdev->bd_disk; @@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = { * Initialize an rbd client instance. * We own *opt. */ -static struct rbd_client *rbd_client_create(struct ceph_options *opt) +static struct rbd_client *rbd_client_create(struct ceph_options *opt, + struct rbd_options *rbd_opts) { struct rbd_client *rbdc; int ret = -ENOMEM; @@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt) if (ret < 0) goto out_err; + rbdc->rbd_opts = rbd_opts; + spin_lock(&node_lock); list_add_tail(&rbdc->node, &rbd_client_list); spin_unlock(&node_lock); @@ -267,6 +285,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt) } /* + * mount options + */ +enum { + Opt_notify_timeout, + Opt_last_int, + /* int args above */ + Opt_last_string, + /* string args above */ +}; + +static match_table_t rbdopt_tokens = { + {Opt_notify_timeout, "notify_timeout=%d"}, + /* int args above */ + /* string args above */ + {-1, NULL} +}; + +static int parse_rbd_opts_token(char *c, void *private) +{ + struct rbd_options *rbdopt = private; + substring_t argstr[MAX_OPT_ARGS]; + int token, intval, ret; + + token = match_token((char *)c, rbdopt_tokens, argstr); + if (token < 0) + return -EINVAL; + + if (token < Opt_last_int) { + ret = match_int(&argstr[0], &intval); + if (ret < 0) { + pr_err("bad mount option arg (not int) " + "at '%s'\n", c); + return ret; + } + dout("got int token %d val %d\n", token, intval); + } else if (token > Opt_last_int && token < Opt_last_string) { + dout("got string token %d val %s\n", token, + argstr[0].from); + } else { + dout("got token %d\n", token); + } + + switch (token) { + case Opt_notify_timeout: + rbdopt->notify_timeout = intval; + break; + default: + BUG_ON(token); + } + return 0; +} + +/* * Get a ceph client with specific addr and configuration, if one does * not exist create it. */ @@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, struct rbd_client *rbdc; struct ceph_options *opt; int ret; + struct rbd_options *rbd_opts; + + rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL); + if (!rbd_opts) + return -ENOMEM; + + rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT; ret = ceph_parse_options(&opt, options, mon_addr, - mon_addr + strlen(mon_addr), NULL, NULL); + mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts); if (ret < 0) - return ret; + goto done_err; spin_lock(&node_lock); rbdc = __rbd_client_find(opt); @@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr, } spin_unlock(&node_lock); - rbdc = rbd_client_create(opt); - if (IS_ERR(rbdc)) - return PTR_ERR(rbdc); + rbdc = rbd_client_create(opt, rbd_opts); + if (IS_ERR(rbdc)) { + ret = PTR_ERR(rbdc); + goto done_err; + } rbd_dev->rbd_client = rbdc; rbd_dev->client = rbdc->client; return 0; +done_err: + kfree(rbd_opts); + return ret; } /* @@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref) spin_unlock(&node_lock); ceph_destroy_client(rbdc->client); + kfree(rbdc->rbd_opts); kfree(rbdc); } @@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq, struct ceph_osd_req_op *ops, int num_reply, void (*rbd_cb)(struct ceph_osd_request *req, - struct ceph_msg *msg)) + struct ceph_msg *msg), + struct ceph_osd_request **linger_req, + u64 *ver) { struct ceph_osd_request *req; struct ceph_file_layout *layout; @@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq, req->r_oid, req->r_oid_len); up_read(&header->snap_rwsem); + if (linger_req) { + ceph_osdc_set_request_linger(&dev->client->osdc, req); + *linger_req = req; + } + ret = ceph_osdc_start_request(&dev->client->osdc, req, false); if (ret < 0) goto done_err; if (!rbd_cb) { ret = ceph_osdc_wait_request(&dev->client->osdc, req); + if (ver) + *ver = le64_to_cpu(req->r_reassert_version.version); + dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version)); ceph_osdc_put_request(req); } return ret; @@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) kfree(req_data); } +static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg) +{ + ceph_osdc_put_request(req); +} + /* * Do a synchronous ceph osd operation */ @@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev, int num_reply, const char *obj, u64 ofs, u64 len, - char *buf) + char *buf, + struct ceph_osd_request **linger_req, + u64 *ver) { int ret; struct page **pages; @@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev, flags, ops, 2, - NULL); + NULL, + linger_req, ver); if (ret < 0) goto done_ops; @@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq, flags, ops, num_reply, - rbd_req_cb); + rbd_req_cb, 0, NULL); done: kfree(seg_name); return ret; @@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev, u64 snapid, const char *obj, u64 ofs, u64 len, - char *buf) + char *buf, + u64 *ver) { return rbd_req_sync_op(dev, NULL, (snapid ? snapid : CEPH_NOSNAP), CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, - 1, obj, ofs, len, buf); + 1, obj, ofs, len, buf, NULL, ver); } /* - * Request sync osd read + * Request sync osd watch + */ +static int rbd_req_sync_notify_ack(struct rbd_device *dev, + u64 ver, + u64 notify_id, + const char *obj) +{ + struct ceph_osd_req_op *ops; + struct page **pages = NULL; + int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0); + if (ret < 0) + return ret; + + ops[0].watch.ver = cpu_to_le64(dev->header.obj_version); + ops[0].watch.cookie = notify_id; + ops[0].watch.flag = 0; + + ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP, + obj, 0, 0, NULL, + pages, 0, + CEPH_OSD_FLAG_READ, + ops, + 1, + rbd_simple_req_cb, 0, NULL); + + rbd_destroy_ops(ops); + return ret; +} + +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +{ + struct rbd_device *dev = (struct rbd_device *)data; + if (!dev) + return; + + dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, + notify_id, (int)opcode); + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + __rbd_update_snaps(dev); + mutex_unlock(&ctl_mutex); + + rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name); +} + +/* + * Request sync osd watch + */ +static int rbd_req_sync_watch(struct rbd_device *dev, + const char *obj, + u64 ver) +{ + struct ceph_osd_req_op *ops; + struct ceph_osd_client *osdc = &dev->client->osdc; + + int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0); + if (ret < 0) + return ret; + + ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, + (void *)dev, &dev->watch_event); + if (ret < 0) + goto fail; + + ops[0].watch.ver = cpu_to_le64(ver); + ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie); + ops[0].watch.flag = 1; + + ret = rbd_req_sync_op(dev, NULL, + CEPH_NOSNAP, + 0, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + ops, + 1, obj, 0, 0, NULL, + &dev->watch_request, NULL); + + if (ret < 0) + goto fail_event; + + rbd_destroy_ops(ops); + return 0; + +fail_event: + ceph_osdc_cancel_event(dev->watch_event); + dev->watch_event = NULL; +fail: + rbd_destroy_ops(ops); + return ret; +} + +struct rbd_notify_info { + struct rbd_device *dev; +}; + +static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +{ + struct rbd_device *dev = (struct rbd_device *)data; + if (!dev) + return; + + dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name, + notify_id, (int)opcode); +} + +/* + * Request sync osd notify + */ +static int rbd_req_sync_notify(struct rbd_device *dev, + const char *obj) +{ + struct ceph_osd_req_op *ops; + struct ceph_osd_client *osdc = &dev->client->osdc; + struct ceph_osd_event *event; + struct rbd_notify_info info; + int payload_len = sizeof(u32) + sizeof(u32); + int ret; + + ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len); + if (ret < 0) + return ret; + + info.dev = dev; + + ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1, + (void *)&info, &event); + if (ret < 0) + goto fail; + + ops[0].watch.ver = 1; + ops[0].watch.flag = 1; + ops[0].watch.cookie = event->cookie; + ops[0].watch.prot_ver = RADOS_NOTIFY_VER; + ops[0].watch.timeout = 12; + + ret = rbd_req_sync_op(dev, NULL, + CEPH_NOSNAP, + 0, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + ops, + 1, obj, 0, 0, NULL, NULL, NULL); + if (ret < 0) + goto fail_event; + + ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT); + dout("ceph_osdc_wait_event returned %d\n", ret); + rbd_destroy_ops(ops); + return 0; + +fail_event: + ceph_osdc_cancel_event(event); +fail: + rbd_destroy_ops(ops); + return ret; +} + +/* + * Request sync osd rollback */ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, u64 snapid, @@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev, 0, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, ops, - 1, obj, 0, 0, NULL); + 1, obj, 0, 0, NULL, NULL, NULL); rbd_destroy_ops(ops); - if (ret < 0) - return ret; - return ret; } @@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev, const char *cls, const char *method, const char *data, - int len) + int len, + u64 *ver) { struct ceph_osd_req_op *ops; int cls_len = strlen(cls); @@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev, 0, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, ops, - 1, obj, 0, 0, NULL); + 1, obj, 0, 0, NULL, NULL, ver); rbd_destroy_ops(ops); @@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, struct rbd_image_header_ondisk *dh; int snap_count = 0; u64 snap_names_len = 0; + u64 ver; while (1) { int len = sizeof(*dh) + @@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, NULL, CEPH_NOSNAP, rbd_dev->obj_md_name, 0, len, - (char *)dh); + (char *)dh, &ver); if (rc < 0) goto out_dh; @@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev, } break; } + header->obj_version = ver; out_dh: kfree(dh); @@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, u64 new_snapid; int ret; void *data, *data_start, *data_end; + u64 ver; /* we should create a snapshot only if we're pointing at the head */ if (dev->cur_snap) @@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev, ceph_encode_64_safe(&data, data_end, new_snapid, bad); ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add", - data_start, data - data_start); + data_start, data - data_start, &ver); kfree(data_start); @@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) int ret; struct rbd_image_header h; u64 snap_seq; + int follow_seq = 0; ret = rbd_read_header(rbd_dev, &h); if (ret < 0) @@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) down_write(&rbd_dev->header.snap_rwsem); snap_seq = rbd_dev->header.snapc->seq; + if (rbd_dev->header.total_snaps && + rbd_dev->header.snapc->snaps[0] == snap_seq) + /* pointing at the head, will need to follow that + if head moves */ + follow_seq = 1; kfree(rbd_dev->header.snapc); kfree(rbd_dev->header.snap_names); @@ -1277,7 +1542,10 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev) rbd_dev->header.snap_names = h.snap_names; rbd_dev->header.snap_names_len = h.snap_names_len; rbd_dev->header.snap_sizes = h.snap_sizes; - rbd_dev->header.snapc->seq = snap_seq; + if (follow_seq) + rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0]; + else + rbd_dev->header.snapc->seq = snap_seq; ret = __rbd_init_snaps_header(rbd_dev); @@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev) device_unregister(&rbd_dev->dev); } -static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) +static int rbd_init_watch_dev(struct rbd_device *rbd_dev) +{ + int ret, rc; + + do { + ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name, + rbd_dev->header.obj_version); + if (ret == -ERANGE) { + mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); + rc = __rbd_update_snaps(rbd_dev); + mutex_unlock(&ctl_mutex); + if (rc < 0) + return rc; + } + } while (ret == -ERANGE); + + return ret; +} + +static ssize_t rbd_add(struct bus_type *bus, + const char *buf, + size_t count) { struct ceph_osd_client *osdc; struct rbd_device *rbd_dev; @@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count) if (rc) goto err_out_bus; + rc = rbd_init_watch_dev(rbd_dev); + if (rc) + goto err_out_bus; + return count; err_out_bus: @@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev) struct rbd_device *rbd_dev = container_of(dev, struct rbd_device, dev); + if (rbd_dev->watch_request) + ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc, + rbd_dev->watch_request); + if (rbd_dev->watch_event) + ceph_osdc_cancel_event(rbd_dev->watch_event); + rbd_put_client(rbd_dev); /* clean up and free blkdev */ @@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev, ret = rbd_header_add_snap(rbd_dev, name, GFP_KERNEL); if (ret < 0) - goto done_unlock; + goto err_unlock; ret = __rbd_update_snaps(rbd_dev); if (ret < 0) - goto done_unlock; + goto err_unlock; + + /* shouldn't hold ctl_mutex when notifying.. notify might + trigger a watch callback that would need to get that mutex */ + mutex_unlock(&ctl_mutex); + + /* make a best effort, don't error if failed */ + rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name); ret = count; -done_unlock: + kfree(name); + return ret; + +err_unlock: mutex_unlock(&ctl_mutex); kfree(name); return ret; diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 08f65faac11..0dba6915712 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -210,8 +210,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_congestion_kb) goto out; - dout("a\n"); - snprintf(name, sizeof(name), "../../bdi/%s", dev_name(fsc->backing_dev_info.dev)); fsc->debugfs_bdi = @@ -221,7 +219,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_bdi) goto out; - dout("b\n"); fsc->debugfs_mdsmap = debugfs_create_file("mdsmap", 0600, fsc->client->debugfs_dir, @@ -230,7 +227,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_mdsmap) goto out; - dout("ca\n"); fsc->debugfs_mdsc = debugfs_create_file("mdsc", 0600, fsc->client->debugfs_dir, @@ -239,7 +235,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_mdsc) goto out; - dout("da\n"); fsc->debugfs_caps = debugfs_create_file("caps", 0400, fsc->client->debugfs_dir, @@ -248,7 +243,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc) if (!fsc->debugfs_caps) goto out; - dout("ea\n"); fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru", 0600, fsc->client->debugfs_dir, diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index ebafa65a29b..1a867a3601a 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -161,7 +161,7 @@ more: filp->f_pos = di->offset; err = filldir(dirent, dentry->d_name.name, dentry->d_name.len, di->offset, - dentry->d_inode->i_ino, + ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino), dentry->d_inode->i_mode >> 12); if (last) { @@ -245,15 +245,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir) dout("readdir off 0 -> '.'\n"); if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0), - inode->i_ino, inode->i_mode >> 12) < 0) + ceph_translate_ino(inode->i_sb, inode->i_ino), + inode->i_mode >> 12) < 0) return 0; filp->f_pos = 1; off = 1; } if (filp->f_pos == 1) { + ino_t ino = filp->f_dentry->d_parent->d_inode->i_ino; dout("readdir off 1 -> '..'\n"); if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1), - filp->f_dentry->d_parent->d_inode->i_ino, + ceph_translate_ino(inode->i_sb, ino), inode->i_mode >> 12) < 0) return 0; filp->f_pos = 2; @@ -377,7 +379,8 @@ more: if (filldir(dirent, rinfo->dir_dname[off - fi->offset], rinfo->dir_dname_len[off - fi->offset], - pos, ino, ftype) < 0) { + pos, + ceph_translate_ino(inode->i_sb, ino), ftype) < 0) { dout("filldir stopping us...\n"); return 0; } @@ -1024,14 +1027,13 @@ out_touch: } /* - * When a dentry is released, clear the dir I_COMPLETE if it was part - * of the current dir gen or if this is in the snapshot namespace. + * Release our ceph_dentry_info. */ -static void ceph_dentry_release(struct dentry *dentry) +static void ceph_d_release(struct dentry *dentry) { struct ceph_dentry_info *di = ceph_dentry(dentry); - dout("dentry_release %p\n", dentry); + dout("d_release %p\n", dentry); if (di) { ceph_dentry_lru_del(dentry); if (di->lease_session) @@ -1256,14 +1258,14 @@ const struct inode_operations ceph_dir_iops = { const struct dentry_operations ceph_dentry_ops = { .d_revalidate = ceph_d_revalidate, - .d_release = ceph_dentry_release, + .d_release = ceph_d_release, }; const struct dentry_operations ceph_snapdir_dentry_ops = { .d_revalidate = ceph_snapdir_d_revalidate, - .d_release = ceph_dentry_release, + .d_release = ceph_d_release, }; const struct dentry_operations ceph_snap_dentry_ops = { - .d_release = ceph_dentry_release, + .d_release = ceph_d_release, }; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 7d0e4a82d89..159b512d5a2 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -564,11 +564,19 @@ more: * start_request so that a tid has been assigned. */ spin_lock(&ci->i_unsafe_lock); - list_add(&req->r_unsafe_item, &ci->i_unsafe_writes); + list_add_tail(&req->r_unsafe_item, + &ci->i_unsafe_writes); spin_unlock(&ci->i_unsafe_lock); ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); } + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + if (ret < 0 && req->r_safe_callback) { + spin_lock(&ci->i_unsafe_lock); + list_del_init(&req->r_unsafe_item); + spin_unlock(&ci->i_unsafe_lock); + ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); + } } if (file->f_flags & O_DIRECT) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 193bfa5e9cb..b54c97da1c4 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -36,6 +36,13 @@ static void ceph_vmtruncate_work(struct work_struct *work); /* * find or create an inode, given the ceph ino number */ +static int ceph_set_ino_cb(struct inode *inode, void *data) +{ + ceph_inode(inode)->i_vino = *(struct ceph_vino *)data; + inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data); + return 0; +} + struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino) { struct inode *inode; @@ -1030,9 +1037,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); - /* d_move screws up d_subdirs order */ - ceph_i_clear(dir, CEPH_I_COMPLETE); - d_move(req->r_old_dentry, dn); dout(" src %p '%.*s' dst %p '%.*s'\n", req->r_old_dentry, @@ -1044,12 +1048,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(dn); - /* take overwritten dentry's readdir offset */ - dout("dn %p gets %p offset %lld (old offset %lld)\n", - req->r_old_dentry, dn, ceph_dentry(dn)->offset, + /* + * d_move() puts the renamed dentry at the end of + * d_subdirs. We need to assign it an appropriate + * directory offset so we can behave when holding + * I_COMPLETE. + */ + ceph_set_dentry_offset(req->r_old_dentry); + dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); - ceph_dentry(req->r_old_dentry)->offset = - ceph_dentry(dn)->offset; dn = req->r_old_dentry; /* use old_dentry */ in = dn->d_inode; @@ -1809,7 +1816,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL); if (!err) { generic_fillattr(inode, stat); - stat->ino = inode->i_ino; + stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); if (ceph_snap(inode) != CEPH_NOSNAP) stat->dev = ceph_snap(inode); else diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 9c5085465a6..a9e78b4a258 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -131,6 +131,7 @@ enum { Opt_rbytes, Opt_norbytes, Opt_noasyncreaddir, + Opt_ino32, }; static match_table_t fsopt_tokens = { @@ -150,6 +151,7 @@ static match_table_t fsopt_tokens = { {Opt_rbytes, "rbytes"}, {Opt_norbytes, "norbytes"}, {Opt_noasyncreaddir, "noasyncreaddir"}, + {Opt_ino32, "ino32"}, {-1, NULL} }; @@ -225,6 +227,9 @@ static int parse_fsopt_token(char *c, void *private) case Opt_noasyncreaddir: fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR; break; + case Opt_ino32: + fsopt->flags |= CEPH_MOUNT_OPT_INO32; + break; default: BUG_ON(token); } @@ -288,7 +293,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, fsopt->sb_flags = flags; fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; - fsopt->rsize = CEPH_MOUNT_RSIZE_DEFAULT; + fsopt->rsize = CEPH_RSIZE_DEFAULT; fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; @@ -370,7 +375,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt) if (fsopt->wsize) seq_printf(m, ",wsize=%d", fsopt->wsize); - if (fsopt->rsize != CEPH_MOUNT_RSIZE_DEFAULT) + if (fsopt->rsize != CEPH_RSIZE_DEFAULT) seq_printf(m, ",rsize=%d", fsopt->rsize); if (fsopt->congestion_kb != default_congestion_kb()) seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 20b907d76ae..619fe719968 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -27,6 +27,7 @@ #define CEPH_MOUNT_OPT_DIRSTAT (1<<4) /* `cat dirname` for stats */ #define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */ #define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */ +#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES) @@ -35,6 +36,7 @@ #define ceph_test_mount_opt(fsc, opt) \ (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) +#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */ #define CEPH_MAX_READDIR_DEFAULT 1024 #define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_SNAPDIRNAME_DEFAULT ".snap" @@ -319,6 +321,16 @@ static inline struct ceph_inode_info *ceph_inode(struct inode *inode) return container_of(inode, struct ceph_inode_info, vfs_inode); } +static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode) +{ + return (struct ceph_fs_client *)inode->i_sb->s_fs_info; +} + +static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb) +{ + return (struct ceph_fs_client *)sb->s_fs_info; +} + static inline struct ceph_vino ceph_vino(struct inode *inode) { return ceph_inode(inode)->i_vino; @@ -327,19 +339,49 @@ static inline struct ceph_vino ceph_vino(struct inode *inode) /* * ino_t is <64 bits on many architectures, blech. * - * don't include snap in ino hash, at least for now. + * i_ino (kernel inode) st_ino (userspace) + * i386 32 32 + * x86_64+ino32 64 32 + * x86_64 64 64 + */ +static inline u32 ceph_ino_to_ino32(ino_t ino) +{ + ino ^= ino >> (sizeof(ino) * 8 - 32); + if (!ino) + ino = 1; + return ino; +} + +/* + * kernel i_ino value */ static inline ino_t ceph_vino_to_ino(struct ceph_vino vino) { ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */ #if BITS_PER_LONG == 32 - ino ^= vino.ino >> (sizeof(u64)-sizeof(ino_t)) * 8; - if (!ino) - ino = 1; + ino = ceph_ino_to_ino32(ino); #endif return ino; } +/* + * user-visible ino (stat, filldir) + */ +#if BITS_PER_LONG == 32 +static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino) +{ + return ino; +} +#else +static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino) +{ + if (ceph_test_mount_opt(ceph_sb_to_client(sb), INO32)) + ino = ceph_ino_to_ino32(ino); + return ino; +} +#endif + + /* for printf-style formatting */ #define ceph_vinop(i) ceph_inode(i)->i_vino.ino, ceph_inode(i)->i_vino.snap @@ -428,13 +470,6 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off) return ((loff_t)frag << 32) | (loff_t)off; } -static inline int ceph_set_ino_cb(struct inode *inode, void *data) -{ - ceph_inode(inode)->i_vino = *(struct ceph_vino *)data; - inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data); - return 0; -} - /* * caps helpers */ @@ -503,15 +538,6 @@ extern void ceph_reservation_status(struct ceph_fs_client *client, int *total, int *avail, int *used, int *reserved, int *min); -static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode) -{ - return (struct ceph_fs_client *)inode->i_sb->s_fs_info; -} - -static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb) -{ - return (struct ceph_fs_client *)sb->s_fs_info; -} /* diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 09dcc0c2ffd..b8e995fbd86 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -136,9 +136,18 @@ struct ceph_dir_layout { /* osd */ -#define CEPH_MSG_OSD_MAP 41 -#define CEPH_MSG_OSD_OP 42 -#define CEPH_MSG_OSD_OPREPLY 43 +#define CEPH_MSG_OSD_MAP 41 +#define CEPH_MSG_OSD_OP 42 +#define CEPH_MSG_OSD_OPREPLY 43 +#define CEPH_MSG_WATCH_NOTIFY 44 + + +/* watch-notify operations */ +enum { + WATCH_NOTIFY = 1, /* notifying watcher */ + WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */ +}; + /* pool operations */ enum { @@ -213,8 +222,10 @@ struct ceph_client_mount { struct ceph_mon_request_header monhdr; } __attribute__ ((packed)); +#define CEPH_SUBSCRIBE_ONETIME 1 /* i want only 1 update after have */ + struct ceph_mon_subscribe_item { - __le64 have_version; __le64 have; + __le64 have_version; __le64 have; __u8 onetime; } __attribute__ ((packed)); diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 72c72bfccb8..0d2e0fffb47 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -71,7 +71,6 @@ struct ceph_options { #define CEPH_OSD_TIMEOUT_DEFAULT 60 /* seconds */ #define CEPH_OSD_KEEPALIVE_DEFAULT 5 #define CEPH_OSD_IDLE_TTL_DEFAULT 60 -#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */ #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index a1af29648fb..f88eacb111d 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -32,6 +32,7 @@ struct ceph_osd { struct rb_node o_node; struct ceph_connection o_con; struct list_head o_requests; + struct list_head o_linger_requests; struct list_head o_osd_lru; struct ceph_authorizer *o_authorizer; void *o_authorizer_buf, *o_authorizer_reply_buf; @@ -47,6 +48,8 @@ struct ceph_osd_request { struct rb_node r_node; struct list_head r_req_lru_item; struct list_head r_osd_item; + struct list_head r_linger_item; + struct list_head r_linger_osd; struct ceph_osd *r_osd; struct ceph_pg r_pgid; int r_pg_osds[CEPH_PG_MAX_SIZE]; @@ -59,6 +62,7 @@ struct ceph_osd_request { int r_flags; /* any additional flags for the osd */ u32 r_sent; /* >0 if r_request is sending/sent */ int r_got_reply; + int r_linger; struct ceph_osd_client *r_osdc; struct kref r_kref; @@ -74,7 +78,6 @@ struct ceph_osd_request { char r_oid[40]; /* object name */ int r_oid_len; unsigned long r_stamp; /* send OR check time */ - bool r_resend; /* msg send failed, needs retry */ struct ceph_file_layout r_file_layout; struct ceph_snap_context *r_snapc; /* snap context for writes */ @@ -90,6 +93,26 @@ struct ceph_osd_request { struct ceph_pagelist *r_trail; /* trailing part of the data */ }; +struct ceph_osd_event { + u64 cookie; + int one_shot; + struct ceph_osd_client *osdc; + void (*cb)(u64, u64, u8, void *); + void *data; + struct rb_node node; + struct list_head osd_node; + struct kref kref; + struct completion completion; +}; + +struct ceph_osd_event_work { + struct work_struct work; + struct ceph_osd_event *event; + u64 ver; + u64 notify_id; + u8 opcode; +}; + struct ceph_osd_client { struct ceph_client *client; @@ -104,7 +127,10 @@ struct ceph_osd_client { u64 timeout_tid; /* tid of timeout triggering rq */ u64 last_tid; /* tid of last request */ struct rb_root requests; /* pending requests */ - struct list_head req_lru; /* pending requests lru */ + struct list_head req_lru; /* in-flight lru */ + struct list_head req_unsent; /* unsent/need-resend queue */ + struct list_head req_notarget; /* map to no osd */ + struct list_head req_linger; /* lingering requests */ int num_requests; struct delayed_work timeout_work; struct delayed_work osds_timeout_work; @@ -116,6 +142,12 @@ struct ceph_osd_client { struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op_reply; + + spinlock_t event_lock; + struct rb_root event_tree; + u64 event_count; + + struct workqueue_struct *notify_wq; }; struct ceph_osd_req_op { @@ -150,6 +182,13 @@ struct ceph_osd_req_op { struct { u64 snapid; } snap; + struct { + u64 cookie; + u64 ver; + __u8 flag; + u32 prot_ver; + u32 timeout; + } watch; }; u32 payload_len; }; @@ -198,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, bool use_mempool, int num_reply, int page_align); +extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); + static inline void ceph_osdc_get_request(struct ceph_osd_request *req) { kref_get(&req->r_kref); @@ -233,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct page **pages, int nr_pages, int flags, int do_sync, bool nofail); +/* watch/notify events */ +extern int ceph_osdc_create_event(struct ceph_osd_client *osdc, + void (*event_cb)(u64, u64, u8, void *), + int one_shot, void *data, + struct ceph_osd_event **pevent); +extern void ceph_osdc_cancel_event(struct ceph_osd_event *event); +extern int ceph_osdc_wait_event(struct ceph_osd_event *event, + unsigned long timeout); +extern void ceph_osdc_put_event(struct ceph_osd_event *event); #endif diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 6d5247f2e81..0a99099801a 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -12,9 +12,9 @@ * osdmap encoding versions */ #define CEPH_OSDMAP_INC_VERSION 5 -#define CEPH_OSDMAP_INC_VERSION_EXT 5 +#define CEPH_OSDMAP_INC_VERSION_EXT 6 #define CEPH_OSDMAP_VERSION 5 -#define CEPH_OSDMAP_VERSION_EXT 5 +#define CEPH_OSDMAP_VERSION_EXT 6 /* * fs id @@ -181,9 +181,17 @@ enum { /* read */ CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2, + CEPH_OSD_OP_MAPEXT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3, /* fancy read */ - CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4, + CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4, + CEPH_OSD_OP_SPARSE_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5, + + CEPH_OSD_OP_NOTIFY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6, + CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7, + + /* versioning */ + CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8, /* write */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, @@ -205,6 +213,8 @@ enum { CEPH_OSD_OP_CREATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13, CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14, + CEPH_OSD_OP_WATCH = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15, + /** attrs **/ /* read */ CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1, @@ -218,11 +228,14 @@ enum { CEPH_OSD_OP_RMXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4, /** subop **/ - CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1, - CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2, - CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3, - CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4, - CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5, + CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1, + CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2, + CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3, + CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4, + CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5, + CEPH_OSD_OP_SCRUB_RESERVE = CEPH_OSD_OP_MODE_SUB | 6, + CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7, + CEPH_OSD_OP_SCRUB_STOP = CEPH_OSD_OP_MODE_SUB | 8, /** lock **/ CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1, @@ -328,6 +341,8 @@ enum { CEPH_OSD_CMPXATTR_MODE_U64 = 2 }; +#define RADOS_NOTIFY_VER 1 + /* * an individual object operation. each may be accompanied by some data * payload @@ -359,7 +374,12 @@ struct ceph_osd_op { struct { __le64 snapid; } __attribute__ ((packed)) snap; - }; + struct { + __le64 cookie; + __le64 ver; + __u8 flag; /* 0 = unwatch, 1 = watch */ + } __attribute__ ((packed)) watch; +}; __le32 payload_len; } __attribute__ ((packed)); @@ -402,4 +422,5 @@ struct ceph_osd_reply_head { } __attribute__ ((packed)); + #endif diff --git a/net/ceph/armor.c b/net/ceph/armor.c index eb2a666b0be..1fc1ee11dfa 100644 --- a/net/ceph/armor.c +++ b/net/ceph/armor.c @@ -78,8 +78,10 @@ int ceph_unarmor(char *dst, const char *src, const char *end) while (src < end) { int a, b, c, d; - if (src < end && src[0] == '\n') + if (src[0] == '\n') { src++; + continue; + } if (src + 4 > end) return -EINVAL; a = decode_bits(src[0]); diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index f3e4a13fea0..95f96ab94bb 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_OSD_MAP: return "osd_map"; case CEPH_MSG_OSD_OP: return "osd_op"; case CEPH_MSG_OSD_OPREPLY: return "osd_opreply"; + case CEPH_MSG_WATCH_NOTIFY: return "watch_notify"; default: return "unknown"; } } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3e20a122ffa..02212ed5085 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -22,10 +22,15 @@ #define OSD_OPREPLY_FRONT_LEN 512 static const struct ceph_connection_operations osd_con_ops; -static int __kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd); -static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); +static void send_queued(struct ceph_osd_client *osdc); +static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); +static void __register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +static void __unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +static int __send_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); static int op_needs_trail(int op) { @@ -34,6 +39,7 @@ static int op_needs_trail(int op) case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CALL: + case CEPH_OSD_OP_NOTIFY: return 1; default: return 0; @@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, init_completion(&req->r_completion); init_completion(&req->r_safe_completion); INIT_LIST_HEAD(&req->r_unsafe_item); + INIT_LIST_HEAD(&req->r_linger_item); + INIT_LIST_HEAD(&req->r_linger_osd); req->r_flags = flags; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req, break; case CEPH_OSD_OP_STARTSYNC: break; + case CEPH_OSD_OP_NOTIFY: + { + __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); + __le32 timeout = cpu_to_le32(src->watch.timeout); + + BUG_ON(!req->r_trail); + + ceph_pagelist_append(req->r_trail, + &prot_ver, sizeof(prot_ver)); + ceph_pagelist_append(req->r_trail, + &timeout, sizeof(timeout)); + } + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_WATCH: + dst->watch.cookie = cpu_to_le64(src->watch.cookie); + dst->watch.ver = cpu_to_le64(src->watch.ver); + dst->watch.flag = src->watch.flag; + break; default: pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); @@ -529,6 +555,45 @@ __lookup_request_ge(struct ceph_osd_client *osdc, return NULL; } +/* + * Resubmit requests pending on the given osd. + */ +static void __kick_osd_requests(struct ceph_osd_client *osdc, + struct ceph_osd *osd) +{ + struct ceph_osd_request *req, *nreq; + int err; + + dout("__kick_osd_requests osd%d\n", osd->o_osd); + err = __reset_osd(osdc, osd); + if (err == -EAGAIN) + return; + + list_for_each_entry(req, &osd->o_requests, r_osd_item) { + list_move(&req->r_req_lru_item, &osdc->req_unsent); + dout("requeued %p tid %llu osd%d\n", req, req->r_tid, + osd->o_osd); + if (!req->r_linger) + req->r_flags |= CEPH_OSD_FLAG_RETRY; + } + + list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, + r_linger_osd) { + __unregister_linger_request(osdc, req); + __register_request(osdc, req); + list_move(&req->r_req_lru_item, &osdc->req_unsent); + dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, + osd->o_osd); + } +} + +static void kick_osd_requests(struct ceph_osd_client *osdc, + struct ceph_osd *kickosd) +{ + mutex_lock(&osdc->request_mutex); + __kick_osd_requests(osdc, kickosd); + mutex_unlock(&osdc->request_mutex); +} /* * If the osd connection drops, we need to resubmit all requests. @@ -543,7 +608,8 @@ static void osd_reset(struct ceph_connection *con) dout("osd_reset osd%d\n", osd->o_osd); osdc = osd->o_osdc; down_read(&osdc->map_sem); - kick_requests(osdc, osd); + kick_osd_requests(osdc, osd); + send_queued(osdc); up_read(&osdc->map_sem); } @@ -561,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) atomic_set(&osd->o_ref, 1); osd->o_osdc = osdc; INIT_LIST_HEAD(&osd->o_requests); + INIT_LIST_HEAD(&osd->o_linger_requests); INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; @@ -650,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) int ret = 0; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); - if (list_empty(&osd->o_requests)) { + if (list_empty(&osd->o_requests) && + list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], &osd->o_con.peer_addr, @@ -723,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc) * Register request, assign tid. If this is the first request, set up * the timeout event. */ -static void register_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static void __register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { - mutex_lock(&osdc->request_mutex); req->r_tid = ++osdc->last_tid; req->r_request->hdr.tid = cpu_to_le64(req->r_tid); INIT_LIST_HEAD(&req->r_req_lru_item); @@ -740,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc, dout(" first request, scheduling timeout\n"); __schedule_osd_timeout(osdc); } +} + +static void register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + mutex_lock(&osdc->request_mutex); + __register_request(osdc, req); mutex_unlock(&osdc->request_mutex); } @@ -758,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc, ceph_con_revoke(&req->r_osd->o_con, req->r_request); list_del_init(&req->r_osd_item); - if (list_empty(&req->r_osd->o_requests)) + if (list_empty(&req->r_osd->o_requests) && + list_empty(&req->r_osd->o_linger_requests)) { + dout("moving osd to %p lru\n", req->r_osd); __move_osd_to_lru(osdc, req->r_osd); - req->r_osd = NULL; + } + if (list_empty(&req->r_osd_item) && + list_empty(&req->r_linger_item)) + req->r_osd = NULL; } ceph_osdc_put_request(req); @@ -781,20 +860,72 @@ static void __cancel_request(struct ceph_osd_request *req) ceph_con_revoke(&req->r_osd->o_con, req->r_request); req->r_sent = 0; } - list_del_init(&req->r_req_lru_item); } +static void __register_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout("__register_linger_request %p\n", req); + list_add_tail(&req->r_linger_item, &osdc->req_linger); + list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); +} + +static void __unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + dout("__unregister_linger_request %p\n", req); + if (req->r_osd) { + list_del_init(&req->r_linger_item); + list_del_init(&req->r_linger_osd); + + if (list_empty(&req->r_osd->o_requests) && + list_empty(&req->r_osd->o_linger_requests)) { + dout("moving osd to %p lru\n", req->r_osd); + __move_osd_to_lru(osdc, req->r_osd); + } + req->r_osd = NULL; + } +} + +void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + mutex_lock(&osdc->request_mutex); + if (req->r_linger) { + __unregister_linger_request(osdc, req); + ceph_osdc_put_request(req); + } + mutex_unlock(&osdc->request_mutex); +} +EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); + +void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + if (!req->r_linger) { + dout("set_request_linger %p\n", req); + req->r_linger = 1; + /* + * caller is now responsible for calling + * unregister_linger_request + */ + ceph_osdc_get_request(req); + } +} +EXPORT_SYMBOL(ceph_osdc_set_request_linger); + /* * Pick an osd (the first 'up' osd in the pg), allocate the osd struct * (as needed), and set the request r_osd appropriately. If there is - * no up osd, set r_osd to NULL. + * no up osd, set r_osd to NULL. Move the request to the appropiate list + * (unsent, homeless) or leave on in-flight lru. * * Return 0 if unchanged, 1 if changed, or negative on error. * * Caller should hold map_sem for read and request_mutex. */ -static int __map_osds(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static int __map_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; @@ -802,11 +933,13 @@ static int __map_osds(struct ceph_osd_client *osdc, int o = -1, num = 0; int err; - dout("map_osds %p tid %lld\n", req, req->r_tid); + dout("map_request %p tid %lld\n", req, req->r_tid); err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, &req->r_file_layout, osdc->osdmap); - if (err) + if (err) { + list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; + } pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; @@ -823,7 +956,7 @@ static int __map_osds(struct ceph_osd_client *osdc, (req->r_osd == NULL && o == -1)) return 0; /* no change */ - dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", + dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, req->r_osd ? req->r_osd->o_osd : -1); @@ -841,10 +974,12 @@ static int __map_osds(struct ceph_osd_client *osdc, if (!req->r_osd && o >= 0) { err = -ENOMEM; req->r_osd = create_osd(osdc); - if (!req->r_osd) + if (!req->r_osd) { + list_move(&req->r_req_lru_item, &osdc->req_notarget); goto out; + } - dout("map_osds osd %p is osd%d\n", req->r_osd, o); + dout("map_request osd %p is osd%d\n", req->r_osd, o); req->r_osd->o_osd = o; req->r_osd->o_con.peer_name.num = cpu_to_le64(o); __insert_osd(osdc, req->r_osd); @@ -855,6 +990,9 @@ static int __map_osds(struct ceph_osd_client *osdc, if (req->r_osd) { __remove_osd_from_lru(req->r_osd); list_add(&req->r_osd_item, &req->r_osd->o_requests); + list_move(&req->r_req_lru_item, &osdc->req_unsent); + } else { + list_move(&req->r_req_lru_item, &osdc->req_notarget); } err = 1; /* osd or pg changed */ @@ -869,16 +1007,6 @@ static int __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_request_head *reqhead; - int err; - - err = __map_osds(osdc, req); - if (err < 0) - return err; - if (req->r_osd == NULL) { - dout("send_request %p no up osds in pg\n", req); - ceph_monc_request_next_osdmap(&osdc->client->monc); - return 0; - } dout("send_request %p tid %llu to osd%d flags %d\n", req, req->r_tid, req->r_osd->o_osd, req->r_flags); @@ -898,6 +1026,21 @@ static int __send_request(struct ceph_osd_client *osdc, } /* + * Send any requests in the queue (req_unsent). + */ +static void send_queued(struct ceph_osd_client *osdc) +{ + struct ceph_osd_request *req, *tmp; + + dout("send_queued\n"); + mutex_lock(&osdc->request_mutex); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + __send_request(osdc, req); + } + mutex_unlock(&osdc->request_mutex); +} + +/* * Timeout callback, called every N seconds when 1 or more osd * requests has been active for more than N seconds. When this * happens, we ping all OSDs with requests who have timed out to @@ -916,30 +1059,13 @@ static void handle_timeout(struct work_struct *work) unsigned long keepalive = osdc->client->options->osd_keepalive_timeout * HZ; unsigned long last_stamp = 0; - struct rb_node *p; struct list_head slow_osds; - dout("timeout\n"); down_read(&osdc->map_sem); ceph_monc_request_next_osdmap(&osdc->client->monc); mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { - req = rb_entry(p, struct ceph_osd_request, r_node); - - if (req->r_resend) { - int err; - - dout("osdc resending prev failed %lld\n", req->r_tid); - err = __send_request(osdc, req); - if (err) - dout("osdc failed again on %lld\n", req->r_tid); - else - req->r_resend = false; - continue; - } - } /* * reset osds that appear to be _really_ unresponsive. this @@ -963,7 +1089,7 @@ static void handle_timeout(struct work_struct *work) BUG_ON(!osd); pr_warning(" tid %llu timed out on osd%d, will reset osd\n", req->r_tid, osd->o_osd); - __kick_requests(osdc, osd); + __kick_osd_requests(osdc, osd); } /* @@ -991,7 +1117,7 @@ static void handle_timeout(struct work_struct *work) __schedule_osd_timeout(osdc); mutex_unlock(&osdc->request_mutex); - + send_queued(osdc); up_read(&osdc->map_sem); } @@ -1035,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, numops * sizeof(struct ceph_osd_op)) goto bad; dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); - /* lookup */ mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); @@ -1079,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, dout("handle_reply tid %llu flags %d\n", tid, flags); + if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) + __register_linger_request(osdc, req); + /* either this is a read, or we got the safe response */ if (result < 0 || (flags & CEPH_OSD_FLAG_ONDISK) || @@ -1099,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, } done: + dout("req=%p req->r_linger=%d\n", req, req->r_linger); ceph_osdc_put_request(req); return; @@ -1109,108 +1238,83 @@ bad: ceph_msg_dump(msg); } - -static int __kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd) +static void reset_changed_osds(struct ceph_osd_client *osdc) { - struct ceph_osd_request *req; struct rb_node *p, *n; - int needmap = 0; - int err; - dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); - if (kickosd) { - err = __reset_osd(osdc, kickosd); - if (err == -EAGAIN) - return 1; - } else { - for (p = rb_first(&osdc->osds); p; p = n) { - struct ceph_osd *osd = - rb_entry(p, struct ceph_osd, o_node); - - n = rb_next(p); - if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || - memcmp(&osd->o_con.peer_addr, - ceph_osd_addr(osdc->osdmap, - osd->o_osd), - sizeof(struct ceph_entity_addr)) != 0) - __reset_osd(osdc, osd); - } + for (p = rb_first(&osdc->osds); p; p = n) { + struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); + + n = rb_next(p); + if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || + memcmp(&osd->o_con.peer_addr, + ceph_osd_addr(osdc->osdmap, + osd->o_osd), + sizeof(struct ceph_entity_addr)) != 0) + __reset_osd(osdc, osd); } +} + +/* + * Requeue requests whose mapping to an OSD has changed. If requests map to + * no osd, request a new map. + * + * Caller should hold map_sem for read and request_mutex. + */ +static void kick_requests(struct ceph_osd_client *osdc) +{ + struct ceph_osd_request *req, *nreq; + struct rb_node *p; + int needmap = 0; + int err; + dout("kick_requests\n"); + mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { req = rb_entry(p, struct ceph_osd_request, r_node); - - if (req->r_resend) { - dout(" r_resend set on tid %llu\n", req->r_tid); - __cancel_request(req); - goto kick; - } - if (req->r_osd && kickosd == req->r_osd) { - __cancel_request(req); - goto kick; + err = __map_request(osdc, req); + if (err < 0) + continue; /* error */ + if (req->r_osd == NULL) { + dout("%p tid %llu maps to no osd\n", req, req->r_tid); + needmap++; /* request a newer map */ + } else if (err > 0) { + dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + if (!req->r_linger) + req->r_flags |= CEPH_OSD_FLAG_RETRY; } + } + + list_for_each_entry_safe(req, nreq, &osdc->req_linger, + r_linger_item) { + dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); - err = __map_osds(osdc, req); + err = __map_request(osdc, req); if (err == 0) - continue; /* no change */ - if (err < 0) { - /* - * FIXME: really, we should set the request - * error and fail if this isn't a 'nofail' - * request, but that's a fair bit more - * complicated to do. So retry! - */ - dout(" setting r_resend on %llu\n", req->r_tid); - req->r_resend = true; - continue; - } + continue; /* no change and no osd was specified */ + if (err < 0) + continue; /* hrm! */ if (req->r_osd == NULL) { dout("tid %llu maps to no valid osd\n", req->r_tid); needmap++; /* request a newer map */ continue; } -kick: - dout("kicking %p tid %llu osd%d\n", req, req->r_tid, + dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); - req->r_flags |= CEPH_OSD_FLAG_RETRY; - err = __send_request(osdc, req); - if (err) { - dout(" setting r_resend on %llu\n", req->r_tid); - req->r_resend = true; - } + __unregister_linger_request(osdc, req); + __register_request(osdc, req); } - - return needmap; -} - -/* - * Resubmit osd requests whose osd or osd address has changed. Request - * a new osd map if osds are down, or we are otherwise unable to determine - * how to direct a request. - * - * Close connections to down osds. - * - * If @who is specified, resubmit requests for that specific osd. - * - * Caller should hold map_sem for read and request_mutex. - */ -static void kick_requests(struct ceph_osd_client *osdc, - struct ceph_osd *kickosd) -{ - int needmap; - - mutex_lock(&osdc->request_mutex); - needmap = __kick_requests(osdc, kickosd); mutex_unlock(&osdc->request_mutex); if (needmap) { dout("%d requests for down osds, need new map\n", needmap); ceph_monc_request_next_osdmap(&osdc->client->monc); } - } + + /* * Process updated osd map. * @@ -1263,6 +1367,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_osdmap_destroy(osdc->osdmap); osdc->osdmap = newmap; } + kick_requests(osdc); + reset_changed_osds(osdc); } else { dout("ignoring incremental map %u len %d\n", epoch, maplen); @@ -1300,6 +1406,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) osdc->osdmap = newmap; if (oldmap) ceph_osdmap_destroy(oldmap); + kick_requests(osdc); } p += maplen; nr_maps--; @@ -1308,8 +1415,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) done: downgrade_write(&osdc->map_sem); ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); - if (newmap) - kick_requests(osdc, NULL); + send_queued(osdc); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; @@ -1322,6 +1428,223 @@ bad: } /* + * watch/notify callback event infrastructure + * + * These callbacks are used both for watch and notify operations. + */ +static void __release_event(struct kref *kref) +{ + struct ceph_osd_event *event = + container_of(kref, struct ceph_osd_event, kref); + + dout("__release_event %p\n", event); + kfree(event); +} + +static void get_event(struct ceph_osd_event *event) +{ + kref_get(&event->kref); +} + +void ceph_osdc_put_event(struct ceph_osd_event *event) +{ + kref_put(&event->kref, __release_event); +} +EXPORT_SYMBOL(ceph_osdc_put_event); + +static void __insert_event(struct ceph_osd_client *osdc, + struct ceph_osd_event *new) +{ + struct rb_node **p = &osdc->event_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_osd_event *event = NULL; + + while (*p) { + parent = *p; + event = rb_entry(parent, struct ceph_osd_event, node); + if (new->cookie < event->cookie) + p = &(*p)->rb_left; + else if (new->cookie > event->cookie) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&new->node, parent, p); + rb_insert_color(&new->node, &osdc->event_tree); +} + +static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, + u64 cookie) +{ + struct rb_node **p = &osdc->event_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_osd_event *event = NULL; + + while (*p) { + parent = *p; + event = rb_entry(parent, struct ceph_osd_event, node); + if (cookie < event->cookie) + p = &(*p)->rb_left; + else if (cookie > event->cookie) + p = &(*p)->rb_right; + else + return event; + } + return NULL; +} + +static void __remove_event(struct ceph_osd_event *event) +{ + struct ceph_osd_client *osdc = event->osdc; + + if (!RB_EMPTY_NODE(&event->node)) { + dout("__remove_event removed %p\n", event); + rb_erase(&event->node, &osdc->event_tree); + ceph_osdc_put_event(event); + } else { + dout("__remove_event didn't remove %p\n", event); + } +} + +int ceph_osdc_create_event(struct ceph_osd_client *osdc, + void (*event_cb)(u64, u64, u8, void *), + int one_shot, void *data, + struct ceph_osd_event **pevent) +{ + struct ceph_osd_event *event; + + event = kmalloc(sizeof(*event), GFP_NOIO); + if (!event) + return -ENOMEM; + + dout("create_event %p\n", event); + event->cb = event_cb; + event->one_shot = one_shot; + event->data = data; + event->osdc = osdc; + INIT_LIST_HEAD(&event->osd_node); + kref_init(&event->kref); /* one ref for us */ + kref_get(&event->kref); /* one ref for the caller */ + init_completion(&event->completion); + + spin_lock(&osdc->event_lock); + event->cookie = ++osdc->event_count; + __insert_event(osdc, event); + spin_unlock(&osdc->event_lock); + + *pevent = event; + return 0; +} +EXPORT_SYMBOL(ceph_osdc_create_event); + +void ceph_osdc_cancel_event(struct ceph_osd_event *event) +{ + struct ceph_osd_client *osdc = event->osdc; + + dout("cancel_event %p\n", event); + spin_lock(&osdc->event_lock); + __remove_event(event); + spin_unlock(&osdc->event_lock); + ceph_osdc_put_event(event); /* caller's */ +} +EXPORT_SYMBOL(ceph_osdc_cancel_event); + + +static void do_event_work(struct work_struct *work) +{ + struct ceph_osd_event_work *event_work = + container_of(work, struct ceph_osd_event_work, work); + struct ceph_osd_event *event = event_work->event; + u64 ver = event_work->ver; + u64 notify_id = event_work->notify_id; + u8 opcode = event_work->opcode; + + dout("do_event_work completing %p\n", event); + event->cb(ver, notify_id, opcode, event->data); + complete(&event->completion); + dout("do_event_work completed %p\n", event); + ceph_osdc_put_event(event); + kfree(event_work); +} + + +/* + * Process osd watch notifications + */ +void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +{ + void *p, *end; + u8 proto_ver; + u64 cookie, ver, notify_id; + u8 opcode; + struct ceph_osd_event *event; + struct ceph_osd_event_work *event_work; + + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_8_safe(&p, end, proto_ver, bad); + ceph_decode_8_safe(&p, end, opcode, bad); + ceph_decode_64_safe(&p, end, cookie, bad); + ceph_decode_64_safe(&p, end, ver, bad); + ceph_decode_64_safe(&p, end, notify_id, bad); + + spin_lock(&osdc->event_lock); + event = __find_event(osdc, cookie); + if (event) { + get_event(event); + if (event->one_shot) + __remove_event(event); + } + spin_unlock(&osdc->event_lock); + dout("handle_watch_notify cookie %lld ver %lld event %p\n", + cookie, ver, event); + if (event) { + event_work = kmalloc(sizeof(*event_work), GFP_NOIO); + INIT_WORK(&event_work->work, do_event_work); + if (!event_work) { + dout("ERROR: could not allocate event_work\n"); + goto done_err; + } + event_work->event = event; + event_work->ver = ver; + event_work->notify_id = notify_id; + event_work->opcode = opcode; + if (!queue_work(osdc->notify_wq, &event_work->work)) { + dout("WARNING: failed to queue notify event work\n"); + goto done_err; + } + } + + return; + +done_err: + complete(&event->completion); + ceph_osdc_put_event(event); + return; + +bad: + pr_err("osdc handle_watch_notify corrupt msg\n"); + return; +} + +int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) +{ + int err; + + dout("wait_event %p\n", event); + err = wait_for_completion_interruptible_timeout(&event->completion, + timeout * HZ); + ceph_osdc_put_event(event); + if (err > 0) + err = 0; + dout("wait_event %p returns %d\n", event, err); + return err; +} +EXPORT_SYMBOL(ceph_osdc_wait_event); + +/* * Register request, send initial attempt. */ int ceph_osdc_start_request(struct ceph_osd_client *osdc, @@ -1347,15 +1670,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, * the request still han't been touched yet. */ if (req->r_sent == 0) { - rc = __send_request(osdc, req); - if (rc) { - if (nofail) { - dout("osdc_start_request failed send, " - " marking %lld\n", req->r_tid); - req->r_resend = true; - rc = 0; - } else { - __unregister_request(osdc, req); + rc = __map_request(osdc, req); + if (rc < 0) + return rc; + if (req->r_osd == NULL) { + dout("send_request %p no up osds in pg\n", req); + ceph_monc_request_next_osdmap(&osdc->client->monc); + } else { + rc = __send_request(osdc, req); + if (rc) { + if (nofail) { + dout("osdc_start_request failed send, " + " will retry %lld\n", req->r_tid); + rc = 0; + } else { + __unregister_request(osdc, req); + } } } } @@ -1441,9 +1771,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) INIT_LIST_HEAD(&osdc->osd_lru); osdc->requests = RB_ROOT; INIT_LIST_HEAD(&osdc->req_lru); + INIT_LIST_HEAD(&osdc->req_unsent); + INIT_LIST_HEAD(&osdc->req_notarget); + INIT_LIST_HEAD(&osdc->req_linger); osdc->num_requests = 0; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); + spin_lock_init(&osdc->event_lock); + osdc->event_tree = RB_ROOT; + osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); @@ -1463,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) "osd_op_reply"); if (err < 0) goto out_msgpool; + + osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); + if (IS_ERR(osdc->notify_wq)) { + err = PTR_ERR(osdc->notify_wq); + osdc->notify_wq = NULL; + goto out_msgpool; + } return 0; out_msgpool: @@ -1476,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { + flush_workqueue(osdc->notify_wq); + destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); if (osdc->osdmap) { @@ -1483,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) osdc->osdmap = NULL; } remove_old_osds(osdc, 1); + WARN_ON(!RB_EMPTY_ROOT(&osdc->osds)); mempool_destroy(osdc->req_mempool); ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); @@ -1591,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) case CEPH_MSG_OSD_OPREPLY: handle_reply(osdc, msg, con); break; + case CEPH_MSG_WATCH_NOTIFY: + handle_watch_notify(osdc, msg); + break; default: pr_err("received unknown message type %d %s\n", type, @@ -1684,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, switch (type) { case CEPH_MSG_OSD_MAP: + case CEPH_MSG_WATCH_NOTIFY: return ceph_msg_new(type, front, GFP_NOFS); case CEPH_MSG_OSD_OPREPLY: return get_reply(con, hdr, skip); |