diff options
Diffstat (limited to 'net/ceph/mon_client.c')
| -rw-r--r-- | net/ceph/mon_client.c | 373 | 
1 files changed, 294 insertions, 79 deletions
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 8a079399174..067d3af2eaf 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -8,8 +8,8 @@  #include <linux/ceph/mon_client.h>  #include <linux/ceph/libceph.h> +#include <linux/ceph/debugfs.h>  #include <linux/ceph/decode.h> -  #include <linux/ceph/auth.h>  /* @@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)  	monc->pending_auth = 1;  	monc->m_auth->front.iov_len = len;  	monc->m_auth->hdr.front_len = cpu_to_le32(len); -	ceph_con_revoke(monc->con, monc->m_auth); +	ceph_msg_revoke(monc->m_auth);  	ceph_msg_get(monc->m_auth);  /* keep our ref */ -	ceph_con_send(monc->con, monc->m_auth); +	ceph_con_send(&monc->con, monc->m_auth);  }  /* @@ -116,14 +116,15 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)   */  static void __close_session(struct ceph_mon_client *monc)  { -	if (monc->con) { -		dout("__close_session closing mon%d\n", monc->cur_mon); -		ceph_con_revoke(monc->con, monc->m_auth); -		ceph_con_close(monc->con); -		monc->cur_mon = -1; -		monc->pending_auth = 0; -		ceph_auth_reset(monc->auth); -	} +	dout("__close_session closing mon%d\n", monc->cur_mon); +	ceph_msg_revoke(monc->m_auth); +	ceph_msg_revoke_incoming(monc->m_auth_reply); +	ceph_msg_revoke(monc->m_subscribe); +	ceph_msg_revoke_incoming(monc->m_subscribe_ack); +	ceph_con_close(&monc->con); +	monc->cur_mon = -1; +	monc->pending_auth = 0; +	ceph_auth_reset(monc->auth);  }  /* @@ -144,15 +145,14 @@ static int __open_session(struct ceph_mon_client *monc)  		monc->want_next_osdmap = !!monc->want_next_osdmap;  		dout("open_session mon%d opening\n", monc->cur_mon); -		monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; -		monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); -		ceph_con_open(monc->con, +		ceph_con_open(&monc->con, +			      CEPH_ENTITY_TYPE_MON, monc->cur_mon,  			      &monc->monmap->mon_inst[monc->cur_mon].addr);  		/* initiatiate authentication handshake */  		ret = ceph_auth_build_hello(monc->auth,  					    monc->m_auth->front.iov_base, -					    monc->m_auth->front_max); +					    monc->m_auth->front_alloc_len);  		__send_prepared_auth_request(monc, ret);  	} else {  		dout("open_session mon%d already open\n", monc->cur_mon); @@ -170,7 +170,7 @@ static bool __sub_expired(struct ceph_mon_client *monc)   */  static void __schedule_delayed(struct ceph_mon_client *monc)  { -	unsigned delay; +	unsigned int delay;  	if (monc->cur_mon < 0 || __sub_expired(monc))  		delay = 10 * HZ; @@ -186,7 +186,7 @@ static void __schedule_delayed(struct ceph_mon_client *monc)  static void __send_subscribe(struct ceph_mon_client *monc)  {  	dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", -	     (unsigned)monc->sub_sent, __sub_expired(monc), +	     (unsigned int)monc->sub_sent, __sub_expired(monc),  	     monc->want_next_osdmap);  	if ((__sub_expired(monc) && !monc->sub_sent) ||  	    monc->want_next_osdmap == 1) { @@ -196,14 +196,14 @@ static void __send_subscribe(struct ceph_mon_client *monc)  		int num;  		p = msg->front.iov_base; -		end = p + msg->front_max; +		end = p + msg->front_alloc_len;  		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;  		ceph_encode_32(&p, num);  		if (monc->want_next_osdmap) {  			dout("__send_subscribe to 'osdmap' %u\n", -			     (unsigned)monc->have_osdmap); +			     (unsigned int)monc->have_osdmap);  			ceph_encode_string(&p, end, "osdmap", 6);  			i = p;  			i->have = cpu_to_le64(monc->have_osdmap); @@ -213,7 +213,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)  		}  		if (monc->want_mdsmap) {  			dout("__send_subscribe to 'mdsmap' %u+\n", -			     (unsigned)monc->have_mdsmap); +			     (unsigned int)monc->have_mdsmap);  			ceph_encode_string(&p, end, "mdsmap", 6);  			i = p;  			i->have = cpu_to_le64(monc->have_mdsmap); @@ -228,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)  		msg->front.iov_len = p - msg->front.iov_base;  		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); -		ceph_con_revoke(monc->con, msg); -		ceph_con_send(monc->con, ceph_msg_get(msg)); +		ceph_msg_revoke(msg); +		ceph_con_send(&monc->con, ceph_msg_get(msg));  		monc->sub_sent = jiffies | 1;  /* never 0 */  	} @@ -238,7 +238,7 @@ static void __send_subscribe(struct ceph_mon_client *monc)  static void handle_subscribe_ack(struct ceph_mon_client *monc,  				 struct ceph_msg *msg)  { -	unsigned seconds; +	unsigned int seconds;  	struct ceph_mon_subscribe_ack *h = msg->front.iov_base;  	if (msg->front.iov_len < sizeof(*h)) @@ -249,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,  	if (monc->hunting) {  		pr_info("mon%d %s session established\n",  			monc->cur_mon, -			ceph_pr_addr(&monc->con->peer_addr.in_addr)); +			ceph_pr_addr(&monc->con.peer_addr.in_addr));  		monc->hunting = false;  	}  	dout("handle_subscribe_ack after %d seconds\n", seconds); @@ -296,21 +296,39 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)  		__send_subscribe(monc);  	mutex_unlock(&monc->mutex);  } +EXPORT_SYMBOL(ceph_monc_request_next_osdmap); + +int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, +			  unsigned long timeout) +{ +	unsigned long started = jiffies; +	int ret; + +	mutex_lock(&monc->mutex); +	while (monc->have_osdmap < epoch) { +		mutex_unlock(&monc->mutex); + +		if (timeout != 0 && time_after_eq(jiffies, started + timeout)) +			return -ETIMEDOUT; + +		ret = wait_event_interruptible_timeout(monc->client->auth_wq, +					 monc->have_osdmap >= epoch, timeout); +		if (ret < 0) +			return ret; + +		mutex_lock(&monc->mutex); +	} + +	mutex_unlock(&monc->mutex); +	return 0; +} +EXPORT_SYMBOL(ceph_monc_wait_osdmap);  /*   *   */  int ceph_monc_open_session(struct ceph_mon_client *monc)  { -	if (!monc->con) { -		monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); -		if (!monc->con) -			return -ENOMEM; -		ceph_con_init(monc->client->msgr, monc->con); -		monc->con->private = monc; -		monc->con->ops = &mon_con_ops; -	} -  	mutex_lock(&monc->mutex);  	__open_session(monc);  	__schedule_delayed(monc); @@ -320,6 +338,17 @@ int ceph_monc_open_session(struct ceph_mon_client *monc)  EXPORT_SYMBOL(ceph_monc_open_session);  /* + * We require the fsid and global_id in order to initialize our + * debugfs dir. + */ +static bool have_debugfs_info(struct ceph_mon_client *monc) +{ +	dout("have_debugfs_info fsid %d globalid %lld\n", +	     (int)monc->client->have_fsid, monc->auth->global_id); +	return monc->client->have_fsid && monc->auth->global_id > 0; +} + +/*   * The monitor responds with mount ack indicate mount success.  The   * included client ticket allows the client to talk to MDSs and OSDs.   */ @@ -329,9 +358,12 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,  	struct ceph_client *client = monc->client;  	struct ceph_monmap *monmap = NULL, *old = monc->monmap;  	void *p, *end; +	int had_debugfs_info, init_debugfs = 0;  	mutex_lock(&monc->mutex); +	had_debugfs_info = have_debugfs_info(monc); +  	dout("handle_monmap\n");  	p = msg->front.iov_base;  	end = p + msg->front.iov_len; @@ -351,8 +383,29 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,  	client->monc.monmap = monmap;  	kfree(old); +	if (!client->have_fsid) { +		client->have_fsid = true; +		if (!had_debugfs_info && have_debugfs_info(monc)) { +			pr_info("client%lld fsid %pU\n", +				ceph_client_id(monc->client), +				&monc->client->fsid); +			init_debugfs = 1; +		} +		mutex_unlock(&monc->mutex); + +		if (init_debugfs) { +			/* +			 * do debugfs initialization without mutex to avoid +			 * creating a locking dependency +			 */ +			ceph_debugfs_client_init(monc->client); +		} + +		goto out_unlocked; +	}  out:  	mutex_unlock(&monc->mutex); +out_unlocked:  	wake_up_all(&client->auth_wq);  } @@ -439,6 +492,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,  		m = NULL;  	} else {  		dout("get_generic_reply %lld got %p\n", tid, req->reply); +		*skip = 0;  		m = ceph_msg_get(req->reply);  		/*  		 * we don't need to track the connection reading into @@ -450,18 +504,17 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,  	return m;  } -static int do_generic_request(struct ceph_mon_client *monc, -			      struct ceph_mon_generic_request *req) +static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, +				struct ceph_mon_generic_request *req)  {  	int err;  	/* register request */ -	mutex_lock(&monc->mutex); -	req->tid = ++monc->last_tid; +	req->tid = tid != 0 ? tid : ++monc->last_tid;  	req->request->hdr.tid = cpu_to_le64(req->tid);  	__insert_generic_request(monc, req);  	monc->num_generic_requests++; -	ceph_con_send(monc->con, ceph_msg_get(req->request)); +	ceph_con_send(&monc->con, ceph_msg_get(req->request));  	mutex_unlock(&monc->mutex);  	err = wait_for_completion_interruptible(&req->completion); @@ -469,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,  	mutex_lock(&monc->mutex);  	rb_erase(&req->node, &monc->generic_request_tree);  	monc->num_generic_requests--; -	mutex_unlock(&monc->mutex);  	if (!err)  		err = req->result;  	return err;  } +static int do_generic_request(struct ceph_mon_client *monc, +			      struct ceph_mon_generic_request *req) +{ +	int err; + +	mutex_lock(&monc->mutex); +	err = __do_generic_request(monc, 0, req); +	mutex_unlock(&monc->mutex); + +	return err; +} +  /*   * statfs   */ @@ -528,10 +592,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)  	init_completion(&req->completion);  	err = -ENOMEM; -	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); +	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, +				    true);  	if (!req->request)  		goto out; -	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); +	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, +				  true);  	if (!req->reply)  		goto out; @@ -550,6 +616,96 @@ out:  }  EXPORT_SYMBOL(ceph_monc_do_statfs); +static void handle_get_version_reply(struct ceph_mon_client *monc, +				     struct ceph_msg *msg) +{ +	struct ceph_mon_generic_request *req; +	u64 tid = le64_to_cpu(msg->hdr.tid); +	void *p = msg->front.iov_base; +	void *end = p + msg->front_alloc_len; +	u64 handle; + +	dout("%s %p tid %llu\n", __func__, msg, tid); + +	ceph_decode_need(&p, end, 2*sizeof(u64), bad); +	handle = ceph_decode_64(&p); +	if (tid != 0 && tid != handle) +		goto bad; + +	mutex_lock(&monc->mutex); +	req = __lookup_generic_req(monc, handle); +	if (req) { +		*(u64 *)req->buf = ceph_decode_64(&p); +		req->result = 0; +		get_generic_request(req); +	} +	mutex_unlock(&monc->mutex); +	if (req) { +		complete_all(&req->completion); +		put_generic_request(req); +	} + +	return; +bad: +	pr_err("corrupt mon_get_version reply\n"); +	ceph_msg_dump(msg); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, +			     u64 *newest) +{ +	struct ceph_mon_generic_request *req; +	void *p, *end; +	u64 tid; +	int err; + +	req = kzalloc(sizeof(*req), GFP_NOFS); +	if (!req) +		return -ENOMEM; + +	kref_init(&req->kref); +	req->buf = newest; +	req->buf_len = sizeof(*newest); +	init_completion(&req->completion); + +	req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, +				    sizeof(u64) + sizeof(u32) + strlen(what), +				    GFP_NOFS, true); +	if (!req->request) { +		err = -ENOMEM; +		goto out; +	} + +	req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, +				  GFP_NOFS, true); +	if (!req->reply) { +		err = -ENOMEM; +		goto out; +	} + +	p = req->request->front.iov_base; +	end = p + req->request->front_alloc_len; + +	/* fill out request */ +	mutex_lock(&monc->mutex); +	tid = ++monc->last_tid; +	ceph_encode_64(&p, tid); /* handle */ +	ceph_encode_string(&p, end, what, strlen(what)); + +	err = __do_generic_request(monc, tid, req); + +	mutex_unlock(&monc->mutex); +out: +	kref_put(&req->kref, release_generic_request); +	return err; +} +EXPORT_SYMBOL(ceph_monc_do_get_version); +  /*   * pool ops   */ @@ -608,7 +764,7 @@ bad:  /*   * Do a synchronous pool op.   */ -int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, +static int do_poolop(struct ceph_mon_client *monc, u32 op,  			u32 pool, u64 snapid,  			char *buf, int len)  { @@ -626,10 +782,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,  	init_completion(&req->completion);  	err = -ENOMEM; -	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); +	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, +				    true);  	if (!req->request)  		goto out; -	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); +	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, +				  true);  	if (!req->reply)  		goto out; @@ -656,7 +814,7 @@ out:  int ceph_monc_create_snapid(struct ceph_mon_client *monc,  			    u32 pool, u64 *snapid)  { -	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP, +	return do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,  				   pool, 0, (char *)snapid, sizeof(*snapid));  } @@ -665,8 +823,8 @@ EXPORT_SYMBOL(ceph_monc_create_snapid);  int ceph_monc_delete_snapid(struct ceph_mon_client *monc,  			    u32 pool, u64 snapid)  { -	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP, -				   pool, snapid, 0, 0); +	return do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP, +				   pool, snapid, NULL, 0);  } @@ -680,8 +838,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc)  	for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {  		req = rb_entry(p, struct ceph_mon_generic_request, node); -		ceph_con_revoke(monc->con, req->request); -		ceph_con_send(monc->con, ceph_msg_get(req->request)); +		ceph_msg_revoke(req->request); +		ceph_msg_revoke_incoming(req->reply); +		ceph_con_send(&monc->con, ceph_msg_get(req->request));  	}  } @@ -701,11 +860,11 @@ static void delayed_work(struct work_struct *work)  		__close_session(monc);  		__open_session(monc);  /* continue hunting */  	} else { -		ceph_con_keepalive(monc->con); +		ceph_con_keepalive(&monc->con);  		__validate_auth(monc); -		if (monc->auth->ops->is_authenticated(monc->auth)) +		if (ceph_auth_is_authenticated(monc->auth))  			__send_subscribe(monc);  	}  	__schedule_delayed(monc); @@ -737,7 +896,6 @@ static int build_initial_monmap(struct ceph_mon_client *monc)  		monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);  	}  	monc->monmap->num_mon = num_mon; -	monc->have_fsid = false;  	return 0;  } @@ -755,13 +913,14 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)  	if (err)  		goto out; -	monc->con = NULL; - +	/* connection */  	/* authentication */  	monc->auth = ceph_auth_init(cl->options->name, -				    cl->options->secret); -	if (IS_ERR(monc->auth)) -		return PTR_ERR(monc->auth); +				    cl->options->key); +	if (IS_ERR(monc->auth)) { +		err = PTR_ERR(monc->auth); +		goto out_monmap; +	}  	monc->auth->want_keys =  		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |  		CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; @@ -770,23 +929,28 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)  	err = -ENOMEM;  	monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,  				     sizeof(struct ceph_mon_subscribe_ack), -				     GFP_NOFS); +				     GFP_NOFS, true);  	if (!monc->m_subscribe_ack) -		goto out_monmap; +		goto out_auth; -	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); +	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, +					 true);  	if (!monc->m_subscribe)  		goto out_subscribe_ack; -	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); +	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, +					  true);  	if (!monc->m_auth_reply)  		goto out_subscribe; -	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); +	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);  	monc->pending_auth = 0;  	if (!monc->m_auth)  		goto out_auth_reply; +	ceph_con_init(&monc->con, monc, &mon_con_ops, +		      &monc->client->msgr); +  	monc->cur_mon = -1;  	monc->hunting = true;  	monc->sub_renew_after = jiffies; @@ -808,6 +972,8 @@ out_subscribe:  	ceph_msg_put(monc->m_subscribe);  out_subscribe_ack:  	ceph_msg_put(monc->m_subscribe_ack); +out_auth: +	ceph_auth_destroy(monc->auth);  out_monmap:  	kfree(monc->monmap);  out: @@ -822,13 +988,17 @@ void ceph_monc_stop(struct ceph_mon_client *monc)  	mutex_lock(&monc->mutex);  	__close_session(monc); -	if (monc->con) { -		monc->con->private = NULL; -		monc->con->ops->put(monc->con); -		monc->con = NULL; -	} +  	mutex_unlock(&monc->mutex); +	/* +	 * flush msgr queue before we destroy ourselves to ensure that: +	 *  - any work that references our embedded con is finished. +	 *  - any osd_client or other work that may reference an authorizer +	 *    finishes before we shut down the auth subsystem. +	 */ +	ceph_msgr_flush(); +  	ceph_auth_destroy(monc->auth);  	ceph_msg_put(monc->m_auth); @@ -845,31 +1015,47 @@ static void handle_auth_reply(struct ceph_mon_client *monc,  {  	int ret;  	int was_auth = 0; +	int had_debugfs_info, init_debugfs = 0;  	mutex_lock(&monc->mutex); -	if (monc->auth->ops) -		was_auth = monc->auth->ops->is_authenticated(monc->auth); +	had_debugfs_info = have_debugfs_info(monc); +	was_auth = ceph_auth_is_authenticated(monc->auth);  	monc->pending_auth = 0;  	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,  				     msg->front.iov_len,  				     monc->m_auth->front.iov_base, -				     monc->m_auth->front_max); +				     monc->m_auth->front_alloc_len);  	if (ret < 0) {  		monc->client->auth_err = ret;  		wake_up_all(&monc->client->auth_wq);  	} else if (ret > 0) {  		__send_prepared_auth_request(monc, ret); -	} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { +	} else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {  		dout("authenticated, starting session\n"); -		monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; -		monc->client->msgr->inst.name.num = +		monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; +		monc->client->msgr.inst.name.num =  					cpu_to_le64(monc->auth->global_id);  		__send_subscribe(monc);  		__resend_generic_request(monc);  	} + +	if (!had_debugfs_info && have_debugfs_info(monc)) { +		pr_info("client%lld fsid %pU\n", +			ceph_client_id(monc->client), +			&monc->client->fsid); +		init_debugfs = 1; +	}  	mutex_unlock(&monc->mutex); + +	if (init_debugfs) { +		/* +		 * do debugfs initialization without mutex to avoid +		 * creating a locking dependency +		 */ +		ceph_debugfs_client_init(monc->client); +	}  }  static int __validate_auth(struct ceph_mon_client *monc) @@ -880,7 +1066,7 @@ static int __validate_auth(struct ceph_mon_client *monc)  		return 0;  	ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, -			      monc->m_auth->front_max); +			      monc->m_auth->front_alloc_len);  	if (ret <= 0)  		return ret; /* either an error, or no need to authenticate */  	__send_prepared_auth_request(monc, ret); @@ -922,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)  		handle_statfs_reply(monc, msg);  		break; +	case CEPH_MSG_MON_GET_VERSION_REPLY: +		handle_get_version_reply(monc, msg); +		break; +  	case CEPH_MSG_POOLOP_REPLY:  		handle_poolop_reply(monc, msg);  		break; @@ -970,10 +1160,21 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,  	case CEPH_MSG_AUTH_REPLY:  		m = ceph_msg_get(monc->m_auth_reply);  		break; +	case CEPH_MSG_MON_GET_VERSION_REPLY: +		if (le64_to_cpu(hdr->tid) != 0) +			return get_generic_reply(con, hdr, skip); + +		/* +		 * Older OSDs don't set reply tid even if the orignal +		 * request had a non-zero tid.  Workaround this weirdness +		 * by falling through to the allocate case. +		 */  	case CEPH_MSG_MON_MAP:  	case CEPH_MSG_MDS_MAP:  	case CEPH_MSG_OSD_MAP: -		m = ceph_msg_new(type, front_len, GFP_NOFS); +		m = ceph_msg_new(type, front_len, GFP_NOFS, false); +		if (!m) +			return NULL;	/* ENOMEM--return skip == 0 */  		break;  	} @@ -1000,10 +1201,10 @@ static void mon_fault(struct ceph_connection *con)  	if (!con->private)  		goto out; -	if (monc->con && !monc->hunting) +	if (!monc->hunting)  		pr_info("mon%d %s session lost, "  			"hunting for new mon\n", monc->cur_mon, -			ceph_pr_addr(&monc->con->peer_addr.in_addr)); +			ceph_pr_addr(&monc->con.peer_addr.in_addr));  	__close_session(monc);  	if (!monc->hunting) { @@ -1018,9 +1219,23 @@ out:  	mutex_unlock(&monc->mutex);  } +/* + * We can ignore refcounting on the connection struct, as all references + * will come from the messenger workqueue, which is drained prior to + * mon_client destruction. + */ +static struct ceph_connection *con_get(struct ceph_connection *con) +{ +	return con; +} + +static void con_put(struct ceph_connection *con) +{ +} +  static const struct ceph_connection_operations mon_con_ops = { -	.get = ceph_con_get, -	.put = ceph_con_put, +	.get = con_get, +	.put = con_put,  	.dispatch = dispatch,  	.fault = mon_fault,  	.alloc_msg = mon_alloc_msg,  | 
