aboutsummaryrefslogtreecommitdiff
path: root/net/ceph/mon_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/mon_client.c')
-rw-r--r--net/ceph/mon_client.c373
1 files changed, 294 insertions, 79 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 8a079399174..067d3af2eaf 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -8,8 +8,8 @@
#include <linux/ceph/mon_client.h>
#include <linux/ceph/libceph.h>
+#include <linux/ceph/debugfs.h>
#include <linux/ceph/decode.h>
-
#include <linux/ceph/auth.h>
/*
@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
monc->pending_auth = 1;
monc->m_auth->front.iov_len = len;
monc->m_auth->hdr.front_len = cpu_to_le32(len);
- ceph_con_revoke(monc->con, monc->m_auth);
+ ceph_msg_revoke(monc->m_auth);
ceph_msg_get(monc->m_auth); /* keep our ref */
- ceph_con_send(monc->con, monc->m_auth);
+ ceph_con_send(&monc->con, monc->m_auth);
}
/*
@@ -116,14 +116,15 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
*/
static void __close_session(struct ceph_mon_client *monc)
{
- if (monc->con) {
- dout("__close_session closing mon%d\n", monc->cur_mon);
- ceph_con_revoke(monc->con, monc->m_auth);
- ceph_con_close(monc->con);
- monc->cur_mon = -1;
- monc->pending_auth = 0;
- ceph_auth_reset(monc->auth);
- }
+ dout("__close_session closing mon%d\n", monc->cur_mon);
+ ceph_msg_revoke(monc->m_auth);
+ ceph_msg_revoke_incoming(monc->m_auth_reply);
+ ceph_msg_revoke(monc->m_subscribe);
+ ceph_msg_revoke_incoming(monc->m_subscribe_ack);
+ ceph_con_close(&monc->con);
+ monc->cur_mon = -1;
+ monc->pending_auth = 0;
+ ceph_auth_reset(monc->auth);
}
/*
@@ -144,15 +145,14 @@ static int __open_session(struct ceph_mon_client *monc)
monc->want_next_osdmap = !!monc->want_next_osdmap;
dout("open_session mon%d opening\n", monc->cur_mon);
- monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
- monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
- ceph_con_open(monc->con,
+ ceph_con_open(&monc->con,
+ CEPH_ENTITY_TYPE_MON, monc->cur_mon,
&monc->monmap->mon_inst[monc->cur_mon].addr);
/* initiatiate authentication handshake */
ret = ceph_auth_build_hello(monc->auth,
monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
+ monc->m_auth->front_alloc_len);
__send_prepared_auth_request(monc, ret);
} else {
dout("open_session mon%d already open\n", monc->cur_mon);
@@ -170,7 +170,7 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/
static void __schedule_delayed(struct ceph_mon_client *monc)
{
- unsigned delay;
+ unsigned int delay;
if (monc->cur_mon < 0 || __sub_expired(monc))
delay = 10 * HZ;
@@ -186,7 +186,7 @@ static void __schedule_delayed(struct ceph_mon_client *monc)
static void __send_subscribe(struct ceph_mon_client *monc)
{
dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
- (unsigned)monc->sub_sent, __sub_expired(monc),
+ (unsigned int)monc->sub_sent, __sub_expired(monc),
monc->want_next_osdmap);
if ((__sub_expired(monc) && !monc->sub_sent) ||
monc->want_next_osdmap == 1) {
@@ -196,14 +196,14 @@ static void __send_subscribe(struct ceph_mon_client *monc)
int num;
p = msg->front.iov_base;
- end = p + msg->front_max;
+ end = p + msg->front_alloc_len;
num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
ceph_encode_32(&p, num);
if (monc->want_next_osdmap) {
dout("__send_subscribe to 'osdmap' %u\n",
- (unsigned)monc->have_osdmap);
+ (unsigned int)monc->have_osdmap);
ceph_encode_string(&p, end, "osdmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_osdmap);
@@ -213,7 +213,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
}
if (monc->want_mdsmap) {
dout("__send_subscribe to 'mdsmap' %u+\n",
- (unsigned)monc->have_mdsmap);
+ (unsigned int)monc->have_mdsmap);
ceph_encode_string(&p, end, "mdsmap", 6);
i = p;
i->have = cpu_to_le64(monc->have_mdsmap);
@@ -228,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_con_revoke(monc->con, msg);
- ceph_con_send(monc->con, ceph_msg_get(msg));
+ ceph_msg_revoke(msg);
+ ceph_con_send(&monc->con, ceph_msg_get(msg));
monc->sub_sent = jiffies | 1; /* never 0 */
}
@@ -238,7 +238,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)
static void handle_subscribe_ack(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
- unsigned seconds;
+ unsigned int seconds;
struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
if (msg->front.iov_len < sizeof(*h))
@@ -249,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
if (monc->hunting) {
pr_info("mon%d %s session established\n",
monc->cur_mon,
- ceph_pr_addr(&monc->con->peer_addr.in_addr));
+ ceph_pr_addr(&monc->con.peer_addr.in_addr));
monc->hunting = false;
}
dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -296,21 +296,39 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
__send_subscribe(monc);
mutex_unlock(&monc->mutex);
}
+EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
+
+int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+ unsigned long timeout)
+{
+ unsigned long started = jiffies;
+ int ret;
+
+ mutex_lock(&monc->mutex);
+ while (monc->have_osdmap < epoch) {
+ mutex_unlock(&monc->mutex);
+
+ if (timeout != 0 && time_after_eq(jiffies, started + timeout))
+ return -ETIMEDOUT;
+
+ ret = wait_event_interruptible_timeout(monc->client->auth_wq,
+ monc->have_osdmap >= epoch, timeout);
+ if (ret < 0)
+ return ret;
+
+ mutex_lock(&monc->mutex);
+ }
+
+ mutex_unlock(&monc->mutex);
+ return 0;
+}
+EXPORT_SYMBOL(ceph_monc_wait_osdmap);
/*
*
*/
int ceph_monc_open_session(struct ceph_mon_client *monc)
{
- if (!monc->con) {
- monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
- if (!monc->con)
- return -ENOMEM;
- ceph_con_init(monc->client->msgr, monc->con);
- monc->con->private = monc;
- monc->con->ops = &mon_con_ops;
- }
-
mutex_lock(&monc->mutex);
__open_session(monc);
__schedule_delayed(monc);
@@ -320,6 +338,17 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)
EXPORT_SYMBOL(ceph_monc_open_session);
/*
+ * We require the fsid and global_id in order to initialize our
+ * debugfs dir.
+ */
+static bool have_debugfs_info(struct ceph_mon_client *monc)
+{
+ dout("have_debugfs_info fsid %d globalid %lld\n",
+ (int)monc->client->have_fsid, monc->auth->global_id);
+ return monc->client->have_fsid && monc->auth->global_id > 0;
+}
+
+/*
* The monitor responds with mount ack indicate mount success. The
* included client ticket allows the client to talk to MDSs and OSDs.
*/
@@ -329,9 +358,12 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
struct ceph_client *client = monc->client;
struct ceph_monmap *monmap = NULL, *old = monc->monmap;
void *p, *end;
+ int had_debugfs_info, init_debugfs = 0;
mutex_lock(&monc->mutex);
+ had_debugfs_info = have_debugfs_info(monc);
+
dout("handle_monmap\n");
p = msg->front.iov_base;
end = p + msg->front.iov_len;
@@ -351,8 +383,29 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
client->monc.monmap = monmap;
kfree(old);
+ if (!client->have_fsid) {
+ client->have_fsid = true;
+ if (!had_debugfs_info && have_debugfs_info(monc)) {
+ pr_info("client%lld fsid %pU\n",
+ ceph_client_id(monc->client),
+ &monc->client->fsid);
+ init_debugfs = 1;
+ }
+ mutex_unlock(&monc->mutex);
+
+ if (init_debugfs) {
+ /*
+ * do debugfs initialization without mutex to avoid
+ * creating a locking dependency
+ */
+ ceph_debugfs_client_init(monc->client);
+ }
+
+ goto out_unlocked;
+ }
out:
mutex_unlock(&monc->mutex);
+out_unlocked:
wake_up_all(&client->auth_wq);
}
@@ -439,6 +492,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
m = NULL;
} else {
dout("get_generic_reply %lld got %p\n", tid, req->reply);
+ *skip = 0;
m = ceph_msg_get(req->reply);
/*
* we don't need to track the connection reading into
@@ -450,18 +504,17 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m;
}
-static int do_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *req)
+static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
+ struct ceph_mon_generic_request *req)
{
int err;
/* register request */
- mutex_lock(&monc->mutex);
- req->tid = ++monc->last_tid;
+ req->tid = tid != 0 ? tid : ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
- ceph_con_send(monc->con, ceph_msg_get(req->request));
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
mutex_unlock(&monc->mutex);
err = wait_for_completion_interruptible(&req->completion);
@@ -469,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--;
- mutex_unlock(&monc->mutex);
if (!err)
err = req->result;
return err;
}
+static int do_generic_request(struct ceph_mon_client *monc,
+ struct ceph_mon_generic_request *req)
+{
+ int err;
+
+ mutex_lock(&monc->mutex);
+ err = __do_generic_request(monc, 0, req);
+ mutex_unlock(&monc->mutex);
+
+ return err;
+}
+
/*
* statfs
*/
@@ -528,10 +592,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
init_completion(&req->completion);
err = -ENOMEM;
- req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
+ req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
+ true);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
+ req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
+ true);
if (!req->reply)
goto out;
@@ -550,6 +616,96 @@ out:
}
EXPORT_SYMBOL(ceph_monc_do_statfs);
+static void handle_get_version_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
+{
+ struct ceph_mon_generic_request *req;
+ u64 tid = le64_to_cpu(msg->hdr.tid);
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front_alloc_len;
+ u64 handle;
+
+ dout("%s %p tid %llu\n", __func__, msg, tid);
+
+ ceph_decode_need(&p, end, 2*sizeof(u64), bad);
+ handle = ceph_decode_64(&p);
+ if (tid != 0 && tid != handle)
+ goto bad;
+
+ mutex_lock(&monc->mutex);
+ req = __lookup_generic_req(monc, handle);
+ if (req) {
+ *(u64 *)req->buf = ceph_decode_64(&p);
+ req->result = 0;
+ get_generic_request(req);
+ }
+ mutex_unlock(&monc->mutex);
+ if (req) {
+ complete_all(&req->completion);
+ put_generic_request(req);
+ }
+
+ return;
+bad:
+ pr_err("corrupt mon_get_version reply\n");
+ ceph_msg_dump(msg);
+}
+
+/*
+ * Send MMonGetVersion and wait for the reply.
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
+ u64 *newest)
+{
+ struct ceph_mon_generic_request *req;
+ void *p, *end;
+ u64 tid;
+ int err;
+
+ req = kzalloc(sizeof(*req), GFP_NOFS);
+ if (!req)
+ return -ENOMEM;
+
+ kref_init(&req->kref);
+ req->buf = newest;
+ req->buf_len = sizeof(*newest);
+ init_completion(&req->completion);
+
+ req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
+ sizeof(u64) + sizeof(u32) + strlen(what),
+ GFP_NOFS, true);
+ if (!req->request) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
+ GFP_NOFS, true);
+ if (!req->reply) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ p = req->request->front.iov_base;
+ end = p + req->request->front_alloc_len;
+
+ /* fill out request */
+ mutex_lock(&monc->mutex);
+ tid = ++monc->last_tid;
+ ceph_encode_64(&p, tid); /* handle */
+ ceph_encode_string(&p, end, what, strlen(what));
+
+ err = __do_generic_request(monc, tid, req);
+
+ mutex_unlock(&monc->mutex);
+out:
+ kref_put(&req->kref, release_generic_request);
+ return err;
+}
+EXPORT_SYMBOL(ceph_monc_do_get_version);
+
/*
* pool ops
*/
@@ -608,7 +764,7 @@ bad:
/*
* Do a synchronous pool op.
*/
-int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+static int do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
@@ -626,10 +782,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
init_completion(&req->completion);
err = -ENOMEM;
- req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+ req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
+ true);
if (!req->request)
goto out;
- req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+ req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
+ true);
if (!req->reply)
goto out;
@@ -656,7 +814,7 @@ out:
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
- return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
@@ -665,8 +823,8 @@ EXPORT_SYMBOL(ceph_monc_create_snapid);
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
- return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
- pool, snapid, 0, 0);
+ return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
+ pool, snapid, NULL, 0);
}
@@ -680,8 +838,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc)
for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_mon_generic_request, node);
- ceph_con_revoke(monc->con, req->request);
- ceph_con_send(monc->con, ceph_msg_get(req->request));
+ ceph_msg_revoke(req->request);
+ ceph_msg_revoke_incoming(req->reply);
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
}
}
@@ -701,11 +860,11 @@ static void delayed_work(struct work_struct *work)
__close_session(monc);
__open_session(monc); /* continue hunting */
} else {
- ceph_con_keepalive(monc->con);
+ ceph_con_keepalive(&monc->con);
__validate_auth(monc);
- if (monc->auth->ops->is_authenticated(monc->auth))
+ if (ceph_auth_is_authenticated(monc->auth))
__send_subscribe(monc);
}
__schedule_delayed(monc);
@@ -737,7 +896,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
}
monc->monmap->num_mon = num_mon;
- monc->have_fsid = false;
return 0;
}
@@ -755,13 +913,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
if (err)
goto out;
- monc->con = NULL;
-
+ /* connection */
/* authentication */
monc->auth = ceph_auth_init(cl->options->name,
- cl->options->secret);
- if (IS_ERR(monc->auth))
- return PTR_ERR(monc->auth);
+ cl->options->key);
+ if (IS_ERR(monc->auth)) {
+ err = PTR_ERR(monc->auth);
+ goto out_monmap;
+ }
monc->auth->want_keys =
CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
@@ -770,23 +929,28 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
err = -ENOMEM;
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
sizeof(struct ceph_mon_subscribe_ack),
- GFP_NOFS);
+ GFP_NOFS, true);
if (!monc->m_subscribe_ack)
- goto out_monmap;
+ goto out_auth;
- monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
+ monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
+ true);
if (!monc->m_subscribe)
goto out_subscribe_ack;
- monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
+ monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
+ true);
if (!monc->m_auth_reply)
goto out_subscribe;
- monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
+ monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
monc->pending_auth = 0;
if (!monc->m_auth)
goto out_auth_reply;
+ ceph_con_init(&monc->con, monc, &mon_con_ops,
+ &monc->client->msgr);
+
monc->cur_mon = -1;
monc->hunting = true;
monc->sub_renew_after = jiffies;
@@ -808,6 +972,8 @@ out_subscribe:
ceph_msg_put(monc->m_subscribe);
out_subscribe_ack:
ceph_msg_put(monc->m_subscribe_ack);
+out_auth:
+ ceph_auth_destroy(monc->auth);
out_monmap:
kfree(monc->monmap);
out:
@@ -822,13 +988,17 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
mutex_lock(&monc->mutex);
__close_session(monc);
- if (monc->con) {
- monc->con->private = NULL;
- monc->con->ops->put(monc->con);
- monc->con = NULL;
- }
+
mutex_unlock(&monc->mutex);
+ /*
+ * flush msgr queue before we destroy ourselves to ensure that:
+ * - any work that references our embedded con is finished.
+ * - any osd_client or other work that may reference an authorizer
+ * finishes before we shut down the auth subsystem.
+ */
+ ceph_msgr_flush();
+
ceph_auth_destroy(monc->auth);
ceph_msg_put(monc->m_auth);
@@ -845,31 +1015,47 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
{
int ret;
int was_auth = 0;
+ int had_debugfs_info, init_debugfs = 0;
mutex_lock(&monc->mutex);
- if (monc->auth->ops)
- was_auth = monc->auth->ops->is_authenticated(monc->auth);
+ had_debugfs_info = have_debugfs_info(monc);
+ was_auth = ceph_auth_is_authenticated(monc->auth);
monc->pending_auth = 0;
ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
msg->front.iov_len,
monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
+ monc->m_auth->front_alloc_len);
if (ret < 0) {
monc->client->auth_err = ret;
wake_up_all(&monc->client->auth_wq);
} else if (ret > 0) {
__send_prepared_auth_request(monc, ret);
- } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
+ } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
dout("authenticated, starting session\n");
- monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
- monc->client->msgr->inst.name.num =
+ monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
+ monc->client->msgr.inst.name.num =
cpu_to_le64(monc->auth->global_id);
__send_subscribe(monc);
__resend_generic_request(monc);
}
+
+ if (!had_debugfs_info && have_debugfs_info(monc)) {
+ pr_info("client%lld fsid %pU\n",
+ ceph_client_id(monc->client),
+ &monc->client->fsid);
+ init_debugfs = 1;
+ }
mutex_unlock(&monc->mutex);
+
+ if (init_debugfs) {
+ /*
+ * do debugfs initialization without mutex to avoid
+ * creating a locking dependency
+ */
+ ceph_debugfs_client_init(monc->client);
+ }
}
static int __validate_auth(struct ceph_mon_client *monc)
@@ -880,7 +1066,7 @@ static int __validate_auth(struct ceph_mon_client *monc)
return 0;
ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
- monc->m_auth->front_max);
+ monc->m_auth->front_alloc_len);
if (ret <= 0)
return ret; /* either an error, or no need to authenticate */
__send_prepared_auth_request(monc, ret);
@@ -922,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg);
break;
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ handle_get_version_reply(monc, msg);
+ break;
+
case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg);
break;
@@ -970,10 +1160,21 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_AUTH_REPLY:
m = ceph_msg_get(monc->m_auth_reply);
break;
+ case CEPH_MSG_MON_GET_VERSION_REPLY:
+ if (le64_to_cpu(hdr->tid) != 0)
+ return get_generic_reply(con, hdr, skip);
+
+ /*
+ * Older OSDs don't set reply tid even if the orignal
+ * request had a non-zero tid. Workaround this weirdness
+ * by falling through to the allocate case.
+ */
case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP:
- m = ceph_msg_new(type, front_len, GFP_NOFS);
+ m = ceph_msg_new(type, front_len, GFP_NOFS, false);
+ if (!m)
+ return NULL; /* ENOMEM--return skip == 0 */
break;
}
@@ -1000,10 +1201,10 @@ static void mon_fault(struct ceph_connection *con)
if (!con->private)
goto out;
- if (monc->con && !monc->hunting)
+ if (!monc->hunting)
pr_info("mon%d %s session lost, "
"hunting for new mon\n", monc->cur_mon,
- ceph_pr_addr(&monc->con->peer_addr.in_addr));
+ ceph_pr_addr(&monc->con.peer_addr.in_addr));
__close_session(monc);
if (!monc->hunting) {
@@ -1018,9 +1219,23 @@ out:
mutex_unlock(&monc->mutex);
}
+/*
+ * We can ignore refcounting on the connection struct, as all references
+ * will come from the messenger workqueue, which is drained prior to
+ * mon_client destruction.
+ */
+static struct ceph_connection *con_get(struct ceph_connection *con)
+{
+ return con;
+}
+
+static void con_put(struct ceph_connection *con)
+{
+}
+
static const struct ceph_connection_operations mon_con_ops = {
- .get = ceph_con_get,
- .put = ceph_con_put,
+ .get = con_get,
+ .put = con_put,
.dispatch = dispatch,
.fault = mon_fault,
.alloc_msg = mon_alloc_msg,