aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2011-03-21 15:07:16 -0700
committerSage Weil <sage@newdream.net>2011-03-22 11:33:55 -0700
commita40c4f10e3fb96030358e49abd010c1f08446fa3 (patch)
tree1aa1f6ca618cd021d944f7da7caeb5b182beaee4
parent55b00bae111030bd0dfcc898a920e54725aed1bf (diff)
libceph: add lingering request and watch/notify event framework
Lingering requests are requests that are sent to the OSD normally but tracked also after we get a successful request. This keeps the OSD connection open and resends the original request if the object moves to another OSD. The OSD can then send notification messages back to us if another client initiates a notify. This framework will be used by RBD so that the client gets notification when a snapshot is created by another node or tool. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--include/linux/ceph/osd_client.h52
-rw-r--r--net/ceph/ceph_common.c1
-rw-r--r--net/ceph/osd_client.c385
3 files changed, 426 insertions, 12 deletions
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e791b8e4635..f88eacb111d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -32,6 +32,7 @@ struct ceph_osd {
struct rb_node o_node;
struct ceph_connection o_con;
struct list_head o_requests;
+ struct list_head o_linger_requests;
struct list_head o_osd_lru;
struct ceph_authorizer *o_authorizer;
void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@ struct ceph_osd_request {
struct rb_node r_node;
struct list_head r_req_lru_item;
struct list_head r_osd_item;
+ struct list_head r_linger_item;
+ struct list_head r_linger_osd;
struct ceph_osd *r_osd;
struct ceph_pg r_pgid;
int r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@ struct ceph_osd_request {
int r_flags; /* any additional flags for the osd */
u32 r_sent; /* >0 if r_request is sending/sent */
int r_got_reply;
+ int r_linger;
struct ceph_osd_client *r_osdc;
struct kref r_kref;
@@ -89,6 +93,26 @@ struct ceph_osd_request {
struct ceph_pagelist *r_trail; /* trailing part of the data */
};
+struct ceph_osd_event {
+ u64 cookie;
+ int one_shot;
+ struct ceph_osd_client *osdc;
+ void (*cb)(u64, u64, u8, void *);
+ void *data;
+ struct rb_node node;
+ struct list_head osd_node;
+ struct kref kref;
+ struct completion completion;
+};
+
+struct ceph_osd_event_work {
+ struct work_struct work;
+ struct ceph_osd_event *event;
+ u64 ver;
+ u64 notify_id;
+ u8 opcode;
+};
+
struct ceph_osd_client {
struct ceph_client *client;
@@ -106,6 +130,7 @@ struct ceph_osd_client {
struct list_head req_lru; /* in-flight lru */
struct list_head req_unsent; /* unsent/need-resend queue */
struct list_head req_notarget; /* map to no osd */
+ struct list_head req_linger; /* lingering requests */
int num_requests;
struct delayed_work timeout_work;
struct delayed_work osds_timeout_work;
@@ -117,6 +142,12 @@ struct ceph_osd_client {
struct ceph_msgpool msgpool_op;
struct ceph_msgpool msgpool_op_reply;
+
+ spinlock_t event_lock;
+ struct rb_root event_tree;
+ u64 event_count;
+
+ struct workqueue_struct *notify_wq;
};
struct ceph_osd_req_op {
@@ -151,6 +182,13 @@ struct ceph_osd_req_op {
struct {
u64 snapid;
} snap;
+ struct {
+ u64 cookie;
+ u64 ver;
+ __u8 flag;
+ u32 prot_ver;
+ u32 timeout;
+ } watch;
};
u32 payload_len;
};
@@ -199,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
bool use_mempool, int num_reply,
int page_align);
+extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+
static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
{
kref_get(&req->r_kref);
@@ -234,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct page **pages, int nr_pages,
int flags, int do_sync, bool nofail);
+/* watch/notify events */
+extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+ void (*event_cb)(u64, u64, u8, void *),
+ int one_shot, void *data,
+ struct ceph_osd_event **pevent);
+extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
+extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
+ unsigned long timeout);
+extern void ceph_osdc_put_event(struct ceph_osd_event *event);
#endif
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index f3e4a13fea0..95f96ab94bb 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_OSD_MAP: return "osd_map";
case CEPH_MSG_OSD_OP: return "osd_op";
case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
+ case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
default: return "unknown";
}
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index b85ed5a5503..02212ed5085 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,6 +25,12 @@ static const struct ceph_connection_operations osd_con_ops;
static void send_queued(struct ceph_osd_client *osdc);
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void __register_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+static int __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
static int op_needs_trail(int op)
{
@@ -33,6 +39,7 @@ static int op_needs_trail(int op)
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
case CEPH_OSD_OP_CALL:
+ case CEPH_OSD_OP_NOTIFY:
return 1;
default:
return 0;
@@ -208,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
+ INIT_LIST_HEAD(&req->r_linger_item);
+ INIT_LIST_HEAD(&req->r_linger_osd);
req->r_flags = flags;
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -314,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
break;
case CEPH_OSD_OP_STARTSYNC:
break;
+ case CEPH_OSD_OP_NOTIFY:
+ {
+ __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
+ __le32 timeout = cpu_to_le32(src->watch.timeout);
+
+ BUG_ON(!req->r_trail);
+
+ ceph_pagelist_append(req->r_trail,
+ &prot_ver, sizeof(prot_ver));
+ ceph_pagelist_append(req->r_trail,
+ &timeout, sizeof(timeout));
+ }
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ case CEPH_OSD_OP_WATCH:
+ dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+ dst->watch.ver = cpu_to_le64(src->watch.ver);
+ dst->watch.flag = src->watch.flag;
+ break;
default:
pr_err("unrecognized osd opcode %d\n", dst->op);
WARN_ON(1);
@@ -534,7 +561,7 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
static void __kick_osd_requests(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
- struct ceph_osd_request *req;
+ struct ceph_osd_request *req, *nreq;
int err;
dout("__kick_osd_requests osd%d\n", osd->o_osd);
@@ -546,7 +573,17 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
list_move(&req->r_req_lru_item, &osdc->req_unsent);
dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
osd->o_osd);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ if (!req->r_linger)
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ }
+
+ list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
+ r_linger_osd) {
+ __unregister_linger_request(osdc, req);
+ __register_request(osdc, req);
+ list_move(&req->r_req_lru_item, &osdc->req_unsent);
+ dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
+ osd->o_osd);
}
}
@@ -590,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
INIT_LIST_HEAD(&osd->o_requests);
+ INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1;
@@ -679,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
int ret = 0;
dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
- if (list_empty(&osd->o_requests)) {
+ if (list_empty(&osd->o_requests) &&
+ list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd);
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
&osd->o_con.peer_addr,
@@ -752,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
* Register request, assign tid. If this is the first request, set up
* the timeout event.
*/
-static void register_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static void __register_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
- mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid;
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -769,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
dout(" first request, scheduling timeout\n");
__schedule_osd_timeout(osdc);
}
+}
+
+static void register_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ mutex_lock(&osdc->request_mutex);
+ __register_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
}
@@ -787,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
list_del_init(&req->r_osd_item);
- if (list_empty(&req->r_osd->o_requests))
+ if (list_empty(&req->r_osd->o_requests) &&
+ list_empty(&req->r_osd->o_linger_requests)) {
+ dout("moving osd to %p lru\n", req->r_osd);
__move_osd_to_lru(osdc, req->r_osd);
- req->r_osd = NULL;
+ }
+ if (list_empty(&req->r_osd_item) &&
+ list_empty(&req->r_linger_item))
+ req->r_osd = NULL;
}
ceph_osdc_put_request(req);
@@ -812,6 +862,58 @@ static void __cancel_request(struct ceph_osd_request *req)
}
}
+static void __register_linger_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ dout("__register_linger_request %p\n", req);
+ list_add_tail(&req->r_linger_item, &osdc->req_linger);
+ list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+}
+
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ dout("__unregister_linger_request %p\n", req);
+ if (req->r_osd) {
+ list_del_init(&req->r_linger_item);
+ list_del_init(&req->r_linger_osd);
+
+ if (list_empty(&req->r_osd->o_requests) &&
+ list_empty(&req->r_osd->o_linger_requests)) {
+ dout("moving osd to %p lru\n", req->r_osd);
+ __move_osd_to_lru(osdc, req->r_osd);
+ }
+ req->r_osd = NULL;
+ }
+}
+
+void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ mutex_lock(&osdc->request_mutex);
+ if (req->r_linger) {
+ __unregister_linger_request(osdc, req);
+ ceph_osdc_put_request(req);
+ }
+ mutex_unlock(&osdc->request_mutex);
+}
+EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
+
+void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ if (!req->r_linger) {
+ dout("set_request_linger %p\n", req);
+ req->r_linger = 1;
+ /*
+ * caller is now responsible for calling
+ * unregister_linger_request
+ */
+ ceph_osdc_get_request(req);
+ }
+}
+EXPORT_SYMBOL(ceph_osdc_set_request_linger);
+
/*
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
* (as needed), and set the request r_osd appropriately. If there is
@@ -958,7 +1060,6 @@ static void handle_timeout(struct work_struct *work)
osdc->client->options->osd_keepalive_timeout * HZ;
unsigned long last_stamp = 0;
struct list_head slow_osds;
-
dout("timeout\n");
down_read(&osdc->map_sem);
@@ -1060,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
numops * sizeof(struct ceph_osd_op))
goto bad;
dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
-
/* lookup */
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
@@ -1104,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
dout("handle_reply tid %llu flags %d\n", tid, flags);
+ if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
+ __register_linger_request(osdc, req);
+
/* either this is a read, or we got the safe response */
if (result < 0 ||
(flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1124,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
}
done:
+ dout("req=%p req->r_linger=%d\n", req, req->r_linger);
ceph_osdc_put_request(req);
return;
@@ -1159,7 +1263,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
*/
static void kick_requests(struct ceph_osd_client *osdc)
{
- struct ceph_osd_request *req;
+ struct ceph_osd_request *req, *nreq;
struct rb_node *p;
int needmap = 0;
int err;
@@ -1177,8 +1281,30 @@ static void kick_requests(struct ceph_osd_client *osdc)
} else if (err > 0) {
dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
- req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ if (!req->r_linger)
+ req->r_flags |= CEPH_OSD_FLAG_RETRY;
+ }
+ }
+
+ list_for_each_entry_safe(req, nreq, &osdc->req_linger,
+ r_linger_item) {
+ dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
+
+ err = __map_request(osdc, req);
+ if (err == 0)
+ continue; /* no change and no osd was specified */
+ if (err < 0)
+ continue; /* hrm! */
+ if (req->r_osd == NULL) {
+ dout("tid %llu maps to no valid osd\n", req->r_tid);
+ needmap++; /* request a newer map */
+ continue;
}
+
+ dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
+ req->r_osd ? req->r_osd->o_osd : -1);
+ __unregister_linger_request(osdc, req);
+ __register_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
@@ -1302,6 +1428,223 @@ bad:
}
/*
+ * watch/notify callback event infrastructure
+ *
+ * These callbacks are used both for watch and notify operations.
+ */
+static void __release_event(struct kref *kref)
+{
+ struct ceph_osd_event *event =
+ container_of(kref, struct ceph_osd_event, kref);
+
+ dout("__release_event %p\n", event);
+ kfree(event);
+}
+
+static void get_event(struct ceph_osd_event *event)
+{
+ kref_get(&event->kref);
+}
+
+void ceph_osdc_put_event(struct ceph_osd_event *event)
+{
+ kref_put(&event->kref, __release_event);
+}
+EXPORT_SYMBOL(ceph_osdc_put_event);
+
+static void __insert_event(struct ceph_osd_client *osdc,
+ struct ceph_osd_event *new)
+{
+ struct rb_node **p = &osdc->event_tree.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_osd_event *event = NULL;
+
+ while (*p) {
+ parent = *p;
+ event = rb_entry(parent, struct ceph_osd_event, node);
+ if (new->cookie < event->cookie)
+ p = &(*p)->rb_left;
+ else if (new->cookie > event->cookie)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&new->node, parent, p);
+ rb_insert_color(&new->node, &osdc->event_tree);
+}
+
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+ u64 cookie)
+{
+ struct rb_node **p = &osdc->event_tree.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_osd_event *event = NULL;
+
+ while (*p) {
+ parent = *p;
+ event = rb_entry(parent, struct ceph_osd_event, node);
+ if (cookie < event->cookie)
+ p = &(*p)->rb_left;
+ else if (cookie > event->cookie)
+ p = &(*p)->rb_right;
+ else
+ return event;
+ }
+ return NULL;
+}
+
+static void __remove_event(struct ceph_osd_event *event)
+{
+ struct ceph_osd_client *osdc = event->osdc;
+
+ if (!RB_EMPTY_NODE(&event->node)) {
+ dout("__remove_event removed %p\n", event);
+ rb_erase(&event->node, &osdc->event_tree);
+ ceph_osdc_put_event(event);
+ } else {
+ dout("__remove_event didn't remove %p\n", event);
+ }
+}
+
+int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+ void (*event_cb)(u64, u64, u8, void *),
+ int one_shot, void *data,
+ struct ceph_osd_event **pevent)
+{
+ struct ceph_osd_event *event;
+
+ event = kmalloc(sizeof(*event), GFP_NOIO);
+ if (!event)
+ return -ENOMEM;
+
+ dout("create_event %p\n", event);
+ event->cb = event_cb;
+ event->one_shot = one_shot;
+ event->data = data;
+ event->osdc = osdc;
+ INIT_LIST_HEAD(&event->osd_node);
+ kref_init(&event->kref); /* one ref for us */
+ kref_get(&event->kref); /* one ref for the caller */
+ init_completion(&event->completion);
+
+ spin_lock(&osdc->event_lock);
+ event->cookie = ++osdc->event_count;
+ __insert_event(osdc, event);
+ spin_unlock(&osdc->event_lock);
+
+ *pevent = event;
+ return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_event);
+
+void ceph_osdc_cancel_event(struct ceph_osd_event *event)
+{
+ struct ceph_osd_client *osdc = event->osdc;
+
+ dout("cancel_event %p\n", event);
+ spin_lock(&osdc->event_lock);
+ __remove_event(event);
+ spin_unlock(&osdc->event_lock);
+ ceph_osdc_put_event(event); /* caller's */
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_event);
+
+
+static void do_event_work(struct work_struct *work)
+{
+ struct ceph_osd_event_work *event_work =
+ container_of(work, struct ceph_osd_event_work, work);
+ struct ceph_osd_event *event = event_work->event;
+ u64 ver = event_work->ver;
+ u64 notify_id = event_work->notify_id;
+ u8 opcode = event_work->opcode;
+
+ dout("do_event_work completing %p\n", event);
+ event->cb(ver, notify_id, opcode, event->data);
+ complete(&event->completion);
+ dout("do_event_work completed %p\n", event);
+ ceph_osdc_put_event(event);
+ kfree(event_work);
+}
+
+
+/*
+ * Process osd watch notifications
+ */
+void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+ void *p, *end;
+ u8 proto_ver;
+ u64 cookie, ver, notify_id;
+ u8 opcode;
+ struct ceph_osd_event *event;
+ struct ceph_osd_event_work *event_work;
+
+ p = msg->front.iov_base;
+ end = p + msg->front.iov_len;
+
+ ceph_decode_8_safe(&p, end, proto_ver, bad);
+ ceph_decode_8_safe(&p, end, opcode, bad);
+ ceph_decode_64_safe(&p, end, cookie, bad);
+ ceph_decode_64_safe(&p, end, ver, bad);
+ ceph_decode_64_safe(&p, end, notify_id, bad);
+
+ spin_lock(&osdc->event_lock);
+ event = __find_event(osdc, cookie);
+ if (event) {
+ get_event(event);
+ if (event->one_shot)
+ __remove_event(event);
+ }
+ spin_unlock(&osdc->event_lock);
+ dout("handle_watch_notify cookie %lld ver %lld event %p\n",
+ cookie, ver, event);
+ if (event) {
+ event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
+ INIT_WORK(&event_work->work, do_event_work);
+ if (!event_work) {
+ dout("ERROR: could not allocate event_work\n");
+ goto done_err;
+ }
+ event_work->event = event;
+ event_work->ver = ver;
+ event_work->notify_id = notify_id;
+ event_work->opcode = opcode;
+ if (!queue_work(osdc->notify_wq, &event_work->work)) {
+ dout("WARNING: failed to queue notify event work\n");
+ goto done_err;
+ }
+ }
+
+ return;
+
+done_err:
+ complete(&event->completion);
+ ceph_osdc_put_event(event);
+ return;
+
+bad:
+ pr_err("osdc handle_watch_notify corrupt msg\n");
+ return;
+}
+
+int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+{
+ int err;
+
+ dout("wait_event %p\n", event);
+ err = wait_for_completion_interruptible_timeout(&event->completion,
+ timeout * HZ);
+ ceph_osdc_put_event(event);
+ if (err > 0)
+ err = 0;
+ dout("wait_event %p returns %d\n", event, err);
+ return err;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
+
+/*
* Register request, send initial attempt.
*/
int ceph_osdc_start_request(struct ceph_osd_client *osdc,
@@ -1430,9 +1773,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
INIT_LIST_HEAD(&osdc->req_lru);
INIT_LIST_HEAD(&osdc->req_unsent);
INIT_LIST_HEAD(&osdc->req_notarget);
+ INIT_LIST_HEAD(&osdc->req_linger);
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+ spin_lock_init(&osdc->event_lock);
+ osdc->event_tree = RB_ROOT;
+ osdc->event_count = 0;
schedule_delayed_work(&osdc->osds_timeout_work,
round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1452,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
"osd_op_reply");
if (err < 0)
goto out_msgpool;
+
+ osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
+ if (IS_ERR(osdc->notify_wq)) {
+ err = PTR_ERR(osdc->notify_wq);
+ osdc->notify_wq = NULL;
+ goto out_msgpool;
+ }
return 0;
out_msgpool:
@@ -1465,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
+ flush_workqueue(osdc->notify_wq);
+ destroy_workqueue(osdc->notify_wq);
cancel_delayed_work_sync(&osdc->timeout_work);
cancel_delayed_work_sync(&osdc->osds_timeout_work);
if (osdc->osdmap) {
@@ -1472,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
osdc->osdmap = NULL;
}
remove_old_osds(osdc, 1);
+ WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1580,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
case CEPH_MSG_OSD_OPREPLY:
handle_reply(osdc, msg, con);
break;
+ case CEPH_MSG_WATCH_NOTIFY:
+ handle_watch_notify(osdc, msg);
+ break;
default:
pr_err("received unknown message type %d %s\n", type,
@@ -1673,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
switch (type) {
case CEPH_MSG_OSD_MAP:
+ case CEPH_MSG_WATCH_NOTIFY:
return ceph_msg_new(type, front, GFP_NOFS);
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);