diff options
Diffstat (limited to 'net/sunrpc')
35 files changed, 1811 insertions, 1667 deletions
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig index 241b54f3020..0754d0f466d 100644 --- a/net/sunrpc/Kconfig +++ b/net/sunrpc/Kconfig @@ -9,19 +9,6 @@ config SUNRPC_BACKCHANNEL  	bool  	depends on SUNRPC -config SUNRPC_XPRT_RDMA -	tristate -	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS -	default SUNRPC && INFINIBAND -	help -	  This option allows the NFS client and server to support -	  an RDMA-enabled transport. - -	  To compile RPC client RDMA transport support as a module, -	  choose M here: the module will be called xprtrdma. - -	  If unsure, say N. -  config SUNRPC_SWAP  	bool  	depends on SUNRPC @@ -57,3 +44,29 @@ config SUNRPC_DEBUG  	  but makes troubleshooting NFS issues significantly harder.  	  If unsure, say Y. + +config SUNRPC_XPRT_RDMA_CLIENT +	tristate "RPC over RDMA Client Support" +	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS +	default SUNRPC && INFINIBAND +	help +	  This option allows the NFS client to support an RDMA-enabled +	  transport. + +	  To compile RPC client RDMA transport support as a module, +	  choose M here: the module will be called xprtrdma. + +	  If unsure, say N. + +config SUNRPC_XPRT_RDMA_SERVER +	tristate "RPC over RDMA Server Support" +	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS +	default SUNRPC && INFINIBAND +	help +	  This option allows the NFS server to support an RDMA-enabled +	  transport. + +	  To compile RPC server RDMA transport support as a module, +	  choose M here: the module will be called svcrdma. + +	  If unsure, say N. diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index 8209a0411bc..e5a7a1cac8f 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -5,7 +5,8 @@  obj-$(CONFIG_SUNRPC) += sunrpc.o  obj-$(CONFIG_SUNRPC_GSS) += auth_gss/ -obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma/ + +obj-y += xprtrdma/  sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \  	    auth.o auth_null.o auth_unix.o auth_generic.o \ diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 5285ead196c..f7736671742 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -296,7 +296,7 @@ static void  rpcauth_unhash_cred_locked(struct rpc_cred *cred)  {  	hlist_del_rcu(&cred->cr_hash); -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags);  } @@ -592,6 +592,7 @@ rpcauth_lookupcred(struct rpc_auth *auth, int flags)  	put_group_info(acred.group_info);  	return ret;  } +EXPORT_SYMBOL_GPL(rpcauth_lookupcred);  void  rpcauth_init_cred(struct rpc_cred *cred, const struct auth_cred *acred, diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index fcac5d14171..b6e440baccc 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -108,6 +108,7 @@ struct gss_auth {  static DEFINE_SPINLOCK(pipe_version_lock);  static struct rpc_wait_queue pipe_version_rpc_waitqueue;  static DECLARE_WAIT_QUEUE_HEAD(pipe_version_waitqueue); +static void gss_put_auth(struct gss_auth *gss_auth);  static void gss_free_ctx(struct gss_cl_ctx *);  static const struct rpc_pipe_ops gss_upcall_ops_v0; @@ -142,7 +143,7 @@ gss_cred_set_ctx(struct rpc_cred *cred, struct gss_cl_ctx *ctx)  	gss_get_ctx(ctx);  	rcu_assign_pointer(gss_cred->gc_ctx, ctx);  	set_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags); -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(RPCAUTH_CRED_NEW, &cred->cr_flags);  } @@ -320,6 +321,7 @@ gss_release_msg(struct gss_upcall_msg *gss_msg)  	if (gss_msg->ctx != NULL)  		gss_put_ctx(gss_msg->ctx);  	rpc_destroy_wait_queue(&gss_msg->rpc_waitqueue); +	gss_put_auth(gss_msg->auth);  	kfree(gss_msg);  } @@ -420,41 +422,53 @@ static void gss_encode_v0_msg(struct gss_upcall_msg *gss_msg)  	memcpy(gss_msg->databuf, &uid, sizeof(uid));  	gss_msg->msg.data = gss_msg->databuf;  	gss_msg->msg.len = sizeof(uid); -	BUG_ON(sizeof(uid) > UPCALL_BUF_LEN); + +	BUILD_BUG_ON(sizeof(uid) > sizeof(gss_msg->databuf));  } -static void gss_encode_v1_msg(struct gss_upcall_msg *gss_msg, +static int gss_encode_v1_msg(struct gss_upcall_msg *gss_msg,  				const char *service_name,  				const char *target_name)  {  	struct gss_api_mech *mech = gss_msg->auth->mech;  	char *p = gss_msg->databuf; -	int len = 0; - -	gss_msg->msg.len = sprintf(gss_msg->databuf, "mech=%s uid=%d ", -				   mech->gm_name, -				   from_kuid(&init_user_ns, gss_msg->uid)); -	p += gss_msg->msg.len; +	size_t buflen = sizeof(gss_msg->databuf); +	int len; + +	len = scnprintf(p, buflen, "mech=%s uid=%d ", mech->gm_name, +			from_kuid(&init_user_ns, gss_msg->uid)); +	buflen -= len; +	p += len; +	gss_msg->msg.len = len;  	if (target_name) { -		len = sprintf(p, "target=%s ", target_name); +		len = scnprintf(p, buflen, "target=%s ", target_name); +		buflen -= len;  		p += len;  		gss_msg->msg.len += len;  	}  	if (service_name != NULL) { -		len = sprintf(p, "service=%s ", service_name); +		len = scnprintf(p, buflen, "service=%s ", service_name); +		buflen -= len;  		p += len;  		gss_msg->msg.len += len;  	}  	if (mech->gm_upcall_enctypes) { -		len = sprintf(p, "enctypes=%s ", mech->gm_upcall_enctypes); +		len = scnprintf(p, buflen, "enctypes=%s ", +				mech->gm_upcall_enctypes); +		buflen -= len;  		p += len;  		gss_msg->msg.len += len;  	} -	len = sprintf(p, "\n"); +	len = scnprintf(p, buflen, "\n"); +	if (len == 0) +		goto out_overflow;  	gss_msg->msg.len += len;  	gss_msg->msg.data = gss_msg->databuf; -	BUG_ON(gss_msg->msg.len > UPCALL_BUF_LEN); +	return 0; +out_overflow: +	WARN_ON_ONCE(1); +	return -ENOMEM;  }  static struct gss_upcall_msg * @@ -463,15 +477,15 @@ gss_alloc_msg(struct gss_auth *gss_auth,  {  	struct gss_upcall_msg *gss_msg;  	int vers; +	int err = -ENOMEM;  	gss_msg = kzalloc(sizeof(*gss_msg), GFP_NOFS);  	if (gss_msg == NULL) -		return ERR_PTR(-ENOMEM); +		goto err;  	vers = get_pipe_version(gss_auth->net); -	if (vers < 0) { -		kfree(gss_msg); -		return ERR_PTR(vers); -	} +	err = vers; +	if (err < 0) +		goto err_free_msg;  	gss_msg->pipe = gss_auth->gss_pipe[vers]->pipe;  	INIT_LIST_HEAD(&gss_msg->list);  	rpc_init_wait_queue(&gss_msg->rpc_waitqueue, "RPCSEC_GSS upcall waitq"); @@ -482,10 +496,20 @@ gss_alloc_msg(struct gss_auth *gss_auth,  	switch (vers) {  	case 0:  		gss_encode_v0_msg(gss_msg); +		break;  	default: -		gss_encode_v1_msg(gss_msg, service_name, gss_auth->target_name); +		err = gss_encode_v1_msg(gss_msg, service_name, gss_auth->target_name); +		if (err) +			goto err_put_pipe_version;  	}; +	kref_get(&gss_auth->kref);  	return gss_msg; +err_put_pipe_version: +	put_pipe_version(gss_auth->net); +err_free_msg: +	kfree(gss_msg); +err: +	return ERR_PTR(err);  }  static struct gss_upcall_msg * @@ -513,14 +537,7 @@ gss_setup_upcall(struct gss_auth *gss_auth, struct rpc_cred *cred)  static void warn_gssd(void)  { -	static unsigned long ratelimit; -	unsigned long now = jiffies; - -	if (time_after(now, ratelimit)) { -		printk(KERN_WARNING "RPC: AUTH_GSS upcall timed out.\n" -				"Please check user daemon is running.\n"); -		ratelimit = now + 15*HZ; -	} +	dprintk("AUTH_GSS upcall failed. Please check user daemon is running.\n");  }  static inline int @@ -581,7 +598,6 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)  	struct rpc_pipe *pipe;  	struct rpc_cred *cred = &gss_cred->gc_base;  	struct gss_upcall_msg *gss_msg; -	unsigned long timeout;  	DEFINE_WAIT(wait);  	int err; @@ -589,17 +605,16 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)  		__func__, from_kuid(&init_user_ns, cred->cr_uid));  retry:  	err = 0; -	/* Default timeout is 15s unless we know that gssd is not running */ -	timeout = 15 * HZ; -	if (!sn->gssd_running) -		timeout = HZ >> 2; +	/* if gssd is down, just skip upcalling altogether */ +	if (!gssd_running(net)) { +		warn_gssd(); +		return -EACCES; +	}  	gss_msg = gss_setup_upcall(gss_auth, cred);  	if (PTR_ERR(gss_msg) == -EAGAIN) {  		err = wait_event_interruptible_timeout(pipe_version_waitqueue, -				sn->pipe_version >= 0, timeout); +				sn->pipe_version >= 0, 15 * HZ);  		if (sn->pipe_version < 0) { -			if (err == 0) -				sn->gssd_running = 0;  			warn_gssd();  			err = -EACCES;  		} @@ -981,6 +996,8 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)  	gss_auth->service = gss_pseudoflavor_to_service(gss_auth->mech, flavor);  	if (gss_auth->service == 0)  		goto err_put_mech; +	if (!gssd_running(gss_auth->net)) +		goto err_put_mech;  	auth = &gss_auth->rpc_auth;  	auth->au_cslack = GSS_CRED_SLACK >> 2;  	auth->au_rslack = GSS_VERF_SLACK >> 2; @@ -1052,6 +1069,12 @@ gss_free_callback(struct kref *kref)  }  static void +gss_put_auth(struct gss_auth *gss_auth) +{ +	kref_put(&gss_auth->kref, gss_free_callback); +} + +static void  gss_destroy(struct rpc_auth *auth)  {  	struct gss_auth *gss_auth = container_of(auth, @@ -1072,9 +1095,18 @@ gss_destroy(struct rpc_auth *auth)  	gss_auth->gss_pipe[1] = NULL;  	rpcauth_destroy_credcache(auth); -	kref_put(&gss_auth->kref, gss_free_callback); +	gss_put_auth(gss_auth);  } +/* + * Auths may be shared between rpc clients that were cloned from a + * common client with the same xprt, if they also share the flavor and + * target_name. + * + * The auth is looked up from the oldest parent sharing the same + * cl_xprt, and the auth itself references only that common parent + * (which is guaranteed to last as long as any of its descendants). + */  static struct gss_auth *  gss_auth_find_or_add_hashed(struct rpc_auth_create_args *args,  		struct rpc_clnt *clnt, @@ -1088,6 +1120,8 @@ gss_auth_find_or_add_hashed(struct rpc_auth_create_args *args,  			gss_auth,  			hash,  			hashval) { +		if (gss_auth->client != clnt) +			continue;  		if (gss_auth->rpc_auth.au_flavor != args->pseudoflavor)  			continue;  		if (gss_auth->target_name != args->target_name) { @@ -1232,7 +1266,7 @@ gss_destroy_nullcred(struct rpc_cred *cred)  	call_rcu(&cred->cr_rcu, gss_free_cred_callback);  	if (ctx)  		gss_put_ctx(ctx); -	kref_put(&gss_auth->kref, gss_free_callback); +	gss_put_auth(gss_auth);  }  static void @@ -1487,7 +1521,7 @@ out:  static int  gss_refresh_null(struct rpc_task *task)  { -	return -EACCES; +	return 0;  }  static __be32 * diff --git a/net/sunrpc/auth_gss/gss_krb5_keys.c b/net/sunrpc/auth_gss/gss_krb5_keys.c index 76e42e6be75..24589bd2a4b 100644 --- a/net/sunrpc/auth_gss/gss_krb5_keys.c +++ b/net/sunrpc/auth_gss/gss_krb5_keys.c @@ -59,6 +59,7 @@  #include <linux/crypto.h>  #include <linux/sunrpc/gss_krb5.h>  #include <linux/sunrpc/xdr.h> +#include <linux/lcm.h>  #ifdef RPC_DEBUG  # define RPCDBG_FACILITY        RPCDBG_AUTH @@ -72,7 +73,7 @@  static void krb5_nfold(u32 inbits, const u8 *in,  		       u32 outbits, u8 *out)  { -	int a, b, c, lcm; +	unsigned long ulcm;  	int byte, i, msbit;  	/* the code below is more readable if I make these bytes @@ -82,17 +83,7 @@ static void krb5_nfold(u32 inbits, const u8 *in,  	outbits >>= 3;  	/* first compute lcm(n,k) */ - -	a = outbits; -	b = inbits; - -	while (b != 0) { -		c = b; -		b = a%b; -		a = c; -	} - -	lcm = outbits*inbits/a; +	ulcm = lcm(inbits, outbits);  	/* now do the real work */ @@ -101,7 +92,7 @@ static void krb5_nfold(u32 inbits, const u8 *in,  	/* this will end up cycling through k lcm(k,n)/k times, which  	   is correct */ -	for (i = lcm-1; i >= 0; i--) { +	for (i = ulcm-1; i >= 0; i--) {  		/* compute the msbit in k which gets added into this byte */  		msbit = (  			/* first, start with the msbit in the first, diff --git a/net/sunrpc/auth_gss/gss_krb5_unseal.c b/net/sunrpc/auth_gss/gss_krb5_unseal.c index 6cd930f3678..6c981ddc19f 100644 --- a/net/sunrpc/auth_gss/gss_krb5_unseal.c +++ b/net/sunrpc/auth_gss/gss_krb5_unseal.c @@ -150,7 +150,6 @@ gss_verify_mic_v2(struct krb5_ctx *ctx,  	struct xdr_netobj cksumobj = {.len = sizeof(cksumdata),  				      .data = cksumdata};  	s32 now; -	u64 seqnum;  	u8 *ptr = read_token->data;  	u8 *cksumkey;  	u8 flags; @@ -197,9 +196,10 @@ gss_verify_mic_v2(struct krb5_ctx *ctx,  	if (now > ctx->endtime)  		return GSS_S_CONTEXT_EXPIRED; -	/* do sequencing checks */ - -	seqnum = be64_to_cpup((__be64 *)ptr + 8); +	/* +	 * NOTE: the sequence number at ptr + 8 is skipped, rpcsec_gss +	 * doesn't want it checked; see page 6 of rfc 2203. +	 */  	return GSS_S_COMPLETE;  } diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c index 1da52d1406f..42560e55d97 100644 --- a/net/sunrpc/auth_gss/gss_krb5_wrap.c +++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c @@ -489,7 +489,6 @@ static u32  gss_unwrap_kerberos_v2(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)  {  	s32		now; -	u64		seqnum;  	u8		*ptr;  	u8		flags = 0x00;  	u16		ec, rrc; @@ -525,7 +524,10 @@ gss_unwrap_kerberos_v2(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)  	ec = be16_to_cpup((__be16 *)(ptr + 4));  	rrc = be16_to_cpup((__be16 *)(ptr + 6)); -	seqnum = be64_to_cpup((__be64 *)(ptr + 8)); +	/* +	 * NOTE: the sequence number at ptr + 8 is skipped, rpcsec_gss +	 * doesn't want it checked; see page 6 of rfc 2203. +	 */  	if (rrc != 0)  		rotate_left(offset + 16, buf, rrc); @@ -574,8 +576,8 @@ gss_unwrap_kerberos_v2(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)  	buf->head[0].iov_len -= GSS_KRB5_TOK_HDR_LEN + headskip;  	buf->len -= GSS_KRB5_TOK_HDR_LEN + headskip; -	/* Trim off the checksum blob */ -	xdr_buf_trim(buf, GSS_KRB5_TOK_HDR_LEN + tailskip); +	/* Trim off the trailing "extra count" and checksum blob */ +	xdr_buf_trim(buf, ec + GSS_KRB5_TOK_HDR_LEN + tailskip);  	return GSS_S_COMPLETE;  } diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c index 27ce2624093..92d5ab99fbf 100644 --- a/net/sunrpc/auth_gss/gss_mech_switch.c +++ b/net/sunrpc/auth_gss/gss_mech_switch.c @@ -218,10 +218,8 @@ static struct gss_api_mech *_gss_mech_get_by_pseudoflavor(u32 pseudoflavor)  	spin_lock(®istered_mechs_lock);  	list_for_each_entry(pos, ®istered_mechs, gm_list) { -		if (!mech_supports_pseudoflavor(pos, pseudoflavor)) { -			module_put(pos->gm_owner); +		if (!mech_supports_pseudoflavor(pos, pseudoflavor))  			continue; -		}  		if (try_module_get(pos->gm_owner))  			gm = pos;  		break; diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c index f1eb0d16666..abbb7dcd168 100644 --- a/net/sunrpc/auth_gss/gss_rpc_upcall.c +++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c @@ -137,7 +137,6 @@ void init_gssp_clnt(struct sunrpc_net *sn)  {  	mutex_init(&sn->gssp_lock);  	sn->gssp_clnt = NULL; -	init_waitqueue_head(&sn->gssp_wq);  }  int set_gssp_clnt(struct net *net) @@ -154,7 +153,6 @@ int set_gssp_clnt(struct net *net)  		sn->gssp_clnt = clnt;  	}  	mutex_unlock(&sn->gssp_lock); -	wake_up(&sn->gssp_wq);  	return ret;  } @@ -298,7 +296,8 @@ int gssp_accept_sec_context_upcall(struct net *net,  	if (res.context_handle) {  		data->out_handle = rctxh.exported_context_token;  		data->mech_oid.len = rctxh.mech.len; -		memcpy(data->mech_oid.data, rctxh.mech.data, +		if (rctxh.mech.data) +			memcpy(data->mech_oid.data, rctxh.mech.data,  						data->mech_oid.len);  		client_name = rctxh.src_name.display_name;  	} diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.c b/net/sunrpc/auth_gss/gss_rpc_xdr.c index f0f78c5f1c7..1ec19f6f0c2 100644 --- a/net/sunrpc/auth_gss/gss_rpc_xdr.c +++ b/net/sunrpc/auth_gss/gss_rpc_xdr.c @@ -559,6 +559,8 @@ static int gssx_enc_cred(struct xdr_stream *xdr,  	/* cred->elements */  	err = dummy_enc_credel_array(xdr, &cred->elements); +	if (err) +		return err;  	/* cred->cred_handle_reference */  	err = gssx_enc_buffer(xdr, &cred->cred_handle_reference); @@ -740,22 +742,20 @@ void gssx_enc_accept_sec_context(struct rpc_rqst *req,  		goto done;  	/* arg->context_handle */ -	if (arg->context_handle) { +	if (arg->context_handle)  		err = gssx_enc_ctx(xdr, arg->context_handle); -		if (err) -			goto done; -	} else { +	else  		err = gssx_enc_bool(xdr, 0); -	} +	if (err) +		goto done;  	/* arg->cred_handle */ -	if (arg->cred_handle) { +	if (arg->cred_handle)  		err = gssx_enc_cred(xdr, arg->cred_handle); -		if (err) -			goto done; -	} else { +	else  		err = gssx_enc_bool(xdr, 0); -	} +	if (err) +		goto done;  	/* arg->input_token */  	err = gssx_enc_in_token(xdr, &arg->input_token); @@ -763,13 +763,12 @@ void gssx_enc_accept_sec_context(struct rpc_rqst *req,  		goto done;  	/* arg->input_cb */ -	if (arg->input_cb) { +	if (arg->input_cb)  		err = gssx_enc_cb(xdr, arg->input_cb); -		if (err) -			goto done; -	} else { +	else  		err = gssx_enc_bool(xdr, 0); -	} +	if (err) +		goto done;  	err = gssx_enc_bool(xdr, arg->ret_deleg_cred);  	if (err) diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c index 09fb638bcaa..4ce5eccec1f 100644 --- a/net/sunrpc/auth_gss/svcauth_gss.c +++ b/net/sunrpc/auth_gss/svcauth_gss.c @@ -1167,8 +1167,8 @@ static int gss_proxy_save_rsc(struct cache_detail *cd,  	if (!ud->found_creds) {  		/* userspace seem buggy, we should always get at least a  		 * mapping to nobody */ -		dprintk("RPC:       No creds found, marking Negative!\n"); -		set_bit(CACHE_NEGATIVE, &rsci.h.flags); +		dprintk("RPC:       No creds found!\n"); +		goto out;  	} else {  		/* steal creds */ @@ -1263,65 +1263,34 @@ out:  	return ret;  } -DEFINE_SPINLOCK(use_gssp_lock); - -static bool use_gss_proxy(struct net *net) -{ -	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); - -	if (sn->use_gss_proxy != -1) -		return sn->use_gss_proxy; -	spin_lock(&use_gssp_lock); -	/* -	 * If you wanted gss-proxy, you should have said so before -	 * starting to accept requests: -	 */ -	sn->use_gss_proxy = 0; -	spin_unlock(&use_gssp_lock); -	return 0; -} - -#ifdef CONFIG_PROC_FS - +/* + * Try to set the sn->use_gss_proxy variable to a new value. We only allow + * it to be changed if it's currently undefined (-1). If it's any other value + * then return -EBUSY unless the type wouldn't have changed anyway. + */  static int set_gss_proxy(struct net *net, int type)  {  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); -	int ret = 0; +	int ret;  	WARN_ON_ONCE(type != 0 && type != 1); -	spin_lock(&use_gssp_lock); -	if (sn->use_gss_proxy == -1 || sn->use_gss_proxy == type) -		sn->use_gss_proxy = type; -	else -		ret = -EBUSY; -	spin_unlock(&use_gssp_lock); -	wake_up(&sn->gssp_wq); -	return ret; -} - -static inline bool gssp_ready(struct sunrpc_net *sn) -{ -	switch (sn->use_gss_proxy) { -		case -1: -			return false; -		case 0: -			return true; -		case 1: -			return sn->gssp_clnt; -	} -	WARN_ON_ONCE(1); -	return false; +	ret = cmpxchg(&sn->use_gss_proxy, -1, type); +	if (ret != -1 && ret != type) +		return -EBUSY; +	return 0;  } -static int wait_for_gss_proxy(struct net *net, struct file *file) +static bool use_gss_proxy(struct net *net)  {  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); -	if (file->f_flags & O_NONBLOCK && !gssp_ready(sn)) -		return -EAGAIN; -	return wait_event_interruptible(sn->gssp_wq, gssp_ready(sn)); +	/* If use_gss_proxy is still undefined, then try to disable it */ +	if (sn->use_gss_proxy == -1) +		set_gss_proxy(net, 0); +	return sn->use_gss_proxy;  } +#ifdef CONFIG_PROC_FS  static ssize_t write_gssp(struct file *file, const char __user *buf,  			 size_t count, loff_t *ppos) @@ -1342,10 +1311,10 @@ static ssize_t write_gssp(struct file *file, const char __user *buf,  		return res;  	if (i != 1)  		return -EINVAL; -	res = set_gss_proxy(net, 1); +	res = set_gssp_clnt(net);  	if (res)  		return res; -	res = set_gssp_clnt(net); +	res = set_gss_proxy(net, 1);  	if (res)  		return res;  	return count; @@ -1355,16 +1324,12 @@ static ssize_t read_gssp(struct file *file, char __user *buf,  			 size_t count, loff_t *ppos)  {  	struct net *net = PDE_DATA(file_inode(file)); +	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);  	unsigned long p = *ppos;  	char tbuf[10];  	size_t len; -	int ret; -	ret = wait_for_gss_proxy(net, file); -	if (ret) -		return ret; - -	snprintf(tbuf, sizeof(tbuf), "%d\n", use_gss_proxy(net)); +	snprintf(tbuf, sizeof(tbuf), "%d\n", sn->use_gss_proxy);  	len = strlen(tbuf);  	if (p >= len)  		return 0; @@ -1538,6 +1503,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)  			if (unwrap_integ_data(rqstp, &rqstp->rq_arg,  					gc->gc_seq, rsci->mechctx))  				goto garbage_args; +			rqstp->rq_auth_slack = RPC_MAX_AUTH_SIZE;  			break;  		case RPC_GSS_SVC_PRIVACY:  			/* placeholders for length and seq. number: */ @@ -1546,6 +1512,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)  			if (unwrap_priv_data(rqstp, &rqstp->rq_arg,  					gc->gc_seq, rsci->mechctx))  				goto garbage_args; +			rqstp->rq_auth_slack = RPC_MAX_AUTH_SIZE * 2;  			break;  		default:  			goto auth_err; @@ -1626,8 +1593,7 @@ svcauth_gss_wrap_resp_integ(struct svc_rqst *rqstp)  	BUG_ON(integ_len % 4);  	*p++ = htonl(integ_len);  	*p++ = htonl(gc->gc_seq); -	if (xdr_buf_subsegment(resbuf, &integ_buf, integ_offset, -				integ_len)) +	if (xdr_buf_subsegment(resbuf, &integ_buf, integ_offset, integ_len))  		BUG();  	if (resbuf->tail[0].iov_base == NULL) {  		if (resbuf->head[0].iov_len + RPC_MAX_AUTH_SIZE > PAGE_SIZE) @@ -1635,10 +1601,8 @@ svcauth_gss_wrap_resp_integ(struct svc_rqst *rqstp)  		resbuf->tail[0].iov_base = resbuf->head[0].iov_base  						+ resbuf->head[0].iov_len;  		resbuf->tail[0].iov_len = 0; -		resv = &resbuf->tail[0]; -	} else { -		resv = &resbuf->tail[0];  	} +	resv = &resbuf->tail[0];  	mic.data = (u8 *)resv->iov_base + resv->iov_len + 4;  	if (gss_get_mic(gsd->rsci->mechctx, &integ_buf, &mic))  		goto out_err; diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 890a29912d5..9761a0da964 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -64,7 +64,6 @@ static void xprt_free_allocation(struct rpc_rqst *req)  	free_page((unsigned long)xbufp->head[0].iov_base);  	xbufp = &req->rq_snd_buf;  	free_page((unsigned long)xbufp->head[0].iov_base); -	list_del(&req->rq_bc_pa_list);  	kfree(req);  } @@ -168,8 +167,10 @@ out_free:  	/*  	 * Memory allocation failed, free the temporary list  	 */ -	list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) +	list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) { +		list_del(&req->rq_bc_pa_list);  		xprt_free_allocation(req); +	}  	dprintk("RPC:       setup backchannel transport failed\n");  	return -ENOMEM; @@ -198,6 +199,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)  	xprt_dec_alloc_count(xprt, max_reqs);  	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {  		dprintk("RPC:        req=%p\n", req); +		list_del(&req->rq_bc_pa_list);  		xprt_free_allocation(req);  		if (--max_reqs == 0)  			break; @@ -210,39 +212,23 @@ out:  }  EXPORT_SYMBOL_GPL(xprt_destroy_backchannel); -/* - * One or more rpc_rqst structure have been preallocated during the - * backchannel setup.  Buffer space for the send and private XDR buffers - * has been preallocated as well.  Use xprt_alloc_bc_request to allocate - * to this request.  Use xprt_free_bc_request to return it. - * - * We know that we're called in soft interrupt context, grab the spin_lock - * since there is no need to grab the bottom half spin_lock. - * - * Return an available rpc_rqst, otherwise NULL if non are available. - */ -struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt) +static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)  { -	struct rpc_rqst *req; +	struct rpc_rqst *req = NULL;  	dprintk("RPC:       allocate a backchannel request\n"); -	spin_lock(&xprt->bc_pa_lock); -	if (!list_empty(&xprt->bc_pa_list)) { -		req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, -				rq_bc_pa_list); -		list_del(&req->rq_bc_pa_list); -	} else { -		req = NULL; -	} -	spin_unlock(&xprt->bc_pa_lock); +	if (list_empty(&xprt->bc_pa_list)) +		goto not_found; -	if (req != NULL) { -		set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); -		req->rq_reply_bytes_recvd = 0; -		req->rq_bytes_sent = 0; -		memcpy(&req->rq_private_buf, &req->rq_rcv_buf, +	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, +				rq_bc_pa_list); +	req->rq_reply_bytes_recvd = 0; +	req->rq_bytes_sent = 0; +	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,  			sizeof(req->rq_private_buf)); -	} +	req->rq_xid = xid; +	req->rq_connect_cookie = xprt->connect_cookie; +not_found:  	dprintk("RPC:       backchannel req=%p\n", req);  	return req;  } @@ -257,10 +243,11 @@ void xprt_free_bc_request(struct rpc_rqst *req)  	dprintk("RPC:       free backchannel req=%p\n", req); -	smp_mb__before_clear_bit(); +	req->rq_connect_cookie = xprt->connect_cookie - 1; +	smp_mb__before_atomic();  	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));  	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); -	smp_mb__after_clear_bit(); +	smp_mb__after_atomic();  	if (!xprt_need_to_requeue(xprt)) {  		/* @@ -279,7 +266,57 @@ void xprt_free_bc_request(struct rpc_rqst *req)  	 * may be reused by a new callback request.  	 */  	spin_lock_bh(&xprt->bc_pa_lock); -	list_add(&req->rq_bc_pa_list, &xprt->bc_pa_list); +	list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);  	spin_unlock_bh(&xprt->bc_pa_lock);  } +/* + * One or more rpc_rqst structure have been preallocated during the + * backchannel setup.  Buffer space for the send and private XDR buffers + * has been preallocated as well.  Use xprt_alloc_bc_request to allocate + * to this request.  Use xprt_free_bc_request to return it. + * + * We know that we're called in soft interrupt context, grab the spin_lock + * since there is no need to grab the bottom half spin_lock. + * + * Return an available rpc_rqst, otherwise NULL if non are available. + */ +struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid) +{ +	struct rpc_rqst *req; + +	spin_lock(&xprt->bc_pa_lock); +	list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) { +		if (req->rq_connect_cookie != xprt->connect_cookie) +			continue; +		if (req->rq_xid == xid) +			goto found; +	} +	req = xprt_alloc_bc_request(xprt, xid); +found: +	spin_unlock(&xprt->bc_pa_lock); +	return req; +} + +/* + * Add callback request to callback list.  The callback + * service sleeps on the sv_cb_waitq waiting for new + * requests.  Wake it up after adding enqueing the + * request. + */ +void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) +{ +	struct rpc_xprt *xprt = req->rq_xprt; +	struct svc_serv *bc_serv = xprt->bc_serv; + +	req->rq_private_buf.len = copied; +	set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); + +	dprintk("RPC:       add callback request to list\n"); +	spin_lock(&bc_serv->sv_cb_lock); +	list_del(&req->rq_bc_pa_list); +	list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); +	wake_up(&bc_serv->sv_cb_waitq); +	spin_unlock(&bc_serv->sv_cb_lock); +} + diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index a72de074172..06636214113 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -374,7 +374,7 @@ void sunrpc_destroy_cache_detail(struct cache_detail *cd)  	}  	return;  out: -	printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name); +	printk(KERN_ERR "RPC: failed to unregister %s cache\n", cd->name);  }  EXPORT_SYMBOL_GPL(sunrpc_destroy_cache_detail); @@ -619,7 +619,7 @@ static void cache_limit_defers(void)  	/* Consider removing either the first or the last */  	if (cache_defer_cnt > DFR_MAX) { -		if (net_random() & 1) +		if (prandom_u32() & 1)  			discard = list_entry(cache_defer_list.next,  					     struct cache_deferred_req, recent);  		else @@ -1111,9 +1111,7 @@ void qword_addhex(char **bpp, int *lp, char *buf, int blen)  		*bp++ = 'x';  		len -= 2;  		while (blen && len >= 2) { -			unsigned char c = *buf++; -			*bp++ = '0' + ((c&0xf0)>>4) + (c>=0xa0)*('a'-'9'-1); -			*bp++ = '0' + (c&0x0f) + ((c&0x0f)>=0x0a)*('a'-'9'-1); +			bp = hex_byte_pack(bp, *buf++);  			len -= 2;  			blen--;  		} diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 77479606a97..2e6ab10734f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -25,12 +25,12 @@  #include <linux/namei.h>  #include <linux/mount.h>  #include <linux/slab.h> +#include <linux/rcupdate.h>  #include <linux/utsname.h>  #include <linux/workqueue.h>  #include <linux/in.h>  #include <linux/in6.h>  #include <linux/un.h> -#include <linux/rcupdate.h>  #include <linux/sunrpc/clnt.h>  #include <linux/sunrpc/addr.h> @@ -264,6 +264,26 @@ void rpc_clients_notifier_unregister(void)  	return rpc_pipefs_notifier_unregister(&rpc_clients_block);  } +static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt, +		struct rpc_xprt *xprt, +		const struct rpc_timeout *timeout) +{ +	struct rpc_xprt *old; + +	spin_lock(&clnt->cl_lock); +	old = rcu_dereference_protected(clnt->cl_xprt, +			lockdep_is_held(&clnt->cl_lock)); + +	if (!xprt_bound(xprt)) +		clnt->cl_autobind = 1; + +	clnt->cl_timeout = timeout; +	rcu_assign_pointer(clnt->cl_xprt, xprt); +	spin_unlock(&clnt->cl_lock); + +	return old; +} +  static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)  {  	clnt->cl_nodelen = strlen(nodename); @@ -272,12 +292,13 @@ static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)  	memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);  } -static int rpc_client_register(const struct rpc_create_args *args, -			       struct rpc_clnt *clnt) +static int rpc_client_register(struct rpc_clnt *clnt, +			       rpc_authflavor_t pseudoflavor, +			       const char *client_name)  {  	struct rpc_auth_create_args auth_args = { -		.pseudoflavor = args->authflavor, -		.target_name = args->client_name, +		.pseudoflavor = pseudoflavor, +		.target_name = client_name,  	};  	struct rpc_auth *auth;  	struct net *net = rpc_net_ns(clnt); @@ -298,7 +319,7 @@ static int rpc_client_register(const struct rpc_create_args *args,  	auth = rpcauth_create(&auth_args, clnt);  	if (IS_ERR(auth)) {  		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n", -				args->authflavor); +				pseudoflavor);  		err = PTR_ERR(auth);  		goto err_auth;  	} @@ -337,7 +358,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,  {  	const struct rpc_program *program = args->program;  	const struct rpc_version *version; -	struct rpc_clnt		*clnt = NULL; +	struct rpc_clnt *clnt = NULL; +	const struct rpc_timeout *timeout;  	int err;  	/* sanity check the name before trying to print it */ @@ -365,7 +387,6 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,  	if (err)  		goto out_no_clid; -	rcu_assign_pointer(clnt->cl_xprt, xprt);  	clnt->cl_procinfo = version->procs;  	clnt->cl_maxproc  = version->nrprocs;  	clnt->cl_prog     = args->prognumber ? : program->number; @@ -380,16 +401,15 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,  	INIT_LIST_HEAD(&clnt->cl_tasks);  	spin_lock_init(&clnt->cl_lock); -	if (!xprt_bound(xprt)) -		clnt->cl_autobind = 1; - -	clnt->cl_timeout = xprt->timeout; +	timeout = xprt->timeout;  	if (args->timeout != NULL) {  		memcpy(&clnt->cl_timeout_default, args->timeout,  				sizeof(clnt->cl_timeout_default)); -		clnt->cl_timeout = &clnt->cl_timeout_default; +		timeout = &clnt->cl_timeout_default;  	} +	rpc_clnt_set_transport(clnt, xprt, timeout); +  	clnt->cl_rtt = &clnt->cl_rtt_default;  	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval); @@ -398,7 +418,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,  	/* save the nodename */  	rpc_clnt_set_nodename(clnt, utsname()->nodename); -	err = rpc_client_register(args, clnt); +	err = rpc_client_register(clnt, args->authflavor, args->client_name);  	if (err)  		goto out_no_path;  	if (parent) @@ -418,6 +438,38 @@ out_no_rpciod:  	return ERR_PTR(err);  } +struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args, +					struct rpc_xprt *xprt) +{ +	struct rpc_clnt *clnt = NULL; + +	clnt = rpc_new_client(args, xprt, NULL); +	if (IS_ERR(clnt)) +		return clnt; + +	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) { +		int err = rpc_ping(clnt); +		if (err != 0) { +			rpc_shutdown_client(clnt); +			return ERR_PTR(err); +		} +	} + +	clnt->cl_softrtry = 1; +	if (args->flags & RPC_CLNT_CREATE_HARDRTRY) +		clnt->cl_softrtry = 0; + +	if (args->flags & RPC_CLNT_CREATE_AUTOBIND) +		clnt->cl_autobind = 1; +	if (args->flags & RPC_CLNT_CREATE_DISCRTRY) +		clnt->cl_discrtry = 1; +	if (!(args->flags & RPC_CLNT_CREATE_QUIET)) +		clnt->cl_chatty = 1; + +	return clnt; +} +EXPORT_SYMBOL_GPL(rpc_create_xprt); +  /**   * rpc_create - create an RPC client and transport with one call   * @args: rpc_clnt create argument structure @@ -431,7 +483,6 @@ out_no_rpciod:  struct rpc_clnt *rpc_create(struct rpc_create_args *args)  {  	struct rpc_xprt *xprt; -	struct rpc_clnt *clnt;  	struct xprt_create xprtargs = {  		.net = args->net,  		.ident = args->protocol, @@ -495,30 +546,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)  	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)  		xprt->resvport = 0; -	clnt = rpc_new_client(args, xprt, NULL); -	if (IS_ERR(clnt)) -		return clnt; - -	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) { -		int err = rpc_ping(clnt); -		if (err != 0) { -			rpc_shutdown_client(clnt); -			return ERR_PTR(err); -		} -	} - -	clnt->cl_softrtry = 1; -	if (args->flags & RPC_CLNT_CREATE_HARDRTRY) -		clnt->cl_softrtry = 0; - -	if (args->flags & RPC_CLNT_CREATE_AUTOBIND) -		clnt->cl_autobind = 1; -	if (args->flags & RPC_CLNT_CREATE_DISCRTRY) -		clnt->cl_discrtry = 1; -	if (!(args->flags & RPC_CLNT_CREATE_QUIET)) -		clnt->cl_chatty = 1; - -	return clnt; +	return rpc_create_xprt(args, xprt);  }  EXPORT_SYMBOL_GPL(rpc_create); @@ -600,6 +628,80 @@ rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)  }  EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth); +/** + * rpc_switch_client_transport: switch the RPC transport on the fly + * @clnt: pointer to a struct rpc_clnt + * @args: pointer to the new transport arguments + * @timeout: pointer to the new timeout parameters + * + * This function allows the caller to switch the RPC transport for the + * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS + * server, for instance.  It assumes that the caller has ensured that + * there are no active RPC tasks by using some form of locking. + * + * Returns zero if "clnt" is now using the new xprt.  Otherwise a + * negative errno is returned, and "clnt" continues to use the old + * xprt. + */ +int rpc_switch_client_transport(struct rpc_clnt *clnt, +		struct xprt_create *args, +		const struct rpc_timeout *timeout) +{ +	const struct rpc_timeout *old_timeo; +	rpc_authflavor_t pseudoflavor; +	struct rpc_xprt *xprt, *old; +	struct rpc_clnt *parent; +	int err; + +	xprt = xprt_create_transport(args); +	if (IS_ERR(xprt)) { +		dprintk("RPC:       failed to create new xprt for clnt %p\n", +			clnt); +		return PTR_ERR(xprt); +	} + +	pseudoflavor = clnt->cl_auth->au_flavor; + +	old_timeo = clnt->cl_timeout; +	old = rpc_clnt_set_transport(clnt, xprt, timeout); + +	rpc_unregister_client(clnt); +	__rpc_clnt_remove_pipedir(clnt); + +	/* +	 * A new transport was created.  "clnt" therefore +	 * becomes the root of a new cl_parent tree.  clnt's +	 * children, if it has any, still point to the old xprt. +	 */ +	parent = clnt->cl_parent; +	clnt->cl_parent = clnt; + +	/* +	 * The old rpc_auth cache cannot be re-used.  GSS +	 * contexts in particular are between a single +	 * client and server. +	 */ +	err = rpc_client_register(clnt, pseudoflavor, NULL); +	if (err) +		goto out_revert; + +	synchronize_rcu(); +	if (parent != clnt) +		rpc_release_client(parent); +	xprt_put(old); +	dprintk("RPC:       replaced xprt for clnt %p\n", clnt); +	return 0; + +out_revert: +	rpc_clnt_set_transport(clnt, old, old_timeo); +	clnt->cl_parent = parent; +	rpc_client_register(clnt, pseudoflavor, NULL); +	xprt_put(xprt); +	dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt); +	return err; +} +EXPORT_SYMBOL_GPL(rpc_switch_client_transport); +  /*   * Kill all tasks for the given client.   * XXX: kill their descendants as well? @@ -656,14 +758,16 @@ EXPORT_SYMBOL_GPL(rpc_shutdown_client);  /*   * Free an RPC client   */ -static void +static struct rpc_clnt *  rpc_free_client(struct rpc_clnt *clnt)  { +	struct rpc_clnt *parent = NULL; +  	dprintk_rcu("RPC:       destroying %s client for %s\n",  			clnt->cl_program->name,  			rcu_dereference(clnt->cl_xprt)->servername);  	if (clnt->cl_parent != clnt) -		rpc_release_client(clnt->cl_parent); +		parent = clnt->cl_parent;  	rpc_clnt_remove_pipedir(clnt);  	rpc_unregister_client(clnt);  	rpc_free_iostats(clnt->cl_metrics); @@ -672,18 +776,17 @@ rpc_free_client(struct rpc_clnt *clnt)  	rpciod_down();  	rpc_free_clid(clnt);  	kfree(clnt); +	return parent;  }  /*   * Free an RPC client   */ -static void +static struct rpc_clnt *   rpc_free_auth(struct rpc_clnt *clnt)  { -	if (clnt->cl_auth == NULL) { -		rpc_free_client(clnt); -		return; -	} +	if (clnt->cl_auth == NULL) +		return rpc_free_client(clnt);  	/*  	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to @@ -694,7 +797,8 @@ rpc_free_auth(struct rpc_clnt *clnt)  	rpcauth_release(clnt->cl_auth);  	clnt->cl_auth = NULL;  	if (atomic_dec_and_test(&clnt->cl_count)) -		rpc_free_client(clnt); +		return rpc_free_client(clnt); +	return NULL;  }  /* @@ -705,10 +809,13 @@ rpc_release_client(struct rpc_clnt *clnt)  {  	dprintk("RPC:       rpc_release_client(%p)\n", clnt); -	if (list_empty(&clnt->cl_tasks)) -		wake_up(&destroy_wait); -	if (atomic_dec_and_test(&clnt->cl_count)) -		rpc_free_auth(clnt); +	do { +		if (list_empty(&clnt->cl_tasks)) +			wake_up(&destroy_wait); +		if (!atomic_dec_and_test(&clnt->cl_count)) +			break; +		clnt = rpc_free_auth(clnt); +	} while (clnt != NULL);  }  EXPORT_SYMBOL_GPL(rpc_release_client); @@ -772,6 +879,8 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)  		atomic_inc(&clnt->cl_count);  		if (clnt->cl_softrtry)  			task->tk_flags |= RPC_TASK_SOFT; +		if (clnt->cl_noretranstimeo) +			task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;  		if (sk_memalloc_socks()) {  			struct rpc_xprt *xprt; @@ -1262,6 +1371,7 @@ rpc_restart_call_prepare(struct rpc_task *task)  	if (RPC_ASSASSINATED(task))  		return 0;  	task->tk_action = call_start; +	task->tk_status = 0;  	if (task->tk_ops->rpc_call_prepare != NULL)  		task->tk_action = rpc_prepare_task;  	return 1; @@ -1278,6 +1388,7 @@ rpc_restart_call(struct rpc_task *task)  	if (RPC_ASSASSINATED(task))  		return 0;  	task->tk_action = call_start; +	task->tk_status = 0;  	return 1;  }  EXPORT_SYMBOL_GPL(rpc_restart_call); @@ -1428,9 +1539,13 @@ call_refreshresult(struct rpc_task *task)  	task->tk_action = call_refresh;  	switch (status) {  	case 0: -		if (rpcauth_uptodatecred(task)) +		if (rpcauth_uptodatecred(task)) {  			task->tk_action = call_allocate; -		return; +			return; +		} +		/* Use rate-limiting and a max number of retries if refresh +		 * had status 0 but failed to update the cred. +		 */  	case -ETIMEDOUT:  		rpc_delay(task, 3*HZ);  	case -EAGAIN: @@ -1623,11 +1738,10 @@ call_bind_status(struct rpc_task *task)  	case -EPROTONOSUPPORT:  		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",  				task->tk_pid); -		task->tk_status = 0; -		task->tk_action = call_bind; -		return; +		goto retry_timeout;  	case -ECONNREFUSED:		/* connection problems */  	case -ECONNRESET: +	case -ECONNABORTED:  	case -ENOTCONN:  	case -EHOSTDOWN:  	case -EHOSTUNREACH: @@ -1650,6 +1764,7 @@ call_bind_status(struct rpc_task *task)  	return;  retry_timeout: +	task->tk_status = 0;  	task->tk_action = call_timeout;  } @@ -1690,20 +1805,23 @@ call_connect_status(struct rpc_task *task)  	dprint_status(task);  	trace_rpc_connect_status(task, status); +	task->tk_status = 0;  	switch (status) { -		/* if soft mounted, test if we've timed out */ -	case -ETIMEDOUT: -		task->tk_action = call_timeout; -		return;  	case -ECONNREFUSED:  	case -ECONNRESET: +	case -ECONNABORTED:  	case -ENETUNREACH: +	case -EHOSTUNREACH:  		if (RPC_IS_SOFTCONN(task))  			break;  		/* retry with existing socket, after a delay */ -	case 0: +		rpc_delay(task, 3*HZ);  	case -EAGAIN: -		task->tk_status = 0; +		/* Check for timeouts before looping back to call_bind */ +	case -ETIMEDOUT: +		task->tk_action = call_timeout; +		return; +	case 0:  		clnt->cl_stats->netreconn++;  		task->tk_action = call_transmit;  		return; @@ -1717,13 +1835,14 @@ call_connect_status(struct rpc_task *task)  static void  call_transmit(struct rpc_task *task)  { +	int is_retrans = RPC_WAS_SENT(task); +  	dprint_status(task);  	task->tk_action = call_status;  	if (task->tk_status < 0)  		return; -	task->tk_status = xprt_prepare_transmit(task); -	if (task->tk_status != 0) +	if (!xprt_prepare_transmit(task))  		return;  	task->tk_action = call_transmit_status;  	/* Encode here so that rpcsec_gss can use correct sequence number. */ @@ -1742,6 +1861,8 @@ call_transmit(struct rpc_task *task)  	xprt_transmit(task);  	if (task->tk_status < 0)  		return; +	if (is_retrans) +		task->tk_client->cl_stats->rpcretrans++;  	/*  	 * On success, ensure that we call xprt_end_transmit() before sleeping  	 * in order to allow access to the socket to other RPC requests. @@ -1795,6 +1916,7 @@ call_transmit_status(struct rpc_task *task)  			break;  		}  	case -ECONNRESET: +	case -ECONNABORTED:  	case -ENOTCONN:  	case -EPIPE:  		rpc_task_force_reencode(task); @@ -1811,8 +1933,7 @@ call_bc_transmit(struct rpc_task *task)  {  	struct rpc_rqst *req = task->tk_rqstp; -	task->tk_status = xprt_prepare_transmit(task); -	if (task->tk_status == -EAGAIN) { +	if (!xprt_prepare_transmit(task)) {  		/*  		 * Could not reserve the transport. Try again after the  		 * transport is released. @@ -1893,6 +2014,10 @@ call_status(struct rpc_task *task)  	case -EHOSTDOWN:  	case -EHOSTUNREACH:  	case -ENETUNREACH: +		if (RPC_IS_SOFTCONN(task)) { +			rpc_exit(task, status); +			break; +		}  		/*  		 * Delay any retries for 3 seconds, then handle as if it  		 * were a timeout. @@ -1900,12 +2025,14 @@ call_status(struct rpc_task *task)  		rpc_delay(task, 3*HZ);  	case -ETIMEDOUT:  		task->tk_action = call_timeout; -		if (task->tk_client->cl_discrtry) +		if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) +		    && task->tk_client->cl_discrtry)  			xprt_conditional_disconnect(req->rq_xprt,  					req->rq_connect_cookie);  		break; -	case -ECONNRESET:  	case -ECONNREFUSED: +	case -ECONNRESET: +	case -ECONNABORTED:  		rpc_force_rebind(clnt);  		rpc_delay(task, 3*HZ);  	case -EPIPE: @@ -1982,7 +2109,6 @@ call_timeout(struct rpc_task *task)  	rpcauth_invalcred(task);  retry: -	clnt->cl_stats->rpcretrans++;  	task->tk_action = call_bind;  	task->tk_status = 0;  } @@ -2025,7 +2151,6 @@ call_decode(struct rpc_task *task)  	if (req->rq_rcv_buf.len < 12) {  		if (!RPC_IS_SOFT(task)) {  			task->tk_action = call_bind; -			clnt->cl_stats->rpcretrans++;  			goto out_retry;  		}  		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n", diff --git a/net/sunrpc/netns.h b/net/sunrpc/netns.h index 779742cfc1f..df582687653 100644 --- a/net/sunrpc/netns.h +++ b/net/sunrpc/netns.h @@ -14,6 +14,7 @@ struct sunrpc_net {  	struct cache_detail *rsi_cache;  	struct super_block *pipefs_sb; +	struct rpc_pipe *gssd_dummy;  	struct mutex pipefs_sb_lock;  	struct list_head all_clients; @@ -26,14 +27,11 @@ struct sunrpc_net {  	unsigned int rpcb_is_af_local : 1;  	struct mutex gssp_lock; -	wait_queue_head_t gssp_wq;  	struct rpc_clnt *gssp_clnt;  	int use_gss_proxy;  	int pipe_version;  	atomic_t pipe_users;  	struct proc_dir_entry *use_gssp_proc; - -	unsigned int gssd_running;  };  extern int sunrpc_net_id; diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c index f94567b45bb..b1855489856 100644 --- a/net/sunrpc/rpc_pipe.c +++ b/net/sunrpc/rpc_pipe.c @@ -17,6 +17,7 @@  #include <linux/fsnotify.h>  #include <linux/kernel.h>  #include <linux/rcupdate.h> +#include <linux/utsname.h>  #include <asm/ioctls.h>  #include <linux/poll.h> @@ -38,7 +39,7 @@  #define NET_NAME(net)	((net == &init_net) ? " (init_net)" : "")  static struct file_system_type rpc_pipe_fs_type; - +static const struct rpc_pipe_ops gssd_dummy_pipe_ops;  static struct kmem_cache *rpc_inode_cachep __read_mostly; @@ -216,14 +217,11 @@ rpc_destroy_inode(struct inode *inode)  static int  rpc_pipe_open(struct inode *inode, struct file *filp)  { -	struct net *net = inode->i_sb->s_fs_info; -	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);  	struct rpc_pipe *pipe;  	int first_open;  	int res = -ENXIO;  	mutex_lock(&inode->i_mutex); -	sn->gssd_running = 1;  	pipe = RPC_I(inode)->pipe;  	if (pipe == NULL)  		goto out; @@ -471,15 +469,6 @@ struct rpc_filelist {  	umode_t mode;  }; -static int rpc_delete_dentry(const struct dentry *dentry) -{ -	return 1; -} - -static const struct dentry_operations rpc_dentry_operations = { -	.d_delete = rpc_delete_dentry, -}; -  static struct inode *  rpc_get_inode(struct super_block *sb, umode_t mode)  { @@ -519,8 +508,8 @@ static int __rpc_create_common(struct inode *dir, struct dentry *dentry,  	d_add(dentry, inode);  	return 0;  out_err: -	printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %s\n", -			__FILE__, __func__, dentry->d_name.name); +	printk(KERN_WARNING "%s: %s failed to allocate inode for dentry %pd\n", +			__FILE__, __func__, dentry);  	dput(dentry);  	return -ENOMEM;  } @@ -755,8 +744,8 @@ static int rpc_populate(struct dentry *parent,  out_bad:  	__rpc_depopulate(parent, files, start, eof);  	mutex_unlock(&dir->i_mutex); -	printk(KERN_WARNING "%s: %s failed to populate directory %s\n", -			__FILE__, __func__, parent->d_name.name); +	printk(KERN_WARNING "%s: %s failed to populate directory %pd\n", +			__FILE__, __func__, parent);  	return err;  } @@ -852,8 +841,8 @@ out:  	return dentry;  out_err:  	dentry = ERR_PTR(err); -	printk(KERN_WARNING "%s: %s() failed to create pipe %s/%s (errno = %d)\n", -			__FILE__, __func__, parent->d_name.name, name, +	printk(KERN_WARNING "%s: %s() failed to create pipe %pd/%s (errno = %d)\n", +			__FILE__, __func__, parent, name,  			err);  	goto out;  } @@ -1168,6 +1157,7 @@ enum {  	RPCAUTH_nfsd4_cb,  	RPCAUTH_cache,  	RPCAUTH_nfsd, +	RPCAUTH_gssd,  	RPCAUTH_RootEOF  }; @@ -1204,6 +1194,10 @@ static const struct rpc_filelist files[] = {  		.name = "nfsd",  		.mode = S_IFDIR | S_IRUGO | S_IXUGO,  	}, +	[RPCAUTH_gssd] = { +		.name = "gssd", +		.mode = S_IFDIR | S_IRUGO | S_IXUGO, +	},  };  /* @@ -1217,13 +1211,24 @@ struct dentry *rpc_d_lookup_sb(const struct super_block *sb,  }  EXPORT_SYMBOL_GPL(rpc_d_lookup_sb); -void rpc_pipefs_init_net(struct net *net) +int rpc_pipefs_init_net(struct net *net)  {  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); +	sn->gssd_dummy = rpc_mkpipe_data(&gssd_dummy_pipe_ops, 0); +	if (IS_ERR(sn->gssd_dummy)) +		return PTR_ERR(sn->gssd_dummy); +  	mutex_init(&sn->pipefs_sb_lock); -	sn->gssd_running = 1;  	sn->pipe_version = -1; +	return 0; +} + +void rpc_pipefs_exit_net(struct net *net) +{ +	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); + +	rpc_destroy_pipe_data(sn->gssd_dummy);  }  /* @@ -1253,11 +1258,134 @@ void rpc_put_sb_net(const struct net *net)  }  EXPORT_SYMBOL_GPL(rpc_put_sb_net); +static const struct rpc_filelist gssd_dummy_clnt_dir[] = { +	[0] = { +		.name = "clntXX", +		.mode = S_IFDIR | S_IRUGO | S_IXUGO, +	}, +}; + +static ssize_t +dummy_downcall(struct file *filp, const char __user *src, size_t len) +{ +	return -EINVAL; +} + +static const struct rpc_pipe_ops gssd_dummy_pipe_ops = { +	.upcall		= rpc_pipe_generic_upcall, +	.downcall	= dummy_downcall, +}; + +/* + * Here we present a bogus "info" file to keep rpc.gssd happy. We don't expect + * that it will ever use this info to handle an upcall, but rpc.gssd expects + * that this file will be there and have a certain format. + */ +static int +rpc_show_dummy_info(struct seq_file *m, void *v) +{ +	seq_printf(m, "RPC server: %s\n", utsname()->nodename); +	seq_printf(m, "service: foo (1) version 0\n"); +	seq_printf(m, "address: 127.0.0.1\n"); +	seq_printf(m, "protocol: tcp\n"); +	seq_printf(m, "port: 0\n"); +	return 0; +} + +static int +rpc_dummy_info_open(struct inode *inode, struct file *file) +{ +	return single_open(file, rpc_show_dummy_info, NULL); +} + +static const struct file_operations rpc_dummy_info_operations = { +	.owner		= THIS_MODULE, +	.open		= rpc_dummy_info_open, +	.read		= seq_read, +	.llseek		= seq_lseek, +	.release	= single_release, +}; + +static const struct rpc_filelist gssd_dummy_info_file[] = { +	[0] = { +		.name = "info", +		.i_fop = &rpc_dummy_info_operations, +		.mode = S_IFREG | S_IRUSR, +	}, +}; + +/** + * rpc_gssd_dummy_populate - create a dummy gssd pipe + * @root:	root of the rpc_pipefs filesystem + * @pipe_data:	pipe data created when netns is initialized + * + * Create a dummy set of directories and a pipe that gssd can hold open to + * indicate that it is up and running. + */ +static struct dentry * +rpc_gssd_dummy_populate(struct dentry *root, struct rpc_pipe *pipe_data) +{ +	int ret = 0; +	struct dentry *gssd_dentry; +	struct dentry *clnt_dentry = NULL; +	struct dentry *pipe_dentry = NULL; +	struct qstr q = QSTR_INIT(files[RPCAUTH_gssd].name, +				  strlen(files[RPCAUTH_gssd].name)); + +	/* We should never get this far if "gssd" doesn't exist */ +	gssd_dentry = d_hash_and_lookup(root, &q); +	if (!gssd_dentry) +		return ERR_PTR(-ENOENT); + +	ret = rpc_populate(gssd_dentry, gssd_dummy_clnt_dir, 0, 1, NULL); +	if (ret) { +		pipe_dentry = ERR_PTR(ret); +		goto out; +	} + +	q.name = gssd_dummy_clnt_dir[0].name; +	q.len = strlen(gssd_dummy_clnt_dir[0].name); +	clnt_dentry = d_hash_and_lookup(gssd_dentry, &q); +	if (!clnt_dentry) { +		pipe_dentry = ERR_PTR(-ENOENT); +		goto out; +	} + +	ret = rpc_populate(clnt_dentry, gssd_dummy_info_file, 0, 1, NULL); +	if (ret) { +		__rpc_depopulate(gssd_dentry, gssd_dummy_clnt_dir, 0, 1); +		pipe_dentry = ERR_PTR(ret); +		goto out; +	} + +	pipe_dentry = rpc_mkpipe_dentry(clnt_dentry, "gssd", NULL, pipe_data); +	if (IS_ERR(pipe_dentry)) { +		__rpc_depopulate(clnt_dentry, gssd_dummy_info_file, 0, 1); +		__rpc_depopulate(gssd_dentry, gssd_dummy_clnt_dir, 0, 1); +	} +out: +	dput(clnt_dentry); +	dput(gssd_dentry); +	return pipe_dentry; +} + +static void +rpc_gssd_dummy_depopulate(struct dentry *pipe_dentry) +{ +	struct dentry *clnt_dir = pipe_dentry->d_parent; +	struct dentry *gssd_dir = clnt_dir->d_parent; + +	__rpc_rmpipe(clnt_dir->d_inode, pipe_dentry); +	__rpc_depopulate(clnt_dir, gssd_dummy_info_file, 0, 1); +	__rpc_depopulate(gssd_dir, gssd_dummy_clnt_dir, 0, 1); +	dput(pipe_dentry); +} +  static int  rpc_fill_super(struct super_block *sb, void *data, int silent)  {  	struct inode *inode; -	struct dentry *root; +	struct dentry *root, *gssd_dentry;  	struct net *net = data;  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);  	int err; @@ -1266,7 +1394,7 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)  	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;  	sb->s_magic = RPCAUTH_GSSMAGIC;  	sb->s_op = &s_ops; -	sb->s_d_op = &rpc_dentry_operations; +	sb->s_d_op = &simple_dentry_operations;  	sb->s_time_gran = 1;  	inode = rpc_get_inode(sb, S_IFDIR | S_IRUGO | S_IXUGO); @@ -1275,6 +1403,13 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)  		return -ENOMEM;  	if (rpc_populate(root, files, RPCAUTH_lockd, RPCAUTH_RootEOF, NULL))  		return -ENOMEM; + +	gssd_dentry = rpc_gssd_dummy_populate(root, sn->gssd_dummy); +	if (IS_ERR(gssd_dentry)) { +		__rpc_depopulate(root, files, RPCAUTH_lockd, RPCAUTH_RootEOF); +		return PTR_ERR(gssd_dentry); +	} +  	dprintk("RPC:       sending pipefs MOUNT notification for net %p%s\n",  		net, NET_NAME(net));  	mutex_lock(&sn->pipefs_sb_lock); @@ -1289,6 +1424,7 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)  	return 0;  err_depopulate: +	rpc_gssd_dummy_depopulate(gssd_dentry);  	blocking_notifier_call_chain(&rpc_pipefs_notifier_list,  					   RPC_PIPEFS_UMOUNT,  					   sb); @@ -1298,6 +1434,16 @@ err_depopulate:  	return err;  } +bool +gssd_running(struct net *net) +{ +	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); +	struct rpc_pipe *pipe = sn->gssd_dummy; + +	return pipe->nreaders || pipe->nwriters; +} +EXPORT_SYMBOL_GPL(gssd_running); +  static struct dentry *  rpc_mount(struct file_system_type *fs_type,  		int flags, const char *dev_name, void *data) diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index ff3cc4bf4b2..c0365c14b85 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -637,7 +637,8 @@ static void __rpc_queue_timer_fn(unsigned long ptr)  static void __rpc_atrun(struct rpc_task *task)  { -	task->tk_status = 0; +	if (task->tk_status == -ETIMEDOUT) +		task->tk_status = 0;  }  /* @@ -831,7 +832,8 @@ static void rpc_async_schedule(struct work_struct *work)   * @size: requested byte size   *   * To prevent rpciod from hanging, this allocator never sleeps, - * returning NULL if the request cannot be serviced immediately. + * returning NULL and suppressing warning if the request cannot be serviced + * immediately.   * The caller can arrange to sleep in a way that is safe for rpciod.   *   * Most requests are 'small' (under 2KiB) and can be serviced from a @@ -844,7 +846,7 @@ static void rpc_async_schedule(struct work_struct *work)  void *rpc_malloc(struct rpc_task *task, size_t size)  {  	struct rpc_buffer *buf; -	gfp_t gfp = GFP_NOWAIT; +	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;  	if (RPC_IS_SWAPPER(task))  		gfp |= __GFP_MEMALLOC; diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c index 0a648c502fc..2df87f78e51 100644 --- a/net/sunrpc/socklib.c +++ b/net/sunrpc/socklib.c @@ -173,7 +173,8 @@ int csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)  		return -1;  	if (csum_fold(desc.csum))  		return -1; -	if (unlikely(skb->ip_summed == CHECKSUM_COMPLETE)) +	if (unlikely(skb->ip_summed == CHECKSUM_COMPLETE) && +	    !skb->csum_complete_sw)  		netdev_rx_csum_fault(skb->dev);  	return 0;  no_checksum: diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h index 14c9f6d1c5f..f2b7cb540e6 100644 --- a/net/sunrpc/sunrpc.h +++ b/net/sunrpc/sunrpc.h @@ -43,6 +43,19 @@ static inline int rpc_reply_expected(struct rpc_task *task)  		(task->tk_msg.rpc_proc->p_decode != NULL);  } +static inline int sock_is_loopback(struct sock *sk) +{ +	struct dst_entry *dst; +	int loopback = 0; +	rcu_read_lock(); +	dst = rcu_dereference(sk->sk_dst_cache); +	if (dst && dst->dev && +	    (dst->dev->features & NETIF_F_LOOPBACK)) +		loopback = 1; +	rcu_read_unlock(); +	return loopback; +} +  int svc_send_common(struct socket *sock, struct xdr_buf *xdr,  		    struct page *headpage, unsigned long headoffset,  		    struct page *tailpage, unsigned long tailoffset); diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c index 3d6498af9ad..cd30120de9e 100644 --- a/net/sunrpc/sunrpc_syms.c +++ b/net/sunrpc/sunrpc_syms.c @@ -44,12 +44,17 @@ static __net_init int sunrpc_init_net(struct net *net)  	if (err)  		goto err_unixgid; -	rpc_pipefs_init_net(net); +	err = rpc_pipefs_init_net(net); +	if (err) +		goto err_pipefs; +  	INIT_LIST_HEAD(&sn->all_clients);  	spin_lock_init(&sn->rpc_client_lock);  	spin_lock_init(&sn->rpcb_clnt_lock);  	return 0; +err_pipefs: +	unix_gid_cache_destroy(net);  err_unixgid:  	ip_map_cache_destroy(net);  err_ipmap: @@ -60,6 +65,7 @@ err_proc:  static __net_exit void sunrpc_exit_net(struct net *net)  { +	rpc_pipefs_exit_net(net);  	unix_gid_cache_destroy(net);  	ip_map_cache_destroy(net);  	rpc_proc_exit(net); diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index b974571126f..5de6801cd92 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -916,9 +916,6 @@ static int __svc_register(struct net *net, const char *progname,  #endif  	} -	if (error < 0) -		printk(KERN_WARNING "svc: failed to register %sv%u RPC " -			"service (errno %d).\n", progname, version, -error);  	return error;  } @@ -937,6 +934,7 @@ int svc_register(const struct svc_serv *serv, struct net *net,  		 const unsigned short port)  {  	struct svc_program	*progp; +	struct svc_version	*vers;  	unsigned int		i;  	int			error = 0; @@ -946,7 +944,8 @@ int svc_register(const struct svc_serv *serv, struct net *net,  	for (progp = serv->sv_program; progp; progp = progp->pg_next) {  		for (i = 0; i < progp->pg_nvers; i++) { -			if (progp->pg_vers[i] == NULL) +			vers = progp->pg_vers[i]; +			if (vers == NULL)  				continue;  			dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n", @@ -955,16 +954,26 @@ int svc_register(const struct svc_serv *serv, struct net *net,  					proto == IPPROTO_UDP?  "udp" : "tcp",  					port,  					family, -					progp->pg_vers[i]->vs_hidden? -						" (but not telling portmap)" : ""); +					vers->vs_hidden ? +					" (but not telling portmap)" : ""); -			if (progp->pg_vers[i]->vs_hidden) +			if (vers->vs_hidden)  				continue;  			error = __svc_register(net, progp->pg_name, progp->pg_prog,  						i, family, proto, port); -			if (error < 0) + +			if (vers->vs_rpcb_optnl) { +				error = 0; +				continue; +			} + +			if (error < 0) { +				printk(KERN_WARNING "svc: failed to register " +					"%sv%u RPC service (errno %d).\n", +					progp->pg_name, i, -error);  				break; +			}  		}  	} @@ -1104,8 +1113,6 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)  	rqstp->rq_vers = vers = svc_getnl(argv);	/* version number */  	rqstp->rq_proc = proc = svc_getnl(argv);	/* procedure number */ -	progp = serv->sv_program; -  	for (progp = serv->sv_program; progp; progp = progp->pg_next)  		if (prog == progp->pg_prog)  			break; diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 80a6640f329..b4737fbdec1 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -571,7 +571,7 @@ static void svc_check_conn_limits(struct svc_serv *serv)  	}  } -int svc_alloc_arg(struct svc_rqst *rqstp) +static int svc_alloc_arg(struct svc_rqst *rqstp)  {  	struct svc_serv *serv = rqstp->rq_server;  	struct xdr_buf *arg; @@ -597,6 +597,7 @@ int svc_alloc_arg(struct svc_rqst *rqstp)  			}  			rqstp->rq_pages[i] = p;  		} +	rqstp->rq_page_end = &rqstp->rq_pages[i];  	rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */  	/* Make arg->head point to first page and arg->pages point to rest */ @@ -612,7 +613,7 @@ int svc_alloc_arg(struct svc_rqst *rqstp)  	return 0;  } -struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) +static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)  {  	struct svc_xprt *xprt;  	struct svc_pool		*pool = rqstp->rq_pool; @@ -691,7 +692,7 @@ struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)  	return xprt;  } -void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) +static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)  {  	spin_lock_bh(&serv->sv_lock);  	set_bit(XPT_TEMP, &newxpt->xpt_flags); @@ -730,6 +731,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)  		newxpt = xprt->xpt_ops->xpo_accept(xprt);  		if (newxpt)  			svc_add_new_temp_xprt(serv, newxpt); +		else +			module_put(xprt->xpt_class->xcl_owner);  	} else if (xprt->xpt_ops->xpo_has_wspace(xprt)) {  		/* XPT_DATA|XPT_DEFERRED case: */  		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", @@ -793,7 +796,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)  	clear_bit(XPT_OLD, &xprt->xpt_flags); -	rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); +	rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp);  	rqstp->rq_chandle.defer = svc_defer;  	if (serv->sv_stats) diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c index 2af7b0cba43..79c0f3459b5 100644 --- a/net/sunrpc/svcauth.c +++ b/net/sunrpc/svcauth.c @@ -54,6 +54,8 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)  	}  	spin_unlock(&authtab_lock); +	rqstp->rq_auth_slack = 0; +  	rqstp->rq_authop = aops;  	return aops->accept(rqstp, authp);  } diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 9c9caaa5e0d..b507cd327d9 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -60,7 +60,7 @@  static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,  					 int flags); -static void		svc_udp_data_ready(struct sock *, int); +static void		svc_udp_data_ready(struct sock *);  static int		svc_udp_recvfrom(struct svc_rqst *);  static int		svc_udp_sendto(struct svc_rqst *);  static void		svc_sock_detach(struct svc_xprt *); @@ -291,12 +291,14 @@ static int svc_one_sock_name(struct svc_sock *svsk, char *buf, int remaining)  				&inet_sk(sk)->inet_rcv_saddr,  				inet_sk(sk)->inet_num);  		break; +#if IS_ENABLED(CONFIG_IPV6)  	case PF_INET6:  		len = snprintf(buf, remaining, "ipv6 %s %pI6 %d\n",  				proto_name, -				&inet6_sk(sk)->rcv_saddr, +				&sk->sk_v6_rcv_saddr,  				inet_sk(sk)->inet_num);  		break; +#endif  	default:  		len = snprintf(buf, remaining, "*unknown-%d*\n",  				sk->sk_family); @@ -398,17 +400,23 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,  	release_sock(sock->sk);  #endif  } + +static int svc_sock_secure_port(struct svc_rqst *rqstp) +{ +	return svc_port_is_privileged(svc_addr(rqstp)); +} +  /*   * INET callback when data has been received on the socket.   */ -static void svc_udp_data_ready(struct sock *sk, int count) +static void svc_udp_data_ready(struct sock *sk)  {  	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;  	wait_queue_head_t *wq = sk_sleep(sk);  	if (svsk) { -		dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n", -			svsk, sk, count, +		dprintk("svc: socket %p(inet %p), busy=%d\n", +			svsk, sk,  			test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));  		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);  		svc_xprt_enqueue(&svsk->sk_xprt); @@ -676,6 +684,7 @@ static struct svc_xprt_ops svc_udp_ops = {  	.xpo_prep_reply_hdr = svc_udp_prep_reply_hdr,  	.xpo_has_wspace = svc_udp_has_wspace,  	.xpo_accept = svc_udp_accept, +	.xpo_secure_port = svc_sock_secure_port,  };  static struct svc_xprt_class svc_udp_class = { @@ -729,7 +738,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)   * A data_ready event on a listening socket means there's a connection   * pending. Do not use state_change as a substitute for it.   */ -static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused) +static void svc_tcp_listen_data_ready(struct sock *sk)  {  	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;  	wait_queue_head_t *wq; @@ -781,7 +790,7 @@ static void svc_tcp_state_change(struct sock *sk)  		wake_up_interruptible_all(wq);  } -static void svc_tcp_data_ready(struct sock *sk, int count) +static void svc_tcp_data_ready(struct sock *sk)  {  	struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;  	wait_queue_head_t *wq = sk_sleep(sk); @@ -840,8 +849,7 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)  	 * tell us anything.  For now just warn about unpriv connections.  	 */  	if (!svc_port_is_privileged(sin)) { -		dprintk(KERN_WARNING -			"%s: connect from unprivileged port: %s\n", +		dprintk("%s: connect from unprivileged port: %s\n",  			serv->sv_name,  			__svc_print_addr(sin, buf, sizeof(buf)));  	} @@ -865,6 +873,10 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)  	}  	svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen); +	if (sock_is_loopback(newsock->sk)) +		set_bit(XPT_LOCAL, &newsvsk->sk_xprt.xpt_flags); +	else +		clear_bit(XPT_LOCAL, &newsvsk->sk_xprt.xpt_flags);  	if (serv->sv_stats)  		serv->sv_stats->nettcpconn++; @@ -1110,6 +1122,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)  	rqstp->rq_xprt_ctxt   = NULL;  	rqstp->rq_prot	      = IPPROTO_TCP; +	rqstp->rq_local	      = !!test_bit(XPT_LOCAL, &svsk->sk_xprt.xpt_flags);  	p = (__be32 *)rqstp->rq_arg.head[0].iov_base;  	calldir = p[1]; @@ -1232,6 +1245,7 @@ static struct svc_xprt_ops svc_tcp_bc_ops = {  	.xpo_detach = svc_bc_tcp_sock_detach,  	.xpo_free = svc_bc_sock_free,  	.xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr, +	.xpo_secure_port = svc_sock_secure_port,  };  static struct svc_xprt_class svc_tcp_bc_class = { @@ -1270,6 +1284,7 @@ static struct svc_xprt_ops svc_tcp_ops = {  	.xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,  	.xpo_has_wspace = svc_tcp_has_wspace,  	.xpo_accept = svc_tcp_accept, +	.xpo_secure_port = svc_sock_secure_port,  };  static struct svc_xprt_class svc_tcp_class = { @@ -1395,6 +1410,22 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,  	return svsk;  } +bool svc_alien_sock(struct net *net, int fd) +{ +	int err; +	struct socket *sock = sockfd_lookup(fd, &err); +	bool ret = false; + +	if (!sock) +		goto out; +	if (sock_net(sock->sk) != net) +		ret = true; +	sockfd_put(sock); +out: +	return ret; +} +EXPORT_SYMBOL_GPL(svc_alien_sock); +  /**   * svc_addsock - add a listener socket to an RPC service   * @serv: pointer to RPC service to which to add a new listener diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 1504bb11e4f..23fb4e75e24 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -462,6 +462,7 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)  	struct kvec *iov = buf->head;  	int scratch_len = buf->buflen - buf->page_len - buf->tail[0].iov_len; +	xdr_set_scratch_buffer(xdr, NULL, 0);  	BUG_ON(scratch_len < 0);  	xdr->buf = buf;  	xdr->iov = iov; @@ -482,6 +483,73 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)  EXPORT_SYMBOL_GPL(xdr_init_encode);  /** + * xdr_commit_encode - Ensure all data is written to buffer + * @xdr: pointer to xdr_stream + * + * We handle encoding across page boundaries by giving the caller a + * temporary location to write to, then later copying the data into + * place; xdr_commit_encode does that copying. + * + * Normally the caller doesn't need to call this directly, as the + * following xdr_reserve_space will do it.  But an explicit call may be + * required at the end of encoding, or any other time when the xdr_buf + * data might be read. + */ +void xdr_commit_encode(struct xdr_stream *xdr) +{ +	int shift = xdr->scratch.iov_len; +	void *page; + +	if (shift == 0) +		return; +	page = page_address(*xdr->page_ptr); +	memcpy(xdr->scratch.iov_base, page, shift); +	memmove(page, page + shift, (void *)xdr->p - page); +	xdr->scratch.iov_len = 0; +} +EXPORT_SYMBOL_GPL(xdr_commit_encode); + +__be32 *xdr_get_next_encode_buffer(struct xdr_stream *xdr, size_t nbytes) +{ +	static __be32 *p; +	int space_left; +	int frag1bytes, frag2bytes; + +	if (nbytes > PAGE_SIZE) +		return NULL; /* Bigger buffers require special handling */ +	if (xdr->buf->len + nbytes > xdr->buf->buflen) +		return NULL; /* Sorry, we're totally out of space */ +	frag1bytes = (xdr->end - xdr->p) << 2; +	frag2bytes = nbytes - frag1bytes; +	if (xdr->iov) +		xdr->iov->iov_len += frag1bytes; +	else +		xdr->buf->page_len += frag1bytes; +	xdr->page_ptr++; +	xdr->iov = NULL; +	/* +	 * If the last encode didn't end exactly on a page boundary, the +	 * next one will straddle boundaries.  Encode into the next +	 * page, then copy it back later in xdr_commit_encode.  We use +	 * the "scratch" iov to track any temporarily unused fragment of +	 * space at the end of the previous buffer: +	 */ +	xdr->scratch.iov_base = xdr->p; +	xdr->scratch.iov_len = frag1bytes; +	p = page_address(*xdr->page_ptr); +	/* +	 * Note this is where the next encode will start after we've +	 * shifted this one back: +	 */ +	xdr->p = (void *)p + frag2bytes; +	space_left = xdr->buf->buflen - xdr->buf->len; +	xdr->end = (void *)p + min_t(int, space_left, PAGE_SIZE); +	xdr->buf->page_len += frag2bytes; +	xdr->buf->len += nbytes; +	return p; +} + +/**   * xdr_reserve_space - Reserve buffer space for sending   * @xdr: pointer to xdr_stream   * @nbytes: number of bytes to reserve @@ -495,20 +563,122 @@ __be32 * xdr_reserve_space(struct xdr_stream *xdr, size_t nbytes)  	__be32 *p = xdr->p;  	__be32 *q; +	xdr_commit_encode(xdr);  	/* align nbytes on the next 32-bit boundary */  	nbytes += 3;  	nbytes &= ~3;  	q = p + (nbytes >> 2);  	if (unlikely(q > xdr->end || q < p)) -		return NULL; +		return xdr_get_next_encode_buffer(xdr, nbytes);  	xdr->p = q; -	xdr->iov->iov_len += nbytes; +	if (xdr->iov) +		xdr->iov->iov_len += nbytes; +	else +		xdr->buf->page_len += nbytes;  	xdr->buf->len += nbytes;  	return p;  }  EXPORT_SYMBOL_GPL(xdr_reserve_space);  /** + * xdr_truncate_encode - truncate an encode buffer + * @xdr: pointer to xdr_stream + * @len: new length of buffer + * + * Truncates the xdr stream, so that xdr->buf->len == len, + * and xdr->p points at offset len from the start of the buffer, and + * head, tail, and page lengths are adjusted to correspond. + * + * If this means moving xdr->p to a different buffer, we assume that + * that the end pointer should be set to the end of the current page, + * except in the case of the head buffer when we assume the head + * buffer's current length represents the end of the available buffer. + * + * This is *not* safe to use on a buffer that already has inlined page + * cache pages (as in a zero-copy server read reply), except for the + * simple case of truncating from one position in the tail to another. + * + */ +void xdr_truncate_encode(struct xdr_stream *xdr, size_t len) +{ +	struct xdr_buf *buf = xdr->buf; +	struct kvec *head = buf->head; +	struct kvec *tail = buf->tail; +	int fraglen; +	int new, old; + +	if (len > buf->len) { +		WARN_ON_ONCE(1); +		return; +	} +	xdr_commit_encode(xdr); + +	fraglen = min_t(int, buf->len - len, tail->iov_len); +	tail->iov_len -= fraglen; +	buf->len -= fraglen; +	if (tail->iov_len && buf->len == len) { +		xdr->p = tail->iov_base + tail->iov_len; +		/* xdr->end, xdr->iov should be set already */ +		return; +	} +	WARN_ON_ONCE(fraglen); +	fraglen = min_t(int, buf->len - len, buf->page_len); +	buf->page_len -= fraglen; +	buf->len -= fraglen; + +	new = buf->page_base + buf->page_len; +	old = new + fraglen; +	xdr->page_ptr -= (old >> PAGE_SHIFT) - (new >> PAGE_SHIFT); + +	if (buf->page_len && buf->len == len) { +		xdr->p = page_address(*xdr->page_ptr); +		xdr->end = (void *)xdr->p + PAGE_SIZE; +		xdr->p = (void *)xdr->p + (new % PAGE_SIZE); +		/* xdr->iov should already be NULL */ +		return; +	} +	if (fraglen) { +		xdr->end = head->iov_base + head->iov_len; +		xdr->page_ptr--; +	} +	/* (otherwise assume xdr->end is already set) */ +	head->iov_len = len; +	buf->len = len; +	xdr->p = head->iov_base + head->iov_len; +	xdr->iov = buf->head; +} +EXPORT_SYMBOL(xdr_truncate_encode); + +/** + * xdr_restrict_buflen - decrease available buffer space + * @xdr: pointer to xdr_stream + * @newbuflen: new maximum number of bytes available + * + * Adjust our idea of how much space is available in the buffer. + * If we've already used too much space in the buffer, returns -1. + * If the available space is already smaller than newbuflen, returns 0 + * and does nothing.  Otherwise, adjusts xdr->buf->buflen to newbuflen + * and ensures xdr->end is set at most offset newbuflen from the start + * of the buffer. + */ +int xdr_restrict_buflen(struct xdr_stream *xdr, int newbuflen) +{ +	struct xdr_buf *buf = xdr->buf; +	int left_in_this_buf = (void *)xdr->end - (void *)xdr->p; +	int end_offset = buf->len + left_in_this_buf; + +	if (newbuflen < 0 || newbuflen < buf->len) +		return -1; +	if (newbuflen > buf->buflen) +		return 0; +	if (newbuflen < end_offset) +		xdr->end = (void *)xdr->end + newbuflen - end_offset; +	buf->buflen = newbuflen; +	return 0; +} +EXPORT_SYMBOL(xdr_restrict_buflen); + +/**   * xdr_write_pages - Insert a list of pages into an XDR buffer for sending   * @xdr: pointer to xdr_stream   * @pages: list of pages @@ -833,8 +1003,20 @@ xdr_buf_from_iov(struct kvec *iov, struct xdr_buf *buf)  }  EXPORT_SYMBOL_GPL(xdr_buf_from_iov); -/* Sets subbuf to the portion of buf of length len beginning base bytes - * from the start of buf. Returns -1 if base of length are out of bounds. */ +/** + * xdr_buf_subsegment - set subbuf to a portion of buf + * @buf: an xdr buffer + * @subbuf: the result buffer + * @base: beginning of range in bytes + * @len: length of range in bytes + * + * sets @subbuf to an xdr buffer representing the portion of @buf of + * length @len starting at offset @base. + * + * @buf and @subbuf may be pointers to the same struct xdr_buf. + * + * Returns -1 if base of length are out of bounds. + */  int  xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,  			unsigned int base, unsigned int len) @@ -847,9 +1029,8 @@ xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,  		len -= subbuf->head[0].iov_len;  		base = 0;  	} else { -		subbuf->head[0].iov_base = NULL; -		subbuf->head[0].iov_len = 0;  		base -= buf->head[0].iov_len; +		subbuf->head[0].iov_len = 0;  	}  	if (base < buf->page_len) { @@ -871,9 +1052,8 @@ xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,  		len -= subbuf->tail[0].iov_len;  		base = 0;  	} else { -		subbuf->tail[0].iov_base = NULL; -		subbuf->tail[0].iov_len = 0;  		base -= buf->tail[0].iov_len; +		subbuf->tail[0].iov_len = 0;  	}  	if (base || len) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 095363eee76..c3b2b3369e5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -71,24 +71,6 @@ static void	 xprt_destroy(struct rpc_xprt *xprt);  static DEFINE_SPINLOCK(xprt_list_lock);  static LIST_HEAD(xprt_list); -/* - * The transport code maintains an estimate on the maximum number of out- - * standing RPC requests, using a smoothed version of the congestion - * avoidance implemented in 44BSD. This is basically the Van Jacobson - * congestion algorithm: If a retransmit occurs, the congestion window is - * halved; otherwise, it is incremented by 1/cwnd when - * - *	-	a reply is received and - *	-	a full number of requests are outstanding and - *	-	the congestion window hasn't been updated recently. - */ -#define RPC_CWNDSHIFT		(8U) -#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT) -#define RPC_INITCWND		RPC_CWNDSCALE -#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT) - -#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) -  /**   * xprt_register_transport - register a transport implementation   * @transport: transport to register @@ -205,10 +187,8 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)  		goto out_sleep;  	}  	xprt->snd_task = task; -	if (req != NULL) { -		req->rq_bytes_sent = 0; +	if (req != NULL)  		req->rq_ntrans++; -	}  	return 1; @@ -232,9 +212,9 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)  {  	xprt->snd_task = NULL;  	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { -		smp_mb__before_clear_bit(); +		smp_mb__before_atomic();  		clear_bit(XPRT_LOCKED, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  	} else  		queue_work(rpciod_workqueue, &xprt->task_cleanup);  } @@ -263,7 +243,6 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)  	}  	if (__xprt_get_cong(xprt, task)) {  		xprt->snd_task = task; -		req->rq_bytes_sent = 0;  		req->rq_ntrans++;  		return 1;  	} @@ -300,10 +279,8 @@ static bool __xprt_lock_write_func(struct rpc_task *task, void *data)  	req = task->tk_rqstp;  	xprt->snd_task = task; -	if (req) { -		req->rq_bytes_sent = 0; +	if (req)  		req->rq_ntrans++; -	}  	return true;  } @@ -329,7 +306,6 @@ static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)  	}  	if (__xprt_get_cong(xprt, task)) {  		xprt->snd_task = task; -		req->rq_bytes_sent = 0;  		req->rq_ntrans++;  		return true;  	} @@ -358,6 +334,11 @@ out_unlock:  void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)  {  	if (xprt->snd_task == task) { +		if (task != NULL) { +			struct rpc_rqst *req = task->tk_rqstp; +			if (req != NULL) +				req->rq_bytes_sent = 0; +		}  		xprt_clear_locked(xprt);  		__xprt_lock_write_next(xprt);  	} @@ -375,6 +356,11 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);  void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)  {  	if (xprt->snd_task == task) { +		if (task != NULL) { +			struct rpc_rqst *req = task->tk_rqstp; +			if (req != NULL) +				req->rq_bytes_sent = 0; +		}  		xprt_clear_locked(xprt);  		__xprt_lock_write_next_cong(xprt);  	} @@ -442,7 +428,15 @@ EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);   * @task: recently completed RPC request used to adjust window   * @result: result code of completed RPC request   * - * We use a time-smoothed congestion estimator to avoid heavy oscillation. + * The transport code maintains an estimate on the maximum number of out- + * standing RPC requests, using a smoothed version of the congestion + * avoidance implemented in 44BSD. This is basically the Van Jacobson + * congestion algorithm: If a retransmit occurs, the congestion window is + * halved; otherwise, it is incremented by 1/cwnd when + * + *	-	a reply is received and + *	-	a full number of requests are outstanding and + *	-	the congestion window hasn't been updated recently.   */  void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)  { @@ -745,6 +739,11 @@ static void xprt_connect_status(struct rpc_task *task)  	}  	switch (task->tk_status) { +	case -ECONNREFUSED: +	case -ECONNRESET: +	case -ECONNABORTED: +	case -ENETUNREACH: +	case -EHOSTUNREACH:  	case -EAGAIN:  		dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);  		break; @@ -854,24 +853,36 @@ static inline int xprt_has_timer(struct rpc_xprt *xprt)   * @task: RPC task about to send a request   *   */ -int xprt_prepare_transmit(struct rpc_task *task) +bool xprt_prepare_transmit(struct rpc_task *task)  {  	struct rpc_rqst	*req = task->tk_rqstp;  	struct rpc_xprt	*xprt = req->rq_xprt; -	int err = 0; +	bool ret = false;  	dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);  	spin_lock_bh(&xprt->transport_lock); -	if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { -		err = req->rq_reply_bytes_recvd; +	if (!req->rq_bytes_sent) { +		if (req->rq_reply_bytes_recvd) { +			task->tk_status = req->rq_reply_bytes_recvd; +			goto out_unlock; +		} +		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) +		    && xprt_connected(xprt) +		    && req->rq_connect_cookie == xprt->connect_cookie) { +			xprt->ops->set_retrans_timeout(task); +			rpc_sleep_on(&xprt->pending, task, xprt_timer); +			goto out_unlock; +		} +	} +	if (!xprt->ops->reserve_xprt(xprt, task)) { +		task->tk_status = -EAGAIN;  		goto out_unlock;  	} -	if (!xprt->ops->reserve_xprt(xprt, task)) -		err = -EAGAIN; +	ret = true;  out_unlock:  	spin_unlock_bh(&xprt->transport_lock); -	return err; +	return ret;  }  void xprt_end_transmit(struct rpc_task *task) @@ -912,7 +923,6 @@ void xprt_transmit(struct rpc_task *task)  	} else if (!req->rq_bytes_sent)  		return; -	req->rq_connect_cookie = xprt->connect_cookie;  	req->rq_xtime = ktime_get();  	status = xprt->ops->send_request(task);  	if (status != 0) { @@ -938,12 +948,14 @@ void xprt_transmit(struct rpc_task *task)  	/* Don't race with disconnect */  	if (!xprt_connected(xprt))  		task->tk_status = -ENOTCONN; -	else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { +	else {  		/*  		 * Sleep on the pending queue since  		 * we're expecting a reply.  		 */ -		rpc_sleep_on(&xprt->pending, task, xprt_timer); +		if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) +			rpc_sleep_on(&xprt->pending, task, xprt_timer); +		req->rq_connect_cookie = xprt->connect_cookie;  	}  	spin_unlock_bh(&xprt->transport_lock);  } @@ -1087,11 +1099,9 @@ struct rpc_xprt *xprt_alloc(struct net *net, size_t size,  	for (i = 0; i < num_prealloc; i++) {  		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);  		if (!req) -			break; +			goto out_free;  		list_add(&req->rq_list, &xprt->free);  	} -	if (i < num_prealloc) -		goto out_free;  	if (max_alloc > num_prealloc)  		xprt->max_reqs = max_alloc;  	else @@ -1173,7 +1183,7 @@ static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)  static inline void xprt_init_xid(struct rpc_xprt *xprt)  { -	xprt->xid = net_random(); +	xprt->xid = prandom_u32();  }  static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) @@ -1186,6 +1196,12 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)  	req->rq_xprt    = xprt;  	req->rq_buffer  = NULL;  	req->rq_xid     = xprt_alloc_xid(xprt); +	req->rq_connect_cookie = xprt->connect_cookie - 1; +	req->rq_bytes_sent = 0; +	req->rq_snd_buf.len = 0; +	req->rq_snd_buf.buflen = 0; +	req->rq_rcv_buf.len = 0; +	req->rq_rcv_buf.buflen = 0;  	req->rq_release_snd_buf = NULL;  	xprt_reset_majortimeo(req);  	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, @@ -1357,15 +1373,3 @@ void xprt_put(struct rpc_xprt *xprt)  	if (atomic_dec_and_test(&xprt->count))  		xprt_destroy(xprt);  } - -/** - * xprt_get - return a reference to an RPC transport. - * @xprt: pointer to the transport - * - */ -struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) -{ -	if (atomic_inc_not_zero(&xprt->count)) -		return xprt; -	return NULL; -} diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 5a8f268bdd3..da5136fd569 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -1,8 +1,8 @@ -obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o +obj-$(CONFIG_SUNRPC_XPRT_RDMA_CLIENT) += xprtrdma.o  xprtrdma-y := transport.o rpc_rdma.o verbs.o -obj-$(CONFIG_SUNRPC_XPRT_RDMA) += svcrdma.o +obj-$(CONFIG_SUNRPC_XPRT_RDMA_SERVER) += svcrdma.o  svcrdma-y := svc_rdma.o svc_rdma_transport.o \  	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index e03725bfe2b..693966d3f33 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,7 @@ static const char transfertypes[][12] = {   * elements. Segments are then coalesced when registered, if possible   * within the selected memreg mode.   * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno.   */  static int @@ -102,10 +101,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  	page_base = xdrbuf->page_base & ~PAGE_MASK;  	p = 0;  	while (len && n < nsegs) { +		if (!ppages[p]) { +			/* alloc the pagelist for receiving buffer */ +			ppages[p] = alloc_page(GFP_ATOMIC); +			if (!ppages[p]) +				return -ENOMEM; +		}  		seg[n].mr_page = ppages[p];  		seg[n].mr_offset = (void *)(unsigned long) page_base;  		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); -		BUG_ON(seg[n].mr_len > PAGE_SIZE); +		if (seg[n].mr_len > PAGE_SIZE) +			return -EIO;  		len -= seg[n].mr_len;  		++n;  		++p; @@ -114,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  	/* Message overflows the seg array */  	if (len && n == nsegs) -		return 0; +		return -EIO;  	if (xdrbuf->tail[0].iov_len) {  		/* the rpcrdma protocol allows us to omit any trailing @@ -123,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  			return n;  		if (n == nsegs)  			/* Tail remains, but we're out of segments */ -			return 0; +			return -EIO;  		seg[n].mr_page = NULL;  		seg[n].mr_offset = xdrbuf->tail[0].iov_base;  		seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,   *  Reply chunk (a counted array):   *   N elements:   *    1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno.   */ -static unsigned int +static ssize_t  rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)  {  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); -	int nsegs, nchunks = 0; +	int n, nsegs, nchunks = 0;  	unsigned int pos;  	struct rpcrdma_mr_seg *seg = req->rl_segments;  	struct rpcrdma_read_chunk *cur_rchunk = NULL; @@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		pos = target->head[0].iov_len;  	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); -	if (nsegs == 0) -		return 0; +	if (nsegs < 0) +		return nsegs;  	do { -		/* bind/register the memory, then build chunk from result. */ -		int n = rpcrdma_register_external(seg, nsegs, +		n = rpcrdma_register_external(seg, nsegs,  						cur_wchunk != NULL, r_xprt);  		if (n <= 0)  			goto out; @@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  	/* success. all failures return above */  	req->rl_nchunks = nchunks; -	BUG_ON(nchunks == 0); -	BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) -	       && (nchunks > 3)); -  	/*  	 * finish off header. If write, marshal discrim and nchunks.  	 */ @@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  out:  	for (pos = 0; nchunks--;)  		pos += rpcrdma_deregister_external( -				&req->rl_segments[pos], r_xprt, NULL); -	return 0; +				&req->rl_segments[pos], r_xprt); +	return n;  }  /* @@ -361,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)   *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.   *  [2] -- optional padding.   *  [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno.   */  int @@ -370,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	char *base; -	size_t hdrlen, rpclen, padlen; +	size_t rpclen, padlen; +	ssize_t hdrlen;  	enum rpcrdma_chunktype rtype, wtype;  	struct rpcrdma_msg *headerp; @@ -441,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  	/* The following simplification is not true forever */  	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)  		wtype = rpcrdma_noch; -	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - -	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && -	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { -		/* forced to "pure inline"? */ -		dprintk("RPC:       %s: too much data (%d/%d) for inline\n", -			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); -		return -1; +	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { +		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n", +			__func__); +		return -EIO;  	}  	hdrlen = 28; /*sizeof *headerp;*/ @@ -474,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;  			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;  			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ -			BUG_ON(wtype != rpcrdma_noch); - +			if (wtype != rpcrdma_noch) { +				dprintk("RPC:       %s: invalid chunk list\n", +					__func__); +				return -EIO; +			}  		} else {  			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;  			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; @@ -492,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			 * on receive. Therefore, we request a reply chunk  			 * for non-writes wherever feasible and efficient.  			 */ -			if (wtype == rpcrdma_noch && -			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) +			if (wtype == rpcrdma_noch)  				wtype = rpcrdma_replych;  		}  	} @@ -511,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  		hdrlen = rpcrdma_create_chunks(rqst,  					&rqst->rq_rcv_buf, headerp, wtype);  	} - -	if (hdrlen == 0) -		return -1; +	if (hdrlen < 0) +		return hdrlen;  	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"  		" headerp 0x%p base 0x%p lkey 0x%x\n", @@ -649,9 +652,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  				break;  			page_base = 0;  		} -		rqst->rq_rcv_buf.page_len = olen - copy_len; -	} else -		rqst->rq_rcv_buf.page_len = 0; +	}  	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {  		curlen = copy_len; @@ -682,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  	rqst->rq_private_buf = rqst->rq_rcv_buf;  } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */  void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work)  { +	struct rpcrdma_ep *ep = +		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);  	struct rpc_xprt *xprt = ep->rep_xprt;  	spin_lock_bh(&xprt->transport_lock); @@ -707,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)  }  /* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest.   */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +void +rpcrdma_conn_func(struct rpcrdma_ep *ep)  { -	wake_up(&rep->rr_unbind); +	schedule_delayed_work(&ep->rep_connect_worker, 0);  }  /* @@ -730,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)  	struct rpc_xprt *xprt = rep->rr_xprt;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	__be32 *iptr; -	int i, rdmalen, status; +	int rdmalen, status; +	unsigned long cwnd;  	/* Check status. If bad, signal disconnect and return rep to pool */  	if (rep->rr_len == ~0U) { @@ -785,6 +785,7 @@ repost:  	/* from here on, the reply is no longer an orphan */  	req->rl_reply = rep; +	xprt->reestablish_timeout = 0;  	/* check for expected message types */  	/* The order of some of these tests is important. */ @@ -859,26 +860,10 @@ badheader:  		break;  	} -	/* If using mw bind, start the deregister process now. */ -	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */ -	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { -	case RPCRDMA_MEMWINDOWS: -		for (i = 0; req->rl_nchunks-- > 1;) -			i += rpcrdma_deregister_external( -				&req->rl_segments[i], r_xprt, NULL); -		/* Optionally wait (not here) for unbinds to complete */ -		rep->rr_func = rpcrdma_unbind_func; -		(void) rpcrdma_deregister_external(&req->rl_segments[i], -						   r_xprt, rep); -		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -		for (i = 0; req->rl_nchunks--;) -			i += rpcrdma_deregister_external(&req->rl_segments[i], -							 r_xprt, NULL); -		break; -	default: -		break; -	} +	cwnd = xprt->cwnd; +	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; +	if (xprt->cwnd > cwnd) +		xprt_release_rqst_cong(rqst->rq_task);  	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",  			__func__, xprt, rqst, status); diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 0ce75524ed2..8f92a61ee2d 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -69,7 +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  	/* Set up the XDR head */  	rqstp->rq_arg.head[0].iov_base = page_address(page); -	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); +	rqstp->rq_arg.head[0].iov_len = +		min_t(size_t, byte_count, ctxt->sge[0].length);  	rqstp->rq_arg.len = byte_count;  	rqstp->rq_arg.buflen = byte_count; @@ -85,11 +87,12 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  		page = ctxt->pages[sge_no];  		put_page(rqstp->rq_pages[sge_no]);  		rqstp->rq_pages[sge_no] = page; -		bc -= min(bc, ctxt->sge[sge_no].length); +		bc -= min_t(u32, bc, ctxt->sge[sge_no].length);  		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;  		sge_no++;  	}  	rqstp->rq_respages = &rqstp->rq_pages[sge_no]; +	rqstp->rq_next_page = rqstp->rq_respages + 1;  	/* We should never run out of SGE because the limit is defined to  	 * support the max allowed RPC data length @@ -112,289 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  	rqstp->rq_arg.tail[0].iov_len = 0;  } -/* Encode a read-chunk-list as an array of IB SGE - * - * Assumptions: - * - chunk[0]->position points to pages[0] at an offset of 0 - * - pages[] is not physically or virtually contiguous and consists of - *   PAGE_SIZE elements. - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - *   chunk in the read list - * - */ -static int map_read_chunks(struct svcxprt_rdma *xprt, -			   struct svc_rqst *rqstp, -			   struct svc_rdma_op_ctxt *head, -			   struct rpcrdma_msg *rmsgp, -			   struct svc_rdma_req_map *rpl_map, -			   struct svc_rdma_req_map *chl_map, -			   int ch_count, -			   int byte_count) +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)  { -	int sge_no; -	int sge_bytes; -	int page_off; -	int page_no; -	int ch_bytes; -	int ch_no; -	struct rpcrdma_read_chunk *ch; +	if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == +	     RDMA_TRANSPORT_IWARP) +		return 1; +	else +		return min_t(int, sge_count, xprt->sc_max_sge); +} -	sge_no = 0; -	page_no = 0; -	page_off = 0; -	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	ch_no = 0; -	ch_bytes = ntohl(ch->rc_target.rs_length); -	head->arg.head[0] = rqstp->rq_arg.head[0]; -	head->arg.tail[0] = rqstp->rq_arg.tail[0]; -	head->arg.pages = &head->pages[head->count]; -	head->hdr_count = head->count; /* save count of hdr pages */ -	head->arg.page_base = 0; -	head->arg.page_len = ch_bytes; -	head->arg.len = rqstp->rq_arg.len + ch_bytes; -	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; -	head->count++; -	chl_map->ch[0].start = 0; -	while (byte_count) { -		rpl_map->sge[sge_no].iov_base = -			page_address(rqstp->rq_arg.pages[page_no]) + page_off; -		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); -		rpl_map->sge[sge_no].iov_len = sge_bytes; -		/* -		 * Don't bump head->count here because the same page -		 * may be used by multiple SGE. -		 */ -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; -		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt, +			      struct svc_rqst *rqstp, +			      struct svc_rdma_op_ctxt *head, +			      int *page_no, +			      u32 *page_offset, +			      u32 rs_handle, +			      u32 rs_length, +			      u64 rs_offset, +			      int last); + +/* Issue an RDMA_READ using the local lkey to map the data sink */ +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, +			       struct svc_rqst *rqstp, +			       struct svc_rdma_op_ctxt *head, +			       int *page_no, +			       u32 *page_offset, +			       u32 rs_handle, +			       u32 rs_length, +			       u64 rs_offset, +			       int last) +{ +	struct ib_send_wr read_wr; +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); +	int ret, read, pno; +	u32 pg_off = *page_offset; +	u32 pg_no = *page_no; -		byte_count -= sge_bytes; -		ch_bytes -= sge_bytes; -		sge_no++; -		/* -		 * If all bytes for this chunk have been mapped to an -		 * SGE, move to the next SGE -		 */ -		if (ch_bytes == 0) { -			chl_map->ch[ch_no].count = -				sge_no - chl_map->ch[ch_no].start; -			ch_no++; -			ch++; -			chl_map->ch[ch_no].start = sge_no; -			ch_bytes = ntohl(ch->rc_target.rs_length); -			/* If bytes remaining account for next chunk */ -			if (byte_count) { -				head->arg.page_len += ch_bytes; -				head->arg.len += ch_bytes; -				head->arg.buflen += ch_bytes; -			} +	ctxt->direction = DMA_FROM_DEVICE; +	ctxt->read_hdr = head; +	pages_needed = +		min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed)); +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); + +	for (pno = 0; pno < pages_needed; pno++) { +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off); + +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; +		head->arg.page_len += len; +		head->arg.len += len; +		if (!pg_off) +			head->count++; +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; +		rqstp->rq_next_page = rqstp->rq_respages + 1; +		ctxt->sge[pno].addr = +			ib_dma_map_page(xprt->sc_cm_id->device, +					head->arg.pages[pg_no], pg_off, +					PAGE_SIZE - pg_off, +					DMA_FROM_DEVICE); +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device, +					   ctxt->sge[pno].addr); +		if (ret) +			goto err; +		atomic_inc(&xprt->sc_dma_used); + +		/* The lkey here is either a local dma lkey or a dma_mr lkey */ +		ctxt->sge[pno].lkey = xprt->sc_dma_lkey; +		ctxt->sge[pno].length = len; +		ctxt->count++; + +		/* adjust offset and wrap to next page if needed */ +		pg_off += len; +		if (pg_off == PAGE_SIZE) { +			pg_off = 0; +			pg_no++;  		} -		/* -		 * If this SGE consumed all of the page, move to the -		 * next page -		 */ -		if ((sge_bytes + page_off) == PAGE_SIZE) { -			page_no++; -			page_off = 0; -			/* -			 * If there are still bytes left to map, bump -			 * the page count -			 */ -			if (byte_count) -				head->count++; -		} else -			page_off += sge_bytes; +		rs_length -= len;  	} -	BUG_ON(byte_count != 0); -	return sge_no; + +	if (last && rs_length == 0) +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); +	else +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); + +	memset(&read_wr, 0, sizeof(read_wr)); +	read_wr.wr_id = (unsigned long)ctxt; +	read_wr.opcode = IB_WR_RDMA_READ; +	ctxt->wr_op = read_wr.opcode; +	read_wr.send_flags = IB_SEND_SIGNALED; +	read_wr.wr.rdma.rkey = rs_handle; +	read_wr.wr.rdma.remote_addr = rs_offset; +	read_wr.sg_list = ctxt->sge; +	read_wr.num_sge = pages_needed; + +	ret = svc_rdma_send(xprt, &read_wr); +	if (ret) { +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +		goto err; +	} + +	/* return current location in page array */ +	*page_no = pg_no; +	*page_offset = pg_off; +	ret = read; +	atomic_inc(&rdma_stat_read); +	return ret; + err: +	svc_rdma_unmap_dma(ctxt); +	svc_rdma_put_context(ctxt, 0); +	return ret;  } -/* Map a read-chunk-list to an XDR and fast register the page-list. - * - * Assumptions: - * - chunk[0]	position points to pages[0] at an offset of 0 - * - pages[]	will be made physically contiguous by creating a one-off memory - *		region using the fastreg verb. - * - byte_count is # of bytes in read-chunk-list - * - ch_count	is # of chunks in read-chunk-list - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - *   chunk in the read list - */ -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt, +/* Issue an RDMA_READ using an FRMR to map the data sink */ +static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,  				struct svc_rqst *rqstp,  				struct svc_rdma_op_ctxt *head, -				struct rpcrdma_msg *rmsgp, -				struct svc_rdma_req_map *rpl_map, -				struct svc_rdma_req_map *chl_map, -				int ch_count, -				int byte_count) +				int *page_no, +				u32 *page_offset, +				u32 rs_handle, +				u32 rs_length, +				u64 rs_offset, +				int last)  { -	int page_no; -	int ch_no; -	u32 offset; -	struct rpcrdma_read_chunk *ch; -	struct svc_rdma_fastreg_mr *frmr; -	int ret = 0; +	struct ib_send_wr read_wr; +	struct ib_send_wr inv_wr; +	struct ib_send_wr fastreg_wr; +	u8 key; +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); +	struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt); +	int ret, read, pno; +	u32 pg_off = *page_offset; +	u32 pg_no = *page_no; -	frmr = svc_rdma_get_frmr(xprt);  	if (IS_ERR(frmr))  		return -ENOMEM; -	head->frmr = frmr; -	head->arg.head[0] = rqstp->rq_arg.head[0]; -	head->arg.tail[0] = rqstp->rq_arg.tail[0]; -	head->arg.pages = &head->pages[head->count]; -	head->hdr_count = head->count; /* save count of hdr pages */ -	head->arg.page_base = 0; -	head->arg.page_len = byte_count; -	head->arg.len = rqstp->rq_arg.len + byte_count; -	head->arg.buflen = rqstp->rq_arg.buflen + byte_count; +	ctxt->direction = DMA_FROM_DEVICE; +	ctxt->frmr = frmr; +	pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len); +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); -	/* Fast register the page list */ -	frmr->kva = page_address(rqstp->rq_arg.pages[0]); +	frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);  	frmr->direction = DMA_FROM_DEVICE;  	frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); -	frmr->map_len = byte_count; -	frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT; -	for (page_no = 0; page_no < frmr->page_list_len; page_no++) { -		frmr->page_list->page_list[page_no] = +	frmr->map_len = pages_needed << PAGE_SHIFT; +	frmr->page_list_len = pages_needed; + +	for (pno = 0; pno < pages_needed; pno++) { +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off); + +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; +		head->arg.page_len += len; +		head->arg.len += len; +		if (!pg_off) +			head->count++; +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; +		rqstp->rq_next_page = rqstp->rq_respages + 1; +		frmr->page_list->page_list[pno] =  			ib_dma_map_page(xprt->sc_cm_id->device, -					rqstp->rq_arg.pages[page_no], 0, +					head->arg.pages[pg_no], 0,  					PAGE_SIZE, DMA_FROM_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device, +					   frmr->page_list->page_list[pno]); +		if (ret) +			goto err;  		atomic_inc(&xprt->sc_dma_used); -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; -	} -	head->count += page_no; -	/* rq_respages points one past arg pages */ -	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; - -	/* Create the reply and chunk maps */ -	offset = 0; -	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	for (ch_no = 0; ch_no < ch_count; ch_no++) { -		int len = ntohl(ch->rc_target.rs_length); -		rpl_map->sge[ch_no].iov_base = frmr->kva + offset; -		rpl_map->sge[ch_no].iov_len = len; -		chl_map->ch[ch_no].count = 1; -		chl_map->ch[ch_no].start = ch_no; -		offset += len; -		ch++; +		/* adjust offset and wrap to next page if needed */ +		pg_off += len; +		if (pg_off == PAGE_SIZE) { +			pg_off = 0; +			pg_no++; +		} +		rs_length -= len;  	} -	ret = svc_rdma_fastreg(xprt, frmr); -	if (ret) -		goto fatal_err; - -	return ch_no; - - fatal_err: -	printk("svcrdma: error fast registering xdr for xprt %p", xprt); -	svc_rdma_put_frmr(xprt, frmr); -	return -EIO; -} - -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt, -			     struct svc_rdma_op_ctxt *ctxt, -			     struct svc_rdma_fastreg_mr *frmr, -			     struct kvec *vec, -			     u64 *sgl_offset, -			     int count) -{ -	int i; -	unsigned long off; +	if (last && rs_length == 0) +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); +	else +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -	ctxt->count = count; -	ctxt->direction = DMA_FROM_DEVICE; -	for (i = 0; i < count; i++) { -		ctxt->sge[i].length = 0; /* in case map fails */ -		if (!frmr) { -			BUG_ON(!virt_to_page(vec[i].iov_base)); -			off = (unsigned long)vec[i].iov_base & ~PAGE_MASK; -			ctxt->sge[i].addr = -				ib_dma_map_page(xprt->sc_cm_id->device, -						virt_to_page(vec[i].iov_base), -						off, -						vec[i].iov_len, -						DMA_FROM_DEVICE); -			if (ib_dma_mapping_error(xprt->sc_cm_id->device, -						 ctxt->sge[i].addr)) -				return -EINVAL; -			ctxt->sge[i].lkey = xprt->sc_dma_lkey; -			atomic_inc(&xprt->sc_dma_used); -		} else { -			ctxt->sge[i].addr = (unsigned long)vec[i].iov_base; -			ctxt->sge[i].lkey = frmr->mr->lkey; -		} -		ctxt->sge[i].length = vec[i].iov_len; -		*sgl_offset = *sgl_offset + vec[i].iov_len; +	/* Bump the key */ +	key = (u8)(frmr->mr->lkey & 0x000000FF); +	ib_update_fast_reg_key(frmr->mr, ++key); + +	ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset; +	ctxt->sge[0].lkey = frmr->mr->lkey; +	ctxt->sge[0].length = read; +	ctxt->count = 1; +	ctxt->read_hdr = head; + +	/* Prepare FASTREG WR */ +	memset(&fastreg_wr, 0, sizeof(fastreg_wr)); +	fastreg_wr.opcode = IB_WR_FAST_REG_MR; +	fastreg_wr.send_flags = IB_SEND_SIGNALED; +	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva; +	fastreg_wr.wr.fast_reg.page_list = frmr->page_list; +	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len; +	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT; +	fastreg_wr.wr.fast_reg.length = frmr->map_len; +	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags; +	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey; +	fastreg_wr.next = &read_wr; + +	/* Prepare RDMA_READ */ +	memset(&read_wr, 0, sizeof(read_wr)); +	read_wr.send_flags = IB_SEND_SIGNALED; +	read_wr.wr.rdma.rkey = rs_handle; +	read_wr.wr.rdma.remote_addr = rs_offset; +	read_wr.sg_list = ctxt->sge; +	read_wr.num_sge = 1; +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { +		read_wr.opcode = IB_WR_RDMA_READ_WITH_INV; +		read_wr.wr_id = (unsigned long)ctxt; +		read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; +	} else { +		read_wr.opcode = IB_WR_RDMA_READ; +		read_wr.next = &inv_wr; +		/* Prepare invalidate */ +		memset(&inv_wr, 0, sizeof(inv_wr)); +		inv_wr.wr_id = (unsigned long)ctxt; +		inv_wr.opcode = IB_WR_LOCAL_INV; +		inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; +		inv_wr.ex.invalidate_rkey = frmr->mr->lkey; +	} +	ctxt->wr_op = read_wr.opcode; + +	/* Post the chain */ +	ret = svc_rdma_send(xprt, &fastreg_wr); +	if (ret) { +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +		goto err;  	} -	return 0; -} -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{ -	if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == -	     RDMA_TRANSPORT_IWARP) && -	    sge_count > 1) -		return 1; -	else -		return min_t(int, sge_count, xprt->sc_max_sge); +	/* return current location in page array */ +	*page_no = pg_no; +	*page_offset = pg_off; +	ret = read; +	atomic_inc(&rdma_stat_read); +	return ret; + err: +	svc_rdma_unmap_dma(ctxt); +	svc_rdma_put_context(ctxt, 0); +	svc_rdma_put_frmr(xprt, frmr); +	return ret;  } -/* - * Use RDMA_READ to read data from the advertised client buffer into the - * XDR stream starting at rq_arg.head[0].iov_base. - * Each chunk in the array - * contains the following fields: - * discrim      - '1', This isn't used for data placement - * position     - The xdr stream offset (the same for every chunk) - * handle       - RMR for client memory region - * length       - data transfer length - * offset       - 64 bit tagged offset in remote memory region - * - * On our side, we need to read into a pagelist. The first page immediately - * follows the RPC header. - * - * This function returns: - * 0 - No error and no read-list found. - * - * 1 - Successful read-list processing. The data is not yet in - * the pagelist and therefore the RPC request must be deferred. The - * I/O completion will enqueue the transport again and - * svc_rdma_recvfrom will complete the request. - * - * <0 - Error processing/posting read-list. - * - * NOTE: The ctxt must not be touched after the last WR has been posted - * because the I/O completion processing may occur on another - * processor and free / modify the context. Ne touche pas! - */ -static int rdma_read_xdr(struct svcxprt_rdma *xprt, -			 struct rpcrdma_msg *rmsgp, -			 struct svc_rqst *rqstp, -			 struct svc_rdma_op_ctxt *hdr_ctxt) +static int rdma_read_chunks(struct svcxprt_rdma *xprt, +			    struct rpcrdma_msg *rmsgp, +			    struct svc_rqst *rqstp, +			    struct svc_rdma_op_ctxt *head)  { -	struct ib_send_wr read_wr; -	struct ib_send_wr inv_wr; -	int err = 0; -	int ch_no; -	int ch_count; -	int byte_count; -	int sge_count; -	u64 sgl_offset; +	int page_no, ch_count, ret;  	struct rpcrdma_read_chunk *ch; -	struct svc_rdma_op_ctxt *ctxt = NULL; -	struct svc_rdma_req_map *rpl_map; -	struct svc_rdma_req_map *chl_map; +	u32 page_offset, byte_count; +	u64 rs_offset; +	rdma_reader_fn reader;  	/* If no read list is present, return 0 */  	ch = svc_rdma_get_read_chunk(rmsgp); @@ -405,129 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,  	if (ch_count > RPCSVC_MAXPAGES)  		return -EINVAL; -	/* Allocate temporary reply and chunk maps */ -	rpl_map = svc_rdma_get_req_map(); -	chl_map = svc_rdma_get_req_map(); +	/* The request is completed when the RDMA_READs complete. The +	 * head context keeps all the pages that comprise the +	 * request. +	 */ +	head->arg.head[0] = rqstp->rq_arg.head[0]; +	head->arg.tail[0] = rqstp->rq_arg.tail[0]; +	head->arg.pages = &head->pages[head->count]; +	head->hdr_count = head->count; +	head->arg.page_base = 0; +	head->arg.page_len = 0; +	head->arg.len = rqstp->rq_arg.len; +	head->arg.buflen = rqstp->rq_arg.buflen; -	if (!xprt->sc_frmr_pg_list_len) -		sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, -					    rpl_map, chl_map, ch_count, -					    byte_count); +	/* Use FRMR if supported */ +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG) +		reader = rdma_read_chunk_frmr;  	else -		sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, -						 rpl_map, chl_map, ch_count, -						 byte_count); -	if (sge_count < 0) { -		err = -EIO; -		goto out; -	} - -	sgl_offset = 0; -	ch_no = 0; +		reader = rdma_read_chunk_lcl; +	page_no = 0; page_offset = 0;  	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	     ch->rc_discrim != 0; ch++, ch_no++) { -		u64 rs_offset; -next_sge: -		ctxt = svc_rdma_get_context(xprt); -		ctxt->direction = DMA_FROM_DEVICE; -		ctxt->frmr = hdr_ctxt->frmr; -		ctxt->read_hdr = NULL; -		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); +	     ch->rc_discrim != 0; ch++) { -		/* Prepare READ WR */ -		memset(&read_wr, 0, sizeof read_wr); -		read_wr.wr_id = (unsigned long)ctxt; -		read_wr.opcode = IB_WR_RDMA_READ; -		ctxt->wr_op = read_wr.opcode; -		read_wr.send_flags = IB_SEND_SIGNALED; -		read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);  		xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,  				 &rs_offset); -		read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset; -		read_wr.sg_list = ctxt->sge; -		read_wr.num_sge = -			rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); -		err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr, -					&rpl_map->sge[chl_map->ch[ch_no].start], -					&sgl_offset, -					read_wr.num_sge); -		if (err) { -			svc_rdma_unmap_dma(ctxt); -			svc_rdma_put_context(ctxt, 0); -			goto out; -		} -		if (((ch+1)->rc_discrim == 0) && -		    (read_wr.num_sge == chl_map->ch[ch_no].count)) { -			/* -			 * Mark the last RDMA_READ with a bit to -			 * indicate all RPC data has been fetched from -			 * the client and the RPC needs to be enqueued. -			 */ -			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -			if (hdr_ctxt->frmr) { -				set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); -				/* -				 * Invalidate the local MR used to map the data -				 * sink. -				 */ -				if (xprt->sc_dev_caps & -				    SVCRDMA_DEVCAP_READ_W_INV) { -					read_wr.opcode = -						IB_WR_RDMA_READ_WITH_INV; -					ctxt->wr_op = read_wr.opcode; -					read_wr.ex.invalidate_rkey = -						ctxt->frmr->mr->lkey; -				} else { -					/* Prepare INVALIDATE WR */ -					memset(&inv_wr, 0, sizeof inv_wr); -					inv_wr.opcode = IB_WR_LOCAL_INV; -					inv_wr.send_flags = IB_SEND_SIGNALED; -					inv_wr.ex.invalidate_rkey = -						hdr_ctxt->frmr->mr->lkey; -					read_wr.next = &inv_wr; -				} -			} -			ctxt->read_hdr = hdr_ctxt; +		byte_count = ntohl(ch->rc_target.rs_length); + +		while (byte_count > 0) { +			ret = reader(xprt, rqstp, head, +				     &page_no, &page_offset, +				     ntohl(ch->rc_target.rs_handle), +				     byte_count, rs_offset, +				     ((ch+1)->rc_discrim == 0) /* last */ +				     ); +			if (ret < 0) +				goto err; +			byte_count -= ret; +			rs_offset += ret; +			head->arg.buflen += ret;  		} -		/* Post the read */ -		err = svc_rdma_send(xprt, &read_wr); -		if (err) { -			printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n", -			       err); -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); -			svc_rdma_unmap_dma(ctxt); -			svc_rdma_put_context(ctxt, 0); -			goto out; -		} -		atomic_inc(&rdma_stat_read); - -		if (read_wr.num_sge < chl_map->ch[ch_no].count) { -			chl_map->ch[ch_no].count -= read_wr.num_sge; -			chl_map->ch[ch_no].start += read_wr.num_sge; -			goto next_sge; -		} -		sgl_offset = 0; -		err = 1;  	} - - out: -	svc_rdma_put_req_map(rpl_map); -	svc_rdma_put_req_map(chl_map); - +	ret = 1; + err:  	/* Detach arg pages. svc_recv will replenish them */ -	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) -		rqstp->rq_pages[ch_no] = NULL; - -	/* -	 * Detach res pages. If svc_release sees any it will attempt to -	 * put them. -	 */ -	while (rqstp->rq_next_page != rqstp->rq_respages) -		*(--rqstp->rq_next_page) = NULL; +	for (page_no = 0; +	     &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) +		rqstp->rq_pages[page_no] = NULL; -	return err; +	return ret;  }  static int rdma_read_complete(struct svc_rqst *rqstp, @@ -550,7 +455,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,  	/* rq_respages starts after the last arg page */  	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; -	rqstp->rq_next_page = &rqstp->rq_arg.pages[page_no]; +	rqstp->rq_next_page = rqstp->rq_respages + 1;  	/* Rebuild rq_arg head and tail. */  	rqstp->rq_arg.head[0] = head->arg.head[0]; @@ -599,13 +504,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  				  struct svc_rdma_op_ctxt,  				  dto_q);  		list_del_init(&ctxt->dto_q); -	} -	if (ctxt) {  		spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);  		return rdma_read_complete(rqstp, ctxt); -	} - -	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { +	} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {  		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,  				  struct svc_rdma_op_ctxt,  				  dto_q); @@ -625,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))  			goto close_out; -		BUG_ON(ret);  		goto out;  	}  	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", @@ -648,12 +548,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  	}  	/* Read read-list data. */ -	ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); +	ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);  	if (ret > 0) {  		/* read-list posted, defer until data received from client. */  		goto defer; -	} -	if (ret < 0) { +	} else if (ret < 0) {  		/* Post of read-list failed, free context. */  		svc_rdma_put_context(ctxt, 1);  		return 0; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index c1d124dc772..49fd21a5c21 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -49,152 +50,6 @@  #define RPCDBG_FACILITY	RPCDBG_SVCXPRT -/* Encode an XDR as an array of IB SGE - * - * Assumptions: - * - head[0] is physically contiguous. - * - tail[0] is physically contiguous. - * - pages[] is not physically or virtually contiguous and consists of - *   PAGE_SIZE elements. - * - * Output: - * SGE[0]              reserved for RCPRDMA header - * SGE[1]              data from xdr->head[] - * SGE[2..sge_count-2] data from xdr->pages[] - * SGE[sge_count-1]    data from xdr->tail. - * - * The max SGE we need is the length of the XDR / pagesize + one for - * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES - * reserves a page for both the request and the reply header, and this - * array is only concerned with the reply we are assured that we have - * on extra page for the RPCRMDA header. - */ -static int fast_reg_xdr(struct svcxprt_rdma *xprt, -			struct xdr_buf *xdr, -			struct svc_rdma_req_map *vec) -{ -	int sge_no; -	u32 sge_bytes; -	u32 page_bytes; -	u32 page_off; -	int page_no = 0; -	u8 *frva; -	struct svc_rdma_fastreg_mr *frmr; - -	frmr = svc_rdma_get_frmr(xprt); -	if (IS_ERR(frmr)) -		return -ENOMEM; -	vec->frmr = frmr; - -	/* Skip the RPCRDMA header */ -	sge_no = 1; - -	/* Map the head. */ -	frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK); -	vec->sge[sge_no].iov_base = xdr->head[0].iov_base; -	vec->sge[sge_no].iov_len = xdr->head[0].iov_len; -	vec->count = 2; -	sge_no++; - -	/* Map the XDR head */ -	frmr->kva = frva; -	frmr->direction = DMA_TO_DEVICE; -	frmr->access_flags = 0; -	frmr->map_len = PAGE_SIZE; -	frmr->page_list_len = 1; -	page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK; -	frmr->page_list->page_list[page_no] = -		ib_dma_map_page(xprt->sc_cm_id->device, -				virt_to_page(xdr->head[0].iov_base), -				page_off, -				PAGE_SIZE - page_off, -				DMA_TO_DEVICE); -	if (ib_dma_mapping_error(xprt->sc_cm_id->device, -				 frmr->page_list->page_list[page_no])) -		goto fatal_err; -	atomic_inc(&xprt->sc_dma_used); - -	/* Map the XDR page list */ -	page_off = xdr->page_base; -	page_bytes = xdr->page_len + page_off; -	if (!page_bytes) -		goto encode_tail; - -	/* Map the pages */ -	vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off; -	vec->sge[sge_no].iov_len = page_bytes; -	sge_no++; -	while (page_bytes) { -		struct page *page; - -		page = xdr->pages[page_no++]; -		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off)); -		page_bytes -= sge_bytes; - -		frmr->page_list->page_list[page_no] = -			ib_dma_map_page(xprt->sc_cm_id->device, -					page, page_off, -					sge_bytes, DMA_TO_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; - -		atomic_inc(&xprt->sc_dma_used); -		page_off = 0; /* reset for next time through loop */ -		frmr->map_len += PAGE_SIZE; -		frmr->page_list_len++; -	} -	vec->count++; - - encode_tail: -	/* Map tail */ -	if (0 == xdr->tail[0].iov_len) -		goto done; - -	vec->count++; -	vec->sge[sge_no].iov_len = xdr->tail[0].iov_len; - -	if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) == -	    ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) { -		/* -		 * If head and tail use the same page, we don't need -		 * to map it again. -		 */ -		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base; -	} else { -		void *va; - -		/* Map another page for the tail */ -		page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; -		va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK); -		vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off; - -		frmr->page_list->page_list[page_no] = -		    ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va), -				    page_off, -				    PAGE_SIZE, -				    DMA_TO_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; -		atomic_inc(&xprt->sc_dma_used); -		frmr->map_len += PAGE_SIZE; -		frmr->page_list_len++; -	} - - done: -	if (svc_rdma_fastreg(xprt, frmr)) -		goto fatal_err; - -	return 0; - - fatal_err: -	printk("svcrdma: Error fast registering memory for xprt %p\n", xprt); -	vec->frmr = NULL; -	svc_rdma_put_frmr(xprt, frmr); -	return -EIO; -} -  static int map_xdr(struct svcxprt_rdma *xprt,  		   struct xdr_buf *xdr,  		   struct svc_rdma_req_map *vec) @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,  	BUG_ON(xdr->len !=  	       (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)); -	if (xprt->sc_frmr_pg_list_len) -		return fast_reg_xdr(xprt, xdr, vec); -  	/* Skip the first sge, this is for the RPCRDMA header */  	sge_no = 1; @@ -265,6 +117,7 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,  		xdr_off -= xdr->head[0].iov_len;  		if (xdr_off < xdr->page_len) {  			/* This offset is in the page list */ +			xdr_off += xdr->page_base;  			page = xdr->pages[xdr_off >> PAGE_SHIFT];  			xdr_off &= ~PAGE_MASK;  		} else { @@ -281,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,  }  /* Assumptions: - * - We are using FRMR - *     - or -   * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE   */  static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -326,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,  		sge_bytes = min_t(size_t,  			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);  		sge[sge_no].length = sge_bytes; -		if (!vec->frmr) { -			sge[sge_no].addr = -				dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, -					    sge_bytes, DMA_TO_DEVICE); -			xdr_off += sge_bytes; -			if (ib_dma_mapping_error(xprt->sc_cm_id->device, -						 sge[sge_no].addr)) -				goto err; -			atomic_inc(&xprt->sc_dma_used); -			sge[sge_no].lkey = xprt->sc_dma_lkey; -		} else { -			sge[sge_no].addr = (unsigned long) -				vec->sge[xdr_sge_no].iov_base + sge_off; -			sge[sge_no].lkey = vec->frmr->mr->lkey; -		} +		sge[sge_no].addr = +			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, +				    sge_bytes, DMA_TO_DEVICE); +		xdr_off += sge_bytes; +		if (ib_dma_mapping_error(xprt->sc_cm_id->device, +					 sge[sge_no].addr)) +			goto err; +		atomic_inc(&xprt->sc_dma_used); +		sge[sge_no].lkey = xprt->sc_dma_lkey;  		ctxt->count++; -		ctxt->frmr = vec->frmr;  		sge_off = 0;  		sge_no++;  		xdr_sge_no++; @@ -368,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,  	return 0;   err:  	svc_rdma_unmap_dma(ctxt); -	svc_rdma_put_frmr(xprt, vec->frmr);  	svc_rdma_put_context(ctxt, 0);  	/* Fatal error, close transport */  	return -EIO; @@ -396,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,  	res_ary = (struct rpcrdma_write_array *)  		&rdma_resp->rm_body.rm_chunks[1]; -	if (vec->frmr) -		max_write = vec->frmr->map_len; -	else -		max_write = xprt->sc_max_sge * PAGE_SIZE; +	max_write = xprt->sc_max_sge * PAGE_SIZE;  	/* Write chunks start at the pagelist */  	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -471,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,  	res_ary = (struct rpcrdma_write_array *)  		&rdma_resp->rm_body.rm_chunks[2]; -	if (vec->frmr) -		max_write = vec->frmr->map_len; -	else -		max_write = xprt->sc_max_sge * PAGE_SIZE; +	max_write = xprt->sc_max_sge * PAGE_SIZE;  	/* xdr offset starts at RPC message */  	nchunks = ntohl(arg_ary->wc_nchunks); @@ -544,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  		      int byte_count)  {  	struct ib_send_wr send_wr; -	struct ib_send_wr inv_wr;  	int sge_no;  	int sge_bytes;  	int page_no; @@ -558,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  		       "svcrdma: could not post a receive buffer, err=%d."  		       "Closing transport %p.\n", ret, rdma);  		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); -		svc_rdma_put_frmr(rdma, vec->frmr);  		svc_rdma_put_context(ctxt, 0);  		return -ENOTCONN;  	} @@ -566,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  	/* Prepare the context */  	ctxt->pages[0] = page;  	ctxt->count = 1; -	ctxt->frmr = vec->frmr; -	if (vec->frmr) -		set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); -	else -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);  	/* Prepare the SGE for the RPCRDMA Header */  	ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -589,21 +419,15 @@ static int send_reply(struct svcxprt_rdma *rdma,  		int xdr_off = 0;  		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);  		byte_count -= sge_bytes; -		if (!vec->frmr) { -			ctxt->sge[sge_no].addr = -				dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, -					    sge_bytes, DMA_TO_DEVICE); -			xdr_off += sge_bytes; -			if (ib_dma_mapping_error(rdma->sc_cm_id->device, -						 ctxt->sge[sge_no].addr)) -				goto err; -			atomic_inc(&rdma->sc_dma_used); -			ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey; -		} else { -			ctxt->sge[sge_no].addr = (unsigned long) -				vec->sge[sge_no].iov_base; -			ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey; -		} +		ctxt->sge[sge_no].addr = +			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, +				    sge_bytes, DMA_TO_DEVICE); +		xdr_off += sge_bytes; +		if (ib_dma_mapping_error(rdma->sc_cm_id->device, +					 ctxt->sge[sge_no].addr)) +			goto err; +		atomic_inc(&rdma->sc_dma_used); +		ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;  		ctxt->sge[sge_no].length = sge_bytes;  	}  	BUG_ON(byte_count != 0); @@ -625,6 +449,8 @@ static int send_reply(struct svcxprt_rdma *rdma,  		if (page_no+1 >= sge_no)  			ctxt->sge[page_no+1].length = 0;  	} +	rqstp->rq_next_page = rqstp->rq_respages + 1; +  	BUG_ON(sge_no > rdma->sc_max_sge);  	memset(&send_wr, 0, sizeof send_wr);  	ctxt->wr_op = IB_WR_SEND; @@ -633,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  	send_wr.num_sge = sge_no;  	send_wr.opcode = IB_WR_SEND;  	send_wr.send_flags =  IB_SEND_SIGNALED; -	if (vec->frmr) { -		/* Prepare INVALIDATE WR */ -		memset(&inv_wr, 0, sizeof inv_wr); -		inv_wr.opcode = IB_WR_LOCAL_INV; -		inv_wr.send_flags = IB_SEND_SIGNALED; -		inv_wr.ex.invalidate_rkey = -			vec->frmr->mr->lkey; -		send_wr.next = &inv_wr; -	}  	ret = svc_rdma_send(rdma, &send_wr);  	if (ret) @@ -651,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,   err:  	svc_rdma_unmap_dma(ctxt); -	svc_rdma_put_frmr(rdma, vec->frmr);  	svc_rdma_put_context(ctxt, 1);  	return -EIO;  } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 62e4f9bcc38..e7323fbbd34 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -65,6 +66,7 @@ static void dto_tasklet_func(unsigned long data);  static void svc_rdma_detach(struct svc_xprt *xprt);  static void svc_rdma_free(struct svc_xprt *xprt);  static int svc_rdma_has_wspace(struct svc_xprt *xprt); +static int svc_rdma_secure_port(struct svc_rqst *);  static void rq_cq_reap(struct svcxprt_rdma *xprt);  static void sq_cq_reap(struct svcxprt_rdma *xprt); @@ -82,6 +84,7 @@ static struct svc_xprt_ops svc_rdma_ops = {  	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,  	.xpo_has_wspace = svc_rdma_has_wspace,  	.xpo_accept = svc_rdma_accept, +	.xpo_secure_port = svc_rdma_secure_port,  };  struct svc_xprt_class svc_rdma_class = { @@ -160,7 +163,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)  		schedule_timeout_uninterruptible(msecs_to_jiffies(500));  	}  	map->count = 0; -	map->frmr = NULL;  	return map;  } @@ -336,22 +338,21 @@ static void process_context(struct svcxprt_rdma *xprt,  	switch (ctxt->wr_op) {  	case IB_WR_SEND: -		if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags)) -			svc_rdma_put_frmr(xprt, ctxt->frmr); +		BUG_ON(ctxt->frmr);  		svc_rdma_put_context(ctxt, 1);  		break;  	case IB_WR_RDMA_WRITE: +		BUG_ON(ctxt->frmr);  		svc_rdma_put_context(ctxt, 0);  		break;  	case IB_WR_RDMA_READ:  	case IB_WR_RDMA_READ_WITH_INV: +		svc_rdma_put_frmr(xprt, ctxt->frmr);  		if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {  			struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;  			BUG_ON(!read_hdr); -			if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags)) -				svc_rdma_put_frmr(xprt, ctxt->frmr);  			spin_lock_bh(&xprt->sc_rq_dto_lock);  			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);  			list_add_tail(&read_hdr->dto_q, @@ -363,6 +364,7 @@ static void process_context(struct svcxprt_rdma *xprt,  		break;  	default: +		BUG_ON(1);  		printk(KERN_ERR "svcrdma: unexpected completion type, "  		       "opcode=%d\n",  		       ctxt->wr_op); @@ -378,29 +380,42 @@ static void process_context(struct svcxprt_rdma *xprt,  static void sq_cq_reap(struct svcxprt_rdma *xprt)  {  	struct svc_rdma_op_ctxt *ctxt = NULL; -	struct ib_wc wc; +	struct ib_wc wc_a[6]; +	struct ib_wc *wc;  	struct ib_cq *cq = xprt->sc_sq_cq;  	int ret; +	memset(wc_a, 0, sizeof(wc_a)); +  	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))  		return;  	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);  	atomic_inc(&rdma_stat_sq_poll); -	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { -		if (wc.status != IB_WC_SUCCESS) -			/* Close the transport */ -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +	while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) { +		int i; -		/* Decrement used SQ WR count */ -		atomic_dec(&xprt->sc_sq_count); -		wake_up(&xprt->sc_send_wait); +		for (i = 0; i < ret; i++) { +			wc = &wc_a[i]; +			if (wc->status != IB_WC_SUCCESS) { +				dprintk("svcrdma: sq wc err status %d\n", +					wc->status); -		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; -		if (ctxt) -			process_context(xprt, ctxt); +				/* Close the transport */ +				set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +			} -		svc_xprt_put(&xprt->sc_xprt); +			/* Decrement used SQ WR count */ +			atomic_dec(&xprt->sc_sq_count); +			wake_up(&xprt->sc_send_wait); + +			ctxt = (struct svc_rdma_op_ctxt *) +				(unsigned long)wc->wr_id; +			if (ctxt) +				process_context(xprt, ctxt); + +			svc_xprt_put(&xprt->sc_xprt); +		}  	}  	if (ctxt) @@ -477,8 +492,7 @@ struct page *svc_rdma_get_page(void)  	while ((page = alloc_page(GFP_KERNEL)) == NULL) {  		/* If we can't get memory, wait a bit and try again */ -		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 " -		       "jiffies.\n"); +		printk(KERN_INFO "svcrdma: out of memory...retrying in 1s\n");  		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));  	}  	return page; @@ -994,7 +1008,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)  			need_dma_mr = 0;  		break;  	case RDMA_TRANSPORT_IB: -		if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) { +		if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) { +			need_dma_mr = 1; +			dma_mr_acc = IB_ACCESS_LOCAL_WRITE; +		} else if (!(devattr.device_cap_flags & +			     IB_DEVICE_LOCAL_DMA_LKEY)) {  			need_dma_mr = 1;  			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;  		} else @@ -1191,14 +1209,7 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)  		container_of(xprt, struct svcxprt_rdma, sc_xprt);  	/* -	 * If there are fewer SQ WR available than required to send a -	 * simple response, return false. -	 */ -	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3)) -		return 0; - -	/* -	 * ...or there are already waiters on the SQ, +	 * If there are already waiters on the SQ,  	 * return false.  	 */  	if (waitqueue_active(&rdma->sc_send_wait)) @@ -1208,6 +1219,11 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)  	return 1;  } +static int svc_rdma_secure_port(struct svc_rqst *rqstp) +{ +	return 1; +} +  /*   * Attempt to register the kvec representing the RPC memory with the   * device. diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 285dc088411..66f91f0d071 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -149,6 +149,11 @@ static struct ctl_table sunrpc_table[] = {  #endif +#define RPCRDMA_BIND_TO		(60U * HZ) +#define RPCRDMA_INIT_REEST_TO	(5U * HZ) +#define RPCRDMA_MAX_REEST_TO	(30U * HZ) +#define RPCRDMA_IDLE_DISC_TO	(5U * 60 * HZ) +  static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */  static void @@ -229,7 +234,6 @@ static void  xprt_rdma_destroy(struct rpc_xprt *xprt)  {  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); -	int rc;  	dprintk("RPC:       %s: called\n", __func__); @@ -238,10 +242,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)  	xprt_clear_connected(xprt);  	rpcrdma_buffer_destroy(&r_xprt->rx_buf); -	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); -	if (rc) -		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n", -			__func__, rc); +	rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);  	rpcrdma_ia_close(&r_xprt->rx_ia);  	xprt_rdma_free_addresses(xprt); @@ -289,9 +290,9 @@ xprt_setup_rdma(struct xprt_create *args)  	/* 60 second timeout, no retries */  	xprt->timeout = &xprt_rdma_default_timeout; -	xprt->bind_timeout = (60U * HZ); -	xprt->reestablish_timeout = (5U * HZ); -	xprt->idle_timeout = (5U * 60 * HZ); +	xprt->bind_timeout = RPCRDMA_BIND_TO; +	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; +	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;  	xprt->resvport = 0;		/* privileged port not needed */  	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */ @@ -391,7 +392,7 @@ out4:  	xprt_rdma_free_addresses(xprt);  	rc = -EINVAL;  out3: -	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); +	rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);  out2:  	rpcrdma_ia_close(&new_xprt->rx_ia);  out1: @@ -436,10 +437,10 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)  		schedule_delayed_work(&r_xprt->rdma_connect,  			xprt->reestablish_timeout);  		xprt->reestablish_timeout <<= 1; -		if (xprt->reestablish_timeout > (30 * HZ)) -			xprt->reestablish_timeout = (30 * HZ); -		else if (xprt->reestablish_timeout < (5 * HZ)) -			xprt->reestablish_timeout = (5 * HZ); +		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) +			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; +		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) +			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;  	} else {  		schedule_delayed_work(&r_xprt->rdma_connect, 0);  		if (!RPC_IS_ASYNC(task)) @@ -447,23 +448,6 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)  	}  } -static int -xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ -	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); -	int credits = atomic_read(&r_xprt->rx_buf.rb_credits); - -	/* == RPC_CWNDSCALE @ init, but *after* setup */ -	if (r_xprt->rx_buf.rb_cwndscale == 0UL) { -		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; -		dprintk("RPC:       %s: cwndscale %lu\n", __func__, -			r_xprt->rx_buf.rb_cwndscale); -		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); -	} -	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; -	return xprt_reserve_xprt_cong(xprt, task); -} -  /*   * The RDMA allocate/free functions need the task structure as a place   * to hide the struct rpcrdma_req, which is necessary for the actual send/recv @@ -479,7 +463,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)  	struct rpcrdma_req *req, *nreq;  	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); -	BUG_ON(NULL == req); +	if (req == NULL) +		return NULL;  	if (size > req->rl_size) {  		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: " @@ -503,18 +488,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)  		 * If the allocation or registration fails, the RPC framework  		 * will (doggedly) retry.  		 */ -		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == -				RPCRDMA_BOUNCEBUFFERS) { -			/* forced to "pure inline" */ -			dprintk("RPC:       %s: too much data (%zd) for inline " -					"(r/w max %d/%d)\n", __func__, size, -					rpcx_to_rdmad(xprt).inline_rsize, -					rpcx_to_rdmad(xprt).inline_wsize); -			size = req->rl_size; -			rpc_exit(task, -EIO);		/* fail the operation */ -			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; -			goto out; -		}  		if (task->tk_flags & RPC_TASK_SWAPPER)  			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);  		else @@ -543,7 +516,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)  		req = nreq;  	}  	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req); -out:  	req->rl_connect_cookie = 0;	/* our reserved value */  	return req->rl_xdr_buf; @@ -579,9 +551,7 @@ xprt_rdma_free(void *buffer)  		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");  	/* -	 * Finish the deregistration. When using mw bind, this was -	 * begun in rpcrdma_reply_handler(). In all other modes, we -	 * do it here, in thread context. The process is considered +	 * Finish the deregistration.  The process is considered  	 * complete when the rr_func vector becomes NULL - this  	 * was put in place during rpcrdma_reply_handler() - the wait  	 * call below will not block if the dereg is "done". If @@ -590,12 +560,7 @@ xprt_rdma_free(void *buffer)  	for (i = 0; req->rl_nchunks;) {  		--req->rl_nchunks;  		i += rpcrdma_deregister_external( -			&req->rl_segments[i], r_xprt, NULL); -	} - -	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { -		rep->rr_func = NULL;	/* abandon the callback */ -		req->rl_reply = NULL; +			&req->rl_segments[i], r_xprt);  	}  	if (req->rl_iov.length == 0) {	/* see allocate above */ @@ -630,13 +595,12 @@ xprt_rdma_send_request(struct rpc_task *task)  	struct rpc_xprt *xprt = rqst->rq_xprt;  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); +	int rc; -	/* marshal the send itself */ -	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { -		r_xprt->rx_stats.failed_marshal_count++; -		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n", -			__func__); -		return -EIO; +	if (req->rl_niovs == 0) { +		rc = rpcrdma_marshal_req(rqst); +		if (rc < 0) +			goto failed_marshal;  	}  	if (req->rl_reply == NULL) 		/* e.g. reconnection */ @@ -660,6 +624,12 @@ xprt_rdma_send_request(struct rpc_task *task)  	rqst->rq_bytes_sent = 0;  	return 0; +failed_marshal: +	r_xprt->rx_stats.failed_marshal_count++; +	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n", +		__func__, rc); +	if (rc == -EIO) +		return -EIO;  drop_connection:  	xprt_disconnect_done(xprt);  	return -ENOTCONN;	/* implies disconnect */ @@ -705,7 +675,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)   */  static struct rpc_xprt_ops xprt_rdma_procs = { -	.reserve_xprt		= xprt_rdma_reserve_xprt, +	.reserve_xprt		= xprt_reserve_xprt_cong,  	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */  	.alloc_slot		= xprt_alloc_slot,  	.release_request	= xprt_release_rqst_cong,       /* ditto */ @@ -733,7 +703,7 @@ static void __exit xprt_rdma_cleanup(void)  {  	int rc; -	dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n"); +	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");  #ifdef RPC_DEBUG  	if (sunrpc_table_header) {  		unregister_sysctl_table(sunrpc_table_header); @@ -755,14 +725,14 @@ static int __init xprt_rdma_init(void)  	if (rc)  		return rc; -	dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n"); +	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); -	dprintk(KERN_INFO "Defaults:\n"); -	dprintk(KERN_INFO "\tSlots %d\n" +	dprintk("Defaults:\n"); +	dprintk("\tSlots %d\n"  		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",  		xprt_rdma_slot_table_entries,  		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); -	dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n", +	dprintk("\tPadding %d\n\tMemreg %d\n",  		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);  #ifdef RPC_DEBUG diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 93726560eaa..13dbd1c389f 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -48,8 +48,8 @@   */  #include <linux/interrupt.h> -#include <linux/pci.h>	/* for Tavor hack below */  #include <linux/slab.h> +#include <asm/bitops.h>  #include "xprt_rdma.h" @@ -142,98 +142,139 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)  	}  } -static inline -void rpcrdma_event_process(struct ib_wc *wc) +static void +rpcrdma_sendcq_process_wc(struct ib_wc *wc)  { -	struct rpcrdma_mw *frmr; -	struct rpcrdma_rep *rep = -			(struct rpcrdma_rep *)(unsigned long) wc->wr_id; +	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; -	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n", -		__func__, rep, wc->status, wc->opcode, wc->byte_len); +	dprintk("RPC:       %s: frmr %p status %X opcode %d\n", +		__func__, frmr, wc->status, wc->opcode); -	if (!rep) /* send or bind completion that we don't care about */ +	if (wc->wr_id == 0ULL)  		return; - -	if (IB_WC_SUCCESS != wc->status) { -		dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n", -			__func__, wc->opcode, wc->status); -		rep->rr_len = ~0U; -		if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV) -			rpcrdma_schedule_tasklet(rep); +	if (wc->status != IB_WC_SUCCESS)  		return; -	} -	switch (wc->opcode) { -	case IB_WC_FAST_REG_MR: -		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; +	if (wc->opcode == IB_WC_FAST_REG_MR)  		frmr->r.frmr.state = FRMR_IS_VALID; -		break; -	case IB_WC_LOCAL_INV: -		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; +	else if (wc->opcode == IB_WC_LOCAL_INV)  		frmr->r.frmr.state = FRMR_IS_INVALID; -		break; -	case IB_WC_RECV: -		rep->rr_len = wc->byte_len; -		ib_dma_sync_single_for_cpu( -			rdmab_to_ia(rep->rr_buffer)->ri_id->device, -			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); -		/* Keep (only) the most recent credits, after check validity */ -		if (rep->rr_len >= 16) { -			struct rpcrdma_msg *p = -					(struct rpcrdma_msg *) rep->rr_base; -			unsigned int credits = ntohl(p->rm_credit); -			if (credits == 0) { -				dprintk("RPC:       %s: server" -					" dropped credits to 0!\n", __func__); -				/* don't deadlock */ -				credits = 1; -			} else if (credits > rep->rr_buffer->rb_max_requests) { -				dprintk("RPC:       %s: server" -					" over-crediting: %d (%d)\n", -					__func__, credits, -					rep->rr_buffer->rb_max_requests); -				credits = rep->rr_buffer->rb_max_requests; -			} -			atomic_set(&rep->rr_buffer->rb_credits, credits); -		} -		/* fall through */ -	case IB_WC_BIND_MW: -		rpcrdma_schedule_tasklet(rep); -		break; -	default: -		dprintk("RPC:       %s: unexpected WC event %X\n", -			__func__, wc->opcode); -		break; -	}  } -static inline int -rpcrdma_cq_poll(struct ib_cq *cq) +static int +rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)  { -	struct ib_wc wc; -	int rc; +	struct ib_wc *wcs; +	int budget, count, rc; -	for (;;) { -		rc = ib_poll_cq(cq, 1, &wc); -		if (rc < 0) { -			dprintk("RPC:       %s: ib_poll_cq failed %i\n", -				__func__, rc); +	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; +	do { +		wcs = ep->rep_send_wcs; + +		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); +		if (rc <= 0)  			return rc; -		} -		if (rc == 0) -			break; -		rpcrdma_event_process(&wc); +		count = rc; +		while (count-- > 0) +			rpcrdma_sendcq_process_wc(wcs++); +	} while (rc == RPCRDMA_POLLSIZE && --budget); +	return 0; +} + +/* + * Handle send, fast_reg_mr, and local_inv completions. + * + * Send events are typically suppressed and thus do not result + * in an upcall. Occasionally one is signaled, however. This + * prevents the provider's completion queue from wrapping and + * losing a completion. + */ +static void +rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +{ +	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; +	int rc; + +	rc = rpcrdma_sendcq_poll(cq, ep); +	if (rc) { +		dprintk("RPC:       %s: ib_poll_cq failed: %i\n", +			__func__, rc); +		return;  	} +	rc = ib_req_notify_cq(cq, +			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); +	if (rc == 0) +		return; +	if (rc < 0) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n", +			__func__, rc); +		return; +	} + +	rpcrdma_sendcq_poll(cq, ep); +} + +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc) +{ +	struct rpcrdma_rep *rep = +			(struct rpcrdma_rep *)(unsigned long)wc->wr_id; + +	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n", +		__func__, rep, wc->status, wc->opcode, wc->byte_len); + +	if (wc->status != IB_WC_SUCCESS) { +		rep->rr_len = ~0U; +		goto out_schedule; +	} +	if (wc->opcode != IB_WC_RECV) +		return; + +	rep->rr_len = wc->byte_len; +	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, +			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); + +	if (rep->rr_len >= 16) { +		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base; +		unsigned int credits = ntohl(p->rm_credit); + +		if (credits == 0) +			credits = 1;	/* don't deadlock */ +		else if (credits > rep->rr_buffer->rb_max_requests) +			credits = rep->rr_buffer->rb_max_requests; +		atomic_set(&rep->rr_buffer->rb_credits, credits); +	} + +out_schedule: +	rpcrdma_schedule_tasklet(rep); +} + +static int +rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +{ +	struct ib_wc *wcs; +	int budget, count, rc; + +	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; +	do { +		wcs = ep->rep_recv_wcs; + +		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); +		if (rc <= 0) +			return rc; + +		count = rc; +		while (count-- > 0) +			rpcrdma_recvcq_process_wc(wcs++); +	} while (rc == RPCRDMA_POLLSIZE && --budget);  	return 0;  }  /* - * rpcrdma_cq_event_upcall + * Handle receive completions.   * - * This upcall handles recv, send, bind and unbind events.   * It is reentrant but processes single events in order to maintain   * ordering of receives to keep server credits.   * @@ -242,26 +283,31 @@ rpcrdma_cq_poll(struct ib_cq *cq)   * connection shutdown. That is, the structures required for   * the completion of the reply handler must remain intact until   * all memory has been reclaimed. - * - * Note that send events are suppressed and do not result in an upcall.   */  static void -rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context) +rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)  { +	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;  	int rc; -	rc = rpcrdma_cq_poll(cq); -	if (rc) +	rc = rpcrdma_recvcq_poll(cq, ep); +	if (rc) { +		dprintk("RPC:       %s: ib_poll_cq failed: %i\n", +			__func__, rc);  		return; +	} -	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); -	if (rc) { -		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n", +	rc = ib_req_notify_cq(cq, +			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); +	if (rc == 0) +		return; +	if (rc < 0) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",  			__func__, rc);  		return;  	} -	rpcrdma_cq_poll(cq); +	rpcrdma_recvcq_poll(cq, ep);  }  #ifdef RPC_DEBUG @@ -493,54 +539,32 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;  	} -	switch (memreg) { -	case RPCRDMA_MEMWINDOWS: -	case RPCRDMA_MEMWINDOWS_ASYNC: -		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) { -			dprintk("RPC:       %s: MEMWINDOWS registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; -		} -		break; -	case RPCRDMA_MTHCAFMR: -		if (!ia->ri_id->device->alloc_fmr) { -#if RPCRDMA_PERSISTENT_REGISTRATION -			dprintk("RPC:       %s: MTHCAFMR registration " -				"specified but not supported by adapter, " -				"using riskier RPCRDMA_ALLPHYSICAL\n", -				__func__); -			memreg = RPCRDMA_ALLPHYSICAL; -#else -			dprintk("RPC:       %s: MTHCAFMR registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; -#endif -		} -		break; -	case RPCRDMA_FRMR: +	if (memreg == RPCRDMA_FRMR) {  		/* Requires both frmr reg and local dma lkey */  		if ((devattr.device_cap_flags &  		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=  		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) { -#if RPCRDMA_PERSISTENT_REGISTRATION  			dprintk("RPC:       %s: FRMR registration " -				"specified but not supported by adapter, " -				"using riskier RPCRDMA_ALLPHYSICAL\n", -				__func__); +				"not supported by HCA\n", __func__); +			memreg = RPCRDMA_MTHCAFMR; +		} else { +			/* Mind the ia limit on FRMR page list depth */ +			ia->ri_max_frmr_depth = min_t(unsigned int, +				RPCRDMA_MAX_DATA_SEGS, +				devattr.max_fast_reg_page_list_len); +		} +	} +	if (memreg == RPCRDMA_MTHCAFMR) { +		if (!ia->ri_id->device->alloc_fmr) { +			dprintk("RPC:       %s: MTHCAFMR registration " +				"not supported by HCA\n", __func__); +#if RPCRDMA_PERSISTENT_REGISTRATION  			memreg = RPCRDMA_ALLPHYSICAL;  #else -			dprintk("RPC:       %s: FRMR registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; +			rc = -ENOMEM; +			goto out2;  #endif  		} -		break;  	}  	/* @@ -552,8 +576,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  	 * adapter.  	 */  	switch (memreg) { -	case RPCRDMA_BOUNCEBUFFERS: -	case RPCRDMA_REGISTER:  	case RPCRDMA_FRMR:  		break;  #if RPCRDMA_PERSISTENT_REGISTRATION @@ -563,30 +585,26 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  				IB_ACCESS_REMOTE_READ;  		goto register_setup;  #endif -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		mem_priv = IB_ACCESS_LOCAL_WRITE | -				IB_ACCESS_MW_BIND; -		goto register_setup;  	case RPCRDMA_MTHCAFMR:  		if (ia->ri_have_dma_lkey)  			break;  		mem_priv = IB_ACCESS_LOCAL_WRITE; +#if RPCRDMA_PERSISTENT_REGISTRATION  	register_setup: +#endif  		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);  		if (IS_ERR(ia->ri_bind_mem)) {  			printk(KERN_ALERT "%s: ib_get_dma_mr for " -				"phys register failed with %lX\n\t" -				"Will continue with degraded performance\n", +				"phys register failed with %lX\n",  				__func__, PTR_ERR(ia->ri_bind_mem)); -			memreg = RPCRDMA_REGISTER; -			ia->ri_bind_mem = NULL; +			rc = -ENOMEM; +			goto out2;  		}  		break;  	default: -		printk(KERN_ERR "%s: invalid memory registration mode %d\n", -				__func__, memreg); -		rc = -EINVAL; +		printk(KERN_ERR "RPC: Unsupported memory " +				"registration mode: %d\n", memreg); +		rc = -ENOMEM;  		goto out2;  	}  	dprintk("RPC:       %s: memory registration strategy is %d\n", @@ -640,6 +658,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  				struct rpcrdma_create_data_internal *cdata)  {  	struct ib_device_attr devattr; +	struct ib_cq *sendcq, *recvcq;  	int rc, err;  	rc = ib_query_device(ia->ri_id->device, &devattr); @@ -659,32 +678,42 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	ep->rep_attr.srq = NULL;  	ep->rep_attr.cap.max_send_wr = cdata->max_requests;  	switch (ia->ri_memreg_strategy) { -	case RPCRDMA_FRMR: +	case RPCRDMA_FRMR: { +		int depth = 7; +  		/* Add room for frmr register and invalidate WRs.  		 * 1. FRMR reg WR for head  		 * 2. FRMR invalidate WR for head -		 * 3. FRMR reg WR for pagelist -		 * 4. FRMR invalidate WR for pagelist +		 * 3. N FRMR reg WRs for pagelist +		 * 4. N FRMR invalidate WRs for pagelist  		 * 5. FRMR reg WR for tail  		 * 6. FRMR invalidate WR for tail  		 * 7. The RDMA_SEND WR  		 */ -		ep->rep_attr.cap.max_send_wr *= 7; + +		/* Calculate N if the device max FRMR depth is smaller than +		 * RPCRDMA_MAX_DATA_SEGS. +		 */ +		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) { +			int delta = RPCRDMA_MAX_DATA_SEGS - +				    ia->ri_max_frmr_depth; + +			do { +				depth += 2; /* FRMR reg + invalidate */ +				delta -= ia->ri_max_frmr_depth; +			} while (delta > 0); + +		} +		ep->rep_attr.cap.max_send_wr *= depth;  		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) { -			cdata->max_requests = devattr.max_qp_wr / 7; +			cdata->max_requests = devattr.max_qp_wr / depth;  			if (!cdata->max_requests)  				return -EINVAL; -			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7; +			ep->rep_attr.cap.max_send_wr = cdata->max_requests * +						       depth;  		}  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		/* Add room for mw_binds+unbinds - overkill! */ -		ep->rep_attr.cap.max_send_wr++; -		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS); -		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) -			return -EINVAL; -		break; +	}  	default:  		break;  	} @@ -705,46 +734,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  		ep->rep_attr.cap.max_recv_sge);  	/* set trigger for requesting send completion */ -	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/; -	switch (ia->ri_memreg_strategy) { -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		ep->rep_cqinit -= RPCRDMA_MAX_SEGS; -		break; -	default: -		break; -	} +	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;  	if (ep->rep_cqinit <= 2)  		ep->rep_cqinit = 0;  	INIT_CQCOUNT(ep);  	ep->rep_ia = ia;  	init_waitqueue_head(&ep->rep_connect_wait); +	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); -	/* -	 * Create a single cq for receive dto and mw_bind (only ever -	 * care about unbind, really). Send completions are suppressed. -	 * Use single threaded tasklet upcalls to maintain ordering. -	 */ -	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall, -				  rpcrdma_cq_async_error_upcall, NULL, -				  ep->rep_attr.cap.max_recv_wr + +	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall, +				  rpcrdma_cq_async_error_upcall, ep,  				  ep->rep_attr.cap.max_send_wr + 1, 0); -	if (IS_ERR(ep->rep_cq)) { -		rc = PTR_ERR(ep->rep_cq); -		dprintk("RPC:       %s: ib_create_cq failed: %i\n", +	if (IS_ERR(sendcq)) { +		rc = PTR_ERR(sendcq); +		dprintk("RPC:       %s: failed to create send CQ: %i\n",  			__func__, rc);  		goto out1;  	} -	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP); +	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); +	if (rc) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n", +			__func__, rc); +		goto out2; +	} + +	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall, +				  rpcrdma_cq_async_error_upcall, ep, +				  ep->rep_attr.cap.max_recv_wr + 1, 0); +	if (IS_ERR(recvcq)) { +		rc = PTR_ERR(recvcq); +		dprintk("RPC:       %s: failed to create recv CQ: %i\n", +			__func__, rc); +		goto out2; +	} + +	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);  	if (rc) {  		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",  			__func__, rc); +		ib_destroy_cq(recvcq);  		goto out2;  	} -	ep->rep_attr.send_cq = ep->rep_cq; -	ep->rep_attr.recv_cq = ep->rep_cq; +	ep->rep_attr.send_cq = sendcq; +	ep->rep_attr.recv_cq = recvcq;  	/* Initialize cma parameters */ @@ -754,9 +788,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	/* Client offers RDMA Read but does not initiate */  	ep->rep_remote_cma.initiator_depth = 0; -	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS) -		ep->rep_remote_cma.responder_resources = 0; -	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */ +	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */  		ep->rep_remote_cma.responder_resources = 32;  	else  		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom; @@ -768,7 +800,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	return 0;  out2: -	err = ib_destroy_cq(ep->rep_cq); +	err = ib_destroy_cq(sendcq);  	if (err)  		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",  			__func__, err); @@ -782,11 +814,8 @@ out1:   * Disconnect and destroy endpoint. After this, the only   * valid operations on the ep are to free it (if dynamically   * allocated) or re-create it. - * - * The caller's error handling must be sure to not leak the endpoint - * if this function fails.   */ -int +void  rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  {  	int rc; @@ -794,6 +823,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  	dprintk("RPC:       %s: entering, connected is %d\n",  		__func__, ep->rep_connected); +	cancel_delayed_work_sync(&ep->rep_connect_worker); +  	if (ia->ri_id->qp) {  		rc = rpcrdma_ep_disconnect(ep, ia);  		if (rc) @@ -809,13 +840,17 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  		ep->rep_pad_mr = NULL;  	} -	rpcrdma_clean_cq(ep->rep_cq); -	rc = ib_destroy_cq(ep->rep_cq); +	rpcrdma_clean_cq(ep->rep_attr.recv_cq); +	rc = ib_destroy_cq(ep->rep_attr.recv_cq);  	if (rc)  		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",  			__func__, rc); -	return rc; +	rpcrdma_clean_cq(ep->rep_attr.send_cq); +	rc = ib_destroy_cq(ep->rep_attr.send_cq); +	if (rc) +		dprintk("RPC:       %s: ib_destroy_cq returned %i\n", +			__func__, rc);  }  /* @@ -831,17 +866,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  	if (ep->rep_connected != 0) {  		struct rpcrdma_xprt *xprt;  retry: +		dprintk("RPC:       %s: reconnecting...\n", __func__);  		rc = rpcrdma_ep_disconnect(ep, ia);  		if (rc && rc != -ENOTCONN)  			dprintk("RPC:       %s: rpcrdma_ep_disconnect"  				" status %i\n", __func__, rc); -		rpcrdma_clean_cq(ep->rep_cq); + +		rpcrdma_clean_cq(ep->rep_attr.recv_cq); +		rpcrdma_clean_cq(ep->rep_attr.send_cq);  		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);  		id = rpcrdma_create_id(xprt, ia,  				(struct sockaddr *)&xprt->rx_data.addr);  		if (IS_ERR(id)) { -			rc = PTR_ERR(id); +			rc = -EHOSTUNREACH;  			goto out;  		}  		/* TEMP TEMP TEMP - fail if new device: @@ -855,35 +893,32 @@ retry:  			printk("RPC:       %s: can't reconnect on "  				"different device!\n", __func__);  			rdma_destroy_id(id); -			rc = -ENETDOWN; +			rc = -ENETUNREACH;  			goto out;  		}  		/* END TEMP */ +		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); +		if (rc) { +			dprintk("RPC:       %s: rdma_create_qp failed %i\n", +				__func__, rc); +			rdma_destroy_id(id); +			rc = -ENETUNREACH; +			goto out; +		}  		rdma_destroy_qp(ia->ri_id);  		rdma_destroy_id(ia->ri_id);  		ia->ri_id = id; +	} else { +		dprintk("RPC:       %s: connecting...\n", __func__); +		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); +		if (rc) { +			dprintk("RPC:       %s: rdma_create_qp failed %i\n", +				__func__, rc); +			/* do not update ep->rep_connected */ +			return -ENETUNREACH; +		}  	} -	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); -	if (rc) { -		dprintk("RPC:       %s: rdma_create_qp failed %i\n", -			__func__, rc); -		goto out; -	} - -/* XXX Tavor device performs badly with 2K MTU! */ -if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) { -	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device); -	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR && -	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX || -	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) { -		struct ib_qp_attr attr = { -			.path_mtu = IB_MTU_1024 -		}; -		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU); -	} -} -  	ep->rep_connected = 0;  	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); @@ -944,7 +979,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  {  	int rc; -	rpcrdma_clean_cq(ep->rep_cq); +	rpcrdma_clean_cq(ep->rep_attr.recv_cq); +	rpcrdma_clean_cq(ep->rep_attr.send_cq);  	rc = rdma_disconnect(ia->ri_id);  	if (!rc) {  		/* returns without wait if not connected */ @@ -967,7 +1003,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)  {  	char *p; -	size_t len; +	size_t len, rlen, wlen;  	int i, rc;  	struct rpcrdma_mw *r; @@ -997,11 +1033,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *  				sizeof(struct rpcrdma_mw);  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * -				sizeof(struct rpcrdma_mw); -		break;  	default:  		break;  	} @@ -1032,32 +1063,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	}  	p += cdata->padding; -	/* -	 * Allocate the fmr's, or mw's for mw_bind chunk registration. -	 * We "cycle" the mw's in order to minimize rkey reuse, -	 * and also reduce unbind-to-bind collision. -	 */  	INIT_LIST_HEAD(&buf->rb_mws);  	r = (struct rpcrdma_mw *)p;  	switch (ia->ri_memreg_strategy) {  	case RPCRDMA_FRMR:  		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {  			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd, -							 RPCRDMA_MAX_SEGS); +						ia->ri_max_frmr_depth);  			if (IS_ERR(r->r.frmr.fr_mr)) {  				rc = PTR_ERR(r->r.frmr.fr_mr);  				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"  					" failed %i\n", __func__, rc);  				goto out;  			} -			r->r.frmr.fr_pgl = -				ib_alloc_fast_reg_page_list(ia->ri_id->device, -							    RPCRDMA_MAX_SEGS); +			r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list( +						ia->ri_id->device, +						ia->ri_max_frmr_depth);  			if (IS_ERR(r->r.frmr.fr_pgl)) {  				rc = PTR_ERR(r->r.frmr.fr_pgl);  				dprintk("RPC:       %s: "  					"ib_alloc_fast_reg_page_list "  					"failed %i\n", __func__, rc); + +				ib_dereg_mr(r->r.frmr.fr_mr);  				goto out;  			}  			list_add(&r->mw_list, &buf->rb_mws); @@ -1082,21 +1110,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  			++r;  		}  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		/* Allocate one extra request's worth, for full cycling */ -		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) { -			r->r.mw = ib_alloc_mw(ia->ri_pd, IB_MW_TYPE_1); -			if (IS_ERR(r->r.mw)) { -				rc = PTR_ERR(r->r.mw); -				dprintk("RPC:       %s: ib_alloc_mw" -					" failed %i\n", __func__, rc); -				goto out; -			} -			list_add(&r->mw_list, &buf->rb_mws); -			++r; -		} -		break;  	default:  		break;  	} @@ -1105,16 +1118,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	 * Allocate/init the request/reply buffers. Doing this  	 * using kmalloc for now -- one for each buf.  	 */ +	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req)); +	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep)); +	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n", +		__func__, wlen, rlen); +  	for (i = 0; i < buf->rb_max_requests; i++) {  		struct rpcrdma_req *req;  		struct rpcrdma_rep *rep; -		len = cdata->inline_wsize + sizeof(struct rpcrdma_req); -		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */ -		/* Typical ~2400b, so rounding up saves work later */ -		if (len < 4096) -			len = 4096; -		req = kmalloc(len, GFP_KERNEL); +		req = kmalloc(wlen, GFP_KERNEL);  		if (req == NULL) {  			dprintk("RPC:       %s: request buffer %d alloc"  				" failed\n", __func__, i); @@ -1126,16 +1139,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		buf->rb_send_bufs[i]->rl_buffer = buf;  		rc = rpcrdma_register_internal(ia, req->rl_base, -				len - offsetof(struct rpcrdma_req, rl_base), +				wlen - offsetof(struct rpcrdma_req, rl_base),  				&buf->rb_send_bufs[i]->rl_handle,  				&buf->rb_send_bufs[i]->rl_iov);  		if (rc)  			goto out; -		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req); +		buf->rb_send_bufs[i]->rl_size = wlen - +						sizeof(struct rpcrdma_req); -		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep); -		rep = kmalloc(len, GFP_KERNEL); +		rep = kmalloc(rlen, GFP_KERNEL);  		if (rep == NULL) {  			dprintk("RPC:       %s: reply buffer %d alloc failed\n",  				__func__, i); @@ -1145,10 +1158,9 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		memset(rep, 0, sizeof(struct rpcrdma_rep));  		buf->rb_recv_bufs[i] = rep;  		buf->rb_recv_bufs[i]->rr_buffer = buf; -		init_waitqueue_head(&rep->rr_unbind);  		rc = rpcrdma_register_internal(ia, rep->rr_base, -				len - offsetof(struct rpcrdma_rep, rr_base), +				rlen - offsetof(struct rpcrdma_rep, rr_base),  				&buf->rb_recv_bufs[i]->rr_handle,  				&buf->rb_recv_bufs[i]->rr_iov);  		if (rc) @@ -1179,7 +1191,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  	/* clean up in reverse order from create  	 *   1.  recv mr memory (mr free, then kfree) -	 *   1a. bind mw memory  	 *   2.  send mr memory (mr free, then kfree)  	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]  	 *   4.  arrays @@ -1194,41 +1205,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  			kfree(buf->rb_recv_bufs[i]);  		}  		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) { -			while (!list_empty(&buf->rb_mws)) { -				r = list_entry(buf->rb_mws.next, -					struct rpcrdma_mw, mw_list); -				list_del(&r->mw_list); -				switch (ia->ri_memreg_strategy) { -				case RPCRDMA_FRMR: -					rc = ib_dereg_mr(r->r.frmr.fr_mr); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dereg_mr" -							" failed %i\n", -							__func__, rc); -					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); -					break; -				case RPCRDMA_MTHCAFMR: -					rc = ib_dealloc_fmr(r->r.fmr); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dealloc_fmr" -							" failed %i\n", -							__func__, rc); -					break; -				case RPCRDMA_MEMWINDOWS_ASYNC: -				case RPCRDMA_MEMWINDOWS: -					rc = ib_dealloc_mw(r->r.mw); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dealloc_mw" -							" failed %i\n", -							__func__, rc); -					break; -				default: -					break; -				} -			}  			rpcrdma_deregister_internal(ia,  					buf->rb_send_bufs[i]->rl_handle,  					&buf->rb_send_bufs[i]->rl_iov); @@ -1236,6 +1212,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  		}  	} +	while (!list_empty(&buf->rb_mws)) { +		r = list_entry(buf->rb_mws.next, +			struct rpcrdma_mw, mw_list); +		list_del(&r->mw_list); +		switch (ia->ri_memreg_strategy) { +		case RPCRDMA_FRMR: +			rc = ib_dereg_mr(r->r.frmr.fr_mr); +			if (rc) +				dprintk("RPC:       %s:" +					" ib_dereg_mr" +					" failed %i\n", +					__func__, rc); +			ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); +			break; +		case RPCRDMA_MTHCAFMR: +			rc = ib_dealloc_fmr(r->r.fmr); +			if (rc) +				dprintk("RPC:       %s:" +					" ib_dealloc_fmr" +					" failed %i\n", +					__func__, rc); +			break; +		default: +			break; +		} +	} +  	kfree(buf->rb_pool);  } @@ -1299,21 +1302,17 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)  	int i;  	unsigned long flags; -	BUG_ON(req->rl_nchunks != 0);  	spin_lock_irqsave(&buffers->rb_lock, flags);  	buffers->rb_send_bufs[--buffers->rb_send_index] = req;  	req->rl_niovs = 0;  	if (req->rl_reply) {  		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply; -		init_waitqueue_head(&req->rl_reply->rr_unbind);  		req->rl_reply->rr_func = NULL;  		req->rl_reply = NULL;  	}  	switch (ia->ri_memreg_strategy) {  	case RPCRDMA_FRMR:  	case RPCRDMA_MTHCAFMR: -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS:  		/*  		 * Cycle mw's back in reverse order, and "spin" them.  		 * This delays and scrambles reuse as much as possible. @@ -1358,8 +1357,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)  /*   * Put reply buffers back into pool when not attached to - * request. This happens in error conditions, and when - * aborting unbinds. Pre-decrement counter/array index. + * request. This happens in error conditions.   */  void  rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) @@ -1498,8 +1496,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,  	seg1->mr_offset -= pageoff;	/* start of page */  	seg1->mr_len += pageoff;  	len = -pageoff; -	if (*nsegs > RPCRDMA_MAX_DATA_SEGS) -		*nsegs = RPCRDMA_MAX_DATA_SEGS; +	if (*nsegs > ia->ri_max_frmr_depth) +		*nsegs = ia->ri_max_frmr_depth;  	for (page_no = i = 0; i < *nsegs;) {  		rpcrdma_map_one(ia, seg, writing);  		pa = seg->mr_dma; @@ -1536,10 +1534,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,  	} else  		post_wr = &frmr_wr; -	/* Bump the key */ -	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); -	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); -  	/* Prepare FRMR WR */  	memset(&frmr_wr, 0, sizeof frmr_wr);  	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; @@ -1550,7 +1544,16 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,  	frmr_wr.wr.fast_reg.page_list_len = page_no;  	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;  	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT; -	BUG_ON(frmr_wr.wr.fast_reg.length < len); +	if (frmr_wr.wr.fast_reg.length < len) { +		while (seg1->mr_nsegs--) +			rpcrdma_unmap_one(ia, seg++); +		return -EIO; +	} + +	/* Bump the key */ +	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); +	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); +  	frmr_wr.wr.fast_reg.access_flags = (writing ?  				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :  				IB_ACCESS_REMOTE_READ); @@ -1661,135 +1664,6 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,  	return rc;  } -static int -rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg, -			int *nsegs, int writing, struct rpcrdma_ia *ia, -			struct rpcrdma_xprt *r_xprt) -{ -	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : -				  IB_ACCESS_REMOTE_READ); -	struct ib_mw_bind param; -	int rc; - -	*nsegs = 1; -	rpcrdma_map_one(ia, seg, writing); -	param.bind_info.mr = ia->ri_bind_mem; -	param.wr_id = 0ULL;	/* no send cookie */ -	param.bind_info.addr = seg->mr_dma; -	param.bind_info.length = seg->mr_len; -	param.send_flags = 0; -	param.bind_info.mw_access_flags = mem_priv; - -	DECR_CQCOUNT(&r_xprt->rx_ep); -	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); -	if (rc) { -		dprintk("RPC:       %s: failed ib_bind_mw " -			"%u@0x%llx status %i\n", -			__func__, seg->mr_len, -			(unsigned long long)seg->mr_dma, rc); -		rpcrdma_unmap_one(ia, seg); -	} else { -		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey; -		seg->mr_base = param.bind_info.addr; -		seg->mr_nsegs = 1; -	} -	return rc; -} - -static int -rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg, -			struct rpcrdma_ia *ia, -			struct rpcrdma_xprt *r_xprt, void **r) -{ -	struct ib_mw_bind param; -	LIST_HEAD(l); -	int rc; - -	BUG_ON(seg->mr_nsegs != 1); -	param.bind_info.mr = ia->ri_bind_mem; -	param.bind_info.addr = 0ULL;	/* unbind */ -	param.bind_info.length = 0; -	param.bind_info.mw_access_flags = 0; -	if (*r) { -		param.wr_id = (u64) (unsigned long) *r; -		param.send_flags = IB_SEND_SIGNALED; -		INIT_CQCOUNT(&r_xprt->rx_ep); -	} else { -		param.wr_id = 0ULL; -		param.send_flags = 0; -		DECR_CQCOUNT(&r_xprt->rx_ep); -	} -	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); -	rpcrdma_unmap_one(ia, seg); -	if (rc) -		dprintk("RPC:       %s: failed ib_(un)bind_mw," -			" status %i\n", __func__, rc); -	else -		*r = NULL;	/* will upcall on completion */ -	return rc; -} - -static int -rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg, -			int *nsegs, int writing, struct rpcrdma_ia *ia) -{ -	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : -				  IB_ACCESS_REMOTE_READ); -	struct rpcrdma_mr_seg *seg1 = seg; -	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS]; -	int len, i, rc = 0; - -	if (*nsegs > RPCRDMA_MAX_DATA_SEGS) -		*nsegs = RPCRDMA_MAX_DATA_SEGS; -	for (len = 0, i = 0; i < *nsegs;) { -		rpcrdma_map_one(ia, seg, writing); -		ipb[i].addr = seg->mr_dma; -		ipb[i].size = seg->mr_len; -		len += seg->mr_len; -		++seg; -		++i; -		/* Check for holes */ -		if ((i < *nsegs && offset_in_page(seg->mr_offset)) || -		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len)) -			break; -	} -	seg1->mr_base = seg1->mr_dma; -	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd, -				ipb, i, mem_priv, &seg1->mr_base); -	if (IS_ERR(seg1->mr_chunk.rl_mr)) { -		rc = PTR_ERR(seg1->mr_chunk.rl_mr); -		dprintk("RPC:       %s: failed ib_reg_phys_mr " -			"%u@0x%llx (%d)... status %i\n", -			__func__, len, -			(unsigned long long)seg1->mr_dma, i, rc); -		while (i--) -			rpcrdma_unmap_one(ia, --seg); -	} else { -		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey; -		seg1->mr_nsegs = i; -		seg1->mr_len = len; -	} -	*nsegs = i; -	return rc; -} - -static int -rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg, -			struct rpcrdma_ia *ia) -{ -	struct rpcrdma_mr_seg *seg1 = seg; -	int rc; - -	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr); -	seg1->mr_chunk.rl_mr = NULL; -	while (seg1->mr_nsegs--) -		rpcrdma_unmap_one(ia, seg++); -	if (rc) -		dprintk("RPC:       %s: failed ib_dereg_mr," -			" status %i\n", __func__, rc); -	return rc; -} -  int  rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  			int nsegs, int writing, struct rpcrdma_xprt *r_xprt) @@ -1819,16 +1693,8 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);  		break; -	/* Registration using memory windows */ -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt); -		break; - -	/* Default registration each time */  	default: -		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia); -		break; +		return -1;  	}  	if (rc)  		return -1; @@ -1838,7 +1704,7 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  int  rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg, -		struct rpcrdma_xprt *r_xprt, void *r) +		struct rpcrdma_xprt *r_xprt)  {  	struct rpcrdma_ia *ia = &r_xprt->rx_ia;  	int nsegs = seg->mr_nsegs, rc; @@ -1847,9 +1713,7 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,  #if RPCRDMA_PERSISTENT_REGISTRATION  	case RPCRDMA_ALLPHYSICAL: -		BUG_ON(nsegs != 1);  		rpcrdma_unmap_one(ia, seg); -		rc = 0;  		break;  #endif @@ -1861,21 +1725,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,  		rc = rpcrdma_deregister_fmr_external(seg, ia);  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r); -		break; -  	default: -		rc = rpcrdma_deregister_default_external(seg, ia);  		break;  	} -	if (r) { -		struct rpcrdma_rep *rep = r; -		void (*func)(struct rpcrdma_rep *) = rep->rr_func; -		rep->rr_func = NULL; -		func(rep);	/* dereg done, callback now */ -	}  	return nsegs;  } @@ -1950,7 +1802,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,  	ib_dma_sync_single_for_cpu(ia->ri_id->device,  		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL); -	DECR_CQCOUNT(ep);  	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);  	if (rc) diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index cc1445dc1d1..89e7cd47970 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -43,6 +43,7 @@  #include <linux/wait.h> 		/* wait_queue_head_t, etc */  #include <linux/spinlock.h> 		/* spinlock_t, etc */  #include <linux/atomic.h>			/* atomic_t, etc */ +#include <linux/workqueue.h>		/* struct work_struct */  #include <rdma/rdma_cm.h>		/* RDMA connection api */  #include <rdma/ib_verbs.h>		/* RDMA verbs api */ @@ -66,18 +67,21 @@ struct rpcrdma_ia {  	struct completion	ri_done;  	int			ri_async_rc;  	enum rpcrdma_memreg	ri_memreg_strategy; +	unsigned int		ri_max_frmr_depth;  };  /*   * RDMA Endpoint -- one per transport instance   */ +#define RPCRDMA_WC_BUDGET	(128) +#define RPCRDMA_POLLSIZE	(16) +  struct rpcrdma_ep {  	atomic_t		rep_cqcount;  	int			rep_cqinit;  	int			rep_connected;  	struct rpcrdma_ia	*rep_ia; -	struct ib_cq		*rep_cq;  	struct ib_qp_init_attr	rep_attr;  	wait_queue_head_t 	rep_connect_wait;  	struct ib_sge		rep_pad;	/* holds zeroed pad */ @@ -86,6 +90,9 @@ struct rpcrdma_ep {  	struct rpc_xprt		*rep_xprt;	/* for rep_func */  	struct rdma_conn_param	rep_remote_cma;  	struct sockaddr_storage	rep_remote_addr; +	struct delayed_work	rep_connect_worker; +	struct ib_wc		rep_send_wcs[RPCRDMA_POLLSIZE]; +	struct ib_wc		rep_recv_wcs[RPCRDMA_POLLSIZE];  };  #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) @@ -124,7 +131,6 @@ struct rpcrdma_rep {  	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */  	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */  	struct list_head rr_list;	/* tasklet list */ -	wait_queue_head_t rr_unbind;	/* optional unbind wait */  	struct ib_sge	rr_iov;		/* for posting */  	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */  	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */ @@ -159,7 +165,6 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */  		struct ib_mr	*rl_mr;		/* if registered directly */  		struct rpcrdma_mw {		/* if registered from region */  			union { -				struct ib_mw	*mw;  				struct ib_fmr	*fmr;  				struct {  					struct ib_fast_reg_page_list *fr_pgl; @@ -207,7 +212,6 @@ struct rpcrdma_req {  struct rpcrdma_buffer {  	spinlock_t	rb_lock;	/* protects indexes */  	atomic_t	rb_credits;	/* most recent server credits */ -	unsigned long	rb_cwndscale;	/* cached framework rpc_cwndscale */  	int		rb_max_requests;/* client max requests */  	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */  	int		rb_send_index; @@ -300,7 +304,7 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);   */  int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,  				struct rpcrdma_create_data_internal *); -int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *); +void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);  int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);  int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); @@ -330,11 +334,12 @@ int rpcrdma_deregister_internal(struct rpcrdma_ia *,  int rpcrdma_register_external(struct rpcrdma_mr_seg *,  				int, int, struct rpcrdma_xprt *);  int rpcrdma_deregister_external(struct rpcrdma_mr_seg *, -				struct rpcrdma_xprt *, void *); +				struct rpcrdma_xprt *);  /*   * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c   */ +void rpcrdma_connect_worker(struct work_struct *);  void rpcrdma_conn_func(struct rpcrdma_ep *);  void rpcrdma_reply_handler(struct rpcrdma_rep *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index ee03d35677d..be8bbd5d65e 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -254,9 +254,10 @@ struct sock_xprt {  	/*  	 * Saved socket callback addresses  	 */ -	void			(*old_data_ready)(struct sock *, int); +	void			(*old_data_ready)(struct sock *);  	void			(*old_state_change)(struct sock *);  	void			(*old_write_space)(struct sock *); +	void			(*old_error_report)(struct sock *);  };  /* @@ -274,6 +275,11 @@ struct sock_xprt {   */  #define TCP_RPC_REPLY		(1UL << 6) +static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) +{ +	return (struct rpc_xprt *) sk->sk_user_data; +} +  static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)  {  	return (struct sockaddr *) &xprt->addr; @@ -393,8 +399,10 @@ static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen,  	return kernel_sendmsg(sock, &msg, NULL, 0, 0);  } -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) +static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)  { +	ssize_t (*do_sendpage)(struct socket *sock, struct page *page, +			int offset, size_t size, int flags);  	struct page **ppage;  	unsigned int remainder;  	int err, sent = 0; @@ -403,6 +411,9 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i  	base += xdr->page_base;  	ppage = xdr->pages + (base >> PAGE_SHIFT);  	base &= ~PAGE_MASK; +	do_sendpage = sock->ops->sendpage; +	if (!zerocopy) +		do_sendpage = sock_no_sendpage;  	for(;;) {  		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);  		int flags = XS_SENDMSG_FLAGS; @@ -410,7 +421,7 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i  		remainder -= len;  		if (remainder != 0 || more)  			flags |= MSG_MORE; -		err = sock->ops->sendpage(sock, *ppage, base, len, flags); +		err = do_sendpage(sock, *ppage, base, len, flags);  		if (remainder == 0 || err != len)  			break;  		sent += err; @@ -431,9 +442,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i   * @addrlen: UDP only -- length of destination address   * @xdr: buffer containing this request   * @base: starting position in the buffer + * @zerocopy: true if it is safe to use sendpage()   *   */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) +static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)  {  	unsigned int remainder = xdr->len - base;  	int err, sent = 0; @@ -461,7 +473,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,  	if (base < xdr->page_len) {  		unsigned int len = xdr->page_len - base;  		remainder -= len; -		err = xs_send_pagedata(sock, xdr, base, remainder != 0); +		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);  		if (remainder == 0 || err != len)  			goto out;  		sent += err; @@ -498,6 +510,7 @@ static int xs_nospace(struct rpc_task *task)  	struct rpc_rqst *req = task->tk_rqstp;  	struct rpc_xprt *xprt = req->rq_xprt;  	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); +	struct sock *sk = transport->inet;  	int ret = -EAGAIN;  	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", @@ -515,7 +528,7 @@ static int xs_nospace(struct rpc_task *task)  			 * window size  			 */  			set_bit(SOCK_NOSPACE, &transport->sock->flags); -			transport->inet->sk_write_pending++; +			sk->sk_write_pending++;  			/* ...and wait for more buffer space */  			xprt_wait_for_buffer_space(task, xs_nospace_callback);  		} @@ -525,6 +538,9 @@ static int xs_nospace(struct rpc_task *task)  	}  	spin_unlock_bh(&xprt->transport_lock); + +	/* Race breaker in case memory is freed before above code is called */ +	sk->sk_write_space(sk);  	return ret;  } @@ -564,7 +580,7 @@ static int xs_local_send_request(struct rpc_task *task)  			req->rq_svec->iov_base, req->rq_svec->iov_len);  	status = xs_sendpages(transport->sock, NULL, 0, -						xdr, req->rq_bytes_sent); +						xdr, req->rq_bytes_sent, true);  	dprintk("RPC:       %s(%u) = %d\n",  			__func__, xdr->len - req->rq_bytes_sent, status);  	if (likely(status >= 0)) { @@ -620,7 +636,7 @@ static int xs_udp_send_request(struct rpc_task *task)  	status = xs_sendpages(transport->sock,  			      xs_addr(xprt),  			      xprt->addrlen, xdr, -			      req->rq_bytes_sent); +			      req->rq_bytes_sent, true);  	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",  			xdr->len - req->rq_bytes_sent, status); @@ -693,6 +709,7 @@ static int xs_tcp_send_request(struct rpc_task *task)  	struct rpc_xprt *xprt = req->rq_xprt;  	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);  	struct xdr_buf *xdr = &req->rq_snd_buf; +	bool zerocopy = true;  	int status;  	xs_encode_stream_record_marker(&req->rq_snd_buf); @@ -700,13 +717,20 @@ static int xs_tcp_send_request(struct rpc_task *task)  	xs_pktdump("packet data:",  				req->rq_svec->iov_base,  				req->rq_svec->iov_len); +	/* Don't use zero copy if this is a resend. If the RPC call +	 * completes while the socket holds a reference to the pages, +	 * then we may end up resending corrupted data. +	 */ +	if (task->tk_flags & RPC_TASK_SENT) +		zerocopy = false;  	/* Continue transmitting the packet/record. We must be careful  	 * to cope with writespace callbacks arriving _after_ we have  	 * called sendmsg(). */  	while (1) {  		status = xs_sendpages(transport->sock, -					NULL, 0, xdr, req->rq_bytes_sent); +					NULL, 0, xdr, req->rq_bytes_sent, +					zerocopy);  		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",  				xdr->len - req->rq_bytes_sent, status); @@ -785,6 +809,7 @@ static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)  	transport->old_data_ready = sk->sk_data_ready;  	transport->old_state_change = sk->sk_state_change;  	transport->old_write_space = sk->sk_write_space; +	transport->old_error_report = sk->sk_error_report;  }  static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) @@ -792,6 +817,34 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s  	sk->sk_data_ready = transport->old_data_ready;  	sk->sk_state_change = transport->old_state_change;  	sk->sk_write_space = transport->old_write_space; +	sk->sk_error_report = transport->old_error_report; +} + +/** + * xs_error_report - callback to handle TCP socket state errors + * @sk: socket + * + * Note: we don't call sock_error() since there may be a rpc_task + * using the socket, and so we don't want to clear sk->sk_err. + */ +static void xs_error_report(struct sock *sk) +{ +	struct rpc_xprt *xprt; +	int err; + +	read_lock_bh(&sk->sk_callback_lock); +	if (!(xprt = xprt_from_sock(sk))) +		goto out; + +	err = -sk->sk_err; +	if (err == 0) +		goto out; +	dprintk("RPC:       xs_error_report client %p, error=%d...\n", +			xprt, -err); +	trace_rpc_socket_error(xprt, sk->sk_socket, err); +	xprt_wake_pending_tasks(xprt, err); + out: +	read_unlock_bh(&sk->sk_callback_lock);  }  static void xs_reset_transport(struct sock_xprt *transport) @@ -813,8 +866,6 @@ static void xs_reset_transport(struct sock_xprt *transport)  	xs_restore_old_callbacks(transport, sk);  	write_unlock_bh(&sk->sk_callback_lock); -	sk->sk_no_check = 0; -  	trace_rpc_socket_close(&transport->xprt, sock);  	sock_release(sock);  } @@ -835,14 +886,16 @@ static void xs_close(struct rpc_xprt *xprt)  	dprintk("RPC:       xs_close xprt %p\n", xprt); +	cancel_delayed_work_sync(&transport->connect_worker); +  	xs_reset_transport(transport);  	xprt->reestablish_timeout = 0; -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);  	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);  	clear_bit(XPRT_CLOSING, &xprt->state); -	smp_mb__after_clear_bit(); +	smp_mb__after_atomic();  	xprt_disconnect_done(xprt);  } @@ -854,12 +907,10 @@ static void xs_tcp_close(struct rpc_xprt *xprt)  		xs_tcp_shutdown(xprt);  } -static void xs_local_destroy(struct rpc_xprt *xprt) +static void xs_xprt_free(struct rpc_xprt *xprt)  { -	xs_close(xprt);  	xs_free_peer_addresses(xprt);  	xprt_free(xprt); -	module_put(THIS_MODULE);  }  /** @@ -869,18 +920,11 @@ static void xs_local_destroy(struct rpc_xprt *xprt)   */  static void xs_destroy(struct rpc_xprt *xprt)  { -	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); -  	dprintk("RPC:       xs_destroy xprt %p\n", xprt); -	cancel_delayed_work_sync(&transport->connect_worker); - -	xs_local_destroy(xprt); -} - -static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) -{ -	return (struct rpc_xprt *) sk->sk_user_data; +	xs_close(xprt); +	xs_xprt_free(xprt); +	module_put(THIS_MODULE);  }  static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) @@ -905,7 +949,7 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)   *   * Currently this assumes we can read the whole reply in a single gulp.   */ -static void xs_local_data_ready(struct sock *sk, int len) +static void xs_local_data_ready(struct sock *sk)  {  	struct rpc_task *task;  	struct rpc_xprt *xprt; @@ -968,7 +1012,7 @@ static void xs_local_data_ready(struct sock *sk, int len)   * @len: how much data to read   *   */ -static void xs_udp_data_ready(struct sock *sk, int len) +static void xs_udp_data_ready(struct sock *sk)  {  	struct rpc_task *task;  	struct rpc_xprt *xprt; @@ -1265,41 +1309,29 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,   * If we're unable to obtain the rpc_rqst we schedule the closing of the   * connection and return -1.   */ -static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, +static int xs_tcp_read_callback(struct rpc_xprt *xprt,  				       struct xdr_skb_reader *desc)  {  	struct sock_xprt *transport =  				container_of(xprt, struct sock_xprt, xprt);  	struct rpc_rqst *req; -	req = xprt_alloc_bc_request(xprt); +	/* Look up and lock the request corresponding to the given XID */ +	spin_lock(&xprt->transport_lock); +	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);  	if (req == NULL) { +		spin_unlock(&xprt->transport_lock);  		printk(KERN_WARNING "Callback slot table overflowed\n");  		xprt_force_disconnect(xprt);  		return -1;  	} -	req->rq_xid = transport->tcp_xid;  	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));  	xs_tcp_read_common(xprt, desc, req); -	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { -		struct svc_serv *bc_serv = xprt->bc_serv; - -		/* -		 * Add callback request to callback list.  The callback -		 * service sleeps on the sv_cb_waitq waiting for new -		 * requests.  Wake it up after adding enqueing the -		 * request. -		 */ -		dprintk("RPC:       add callback request to list\n"); -		spin_lock(&bc_serv->sv_cb_lock); -		list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); -		spin_unlock(&bc_serv->sv_cb_lock); -		wake_up(&bc_serv->sv_cb_waitq); -	} - -	req->rq_private_buf.len = transport->tcp_copied; +	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) +		xprt_complete_bc_request(req, transport->tcp_copied); +	spin_unlock(&xprt->transport_lock);  	return 0;  } @@ -1403,7 +1435,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns   * @bytes: how much data to read   *   */ -static void xs_tcp_data_ready(struct sock *sk, int bytes) +static void xs_tcp_data_ready(struct sock *sk)  {  	struct rpc_xprt *xprt;  	read_descriptor_t rd_desc; @@ -1463,12 +1495,12 @@ static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)  static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)  { -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);  	clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);  	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);  	clear_bit(XPRT_CLOSING, &xprt->state); -	smp_mb__after_clear_bit(); +	smp_mb__after_atomic();  }  static void xs_sock_mark_closed(struct rpc_xprt *xprt) @@ -1511,6 +1543,7 @@ static void xs_tcp_state_change(struct sock *sk)  			transport->tcp_copied = 0;  			transport->tcp_flags =  				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; +			xprt->connect_cookie++;  			xprt_wake_pending_tasks(xprt, -EAGAIN);  		} @@ -1521,10 +1554,10 @@ static void xs_tcp_state_change(struct sock *sk)  		xprt->connect_cookie++;  		xprt->reestablish_timeout = 0;  		set_bit(XPRT_CLOSING, &xprt->state); -		smp_mb__before_clear_bit(); +		smp_mb__before_atomic();  		clear_bit(XPRT_CONNECTED, &xprt->state);  		clear_bit(XPRT_CLOSE_WAIT, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);  		break;  	case TCP_CLOSE_WAIT: @@ -1543,9 +1576,9 @@ static void xs_tcp_state_change(struct sock *sk)  	case TCP_LAST_ACK:  		set_bit(XPRT_CLOSING, &xprt->state);  		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); -		smp_mb__before_clear_bit(); +		smp_mb__before_atomic();  		clear_bit(XPRT_CONNECTED, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  		break;  	case TCP_CLOSE:  		xs_tcp_cancel_linger_timeout(xprt); @@ -1666,7 +1699,7 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)  static unsigned short xs_get_random_port(void)  {  	unsigned short range = xprt_max_resvport - xprt_min_resvport; -	unsigned short rand = (unsigned short) net_random() % range; +	unsigned short rand = (unsigned short) prandom_u32() % range;  	return rand + xprt_min_resvport;  } @@ -1816,6 +1849,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock)  }  #endif +static void xs_dummy_setup_socket(struct work_struct *work) +{ +} +  static struct socket *xs_create_sock(struct rpc_xprt *xprt,  		struct sock_xprt *transport, int family, int type, int protocol)  { @@ -1857,6 +1894,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,  		sk->sk_user_data = xprt;  		sk->sk_data_ready = xs_local_data_ready;  		sk->sk_write_space = xs_udp_write_space; +		sk->sk_error_report = xs_error_report;  		sk->sk_allocation = GFP_ATOMIC;  		xprt_clear_connected(xprt); @@ -2006,7 +2044,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  		sk->sk_user_data = xprt;  		sk->sk_data_ready = xs_udp_data_ready;  		sk->sk_write_space = xs_udp_write_space; -		sk->sk_no_check = UDP_CSUM_NORCV;  		sk->sk_allocation = GFP_ATOMIC;  		xprt_set_connected(xprt); @@ -2112,6 +2149,19 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  	if (!transport->inet) {  		struct sock *sk = sock->sk; +		unsigned int keepidle = xprt->timeout->to_initval / HZ; +		unsigned int keepcnt = xprt->timeout->to_retries + 1; +		unsigned int opt_on = 1; + +		/* TCP Keepalive options */ +		kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, +				(char *)&opt_on, sizeof(opt_on)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, +				(char *)&keepidle, sizeof(keepidle)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, +				(char *)&keepidle, sizeof(keepidle)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, +				(char *)&keepcnt, sizeof(keepcnt));  		write_lock_bh(&sk->sk_callback_lock); @@ -2121,6 +2171,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  		sk->sk_data_ready = xs_tcp_data_ready;  		sk->sk_state_change = xs_tcp_state_change;  		sk->sk_write_space = xs_tcp_write_space; +		sk->sk_error_report = xs_error_report;  		sk->sk_allocation = GFP_ATOMIC;  		/* socket options */ @@ -2151,7 +2202,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  	case 0:  	case -EINPROGRESS:  		/* SYN_SENT! */ -		xprt->connect_cookie++;  		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)  			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;  	} @@ -2484,6 +2534,10 @@ static void bc_close(struct rpc_xprt *xprt)  static void bc_destroy(struct rpc_xprt *xprt)  { +	dprintk("RPC:       bc_destroy xprt %p\n", xprt); + +	xs_xprt_free(xprt); +	module_put(THIS_MODULE);  }  static struct rpc_xprt_ops xs_local_ops = { @@ -2498,7 +2552,7 @@ static struct rpc_xprt_ops xs_local_ops = {  	.send_request		= xs_local_send_request,  	.set_retrans_timeout	= xprt_set_retrans_timeout_def,  	.close			= xs_close, -	.destroy		= xs_local_destroy, +	.destroy		= xs_destroy,  	.print_stats		= xs_local_print_stats,  }; @@ -2655,6 +2709,9 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)  	xprt->ops = &xs_local_ops;  	xprt->timeout = &xs_local_default_timeout; +	INIT_DELAYED_WORK(&transport->connect_worker, +			xs_dummy_setup_socket); +  	switch (sun->sun_family) {  	case AF_LOCAL:  		if (sun->sun_path[0] != '/') { @@ -2681,7 +2738,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2759,7 +2816,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2834,12 +2891,11 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)  				xprt->address_strings[RPC_DISPLAY_ADDR],  				xprt->address_strings[RPC_DISPLAY_PROTO]); -  	if (try_module_get(THIS_MODULE))  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2856,15 +2912,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	struct svc_sock *bc_sock;  	struct rpc_xprt *ret; -	if (args->bc_xprt->xpt_bc_xprt) { -		/* -		 * This server connection already has a backchannel -		 * export; we can't create a new one, as we wouldn't be -		 * able to match replies based on xid any more.  So, -		 * reuse the already-existing one: -		 */ -		 return args->bc_xprt->xpt_bc_xprt; -	}  	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,  			xprt_tcp_slot_table_entries);  	if (IS_ERR(xprt)) @@ -2905,10 +2952,9 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	/*  	 * Once we've associated a backchannel xprt with a connection, -	 * we want to keep it around as long as long as the connection -	 * lasts, in case we need to start using it for a backchannel -	 * again; this reference won't be dropped until bc_xprt is -	 * destroyed. +	 * we want to keep it around as long as the connection lasts, +	 * in case we need to start using it for a backchannel again; +	 * this reference won't be dropped until bc_xprt is destroyed.  	 */  	xprt_get(xprt);  	args->bc_xprt->xpt_bc_xprt = xprt; @@ -2923,13 +2969,14 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	 */  	xprt_set_connected(xprt); -  	if (try_module_get(THIS_MODULE))  		return xprt; + +	args->bc_xprt->xpt_bc_xprt = NULL;  	xprt_put(xprt);  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  }  | 
