diff options
Diffstat (limited to 'net/sunrpc/xprtrdma')
| -rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 4 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 222 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 9 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_marshal.c | 82 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 657 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 262 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 96 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 136 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 794 | ||||
| -rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 35 | 
10 files changed, 932 insertions, 1365 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 5a8f268bdd3..da5136fd569 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -1,8 +1,8 @@ -obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o +obj-$(CONFIG_SUNRPC_XPRT_RDMA_CLIENT) += xprtrdma.o  xprtrdma-y := transport.o rpc_rdma.o verbs.o -obj-$(CONFIG_SUNRPC_XPRT_RDMA) += svcrdma.o +obj-$(CONFIG_SUNRPC_XPRT_RDMA_SERVER) += svcrdma.o  svcrdma-y := svc_rdma.o svc_rdma_transport.o \  	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 2ac3f6e8adf..693966d3f33 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,7 @@ static const char transfertypes[][12] = {   * elements. Segments are then coalesced when registered, if possible   * within the selected memreg mode.   * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno.   */  static int @@ -87,6 +86,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)  {  	int len, n = 0, p; +	int page_base; +	struct page **ppages;  	if (pos == 0 && xdrbuf->head[0].iov_len) {  		seg[n].mr_page = NULL; @@ -95,35 +96,40 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  		++n;  	} -	if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) { -		if (n == nsegs) -			return 0; -		seg[n].mr_page = xdrbuf->pages[0]; -		seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base; -		seg[n].mr_len = min_t(u32, -			PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len); -		len = xdrbuf->page_len - seg[n].mr_len; -		++n; -		p = 1; -		while (len > 0) { -			if (n == nsegs) -				return 0; -			seg[n].mr_page = xdrbuf->pages[p]; -			seg[n].mr_offset = NULL; -			seg[n].mr_len = min_t(u32, PAGE_SIZE, len); -			len -= seg[n].mr_len; -			++n; -			++p; +	len = xdrbuf->page_len; +	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); +	page_base = xdrbuf->page_base & ~PAGE_MASK; +	p = 0; +	while (len && n < nsegs) { +		if (!ppages[p]) { +			/* alloc the pagelist for receiving buffer */ +			ppages[p] = alloc_page(GFP_ATOMIC); +			if (!ppages[p]) +				return -ENOMEM;  		} +		seg[n].mr_page = ppages[p]; +		seg[n].mr_offset = (void *)(unsigned long) page_base; +		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); +		if (seg[n].mr_len > PAGE_SIZE) +			return -EIO; +		len -= seg[n].mr_len; +		++n; +		++p; +		page_base = 0;	/* page offset only applies to first page */  	} +	/* Message overflows the seg array */ +	if (len && n == nsegs) +		return -EIO; +  	if (xdrbuf->tail[0].iov_len) {  		/* the rpcrdma protocol allows us to omit any trailing  		 * xdr pad bytes, saving the server an RDMA operation. */  		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)  			return n;  		if (n == nsegs) -			return 0; +			/* Tail remains, but we're out of segments */ +			return -EIO;  		seg[n].mr_page = NULL;  		seg[n].mr_offset = xdrbuf->tail[0].iov_base;  		seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,   *  Reply chunk (a counted array):   *   N elements:   *    1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno.   */ -static unsigned int +static ssize_t  rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)  {  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst); -	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt); -	int nsegs, nchunks = 0; +	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); +	int n, nsegs, nchunks = 0;  	unsigned int pos;  	struct rpcrdma_mr_seg *seg = req->rl_segments;  	struct rpcrdma_read_chunk *cur_rchunk = NULL; @@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		pos = target->head[0].iov_len;  	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); -	if (nsegs == 0) -		return 0; +	if (nsegs < 0) +		return nsegs;  	do { -		/* bind/register the memory, then build chunk from result. */ -		int n = rpcrdma_register_external(seg, nsegs, +		n = rpcrdma_register_external(seg, nsegs,  						cur_wchunk != NULL, r_xprt);  		if (n <= 0)  			goto out; @@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  	/* success. all failures return above */  	req->rl_nchunks = nchunks; -	BUG_ON(nchunks == 0); -	BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) -	       && (nchunks > 3)); -  	/*  	 * finish off header. If write, marshal discrim and nchunks.  	 */ @@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  out:  	for (pos = 0; nchunks--;)  		pos += rpcrdma_deregister_external( -				&req->rl_segments[pos], r_xprt, NULL); -	return 0; +				&req->rl_segments[pos], r_xprt); +	return n;  }  /* @@ -296,6 +299,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)  	int copy_len;  	unsigned char *srcp, *destp;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); +	int page_base; +	struct page **ppages;  	destp = rqst->rq_svec[0].iov_base;  	curlen = rqst->rq_svec[0].iov_len; @@ -324,28 +329,25 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)  			__func__, destp + copy_len, curlen);  		rqst->rq_svec[0].iov_len += curlen;  	} -  	r_xprt->rx_stats.pullup_copy_count += copy_len; -	npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT; + +	page_base = rqst->rq_snd_buf.page_base; +	ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); +	page_base &= ~PAGE_MASK; +	npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;  	for (i = 0; copy_len && i < npages; i++) { -		if (i == 0) -			curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base; -		else -			curlen = PAGE_SIZE; +		curlen = PAGE_SIZE - page_base;  		if (curlen > copy_len)  			curlen = copy_len;  		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",  			__func__, i, destp, copy_len, curlen); -		srcp = kmap_atomic(rqst->rq_snd_buf.pages[i], -					KM_SKB_SUNRPC_DATA); -		if (i == 0) -			memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen); -		else -			memcpy(destp, srcp, curlen); -		kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA); +		srcp = kmap_atomic(ppages[i]); +		memcpy(destp, srcp+page_base, curlen); +		kunmap_atomic(srcp);  		rqst->rq_svec[0].iov_len += curlen;  		destp += curlen;  		copy_len -= curlen; +		page_base = 0;  	}  	/* header now contains entire send message */  	return pad; @@ -362,16 +364,19 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)   *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.   *  [2] -- optional padding.   *  [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno.   */  int  rpcrdma_marshal_req(struct rpc_rqst *rqst)  { -	struct rpc_xprt *xprt = rqst->rq_task->tk_xprt; +	struct rpc_xprt *xprt = rqst->rq_xprt;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	char *base; -	size_t hdrlen, rpclen, padlen; +	size_t rpclen, padlen; +	ssize_t hdrlen;  	enum rpcrdma_chunktype rtype, wtype;  	struct rpcrdma_msg *headerp; @@ -442,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  	/* The following simplification is not true forever */  	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)  		wtype = rpcrdma_noch; -	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - -	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && -	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { -		/* forced to "pure inline"? */ -		dprintk("RPC:       %s: too much data (%d/%d) for inline\n", -			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); -		return -1; +	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { +		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n", +			__func__); +		return -EIO;  	}  	hdrlen = 28; /*sizeof *headerp;*/ @@ -475,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;  			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;  			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ -			BUG_ON(wtype != rpcrdma_noch); - +			if (wtype != rpcrdma_noch) { +				dprintk("RPC:       %s: invalid chunk list\n", +					__func__); +				return -EIO; +			}  		} else {  			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;  			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; @@ -493,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			 * on receive. Therefore, we request a reply chunk  			 * for non-writes wherever feasible and efficient.  			 */ -			if (wtype == rpcrdma_noch && -			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) +			if (wtype == rpcrdma_noch)  				wtype = rpcrdma_replych;  		}  	} @@ -512,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  		hdrlen = rpcrdma_create_chunks(rqst,  					&rqst->rq_rcv_buf, headerp, wtype);  	} - -	if (hdrlen == 0) -		return -1; +	if (hdrlen < 0) +		return hdrlen;  	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"  		" headerp 0x%p base 0x%p lkey 0x%x\n", @@ -606,6 +608,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  {  	int i, npages, curlen, olen;  	char *destp; +	struct page **ppages; +	int page_base;  	curlen = rqst->rq_rcv_buf.head[0].iov_len;  	if (curlen > copy_len) {	/* write chunk header fixup */ @@ -624,36 +628,31 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  	olen = copy_len;  	i = 0;  	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; +	page_base = rqst->rq_rcv_buf.page_base; +	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); +	page_base &= ~PAGE_MASK; +  	if (copy_len && rqst->rq_rcv_buf.page_len) { -		npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base + +		npages = PAGE_ALIGN(page_base +  			rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;  		for (; i < npages; i++) { -			if (i == 0) -				curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base; -			else -				curlen = PAGE_SIZE; +			curlen = PAGE_SIZE - page_base;  			if (curlen > copy_len)  				curlen = copy_len;  			dprintk("RPC:       %s: page %d"  				" srcp 0x%p len %d curlen %d\n",  				__func__, i, srcp, copy_len, curlen); -			destp = kmap_atomic(rqst->rq_rcv_buf.pages[i], -						KM_SKB_SUNRPC_DATA); -			if (i == 0) -				memcpy(destp + rqst->rq_rcv_buf.page_base, -						srcp, curlen); -			else -				memcpy(destp, srcp, curlen); -			flush_dcache_page(rqst->rq_rcv_buf.pages[i]); -			kunmap_atomic(destp, KM_SKB_SUNRPC_DATA); +			destp = kmap_atomic(ppages[i]); +			memcpy(destp + page_base, srcp, curlen); +			flush_dcache_page(ppages[i]); +			kunmap_atomic(destp);  			srcp += curlen;  			copy_len -= curlen;  			if (copy_len == 0)  				break; +			page_base = 0;  		} -		rqst->rq_rcv_buf.page_len = olen - copy_len; -	} else -		rqst->rq_rcv_buf.page_len = 0; +	}  	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {  		curlen = copy_len; @@ -684,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  	rqst->rq_private_buf = rqst->rq_rcv_buf;  } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */  void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work)  { +	struct rpcrdma_ep *ep = +		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);  	struct rpc_xprt *xprt = ep->rep_xprt;  	spin_lock_bh(&xprt->transport_lock); @@ -709,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)  }  /* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest.   */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +void +rpcrdma_conn_func(struct rpcrdma_ep *ep)  { -	wake_up(&rep->rr_unbind); +	schedule_delayed_work(&ep->rep_connect_worker, 0);  }  /* @@ -732,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)  	struct rpc_xprt *xprt = rep->rr_xprt;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	__be32 *iptr; -	int i, rdmalen, status; +	int rdmalen, status; +	unsigned long cwnd;  	/* Check status. If bad, signal disconnect and return rep to pool */  	if (rep->rr_len == ~0U) { @@ -773,15 +771,21 @@ repost:  	/* get request object */  	req = rpcr_to_rdmar(rqst); +	if (req->rl_reply) { +		spin_unlock(&xprt->transport_lock); +		dprintk("RPC:       %s: duplicate reply 0x%p to RPC " +			"request 0x%p: xid 0x%08x\n", __func__, rep, req, +			headerp->rm_xid); +		goto repost; +	}  	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"  		"                   RPC request 0x%p xid 0x%08x\n",  			__func__, rep, req, rqst, headerp->rm_xid); -	BUG_ON(!req || req->rl_reply); -  	/* from here on, the reply is no longer an orphan */  	req->rl_reply = rep; +	xprt->reestablish_timeout = 0;  	/* check for expected message types */  	/* The order of some of these tests is important. */ @@ -856,26 +860,10 @@ badheader:  		break;  	} -	/* If using mw bind, start the deregister process now. */ -	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */ -	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { -	case RPCRDMA_MEMWINDOWS: -		for (i = 0; req->rl_nchunks-- > 1;) -			i += rpcrdma_deregister_external( -				&req->rl_segments[i], r_xprt, NULL); -		/* Optionally wait (not here) for unbinds to complete */ -		rep->rr_func = rpcrdma_unbind_func; -		(void) rpcrdma_deregister_external(&req->rl_segments[i], -						   r_xprt, rep); -		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -		for (i = 0; req->rl_nchunks--;) -			i += rpcrdma_deregister_external(&req->rl_segments[i], -							 r_xprt, NULL); -		break; -	default: -		break; -	} +	cwnd = xprt->cwnd; +	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; +	if (xprt->cwnd > cwnd) +		xprt_release_rqst_cong(rqst->rq_task);  	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",  			__func__, xprt, rqst, status); diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index 09af4fab1a4..c1b6270262c 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -47,6 +47,7 @@  #include <linux/sunrpc/clnt.h>  #include <linux/sunrpc/sched.h>  #include <linux/sunrpc/svc_rdma.h> +#include "xprt_rdma.h"  #define RPCDBG_FACILITY	RPCDBG_SVCXPRT @@ -83,7 +84,7 @@ struct workqueue_struct *svc_rdma_wq;   * resets the associated statistic to zero. Any read returns it's   * current value.   */ -static int read_reset_stat(ctl_table *table, int write, +static int read_reset_stat(struct ctl_table *table, int write,  			   void __user *buffer, size_t *lenp,  			   loff_t *ppos)  { @@ -118,7 +119,7 @@ static int read_reset_stat(ctl_table *table, int write,  }  static struct ctl_table_header *svcrdma_table_header; -static ctl_table svcrdma_parm_table[] = { +static struct ctl_table svcrdma_parm_table[] = {  	{  		.procname	= "max_requests",  		.data		= &svcrdma_max_requests, @@ -213,7 +214,7 @@ static ctl_table svcrdma_parm_table[] = {  	{ },  }; -static ctl_table svcrdma_table[] = { +static struct ctl_table svcrdma_table[] = {  	{  		.procname	= "svc_rdma",  		.mode		= 0555, @@ -222,7 +223,7 @@ static ctl_table svcrdma_table[] = {  	{ },  }; -static ctl_table svcrdma_root_table[] = { +static struct ctl_table svcrdma_root_table[] = {  	{  		.procname	= "sunrpc",  		.mode		= 0555, diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index 9530ef2d40d..65b146297f5 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -60,21 +60,11 @@ static u32 *decode_read_list(u32 *va, u32 *vaend)  	struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;  	while (ch->rc_discrim != xdr_zero) { -		u64 ch_offset; -  		if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >  		    (unsigned long)vaend) {  			dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);  			return NULL;  		} - -		ch->rc_discrim = ntohl(ch->rc_discrim); -		ch->rc_position = ntohl(ch->rc_position); -		ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle); -		ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length); -		va = (u32 *)&ch->rc_target.rs_offset; -		xdr_decode_hyper(va, &ch_offset); -		put_unaligned(ch_offset, (u64 *)va);  		ch++;  	}  	return (u32 *)&ch->rc_position; @@ -91,7 +81,7 @@ void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,  	*byte_count = 0;  	*ch_count = 0;  	for (; ch->rc_discrim != 0; ch++) { -		*byte_count = *byte_count + ch->rc_target.rs_length; +		*byte_count = *byte_count + ntohl(ch->rc_target.rs_length);  		*ch_count = *ch_count + 1;  	}  } @@ -108,7 +98,9 @@ void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,   */  static u32 *decode_write_list(u32 *va, u32 *vaend)  { -	int ch_no; +	unsigned long start, end; +	int nchunks; +  	struct rpcrdma_write_array *ary =  		(struct rpcrdma_write_array *)va; @@ -121,37 +113,28 @@ static u32 *decode_write_list(u32 *va, u32 *vaend)  		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);  		return NULL;  	} -	ary->wc_discrim = ntohl(ary->wc_discrim); -	ary->wc_nchunks = ntohl(ary->wc_nchunks); -	if (((unsigned long)&ary->wc_array[0] + -	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) > -	    (unsigned long)vaend) { +	nchunks = ntohl(ary->wc_nchunks); + +	start = (unsigned long)&ary->wc_array[0]; +	end = (unsigned long)vaend; +	if (nchunks < 0 || +	    nchunks > (SIZE_MAX - start) / sizeof(struct rpcrdma_write_chunk) || +	    (start + (sizeof(struct rpcrdma_write_chunk) * nchunks)) > end) {  		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n", -			ary, ary->wc_nchunks, vaend); +			ary, nchunks, vaend);  		return NULL;  	} -	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) { -		u64 ch_offset; - -		ary->wc_array[ch_no].wc_target.rs_handle = -			ntohl(ary->wc_array[ch_no].wc_target.rs_handle); -		ary->wc_array[ch_no].wc_target.rs_length = -			ntohl(ary->wc_array[ch_no].wc_target.rs_length); -		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset; -		xdr_decode_hyper(va, &ch_offset); -		put_unaligned(ch_offset, (u64 *)va); -	} -  	/*  	 * rs_length is the 2nd 4B field in wc_target and taking its  	 * address skips the list terminator  	 */ -	return (u32 *)&ary->wc_array[ch_no].wc_target.rs_length; +	return (u32 *)&ary->wc_array[nchunks].wc_target.rs_length;  }  static u32 *decode_reply_array(u32 *va, u32 *vaend)  { -	int ch_no; +	unsigned long start, end; +	int nchunks;  	struct rpcrdma_write_array *ary =  		(struct rpcrdma_write_array *)va; @@ -164,28 +147,18 @@ static u32 *decode_reply_array(u32 *va, u32 *vaend)  		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);  		return NULL;  	} -	ary->wc_discrim = ntohl(ary->wc_discrim); -	ary->wc_nchunks = ntohl(ary->wc_nchunks); -	if (((unsigned long)&ary->wc_array[0] + -	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) > -	    (unsigned long)vaend) { +	nchunks = ntohl(ary->wc_nchunks); + +	start = (unsigned long)&ary->wc_array[0]; +	end = (unsigned long)vaend; +	if (nchunks < 0 || +	    nchunks > (SIZE_MAX - start) / sizeof(struct rpcrdma_write_chunk) || +	    (start + (sizeof(struct rpcrdma_write_chunk) * nchunks)) > end) {  		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n", -			ary, ary->wc_nchunks, vaend); +			ary, nchunks, vaend);  		return NULL;  	} -	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) { -		u64 ch_offset; - -		ary->wc_array[ch_no].wc_target.rs_handle = -			ntohl(ary->wc_array[ch_no].wc_target.rs_handle); -		ary->wc_array[ch_no].wc_target.rs_length = -			ntohl(ary->wc_array[ch_no].wc_target.rs_length); -		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset; -		xdr_decode_hyper(va, &ch_offset); -		put_unaligned(ch_offset, (u64 *)va); -	} - -	return (u32 *)&ary->wc_array[ch_no]; +	return (u32 *)&ary->wc_array[nchunks];  }  int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req, @@ -386,13 +359,14 @@ void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,  void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,  				     int chunk_no, -				     u32 rs_handle, u64 rs_offset, +				     __be32 rs_handle, +				     __be64 rs_offset,  				     u32 write_len)  {  	struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target; -	seg->rs_handle = htonl(rs_handle); +	seg->rs_handle = rs_handle; +	seg->rs_offset = rs_offset;  	seg->rs_length = htonl(write_len); -	xdr_encode_hyper((u32 *) &seg->rs_offset, rs_offset);  }  void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt, diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index df67211c4ba..8f92a61ee2d 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -69,7 +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  	/* Set up the XDR head */  	rqstp->rq_arg.head[0].iov_base = page_address(page); -	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); +	rqstp->rq_arg.head[0].iov_len = +		min_t(size_t, byte_count, ctxt->sge[0].length);  	rqstp->rq_arg.len = byte_count;  	rqstp->rq_arg.buflen = byte_count; @@ -85,11 +87,12 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  		page = ctxt->pages[sge_no];  		put_page(rqstp->rq_pages[sge_no]);  		rqstp->rq_pages[sge_no] = page; -		bc -= min(bc, ctxt->sge[sge_no].length); +		bc -= min_t(u32, bc, ctxt->sge[sge_no].length);  		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;  		sge_no++;  	}  	rqstp->rq_respages = &rqstp->rq_pages[sge_no]; +	rqstp->rq_next_page = rqstp->rq_respages + 1;  	/* We should never run out of SGE because the limit is defined to  	 * support the max allowed RPC data length @@ -112,288 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,  	rqstp->rq_arg.tail[0].iov_len = 0;  } -/* Encode a read-chunk-list as an array of IB SGE - * - * Assumptions: - * - chunk[0]->position points to pages[0] at an offset of 0 - * - pages[] is not physically or virtually contiguous and consists of - *   PAGE_SIZE elements. - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - *   chunk in the read list - * - */ -static int map_read_chunks(struct svcxprt_rdma *xprt, -			   struct svc_rqst *rqstp, -			   struct svc_rdma_op_ctxt *head, -			   struct rpcrdma_msg *rmsgp, -			   struct svc_rdma_req_map *rpl_map, -			   struct svc_rdma_req_map *chl_map, -			   int ch_count, -			   int byte_count) +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)  { -	int sge_no; -	int sge_bytes; -	int page_off; -	int page_no; -	int ch_bytes; -	int ch_no; -	struct rpcrdma_read_chunk *ch; +	if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == +	     RDMA_TRANSPORT_IWARP) +		return 1; +	else +		return min_t(int, sge_count, xprt->sc_max_sge); +} -	sge_no = 0; -	page_no = 0; -	page_off = 0; -	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	ch_no = 0; -	ch_bytes = ch->rc_target.rs_length; -	head->arg.head[0] = rqstp->rq_arg.head[0]; -	head->arg.tail[0] = rqstp->rq_arg.tail[0]; -	head->arg.pages = &head->pages[head->count]; -	head->hdr_count = head->count; /* save count of hdr pages */ -	head->arg.page_base = 0; -	head->arg.page_len = ch_bytes; -	head->arg.len = rqstp->rq_arg.len + ch_bytes; -	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; -	head->count++; -	chl_map->ch[0].start = 0; -	while (byte_count) { -		rpl_map->sge[sge_no].iov_base = -			page_address(rqstp->rq_arg.pages[page_no]) + page_off; -		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); -		rpl_map->sge[sge_no].iov_len = sge_bytes; -		/* -		 * Don't bump head->count here because the same page -		 * may be used by multiple SGE. -		 */ -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; -		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt, +			      struct svc_rqst *rqstp, +			      struct svc_rdma_op_ctxt *head, +			      int *page_no, +			      u32 *page_offset, +			      u32 rs_handle, +			      u32 rs_length, +			      u64 rs_offset, +			      int last); + +/* Issue an RDMA_READ using the local lkey to map the data sink */ +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, +			       struct svc_rqst *rqstp, +			       struct svc_rdma_op_ctxt *head, +			       int *page_no, +			       u32 *page_offset, +			       u32 rs_handle, +			       u32 rs_length, +			       u64 rs_offset, +			       int last) +{ +	struct ib_send_wr read_wr; +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); +	int ret, read, pno; +	u32 pg_off = *page_offset; +	u32 pg_no = *page_no; -		byte_count -= sge_bytes; -		ch_bytes -= sge_bytes; -		sge_no++; -		/* -		 * If all bytes for this chunk have been mapped to an -		 * SGE, move to the next SGE -		 */ -		if (ch_bytes == 0) { -			chl_map->ch[ch_no].count = -				sge_no - chl_map->ch[ch_no].start; -			ch_no++; -			ch++; -			chl_map->ch[ch_no].start = sge_no; -			ch_bytes = ch->rc_target.rs_length; -			/* If bytes remaining account for next chunk */ -			if (byte_count) { -				head->arg.page_len += ch_bytes; -				head->arg.len += ch_bytes; -				head->arg.buflen += ch_bytes; -			} +	ctxt->direction = DMA_FROM_DEVICE; +	ctxt->read_hdr = head; +	pages_needed = +		min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed)); +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); + +	for (pno = 0; pno < pages_needed; pno++) { +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off); + +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; +		head->arg.page_len += len; +		head->arg.len += len; +		if (!pg_off) +			head->count++; +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; +		rqstp->rq_next_page = rqstp->rq_respages + 1; +		ctxt->sge[pno].addr = +			ib_dma_map_page(xprt->sc_cm_id->device, +					head->arg.pages[pg_no], pg_off, +					PAGE_SIZE - pg_off, +					DMA_FROM_DEVICE); +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device, +					   ctxt->sge[pno].addr); +		if (ret) +			goto err; +		atomic_inc(&xprt->sc_dma_used); + +		/* The lkey here is either a local dma lkey or a dma_mr lkey */ +		ctxt->sge[pno].lkey = xprt->sc_dma_lkey; +		ctxt->sge[pno].length = len; +		ctxt->count++; + +		/* adjust offset and wrap to next page if needed */ +		pg_off += len; +		if (pg_off == PAGE_SIZE) { +			pg_off = 0; +			pg_no++;  		} -		/* -		 * If this SGE consumed all of the page, move to the -		 * next page -		 */ -		if ((sge_bytes + page_off) == PAGE_SIZE) { -			page_no++; -			page_off = 0; -			/* -			 * If there are still bytes left to map, bump -			 * the page count -			 */ -			if (byte_count) -				head->count++; -		} else -			page_off += sge_bytes; +		rs_length -= len; +	} + +	if (last && rs_length == 0) +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); +	else +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); + +	memset(&read_wr, 0, sizeof(read_wr)); +	read_wr.wr_id = (unsigned long)ctxt; +	read_wr.opcode = IB_WR_RDMA_READ; +	ctxt->wr_op = read_wr.opcode; +	read_wr.send_flags = IB_SEND_SIGNALED; +	read_wr.wr.rdma.rkey = rs_handle; +	read_wr.wr.rdma.remote_addr = rs_offset; +	read_wr.sg_list = ctxt->sge; +	read_wr.num_sge = pages_needed; + +	ret = svc_rdma_send(xprt, &read_wr); +	if (ret) { +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +		goto err;  	} -	BUG_ON(byte_count != 0); -	return sge_no; + +	/* return current location in page array */ +	*page_no = pg_no; +	*page_offset = pg_off; +	ret = read; +	atomic_inc(&rdma_stat_read); +	return ret; + err: +	svc_rdma_unmap_dma(ctxt); +	svc_rdma_put_context(ctxt, 0); +	return ret;  } -/* Map a read-chunk-list to an XDR and fast register the page-list. - * - * Assumptions: - * - chunk[0]	position points to pages[0] at an offset of 0 - * - pages[]	will be made physically contiguous by creating a one-off memory - *		region using the fastreg verb. - * - byte_count is # of bytes in read-chunk-list - * - ch_count	is # of chunks in read-chunk-list - * - * Output: - * - sge array pointing into pages[] array. - * - chunk_sge array specifying sge index and count for each - *   chunk in the read list - */ -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt, +/* Issue an RDMA_READ using an FRMR to map the data sink */ +static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,  				struct svc_rqst *rqstp,  				struct svc_rdma_op_ctxt *head, -				struct rpcrdma_msg *rmsgp, -				struct svc_rdma_req_map *rpl_map, -				struct svc_rdma_req_map *chl_map, -				int ch_count, -				int byte_count) +				int *page_no, +				u32 *page_offset, +				u32 rs_handle, +				u32 rs_length, +				u64 rs_offset, +				int last)  { -	int page_no; -	int ch_no; -	u32 offset; -	struct rpcrdma_read_chunk *ch; -	struct svc_rdma_fastreg_mr *frmr; -	int ret = 0; +	struct ib_send_wr read_wr; +	struct ib_send_wr inv_wr; +	struct ib_send_wr fastreg_wr; +	u8 key; +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT; +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt); +	struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt); +	int ret, read, pno; +	u32 pg_off = *page_offset; +	u32 pg_no = *page_no; -	frmr = svc_rdma_get_frmr(xprt);  	if (IS_ERR(frmr))  		return -ENOMEM; -	head->frmr = frmr; -	head->arg.head[0] = rqstp->rq_arg.head[0]; -	head->arg.tail[0] = rqstp->rq_arg.tail[0]; -	head->arg.pages = &head->pages[head->count]; -	head->hdr_count = head->count; /* save count of hdr pages */ -	head->arg.page_base = 0; -	head->arg.page_len = byte_count; -	head->arg.len = rqstp->rq_arg.len + byte_count; -	head->arg.buflen = rqstp->rq_arg.buflen + byte_count; +	ctxt->direction = DMA_FROM_DEVICE; +	ctxt->frmr = frmr; +	pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len); +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length); -	/* Fast register the page list */ -	frmr->kva = page_address(rqstp->rq_arg.pages[0]); +	frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);  	frmr->direction = DMA_FROM_DEVICE;  	frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); -	frmr->map_len = byte_count; -	frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT; -	for (page_no = 0; page_no < frmr->page_list_len; page_no++) { -		frmr->page_list->page_list[page_no] = +	frmr->map_len = pages_needed << PAGE_SHIFT; +	frmr->page_list_len = pages_needed; + +	for (pno = 0; pno < pages_needed; pno++) { +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off); + +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; +		head->arg.page_len += len; +		head->arg.len += len; +		if (!pg_off) +			head->count++; +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1]; +		rqstp->rq_next_page = rqstp->rq_respages + 1; +		frmr->page_list->page_list[pno] =  			ib_dma_map_page(xprt->sc_cm_id->device, -					rqstp->rq_arg.pages[page_no], 0, +					head->arg.pages[pg_no], 0,  					PAGE_SIZE, DMA_FROM_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device, +					   frmr->page_list->page_list[pno]); +		if (ret) +			goto err;  		atomic_inc(&xprt->sc_dma_used); -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; -	} -	head->count += page_no; - -	/* rq_respages points one past arg pages */ -	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; -	/* Create the reply and chunk maps */ -	offset = 0; -	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	for (ch_no = 0; ch_no < ch_count; ch_no++) { -		rpl_map->sge[ch_no].iov_base = frmr->kva + offset; -		rpl_map->sge[ch_no].iov_len = ch->rc_target.rs_length; -		chl_map->ch[ch_no].count = 1; -		chl_map->ch[ch_no].start = ch_no; -		offset += ch->rc_target.rs_length; -		ch++; +		/* adjust offset and wrap to next page if needed */ +		pg_off += len; +		if (pg_off == PAGE_SIZE) { +			pg_off = 0; +			pg_no++; +		} +		rs_length -= len;  	} -	ret = svc_rdma_fastreg(xprt, frmr); -	if (ret) -		goto fatal_err; - -	return ch_no; - - fatal_err: -	printk("svcrdma: error fast registering xdr for xprt %p", xprt); -	svc_rdma_put_frmr(xprt, frmr); -	return -EIO; -} - -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt, -			     struct svc_rdma_op_ctxt *ctxt, -			     struct svc_rdma_fastreg_mr *frmr, -			     struct kvec *vec, -			     u64 *sgl_offset, -			     int count) -{ -	int i; -	unsigned long off; +	if (last && rs_length == 0) +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); +	else +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -	ctxt->count = count; -	ctxt->direction = DMA_FROM_DEVICE; -	for (i = 0; i < count; i++) { -		ctxt->sge[i].length = 0; /* in case map fails */ -		if (!frmr) { -			BUG_ON(0 == virt_to_page(vec[i].iov_base)); -			off = (unsigned long)vec[i].iov_base & ~PAGE_MASK; -			ctxt->sge[i].addr = -				ib_dma_map_page(xprt->sc_cm_id->device, -						virt_to_page(vec[i].iov_base), -						off, -						vec[i].iov_len, -						DMA_FROM_DEVICE); -			if (ib_dma_mapping_error(xprt->sc_cm_id->device, -						 ctxt->sge[i].addr)) -				return -EINVAL; -			ctxt->sge[i].lkey = xprt->sc_dma_lkey; -			atomic_inc(&xprt->sc_dma_used); -		} else { -			ctxt->sge[i].addr = (unsigned long)vec[i].iov_base; -			ctxt->sge[i].lkey = frmr->mr->lkey; -		} -		ctxt->sge[i].length = vec[i].iov_len; -		*sgl_offset = *sgl_offset + vec[i].iov_len; +	/* Bump the key */ +	key = (u8)(frmr->mr->lkey & 0x000000FF); +	ib_update_fast_reg_key(frmr->mr, ++key); + +	ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset; +	ctxt->sge[0].lkey = frmr->mr->lkey; +	ctxt->sge[0].length = read; +	ctxt->count = 1; +	ctxt->read_hdr = head; + +	/* Prepare FASTREG WR */ +	memset(&fastreg_wr, 0, sizeof(fastreg_wr)); +	fastreg_wr.opcode = IB_WR_FAST_REG_MR; +	fastreg_wr.send_flags = IB_SEND_SIGNALED; +	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva; +	fastreg_wr.wr.fast_reg.page_list = frmr->page_list; +	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len; +	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT; +	fastreg_wr.wr.fast_reg.length = frmr->map_len; +	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags; +	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey; +	fastreg_wr.next = &read_wr; + +	/* Prepare RDMA_READ */ +	memset(&read_wr, 0, sizeof(read_wr)); +	read_wr.send_flags = IB_SEND_SIGNALED; +	read_wr.wr.rdma.rkey = rs_handle; +	read_wr.wr.rdma.remote_addr = rs_offset; +	read_wr.sg_list = ctxt->sge; +	read_wr.num_sge = 1; +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { +		read_wr.opcode = IB_WR_RDMA_READ_WITH_INV; +		read_wr.wr_id = (unsigned long)ctxt; +		read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; +	} else { +		read_wr.opcode = IB_WR_RDMA_READ; +		read_wr.next = &inv_wr; +		/* Prepare invalidate */ +		memset(&inv_wr, 0, sizeof(inv_wr)); +		inv_wr.wr_id = (unsigned long)ctxt; +		inv_wr.opcode = IB_WR_LOCAL_INV; +		inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; +		inv_wr.ex.invalidate_rkey = frmr->mr->lkey; +	} +	ctxt->wr_op = read_wr.opcode; + +	/* Post the chain */ +	ret = svc_rdma_send(xprt, &fastreg_wr); +	if (ret) { +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret); +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +		goto err;  	} -	return 0; -} -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{ -	if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == -	     RDMA_TRANSPORT_IWARP) && -	    sge_count > 1) -		return 1; -	else -		return min_t(int, sge_count, xprt->sc_max_sge); +	/* return current location in page array */ +	*page_no = pg_no; +	*page_offset = pg_off; +	ret = read; +	atomic_inc(&rdma_stat_read); +	return ret; + err: +	svc_rdma_unmap_dma(ctxt); +	svc_rdma_put_context(ctxt, 0); +	svc_rdma_put_frmr(xprt, frmr); +	return ret;  } -/* - * Use RDMA_READ to read data from the advertised client buffer into the - * XDR stream starting at rq_arg.head[0].iov_base. - * Each chunk in the array - * contains the following fields: - * discrim      - '1', This isn't used for data placement - * position     - The xdr stream offset (the same for every chunk) - * handle       - RMR for client memory region - * length       - data transfer length - * offset       - 64 bit tagged offset in remote memory region - * - * On our side, we need to read into a pagelist. The first page immediately - * follows the RPC header. - * - * This function returns: - * 0 - No error and no read-list found. - * - * 1 - Successful read-list processing. The data is not yet in - * the pagelist and therefore the RPC request must be deferred. The - * I/O completion will enqueue the transport again and - * svc_rdma_recvfrom will complete the request. - * - * <0 - Error processing/posting read-list. - * - * NOTE: The ctxt must not be touched after the last WR has been posted - * because the I/O completion processing may occur on another - * processor and free / modify the context. Ne touche pas! - */ -static int rdma_read_xdr(struct svcxprt_rdma *xprt, -			 struct rpcrdma_msg *rmsgp, -			 struct svc_rqst *rqstp, -			 struct svc_rdma_op_ctxt *hdr_ctxt) +static int rdma_read_chunks(struct svcxprt_rdma *xprt, +			    struct rpcrdma_msg *rmsgp, +			    struct svc_rqst *rqstp, +			    struct svc_rdma_op_ctxt *head)  { -	struct ib_send_wr read_wr; -	struct ib_send_wr inv_wr; -	int err = 0; -	int ch_no; -	int ch_count; -	int byte_count; -	int sge_count; -	u64 sgl_offset; +	int page_no, ch_count, ret;  	struct rpcrdma_read_chunk *ch; -	struct svc_rdma_op_ctxt *ctxt = NULL; -	struct svc_rdma_req_map *rpl_map; -	struct svc_rdma_req_map *chl_map; +	u32 page_offset, byte_count; +	u64 rs_offset; +	rdma_reader_fn reader;  	/* If no read list is present, return 0 */  	ch = svc_rdma_get_read_chunk(rmsgp); @@ -404,128 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,  	if (ch_count > RPCSVC_MAXPAGES)  		return -EINVAL; -	/* Allocate temporary reply and chunk maps */ -	rpl_map = svc_rdma_get_req_map(); -	chl_map = svc_rdma_get_req_map(); +	/* The request is completed when the RDMA_READs complete. The +	 * head context keeps all the pages that comprise the +	 * request. +	 */ +	head->arg.head[0] = rqstp->rq_arg.head[0]; +	head->arg.tail[0] = rqstp->rq_arg.tail[0]; +	head->arg.pages = &head->pages[head->count]; +	head->hdr_count = head->count; +	head->arg.page_base = 0; +	head->arg.page_len = 0; +	head->arg.len = rqstp->rq_arg.len; +	head->arg.buflen = rqstp->rq_arg.buflen; -	if (!xprt->sc_frmr_pg_list_len) -		sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, -					    rpl_map, chl_map, ch_count, -					    byte_count); +	/* Use FRMR if supported */ +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG) +		reader = rdma_read_chunk_frmr;  	else -		sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, -						 rpl_map, chl_map, ch_count, -						 byte_count); -	if (sge_count < 0) { -		err = -EIO; -		goto out; -	} - -	sgl_offset = 0; -	ch_no = 0; +		reader = rdma_read_chunk_lcl; +	page_no = 0; page_offset = 0;  	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; -	     ch->rc_discrim != 0; ch++, ch_no++) { -next_sge: -		ctxt = svc_rdma_get_context(xprt); -		ctxt->direction = DMA_FROM_DEVICE; -		ctxt->frmr = hdr_ctxt->frmr; -		ctxt->read_hdr = NULL; -		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); - -		/* Prepare READ WR */ -		memset(&read_wr, 0, sizeof read_wr); -		read_wr.wr_id = (unsigned long)ctxt; -		read_wr.opcode = IB_WR_RDMA_READ; -		ctxt->wr_op = read_wr.opcode; -		read_wr.send_flags = IB_SEND_SIGNALED; -		read_wr.wr.rdma.rkey = ch->rc_target.rs_handle; -		read_wr.wr.rdma.remote_addr = -			get_unaligned(&(ch->rc_target.rs_offset)) + -			sgl_offset; -		read_wr.sg_list = ctxt->sge; -		read_wr.num_sge = -			rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); -		err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr, -					&rpl_map->sge[chl_map->ch[ch_no].start], -					&sgl_offset, -					read_wr.num_sge); -		if (err) { -			svc_rdma_unmap_dma(ctxt); -			svc_rdma_put_context(ctxt, 0); -			goto out; -		} -		if (((ch+1)->rc_discrim == 0) && -		    (read_wr.num_sge == chl_map->ch[ch_no].count)) { -			/* -			 * Mark the last RDMA_READ with a bit to -			 * indicate all RPC data has been fetched from -			 * the client and the RPC needs to be enqueued. -			 */ -			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); -			if (hdr_ctxt->frmr) { -				set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); -				/* -				 * Invalidate the local MR used to map the data -				 * sink. -				 */ -				if (xprt->sc_dev_caps & -				    SVCRDMA_DEVCAP_READ_W_INV) { -					read_wr.opcode = -						IB_WR_RDMA_READ_WITH_INV; -					ctxt->wr_op = read_wr.opcode; -					read_wr.ex.invalidate_rkey = -						ctxt->frmr->mr->lkey; -				} else { -					/* Prepare INVALIDATE WR */ -					memset(&inv_wr, 0, sizeof inv_wr); -					inv_wr.opcode = IB_WR_LOCAL_INV; -					inv_wr.send_flags = IB_SEND_SIGNALED; -					inv_wr.ex.invalidate_rkey = -						hdr_ctxt->frmr->mr->lkey; -					read_wr.next = &inv_wr; -				} -			} -			ctxt->read_hdr = hdr_ctxt; +	     ch->rc_discrim != 0; ch++) { + +		xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset, +				 &rs_offset); +		byte_count = ntohl(ch->rc_target.rs_length); + +		while (byte_count > 0) { +			ret = reader(xprt, rqstp, head, +				     &page_no, &page_offset, +				     ntohl(ch->rc_target.rs_handle), +				     byte_count, rs_offset, +				     ((ch+1)->rc_discrim == 0) /* last */ +				     ); +			if (ret < 0) +				goto err; +			byte_count -= ret; +			rs_offset += ret; +			head->arg.buflen += ret;  		} -		/* Post the read */ -		err = svc_rdma_send(xprt, &read_wr); -		if (err) { -			printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n", -			       err); -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); -			svc_rdma_unmap_dma(ctxt); -			svc_rdma_put_context(ctxt, 0); -			goto out; -		} -		atomic_inc(&rdma_stat_read); - -		if (read_wr.num_sge < chl_map->ch[ch_no].count) { -			chl_map->ch[ch_no].count -= read_wr.num_sge; -			chl_map->ch[ch_no].start += read_wr.num_sge; -			goto next_sge; -		} -		sgl_offset = 0; -		err = 1;  	} - - out: -	svc_rdma_put_req_map(rpl_map); -	svc_rdma_put_req_map(chl_map); - +	ret = 1; + err:  	/* Detach arg pages. svc_recv will replenish them */ -	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) -		rqstp->rq_pages[ch_no] = NULL; - -	/* -	 * Detach res pages. svc_release must see a resused count of -	 * zero or it will attempt to put them. -	 */ -	while (rqstp->rq_resused) -		rqstp->rq_respages[--rqstp->rq_resused] = NULL; +	for (page_no = 0; +	     &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) +		rqstp->rq_pages[page_no] = NULL; -	return err; +	return ret;  }  static int rdma_read_complete(struct svc_rqst *rqstp, @@ -548,7 +455,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,  	/* rq_respages starts after the last arg page */  	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; -	rqstp->rq_resused = 0; +	rqstp->rq_next_page = rqstp->rq_respages + 1;  	/* Rebuild rq_arg head and tail. */  	rqstp->rq_arg.head[0] = head->arg.head[0]; @@ -597,13 +504,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  				  struct svc_rdma_op_ctxt,  				  dto_q);  		list_del_init(&ctxt->dto_q); -	} -	if (ctxt) {  		spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);  		return rdma_read_complete(rqstp, ctxt); -	} - -	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { +	} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {  		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,  				  struct svc_rdma_op_ctxt,  				  dto_q); @@ -623,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))  			goto close_out; -		BUG_ON(ret);  		goto out;  	}  	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", @@ -646,12 +548,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)  	}  	/* Read read-list data. */ -	ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); +	ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);  	if (ret > 0) {  		/* read-list posted, defer until data received from client. */  		goto defer; -	} -	if (ret < 0) { +	} else if (ret < 0) {  		/* Post of read-list failed, free context. */  		svc_rdma_put_context(ctxt, 1);  		return 0; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 249a835b703..49fd21a5c21 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -49,152 +50,6 @@  #define RPCDBG_FACILITY	RPCDBG_SVCXPRT -/* Encode an XDR as an array of IB SGE - * - * Assumptions: - * - head[0] is physically contiguous. - * - tail[0] is physically contiguous. - * - pages[] is not physically or virtually contiguous and consists of - *   PAGE_SIZE elements. - * - * Output: - * SGE[0]              reserved for RCPRDMA header - * SGE[1]              data from xdr->head[] - * SGE[2..sge_count-2] data from xdr->pages[] - * SGE[sge_count-1]    data from xdr->tail. - * - * The max SGE we need is the length of the XDR / pagesize + one for - * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES - * reserves a page for both the request and the reply header, and this - * array is only concerned with the reply we are assured that we have - * on extra page for the RPCRMDA header. - */ -static int fast_reg_xdr(struct svcxprt_rdma *xprt, -			struct xdr_buf *xdr, -			struct svc_rdma_req_map *vec) -{ -	int sge_no; -	u32 sge_bytes; -	u32 page_bytes; -	u32 page_off; -	int page_no = 0; -	u8 *frva; -	struct svc_rdma_fastreg_mr *frmr; - -	frmr = svc_rdma_get_frmr(xprt); -	if (IS_ERR(frmr)) -		return -ENOMEM; -	vec->frmr = frmr; - -	/* Skip the RPCRDMA header */ -	sge_no = 1; - -	/* Map the head. */ -	frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK); -	vec->sge[sge_no].iov_base = xdr->head[0].iov_base; -	vec->sge[sge_no].iov_len = xdr->head[0].iov_len; -	vec->count = 2; -	sge_no++; - -	/* Map the XDR head */ -	frmr->kva = frva; -	frmr->direction = DMA_TO_DEVICE; -	frmr->access_flags = 0; -	frmr->map_len = PAGE_SIZE; -	frmr->page_list_len = 1; -	page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK; -	frmr->page_list->page_list[page_no] = -		ib_dma_map_page(xprt->sc_cm_id->device, -				virt_to_page(xdr->head[0].iov_base), -				page_off, -				PAGE_SIZE - page_off, -				DMA_TO_DEVICE); -	if (ib_dma_mapping_error(xprt->sc_cm_id->device, -				 frmr->page_list->page_list[page_no])) -		goto fatal_err; -	atomic_inc(&xprt->sc_dma_used); - -	/* Map the XDR page list */ -	page_off = xdr->page_base; -	page_bytes = xdr->page_len + page_off; -	if (!page_bytes) -		goto encode_tail; - -	/* Map the pages */ -	vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off; -	vec->sge[sge_no].iov_len = page_bytes; -	sge_no++; -	while (page_bytes) { -		struct page *page; - -		page = xdr->pages[page_no++]; -		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off)); -		page_bytes -= sge_bytes; - -		frmr->page_list->page_list[page_no] = -			ib_dma_map_page(xprt->sc_cm_id->device, -					page, page_off, -					sge_bytes, DMA_TO_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; - -		atomic_inc(&xprt->sc_dma_used); -		page_off = 0; /* reset for next time through loop */ -		frmr->map_len += PAGE_SIZE; -		frmr->page_list_len++; -	} -	vec->count++; - - encode_tail: -	/* Map tail */ -	if (0 == xdr->tail[0].iov_len) -		goto done; - -	vec->count++; -	vec->sge[sge_no].iov_len = xdr->tail[0].iov_len; - -	if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) == -	    ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) { -		/* -		 * If head and tail use the same page, we don't need -		 * to map it again. -		 */ -		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base; -	} else { -		void *va; - -		/* Map another page for the tail */ -		page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; -		va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK); -		vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off; - -		frmr->page_list->page_list[page_no] = -		    ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va), -				    page_off, -				    PAGE_SIZE, -				    DMA_TO_DEVICE); -		if (ib_dma_mapping_error(xprt->sc_cm_id->device, -					 frmr->page_list->page_list[page_no])) -			goto fatal_err; -		atomic_inc(&xprt->sc_dma_used); -		frmr->map_len += PAGE_SIZE; -		frmr->page_list_len++; -	} - - done: -	if (svc_rdma_fastreg(xprt, frmr)) -		goto fatal_err; - -	return 0; - - fatal_err: -	printk("svcrdma: Error fast registering memory for xprt %p\n", xprt); -	vec->frmr = NULL; -	svc_rdma_put_frmr(xprt, frmr); -	return -EIO; -} -  static int map_xdr(struct svcxprt_rdma *xprt,  		   struct xdr_buf *xdr,  		   struct svc_rdma_req_map *vec) @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,  	BUG_ON(xdr->len !=  	       (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)); -	if (xprt->sc_frmr_pg_list_len) -		return fast_reg_xdr(xprt, xdr, vec); -  	/* Skip the first sge, this is for the RPCRDMA header */  	sge_no = 1; @@ -265,6 +117,7 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,  		xdr_off -= xdr->head[0].iov_len;  		if (xdr_off < xdr->page_len) {  			/* This offset is in the page list */ +			xdr_off += xdr->page_base;  			page = xdr->pages[xdr_off >> PAGE_SHIFT];  			xdr_off &= ~PAGE_MASK;  		} else { @@ -281,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,  }  /* Assumptions: - * - We are using FRMR - *     - or -   * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE   */  static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -326,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,  		sge_bytes = min_t(size_t,  			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);  		sge[sge_no].length = sge_bytes; -		if (!vec->frmr) { -			sge[sge_no].addr = -				dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, -					    sge_bytes, DMA_TO_DEVICE); -			xdr_off += sge_bytes; -			if (ib_dma_mapping_error(xprt->sc_cm_id->device, -						 sge[sge_no].addr)) -				goto err; -			atomic_inc(&xprt->sc_dma_used); -			sge[sge_no].lkey = xprt->sc_dma_lkey; -		} else { -			sge[sge_no].addr = (unsigned long) -				vec->sge[xdr_sge_no].iov_base + sge_off; -			sge[sge_no].lkey = vec->frmr->mr->lkey; -		} +		sge[sge_no].addr = +			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, +				    sge_bytes, DMA_TO_DEVICE); +		xdr_off += sge_bytes; +		if (ib_dma_mapping_error(xprt->sc_cm_id->device, +					 sge[sge_no].addr)) +			goto err; +		atomic_inc(&xprt->sc_dma_used); +		sge[sge_no].lkey = xprt->sc_dma_lkey;  		ctxt->count++; -		ctxt->frmr = vec->frmr;  		sge_off = 0;  		sge_no++;  		xdr_sge_no++; @@ -368,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,  	return 0;   err:  	svc_rdma_unmap_dma(ctxt); -	svc_rdma_put_frmr(xprt, vec->frmr);  	svc_rdma_put_context(ctxt, 0);  	/* Fatal error, close transport */  	return -EIO; @@ -396,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,  	res_ary = (struct rpcrdma_write_array *)  		&rdma_resp->rm_body.rm_chunks[1]; -	if (vec->frmr) -		max_write = vec->frmr->map_len; -	else -		max_write = xprt->sc_max_sge * PAGE_SIZE; +	max_write = xprt->sc_max_sge * PAGE_SIZE;  	/* Write chunks start at the pagelist */  	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -409,21 +249,21 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,  		u64 rs_offset;  		arg_ch = &arg_ary->wc_array[chunk_no].wc_target; -		write_len = min(xfer_len, arg_ch->rs_length); +		write_len = min(xfer_len, ntohl(arg_ch->rs_length));  		/* Prepare the response chunk given the length actually  		 * written */ -		rs_offset = get_unaligned(&(arg_ch->rs_offset)); +		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);  		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, -					    arg_ch->rs_handle, -					    rs_offset, -					    write_len); +						arg_ch->rs_handle, +						arg_ch->rs_offset, +						write_len);  		chunk_off = 0;  		while (write_len) {  			int this_write;  			this_write = min(write_len, max_write);  			ret = send_write(xprt, rqstp, -					 arg_ch->rs_handle, +					 ntohl(arg_ch->rs_handle),  					 rs_offset + chunk_off,  					 xdr_off,  					 this_write, @@ -457,6 +297,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,  	u32 xdr_off;  	int chunk_no;  	int chunk_off; +	int nchunks;  	struct rpcrdma_segment *ch;  	struct rpcrdma_write_array *arg_ary;  	struct rpcrdma_write_array *res_ary; @@ -470,32 +311,30 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,  	res_ary = (struct rpcrdma_write_array *)  		&rdma_resp->rm_body.rm_chunks[2]; -	if (vec->frmr) -		max_write = vec->frmr->map_len; -	else -		max_write = xprt->sc_max_sge * PAGE_SIZE; +	max_write = xprt->sc_max_sge * PAGE_SIZE;  	/* xdr offset starts at RPC message */ +	nchunks = ntohl(arg_ary->wc_nchunks);  	for (xdr_off = 0, chunk_no = 0; -	     xfer_len && chunk_no < arg_ary->wc_nchunks; +	     xfer_len && chunk_no < nchunks;  	     chunk_no++) {  		u64 rs_offset;  		ch = &arg_ary->wc_array[chunk_no].wc_target; -		write_len = min(xfer_len, ch->rs_length); +		write_len = min(xfer_len, htonl(ch->rs_length));  		/* Prepare the reply chunk given the length actually  		 * written */ -		rs_offset = get_unaligned(&(ch->rs_offset)); +		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);  		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, -					    ch->rs_handle, rs_offset, -					    write_len); +						ch->rs_handle, ch->rs_offset, +						write_len);  		chunk_off = 0;  		while (write_len) {  			int this_write;  			this_write = min(write_len, max_write);  			ret = send_write(xprt, rqstp, -					 ch->rs_handle, +					 ntohl(ch->rs_handle),  					 rs_offset + chunk_off,  					 xdr_off,  					 this_write, @@ -542,10 +381,10 @@ static int send_reply(struct svcxprt_rdma *rdma,  		      int byte_count)  {  	struct ib_send_wr send_wr; -	struct ib_send_wr inv_wr;  	int sge_no;  	int sge_bytes;  	int page_no; +	int pages;  	int ret;  	/* Post a recv buffer to handle another request. */ @@ -555,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  		       "svcrdma: could not post a receive buffer, err=%d."  		       "Closing transport %p.\n", ret, rdma);  		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); -		svc_rdma_put_frmr(rdma, vec->frmr);  		svc_rdma_put_context(ctxt, 0);  		return -ENOTCONN;  	} @@ -563,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  	/* Prepare the context */  	ctxt->pages[0] = page;  	ctxt->count = 1; -	ctxt->frmr = vec->frmr; -	if (vec->frmr) -		set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); -	else -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);  	/* Prepare the SGE for the RPCRDMA Header */  	ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -586,21 +419,15 @@ static int send_reply(struct svcxprt_rdma *rdma,  		int xdr_off = 0;  		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);  		byte_count -= sge_bytes; -		if (!vec->frmr) { -			ctxt->sge[sge_no].addr = -				dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, -					    sge_bytes, DMA_TO_DEVICE); -			xdr_off += sge_bytes; -			if (ib_dma_mapping_error(rdma->sc_cm_id->device, -						 ctxt->sge[sge_no].addr)) -				goto err; -			atomic_inc(&rdma->sc_dma_used); -			ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey; -		} else { -			ctxt->sge[sge_no].addr = (unsigned long) -				vec->sge[sge_no].iov_base; -			ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey; -		} +		ctxt->sge[sge_no].addr = +			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, +				    sge_bytes, DMA_TO_DEVICE); +		xdr_off += sge_bytes; +		if (ib_dma_mapping_error(rdma->sc_cm_id->device, +					 ctxt->sge[sge_no].addr)) +			goto err; +		atomic_inc(&rdma->sc_dma_used); +		ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;  		ctxt->sge[sge_no].length = sge_bytes;  	}  	BUG_ON(byte_count != 0); @@ -609,7 +436,8 @@ static int send_reply(struct svcxprt_rdma *rdma,  	 * respages array. They are our pages until the I/O  	 * completes.  	 */ -	for (page_no = 0; page_no < rqstp->rq_resused; page_no++) { +	pages = rqstp->rq_next_page - rqstp->rq_respages; +	for (page_no = 0; page_no < pages; page_no++) {  		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];  		ctxt->count++;  		rqstp->rq_respages[page_no] = NULL; @@ -621,6 +449,8 @@ static int send_reply(struct svcxprt_rdma *rdma,  		if (page_no+1 >= sge_no)  			ctxt->sge[page_no+1].length = 0;  	} +	rqstp->rq_next_page = rqstp->rq_respages + 1; +  	BUG_ON(sge_no > rdma->sc_max_sge);  	memset(&send_wr, 0, sizeof send_wr);  	ctxt->wr_op = IB_WR_SEND; @@ -629,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,  	send_wr.num_sge = sge_no;  	send_wr.opcode = IB_WR_SEND;  	send_wr.send_flags =  IB_SEND_SIGNALED; -	if (vec->frmr) { -		/* Prepare INVALIDATE WR */ -		memset(&inv_wr, 0, sizeof inv_wr); -		inv_wr.opcode = IB_WR_LOCAL_INV; -		inv_wr.send_flags = IB_SEND_SIGNALED; -		inv_wr.ex.invalidate_rkey = -			vec->frmr->mr->lkey; -		send_wr.next = &inv_wr; -	}  	ret = svc_rdma_send(rdma, &send_wr);  	if (ret) @@ -647,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,   err:  	svc_rdma_unmap_dma(ctxt); -	svc_rdma_put_frmr(rdma, vec->frmr);  	svc_rdma_put_context(ctxt, 1);  	return -EIO;  } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 9df1eadc912..e7323fbbd34 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1,4 +1,5 @@  /* + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.   * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.   *   * This software is available to you under a choice of one of two @@ -42,6 +43,7 @@  #include <linux/sunrpc/svc_xprt.h>  #include <linux/sunrpc/debug.h>  #include <linux/sunrpc/rpc_rdma.h> +#include <linux/interrupt.h>  #include <linux/sched.h>  #include <linux/slab.h>  #include <linux/spinlock.h> @@ -49,6 +51,8 @@  #include <rdma/ib_verbs.h>  #include <rdma/rdma_cm.h>  #include <linux/sunrpc/svc_rdma.h> +#include <linux/export.h> +#include "xprt_rdma.h"  #define RPCDBG_FACILITY	RPCDBG_SVCXPRT @@ -62,6 +66,7 @@ static void dto_tasklet_func(unsigned long data);  static void svc_rdma_detach(struct svc_xprt *xprt);  static void svc_rdma_free(struct svc_xprt *xprt);  static int svc_rdma_has_wspace(struct svc_xprt *xprt); +static int svc_rdma_secure_port(struct svc_rqst *);  static void rq_cq_reap(struct svcxprt_rdma *xprt);  static void sq_cq_reap(struct svcxprt_rdma *xprt); @@ -79,6 +84,7 @@ static struct svc_xprt_ops svc_rdma_ops = {  	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,  	.xpo_has_wspace = svc_rdma_has_wspace,  	.xpo_accept = svc_rdma_accept, +	.xpo_secure_port = svc_rdma_secure_port,  };  struct svc_xprt_class svc_rdma_class = { @@ -88,12 +94,6 @@ struct svc_xprt_class svc_rdma_class = {  	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,  }; -/* WR context cache. Created in svc_rdma.c  */ -extern struct kmem_cache *svc_rdma_ctxt_cachep; - -/* Workqueue created in svc_rdma.c */ -extern struct workqueue_struct *svc_rdma_wq; -  struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)  {  	struct svc_rdma_op_ctxt *ctxt; @@ -148,9 +148,6 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)  	atomic_dec(&xprt->sc_ctxt_used);  } -/* Temporary NFS request map cache. Created in svc_rdma.c  */ -extern struct kmem_cache *svc_rdma_map_cachep; -  /*   * Temporary NFS req mappings are shared across all transport   * instances. These are short lived and should be bounded by the number @@ -166,7 +163,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)  		schedule_timeout_uninterruptible(msecs_to_jiffies(500));  	}  	map->count = 0; -	map->frmr = NULL;  	return map;  } @@ -333,7 +329,7 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)  }  /* - * Processs a completion context + * Process a completion context   */  static void process_context(struct svcxprt_rdma *xprt,  			    struct svc_rdma_op_ctxt *ctxt) @@ -342,22 +338,21 @@ static void process_context(struct svcxprt_rdma *xprt,  	switch (ctxt->wr_op) {  	case IB_WR_SEND: -		if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags)) -			svc_rdma_put_frmr(xprt, ctxt->frmr); +		BUG_ON(ctxt->frmr);  		svc_rdma_put_context(ctxt, 1);  		break;  	case IB_WR_RDMA_WRITE: +		BUG_ON(ctxt->frmr);  		svc_rdma_put_context(ctxt, 0);  		break;  	case IB_WR_RDMA_READ:  	case IB_WR_RDMA_READ_WITH_INV: +		svc_rdma_put_frmr(xprt, ctxt->frmr);  		if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {  			struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;  			BUG_ON(!read_hdr); -			if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags)) -				svc_rdma_put_frmr(xprt, ctxt->frmr);  			spin_lock_bh(&xprt->sc_rq_dto_lock);  			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);  			list_add_tail(&read_hdr->dto_q, @@ -369,6 +364,7 @@ static void process_context(struct svcxprt_rdma *xprt,  		break;  	default: +		BUG_ON(1);  		printk(KERN_ERR "svcrdma: unexpected completion type, "  		       "opcode=%d\n",  		       ctxt->wr_op); @@ -384,29 +380,42 @@ static void process_context(struct svcxprt_rdma *xprt,  static void sq_cq_reap(struct svcxprt_rdma *xprt)  {  	struct svc_rdma_op_ctxt *ctxt = NULL; -	struct ib_wc wc; +	struct ib_wc wc_a[6]; +	struct ib_wc *wc;  	struct ib_cq *cq = xprt->sc_sq_cq;  	int ret; +	memset(wc_a, 0, sizeof(wc_a)); +  	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))  		return;  	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);  	atomic_inc(&rdma_stat_sq_poll); -	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { -		if (wc.status != IB_WC_SUCCESS) -			/* Close the transport */ -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +	while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) { +		int i; -		/* Decrement used SQ WR count */ -		atomic_dec(&xprt->sc_sq_count); -		wake_up(&xprt->sc_send_wait); +		for (i = 0; i < ret; i++) { +			wc = &wc_a[i]; +			if (wc->status != IB_WC_SUCCESS) { +				dprintk("svcrdma: sq wc err status %d\n", +					wc->status); -		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; -		if (ctxt) -			process_context(xprt, ctxt); +				/* Close the transport */ +				set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); +			} -		svc_xprt_put(&xprt->sc_xprt); +			/* Decrement used SQ WR count */ +			atomic_dec(&xprt->sc_sq_count); +			wake_up(&xprt->sc_send_wait); + +			ctxt = (struct svc_rdma_op_ctxt *) +				(unsigned long)wc->wr_id; +			if (ctxt) +				process_context(xprt, ctxt); + +			svc_xprt_put(&xprt->sc_xprt); +		}  	}  	if (ctxt) @@ -451,7 +460,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,  	if (!cma_xprt)  		return NULL; -	svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv); +	svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);  	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);  	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);  	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); @@ -483,8 +492,7 @@ struct page *svc_rdma_get_page(void)  	while ((page = alloc_page(GFP_KERNEL)) == NULL) {  		/* If we can't get memory, wait a bit and try again */ -		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 " -		       "jiffies.\n"); +		printk(KERN_INFO "svcrdma: out of memory...retrying in 1s\n");  		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));  	}  	return page; @@ -584,10 +592,6 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)  	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);  	spin_unlock_bh(&listen_xprt->sc_lock); -	/* -	 * Can't use svc_xprt_received here because we are not on a -	 * rqstp thread -	*/  	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);  	svc_xprt_enqueue(&listen_xprt->sc_xprt);  } @@ -695,7 +699,8 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,  		return ERR_PTR(-ENOMEM);  	xprt = &cma_xprt->sc_xprt; -	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); +	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP, +				   IB_QPT_RC);  	if (IS_ERR(listen_id)) {  		ret = PTR_ERR(listen_id);  		dprintk("svcrdma: rdma_create_id failed = %d\n", ret); @@ -1003,7 +1008,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)  			need_dma_mr = 0;  		break;  	case RDMA_TRANSPORT_IB: -		if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) { +		if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) { +			need_dma_mr = 1; +			dma_mr_acc = IB_ACCESS_LOCAL_WRITE; +		} else if (!(devattr.device_cap_flags & +			     IB_DEVICE_LOCAL_DMA_LKEY)) {  			need_dma_mr = 1;  			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;  		} else @@ -1200,14 +1209,7 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)  		container_of(xprt, struct svcxprt_rdma, sc_xprt);  	/* -	 * If there are fewer SQ WR available than required to send a -	 * simple response, return false. -	 */ -	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3)) -		return 0; - -	/* -	 * ...or there are already waiters on the SQ, +	 * If there are already waiters on the SQ,  	 * return false.  	 */  	if (waitqueue_active(&rdma->sc_send_wait)) @@ -1217,6 +1219,11 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)  	return 1;  } +static int svc_rdma_secure_port(struct svc_rqst *rqstp) +{ +	return 1; +} +  /*   * Attempt to register the kvec representing the RPC memory with the   * device. @@ -1335,6 +1342,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,  					    p, 0, length, DMA_FROM_DEVICE);  	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {  		put_page(p); +		svc_rdma_put_context(ctxt, 1);  		return;  	}  	atomic_inc(&xprt->sc_dma_used); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 0867070bb5c..66f91f0d071 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -51,6 +51,7 @@  #include <linux/init.h>  #include <linux/slab.h>  #include <linux/seq_file.h> +#include <linux/sunrpc/addr.h>  #include "xprt_rdma.h" @@ -85,7 +86,7 @@ static unsigned int max_memreg = RPCRDMA_LAST - 1;  static struct ctl_table_header *sunrpc_table_header; -static ctl_table xr_tunables_table[] = { +static struct ctl_table xr_tunables_table[] = {  	{  		.procname	= "rdma_slot_table_entries",  		.data		= &xprt_rdma_slot_table_entries, @@ -137,7 +138,7 @@ static ctl_table xr_tunables_table[] = {  	{ },  }; -static ctl_table sunrpc_table[] = { +static struct ctl_table sunrpc_table[] = {  	{  		.procname	= "sunrpc",  		.mode		= 0555, @@ -148,6 +149,11 @@ static ctl_table sunrpc_table[] = {  #endif +#define RPCRDMA_BIND_TO		(60U * HZ) +#define RPCRDMA_INIT_REEST_TO	(5U * HZ) +#define RPCRDMA_MAX_REEST_TO	(30U * HZ) +#define RPCRDMA_IDLE_DISC_TO	(5U * 60 * HZ) +  static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */  static void @@ -199,23 +205,18 @@ xprt_rdma_connect_worker(struct work_struct *work)  	struct rpc_xprt *xprt = &r_xprt->xprt;  	int rc = 0; -	if (!xprt->shutdown) { -		xprt_clear_connected(xprt); - -		dprintk("RPC:       %s: %sconnect\n", __func__, -				r_xprt->rx_ep.rep_connected != 0 ? "re" : ""); -		rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); -		if (rc) -			goto out; -	} -	goto out_clear; +	current->flags |= PF_FSTRANS; +	xprt_clear_connected(xprt); -out: -	xprt_wake_pending_tasks(xprt, rc); +	dprintk("RPC:       %s: %sconnect\n", __func__, +			r_xprt->rx_ep.rep_connected != 0 ? "re" : ""); +	rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); +	if (rc) +		xprt_wake_pending_tasks(xprt, rc); -out_clear:  	dprintk("RPC:       %s: exit\n", __func__);  	xprt_clear_connecting(xprt); +	current->flags &= ~PF_FSTRANS;  }  /* @@ -233,7 +234,6 @@ static void  xprt_rdma_destroy(struct rpc_xprt *xprt)  {  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); -	int rc;  	dprintk("RPC:       %s: called\n", __func__); @@ -242,10 +242,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)  	xprt_clear_connected(xprt);  	rpcrdma_buffer_destroy(&r_xprt->rx_buf); -	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); -	if (rc) -		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n", -			__func__, rc); +	rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);  	rpcrdma_ia_close(&r_xprt->rx_ia);  	xprt_rdma_free_addresses(xprt); @@ -283,6 +280,7 @@ xprt_setup_rdma(struct xprt_create *args)  	}  	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), +			xprt_rdma_slot_table_entries,  			xprt_rdma_slot_table_entries);  	if (xprt == NULL) {  		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n", @@ -292,9 +290,9 @@ xprt_setup_rdma(struct xprt_create *args)  	/* 60 second timeout, no retries */  	xprt->timeout = &xprt_rdma_default_timeout; -	xprt->bind_timeout = (60U * HZ); -	xprt->reestablish_timeout = (5U * HZ); -	xprt->idle_timeout = (5U * 60 * HZ); +	xprt->bind_timeout = RPCRDMA_BIND_TO; +	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; +	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;  	xprt->resvport = 0;		/* privileged port not needed */  	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */ @@ -394,7 +392,7 @@ out4:  	xprt_rdma_free_addresses(xprt);  	rc = -EINVAL;  out3: -	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); +	rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);  out2:  	rpcrdma_ia_close(&new_xprt->rx_ia);  out1: @@ -430,9 +428,8 @@ xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)  }  static void -xprt_rdma_connect(struct rpc_task *task) +xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)  { -	struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	if (r_xprt->rx_ep.rep_connected != 0) { @@ -440,10 +437,10 @@ xprt_rdma_connect(struct rpc_task *task)  		schedule_delayed_work(&r_xprt->rdma_connect,  			xprt->reestablish_timeout);  		xprt->reestablish_timeout <<= 1; -		if (xprt->reestablish_timeout > (30 * HZ)) -			xprt->reestablish_timeout = (30 * HZ); -		else if (xprt->reestablish_timeout < (5 * HZ)) -			xprt->reestablish_timeout = (5 * HZ); +		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) +			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; +		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) +			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;  	} else {  		schedule_delayed_work(&r_xprt->rdma_connect, 0);  		if (!RPC_IS_ASYNC(task)) @@ -451,24 +448,6 @@ xprt_rdma_connect(struct rpc_task *task)  	}  } -static int -xprt_rdma_reserve_xprt(struct rpc_task *task) -{ -	struct rpc_xprt *xprt = task->tk_xprt; -	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); -	int credits = atomic_read(&r_xprt->rx_buf.rb_credits); - -	/* == RPC_CWNDSCALE @ init, but *after* setup */ -	if (r_xprt->rx_buf.rb_cwndscale == 0UL) { -		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd; -		dprintk("RPC:       %s: cwndscale %lu\n", __func__, -			r_xprt->rx_buf.rb_cwndscale); -		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0); -	} -	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale; -	return xprt_reserve_xprt_cong(task); -} -  /*   * The RDMA allocate/free functions need the task structure as a place   * to hide the struct rpcrdma_req, which is necessary for the actual send/recv @@ -480,11 +459,12 @@ xprt_rdma_reserve_xprt(struct rpc_task *task)  static void *  xprt_rdma_allocate(struct rpc_task *task, size_t size)  { -	struct rpc_xprt *xprt = task->tk_xprt; +	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;  	struct rpcrdma_req *req, *nreq;  	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); -	BUG_ON(NULL == req); +	if (req == NULL) +		return NULL;  	if (size > req->rl_size) {  		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: " @@ -508,18 +488,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)  		 * If the allocation or registration fails, the RPC framework  		 * will (doggedly) retry.  		 */ -		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy == -				RPCRDMA_BOUNCEBUFFERS) { -			/* forced to "pure inline" */ -			dprintk("RPC:       %s: too much data (%zd) for inline " -					"(r/w max %d/%d)\n", __func__, size, -					rpcx_to_rdmad(xprt).inline_rsize, -					rpcx_to_rdmad(xprt).inline_wsize); -			size = req->rl_size; -			rpc_exit(task, -EIO);		/* fail the operation */ -			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; -			goto out; -		}  		if (task->tk_flags & RPC_TASK_SWAPPER)  			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);  		else @@ -548,7 +516,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)  		req = nreq;  	}  	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req); -out:  	req->rl_connect_cookie = 0;	/* our reserved value */  	return req->rl_xdr_buf; @@ -584,9 +551,7 @@ xprt_rdma_free(void *buffer)  		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");  	/* -	 * Finish the deregistration. When using mw bind, this was -	 * begun in rpcrdma_reply_handler(). In all other modes, we -	 * do it here, in thread context. The process is considered +	 * Finish the deregistration.  The process is considered  	 * complete when the rr_func vector becomes NULL - this  	 * was put in place during rpcrdma_reply_handler() - the wait  	 * call below will not block if the dereg is "done". If @@ -595,12 +560,7 @@ xprt_rdma_free(void *buffer)  	for (i = 0; req->rl_nchunks;) {  		--req->rl_nchunks;  		i += rpcrdma_deregister_external( -			&req->rl_segments[i], r_xprt, NULL); -	} - -	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) { -		rep->rr_func = NULL;	/* abandon the callback */ -		req->rl_reply = NULL; +			&req->rl_segments[i], r_xprt);  	}  	if (req->rl_iov.length == 0) {	/* see allocate above */ @@ -632,16 +592,15 @@ static int  xprt_rdma_send_request(struct rpc_task *task)  {  	struct rpc_rqst *rqst = task->tk_rqstp; -	struct rpc_xprt *xprt = task->tk_xprt; +	struct rpc_xprt *xprt = rqst->rq_xprt;  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); +	int rc; -	/* marshal the send itself */ -	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) { -		r_xprt->rx_stats.failed_marshal_count++; -		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n", -			__func__); -		return -EIO; +	if (req->rl_niovs == 0) { +		rc = rpcrdma_marshal_req(rqst); +		if (rc < 0) +			goto failed_marshal;  	}  	if (req->rl_reply == NULL) 		/* e.g. reconnection */ @@ -665,6 +624,12 @@ xprt_rdma_send_request(struct rpc_task *task)  	rqst->rq_bytes_sent = 0;  	return 0; +failed_marshal: +	r_xprt->rx_stats.failed_marshal_count++; +	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n", +		__func__, rc); +	if (rc == -EIO) +		return -EIO;  drop_connection:  	xprt_disconnect_done(xprt);  	return -ENOTCONN;	/* implies disconnect */ @@ -710,8 +675,9 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)   */  static struct rpc_xprt_ops xprt_rdma_procs = { -	.reserve_xprt		= xprt_rdma_reserve_xprt, +	.reserve_xprt		= xprt_reserve_xprt_cong,  	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */ +	.alloc_slot		= xprt_alloc_slot,  	.release_request	= xprt_release_rqst_cong,       /* ditto */  	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */  	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */ @@ -737,7 +703,7 @@ static void __exit xprt_rdma_cleanup(void)  {  	int rc; -	dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n"); +	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");  #ifdef RPC_DEBUG  	if (sunrpc_table_header) {  		unregister_sysctl_table(sunrpc_table_header); @@ -759,14 +725,14 @@ static int __init xprt_rdma_init(void)  	if (rc)  		return rc; -	dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n"); +	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); -	dprintk(KERN_INFO "Defaults:\n"); -	dprintk(KERN_INFO "\tSlots %d\n" +	dprintk("Defaults:\n"); +	dprintk("\tSlots %d\n"  		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",  		xprt_rdma_slot_table_entries,  		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); -	dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n", +	dprintk("\tPadding %d\n\tMemreg %d\n",  		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);  #ifdef RPC_DEBUG diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 5f4c7b3bc71..13dbd1c389f 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -47,8 +47,9 @@   *  o buffer memory   */ -#include <linux/pci.h>	/* for Tavor hack below */ +#include <linux/interrupt.h>  #include <linux/slab.h> +#include <asm/bitops.h>  #include "xprt_rdma.h" @@ -141,89 +142,139 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)  	}  } -static inline -void rpcrdma_event_process(struct ib_wc *wc) +static void +rpcrdma_sendcq_process_wc(struct ib_wc *wc)  { -	struct rpcrdma_rep *rep = -			(struct rpcrdma_rep *)(unsigned long) wc->wr_id; +	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; -	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n", -		__func__, rep, wc->status, wc->opcode, wc->byte_len); +	dprintk("RPC:       %s: frmr %p status %X opcode %d\n", +		__func__, frmr, wc->status, wc->opcode); -	if (!rep) /* send or bind completion that we don't care about */ +	if (wc->wr_id == 0ULL) +		return; +	if (wc->status != IB_WC_SUCCESS)  		return; -	if (IB_WC_SUCCESS != wc->status) { -		dprintk("RPC:       %s: %s WC status %X, connection lost\n", -			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send", -			 wc->status); -		rep->rr_len = ~0U; -		rpcrdma_schedule_tasklet(rep); +	if (wc->opcode == IB_WC_FAST_REG_MR) +		frmr->r.frmr.state = FRMR_IS_VALID; +	else if (wc->opcode == IB_WC_LOCAL_INV) +		frmr->r.frmr.state = FRMR_IS_INVALID; +} + +static int +rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +{ +	struct ib_wc *wcs; +	int budget, count, rc; + +	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; +	do { +		wcs = ep->rep_send_wcs; + +		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); +		if (rc <= 0) +			return rc; + +		count = rc; +		while (count-- > 0) +			rpcrdma_sendcq_process_wc(wcs++); +	} while (rc == RPCRDMA_POLLSIZE && --budget); +	return 0; +} + +/* + * Handle send, fast_reg_mr, and local_inv completions. + * + * Send events are typically suppressed and thus do not result + * in an upcall. Occasionally one is signaled, however. This + * prevents the provider's completion queue from wrapping and + * losing a completion. + */ +static void +rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +{ +	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; +	int rc; + +	rc = rpcrdma_sendcq_poll(cq, ep); +	if (rc) { +		dprintk("RPC:       %s: ib_poll_cq failed: %i\n", +			__func__, rc);  		return;  	} -	switch (wc->opcode) { -	case IB_WC_RECV: -		rep->rr_len = wc->byte_len; -		ib_dma_sync_single_for_cpu( -			rdmab_to_ia(rep->rr_buffer)->ri_id->device, -			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); -		/* Keep (only) the most recent credits, after check validity */ -		if (rep->rr_len >= 16) { -			struct rpcrdma_msg *p = -					(struct rpcrdma_msg *) rep->rr_base; -			unsigned int credits = ntohl(p->rm_credit); -			if (credits == 0) { -				dprintk("RPC:       %s: server" -					" dropped credits to 0!\n", __func__); -				/* don't deadlock */ -				credits = 1; -			} else if (credits > rep->rr_buffer->rb_max_requests) { -				dprintk("RPC:       %s: server" -					" over-crediting: %d (%d)\n", -					__func__, credits, -					rep->rr_buffer->rb_max_requests); -				credits = rep->rr_buffer->rb_max_requests; -			} -			atomic_set(&rep->rr_buffer->rb_credits, credits); -		} -		/* fall through */ -	case IB_WC_BIND_MW: -		rpcrdma_schedule_tasklet(rep); -		break; -	default: -		dprintk("RPC:       %s: unexpected WC event %X\n", -			__func__, wc->opcode); -		break; +	rc = ib_req_notify_cq(cq, +			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); +	if (rc == 0) +		return; +	if (rc < 0) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n", +			__func__, rc); +		return;  	} + +	rpcrdma_sendcq_poll(cq, ep);  } -static inline int -rpcrdma_cq_poll(struct ib_cq *cq) +static void +rpcrdma_recvcq_process_wc(struct ib_wc *wc)  { -	struct ib_wc wc; -	int rc; +	struct rpcrdma_rep *rep = +			(struct rpcrdma_rep *)(unsigned long)wc->wr_id; -	for (;;) { -		rc = ib_poll_cq(cq, 1, &wc); -		if (rc < 0) { -			dprintk("RPC:       %s: ib_poll_cq failed %i\n", -				__func__, rc); -			return rc; -		} -		if (rc == 0) -			break; +	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n", +		__func__, rep, wc->status, wc->opcode, wc->byte_len); -		rpcrdma_event_process(&wc); +	if (wc->status != IB_WC_SUCCESS) { +		rep->rr_len = ~0U; +		goto out_schedule;  	} +	if (wc->opcode != IB_WC_RECV) +		return; +	rep->rr_len = wc->byte_len; +	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, +			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); + +	if (rep->rr_len >= 16) { +		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base; +		unsigned int credits = ntohl(p->rm_credit); + +		if (credits == 0) +			credits = 1;	/* don't deadlock */ +		else if (credits > rep->rr_buffer->rb_max_requests) +			credits = rep->rr_buffer->rb_max_requests; +		atomic_set(&rep->rr_buffer->rb_credits, credits); +	} + +out_schedule: +	rpcrdma_schedule_tasklet(rep); +} + +static int +rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) +{ +	struct ib_wc *wcs; +	int budget, count, rc; + +	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; +	do { +		wcs = ep->rep_recv_wcs; + +		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); +		if (rc <= 0) +			return rc; + +		count = rc; +		while (count-- > 0) +			rpcrdma_recvcq_process_wc(wcs++); +	} while (rc == RPCRDMA_POLLSIZE && --budget);  	return 0;  }  /* - * rpcrdma_cq_event_upcall + * Handle receive completions.   * - * This upcall handles recv, send, bind and unbind events.   * It is reentrant but processes single events in order to maintain   * ordering of receives to keep server credits.   * @@ -232,26 +283,31 @@ rpcrdma_cq_poll(struct ib_cq *cq)   * connection shutdown. That is, the structures required for   * the completion of the reply handler must remain intact until   * all memory has been reclaimed. - * - * Note that send events are suppressed and do not result in an upcall.   */  static void -rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context) +rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)  { +	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;  	int rc; -	rc = rpcrdma_cq_poll(cq); -	if (rc) +	rc = rpcrdma_recvcq_poll(cq, ep); +	if (rc) { +		dprintk("RPC:       %s: ib_poll_cq failed: %i\n", +			__func__, rc);  		return; +	} -	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); -	if (rc) { -		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n", +	rc = ib_req_notify_cq(cq, +			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); +	if (rc == 0) +		return; +	if (rc < 0) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",  			__func__, rc);  		return;  	} -	rpcrdma_cq_poll(cq); +	rpcrdma_recvcq_poll(cq, ep);  }  #ifdef RPC_DEBUG @@ -378,7 +434,7 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,  	init_completion(&ia->ri_done); -	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP); +	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);  	if (IS_ERR(id)) {  		rc = PTR_ERR(id);  		dprintk("RPC:       %s: rdma_create_id() failed %i\n", @@ -483,54 +539,32 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;  	} -	switch (memreg) { -	case RPCRDMA_MEMWINDOWS: -	case RPCRDMA_MEMWINDOWS_ASYNC: -		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) { -			dprintk("RPC:       %s: MEMWINDOWS registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; -		} -		break; -	case RPCRDMA_MTHCAFMR: -		if (!ia->ri_id->device->alloc_fmr) { -#if RPCRDMA_PERSISTENT_REGISTRATION -			dprintk("RPC:       %s: MTHCAFMR registration " -				"specified but not supported by adapter, " -				"using riskier RPCRDMA_ALLPHYSICAL\n", -				__func__); -			memreg = RPCRDMA_ALLPHYSICAL; -#else -			dprintk("RPC:       %s: MTHCAFMR registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; -#endif -		} -		break; -	case RPCRDMA_FRMR: +	if (memreg == RPCRDMA_FRMR) {  		/* Requires both frmr reg and local dma lkey */  		if ((devattr.device_cap_flags &  		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=  		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) { -#if RPCRDMA_PERSISTENT_REGISTRATION  			dprintk("RPC:       %s: FRMR registration " -				"specified but not supported by adapter, " -				"using riskier RPCRDMA_ALLPHYSICAL\n", -				__func__); +				"not supported by HCA\n", __func__); +			memreg = RPCRDMA_MTHCAFMR; +		} else { +			/* Mind the ia limit on FRMR page list depth */ +			ia->ri_max_frmr_depth = min_t(unsigned int, +				RPCRDMA_MAX_DATA_SEGS, +				devattr.max_fast_reg_page_list_len); +		} +	} +	if (memreg == RPCRDMA_MTHCAFMR) { +		if (!ia->ri_id->device->alloc_fmr) { +			dprintk("RPC:       %s: MTHCAFMR registration " +				"not supported by HCA\n", __func__); +#if RPCRDMA_PERSISTENT_REGISTRATION  			memreg = RPCRDMA_ALLPHYSICAL;  #else -			dprintk("RPC:       %s: FRMR registration " -				"specified but not supported by adapter, " -				"using slower RPCRDMA_REGISTER\n", -				__func__); -			memreg = RPCRDMA_REGISTER; +			rc = -ENOMEM; +			goto out2;  #endif  		} -		break;  	}  	/* @@ -542,8 +576,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  	 * adapter.  	 */  	switch (memreg) { -	case RPCRDMA_BOUNCEBUFFERS: -	case RPCRDMA_REGISTER:  	case RPCRDMA_FRMR:  		break;  #if RPCRDMA_PERSISTENT_REGISTRATION @@ -553,30 +585,26 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)  				IB_ACCESS_REMOTE_READ;  		goto register_setup;  #endif -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		mem_priv = IB_ACCESS_LOCAL_WRITE | -				IB_ACCESS_MW_BIND; -		goto register_setup;  	case RPCRDMA_MTHCAFMR:  		if (ia->ri_have_dma_lkey)  			break;  		mem_priv = IB_ACCESS_LOCAL_WRITE; +#if RPCRDMA_PERSISTENT_REGISTRATION  	register_setup: +#endif  		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);  		if (IS_ERR(ia->ri_bind_mem)) {  			printk(KERN_ALERT "%s: ib_get_dma_mr for " -				"phys register failed with %lX\n\t" -				"Will continue with degraded performance\n", +				"phys register failed with %lX\n",  				__func__, PTR_ERR(ia->ri_bind_mem)); -			memreg = RPCRDMA_REGISTER; -			ia->ri_bind_mem = NULL; +			rc = -ENOMEM; +			goto out2;  		}  		break;  	default: -		printk(KERN_ERR "%s: invalid memory registration mode %d\n", -				__func__, memreg); -		rc = -EINVAL; +		printk(KERN_ERR "RPC: Unsupported memory " +				"registration mode: %d\n", memreg); +		rc = -ENOMEM;  		goto out2;  	}  	dprintk("RPC:       %s: memory registration strategy is %d\n", @@ -630,6 +658,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  				struct rpcrdma_create_data_internal *cdata)  {  	struct ib_device_attr devattr; +	struct ib_cq *sendcq, *recvcq;  	int rc, err;  	rc = ib_query_device(ia->ri_id->device, &devattr); @@ -649,32 +678,42 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	ep->rep_attr.srq = NULL;  	ep->rep_attr.cap.max_send_wr = cdata->max_requests;  	switch (ia->ri_memreg_strategy) { -	case RPCRDMA_FRMR: +	case RPCRDMA_FRMR: { +		int depth = 7; +  		/* Add room for frmr register and invalidate WRs.  		 * 1. FRMR reg WR for head  		 * 2. FRMR invalidate WR for head -		 * 3. FRMR reg WR for pagelist -		 * 4. FRMR invalidate WR for pagelist +		 * 3. N FRMR reg WRs for pagelist +		 * 4. N FRMR invalidate WRs for pagelist  		 * 5. FRMR reg WR for tail  		 * 6. FRMR invalidate WR for tail  		 * 7. The RDMA_SEND WR  		 */ -		ep->rep_attr.cap.max_send_wr *= 7; + +		/* Calculate N if the device max FRMR depth is smaller than +		 * RPCRDMA_MAX_DATA_SEGS. +		 */ +		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) { +			int delta = RPCRDMA_MAX_DATA_SEGS - +				    ia->ri_max_frmr_depth; + +			do { +				depth += 2; /* FRMR reg + invalidate */ +				delta -= ia->ri_max_frmr_depth; +			} while (delta > 0); + +		} +		ep->rep_attr.cap.max_send_wr *= depth;  		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) { -			cdata->max_requests = devattr.max_qp_wr / 7; +			cdata->max_requests = devattr.max_qp_wr / depth;  			if (!cdata->max_requests)  				return -EINVAL; -			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7; +			ep->rep_attr.cap.max_send_wr = cdata->max_requests * +						       depth;  		}  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		/* Add room for mw_binds+unbinds - overkill! */ -		ep->rep_attr.cap.max_send_wr++; -		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS); -		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) -			return -EINVAL; -		break; +	}  	default:  		break;  	} @@ -695,46 +734,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  		ep->rep_attr.cap.max_recv_sge);  	/* set trigger for requesting send completion */ -	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/; -	switch (ia->ri_memreg_strategy) { -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		ep->rep_cqinit -= RPCRDMA_MAX_SEGS; -		break; -	default: -		break; -	} +	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;  	if (ep->rep_cqinit <= 2)  		ep->rep_cqinit = 0;  	INIT_CQCOUNT(ep);  	ep->rep_ia = ia;  	init_waitqueue_head(&ep->rep_connect_wait); +	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); -	/* -	 * Create a single cq for receive dto and mw_bind (only ever -	 * care about unbind, really). Send completions are suppressed. -	 * Use single threaded tasklet upcalls to maintain ordering. -	 */ -	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall, -				  rpcrdma_cq_async_error_upcall, NULL, -				  ep->rep_attr.cap.max_recv_wr + +	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall, +				  rpcrdma_cq_async_error_upcall, ep,  				  ep->rep_attr.cap.max_send_wr + 1, 0); -	if (IS_ERR(ep->rep_cq)) { -		rc = PTR_ERR(ep->rep_cq); -		dprintk("RPC:       %s: ib_create_cq failed: %i\n", +	if (IS_ERR(sendcq)) { +		rc = PTR_ERR(sendcq); +		dprintk("RPC:       %s: failed to create send CQ: %i\n",  			__func__, rc);  		goto out1;  	} -	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP); +	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);  	if (rc) {  		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",  			__func__, rc);  		goto out2;  	} -	ep->rep_attr.send_cq = ep->rep_cq; -	ep->rep_attr.recv_cq = ep->rep_cq; +	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall, +				  rpcrdma_cq_async_error_upcall, ep, +				  ep->rep_attr.cap.max_recv_wr + 1, 0); +	if (IS_ERR(recvcq)) { +		rc = PTR_ERR(recvcq); +		dprintk("RPC:       %s: failed to create recv CQ: %i\n", +			__func__, rc); +		goto out2; +	} + +	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); +	if (rc) { +		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n", +			__func__, rc); +		ib_destroy_cq(recvcq); +		goto out2; +	} + +	ep->rep_attr.send_cq = sendcq; +	ep->rep_attr.recv_cq = recvcq;  	/* Initialize cma parameters */ @@ -744,9 +788,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	/* Client offers RDMA Read but does not initiate */  	ep->rep_remote_cma.initiator_depth = 0; -	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS) -		ep->rep_remote_cma.responder_resources = 0; -	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */ +	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */  		ep->rep_remote_cma.responder_resources = 32;  	else  		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom; @@ -758,7 +800,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,  	return 0;  out2: -	err = ib_destroy_cq(ep->rep_cq); +	err = ib_destroy_cq(sendcq);  	if (err)  		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",  			__func__, err); @@ -772,11 +814,8 @@ out1:   * Disconnect and destroy endpoint. After this, the only   * valid operations on the ep are to free it (if dynamically   * allocated) or re-create it. - * - * The caller's error handling must be sure to not leak the endpoint - * if this function fails.   */ -int +void  rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  {  	int rc; @@ -784,6 +823,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  	dprintk("RPC:       %s: entering, connected is %d\n",  		__func__, ep->rep_connected); +	cancel_delayed_work_sync(&ep->rep_connect_worker); +  	if (ia->ri_id->qp) {  		rc = rpcrdma_ep_disconnect(ep, ia);  		if (rc) @@ -799,13 +840,17 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  		ep->rep_pad_mr = NULL;  	} -	rpcrdma_clean_cq(ep->rep_cq); -	rc = ib_destroy_cq(ep->rep_cq); +	rpcrdma_clean_cq(ep->rep_attr.recv_cq); +	rc = ib_destroy_cq(ep->rep_attr.recv_cq);  	if (rc)  		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",  			__func__, rc); -	return rc; +	rpcrdma_clean_cq(ep->rep_attr.send_cq); +	rc = ib_destroy_cq(ep->rep_attr.send_cq); +	if (rc) +		dprintk("RPC:       %s: ib_destroy_cq returned %i\n", +			__func__, rc);  }  /* @@ -821,17 +866,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  	if (ep->rep_connected != 0) {  		struct rpcrdma_xprt *xprt;  retry: +		dprintk("RPC:       %s: reconnecting...\n", __func__);  		rc = rpcrdma_ep_disconnect(ep, ia);  		if (rc && rc != -ENOTCONN)  			dprintk("RPC:       %s: rpcrdma_ep_disconnect"  				" status %i\n", __func__, rc); -		rpcrdma_clean_cq(ep->rep_cq); + +		rpcrdma_clean_cq(ep->rep_attr.recv_cq); +		rpcrdma_clean_cq(ep->rep_attr.send_cq);  		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);  		id = rpcrdma_create_id(xprt, ia,  				(struct sockaddr *)&xprt->rx_data.addr);  		if (IS_ERR(id)) { -			rc = PTR_ERR(id); +			rc = -EHOSTUNREACH;  			goto out;  		}  		/* TEMP TEMP TEMP - fail if new device: @@ -845,35 +893,32 @@ retry:  			printk("RPC:       %s: can't reconnect on "  				"different device!\n", __func__);  			rdma_destroy_id(id); -			rc = -ENETDOWN; +			rc = -ENETUNREACH;  			goto out;  		}  		/* END TEMP */ +		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); +		if (rc) { +			dprintk("RPC:       %s: rdma_create_qp failed %i\n", +				__func__, rc); +			rdma_destroy_id(id); +			rc = -ENETUNREACH; +			goto out; +		}  		rdma_destroy_qp(ia->ri_id);  		rdma_destroy_id(ia->ri_id);  		ia->ri_id = id; +	} else { +		dprintk("RPC:       %s: connecting...\n", __func__); +		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); +		if (rc) { +			dprintk("RPC:       %s: rdma_create_qp failed %i\n", +				__func__, rc); +			/* do not update ep->rep_connected */ +			return -ENETUNREACH; +		}  	} -	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); -	if (rc) { -		dprintk("RPC:       %s: rdma_create_qp failed %i\n", -			__func__, rc); -		goto out; -	} - -/* XXX Tavor device performs badly with 2K MTU! */ -if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) { -	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device); -	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR && -	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX || -	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) { -		struct ib_qp_attr attr = { -			.path_mtu = IB_MTU_1024 -		}; -		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU); -	} -} -  	ep->rep_connected = 0;  	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); @@ -934,7 +979,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)  {  	int rc; -	rpcrdma_clean_cq(ep->rep_cq); +	rpcrdma_clean_cq(ep->rep_attr.recv_cq); +	rpcrdma_clean_cq(ep->rep_attr.send_cq);  	rc = rdma_disconnect(ia->ri_id);  	if (!rc) {  		/* returns without wait if not connected */ @@ -957,7 +1003,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)  {  	char *p; -	size_t len; +	size_t len, rlen, wlen;  	int i, rc;  	struct rpcrdma_mw *r; @@ -987,11 +1033,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *  				sizeof(struct rpcrdma_mw);  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS * -				sizeof(struct rpcrdma_mw); -		break;  	default:  		break;  	} @@ -1022,32 +1063,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	}  	p += cdata->padding; -	/* -	 * Allocate the fmr's, or mw's for mw_bind chunk registration. -	 * We "cycle" the mw's in order to minimize rkey reuse, -	 * and also reduce unbind-to-bind collision. -	 */  	INIT_LIST_HEAD(&buf->rb_mws);  	r = (struct rpcrdma_mw *)p;  	switch (ia->ri_memreg_strategy) {  	case RPCRDMA_FRMR:  		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {  			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd, -							 RPCRDMA_MAX_SEGS); +						ia->ri_max_frmr_depth);  			if (IS_ERR(r->r.frmr.fr_mr)) {  				rc = PTR_ERR(r->r.frmr.fr_mr);  				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"  					" failed %i\n", __func__, rc);  				goto out;  			} -			r->r.frmr.fr_pgl = -				ib_alloc_fast_reg_page_list(ia->ri_id->device, -							    RPCRDMA_MAX_SEGS); +			r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list( +						ia->ri_id->device, +						ia->ri_max_frmr_depth);  			if (IS_ERR(r->r.frmr.fr_pgl)) {  				rc = PTR_ERR(r->r.frmr.fr_pgl);  				dprintk("RPC:       %s: "  					"ib_alloc_fast_reg_page_list "  					"failed %i\n", __func__, rc); + +				ib_dereg_mr(r->r.frmr.fr_mr);  				goto out;  			}  			list_add(&r->mw_list, &buf->rb_mws); @@ -1072,21 +1110,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  			++r;  		}  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		/* Allocate one extra request's worth, for full cycling */ -		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) { -			r->r.mw = ib_alloc_mw(ia->ri_pd); -			if (IS_ERR(r->r.mw)) { -				rc = PTR_ERR(r->r.mw); -				dprintk("RPC:       %s: ib_alloc_mw" -					" failed %i\n", __func__, rc); -				goto out; -			} -			list_add(&r->mw_list, &buf->rb_mws); -			++r; -		} -		break;  	default:  		break;  	} @@ -1095,16 +1118,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  	 * Allocate/init the request/reply buffers. Doing this  	 * using kmalloc for now -- one for each buf.  	 */ +	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req)); +	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep)); +	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n", +		__func__, wlen, rlen); +  	for (i = 0; i < buf->rb_max_requests; i++) {  		struct rpcrdma_req *req;  		struct rpcrdma_rep *rep; -		len = cdata->inline_wsize + sizeof(struct rpcrdma_req); -		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */ -		/* Typical ~2400b, so rounding up saves work later */ -		if (len < 4096) -			len = 4096; -		req = kmalloc(len, GFP_KERNEL); +		req = kmalloc(wlen, GFP_KERNEL);  		if (req == NULL) {  			dprintk("RPC:       %s: request buffer %d alloc"  				" failed\n", __func__, i); @@ -1116,16 +1139,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		buf->rb_send_bufs[i]->rl_buffer = buf;  		rc = rpcrdma_register_internal(ia, req->rl_base, -				len - offsetof(struct rpcrdma_req, rl_base), +				wlen - offsetof(struct rpcrdma_req, rl_base),  				&buf->rb_send_bufs[i]->rl_handle,  				&buf->rb_send_bufs[i]->rl_iov);  		if (rc)  			goto out; -		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req); +		buf->rb_send_bufs[i]->rl_size = wlen - +						sizeof(struct rpcrdma_req); -		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep); -		rep = kmalloc(len, GFP_KERNEL); +		rep = kmalloc(rlen, GFP_KERNEL);  		if (rep == NULL) {  			dprintk("RPC:       %s: reply buffer %d alloc failed\n",  				__func__, i); @@ -1135,10 +1158,9 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,  		memset(rep, 0, sizeof(struct rpcrdma_rep));  		buf->rb_recv_bufs[i] = rep;  		buf->rb_recv_bufs[i]->rr_buffer = buf; -		init_waitqueue_head(&rep->rr_unbind);  		rc = rpcrdma_register_internal(ia, rep->rr_base, -				len - offsetof(struct rpcrdma_rep, rr_base), +				rlen - offsetof(struct rpcrdma_rep, rr_base),  				&buf->rb_recv_bufs[i]->rr_handle,  				&buf->rb_recv_bufs[i]->rr_iov);  		if (rc) @@ -1169,7 +1191,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  	/* clean up in reverse order from create  	 *   1.  recv mr memory (mr free, then kfree) -	 *   1a. bind mw memory  	 *   2.  send mr memory (mr free, then kfree)  	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]  	 *   4.  arrays @@ -1184,41 +1205,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  			kfree(buf->rb_recv_bufs[i]);  		}  		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) { -			while (!list_empty(&buf->rb_mws)) { -				r = list_entry(buf->rb_mws.next, -					struct rpcrdma_mw, mw_list); -				list_del(&r->mw_list); -				switch (ia->ri_memreg_strategy) { -				case RPCRDMA_FRMR: -					rc = ib_dereg_mr(r->r.frmr.fr_mr); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dereg_mr" -							" failed %i\n", -							__func__, rc); -					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); -					break; -				case RPCRDMA_MTHCAFMR: -					rc = ib_dealloc_fmr(r->r.fmr); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dealloc_fmr" -							" failed %i\n", -							__func__, rc); -					break; -				case RPCRDMA_MEMWINDOWS_ASYNC: -				case RPCRDMA_MEMWINDOWS: -					rc = ib_dealloc_mw(r->r.mw); -					if (rc) -						dprintk("RPC:       %s:" -							" ib_dealloc_mw" -							" failed %i\n", -							__func__, rc); -					break; -				default: -					break; -				} -			}  			rpcrdma_deregister_internal(ia,  					buf->rb_send_bufs[i]->rl_handle,  					&buf->rb_send_bufs[i]->rl_iov); @@ -1226,6 +1212,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)  		}  	} +	while (!list_empty(&buf->rb_mws)) { +		r = list_entry(buf->rb_mws.next, +			struct rpcrdma_mw, mw_list); +		list_del(&r->mw_list); +		switch (ia->ri_memreg_strategy) { +		case RPCRDMA_FRMR: +			rc = ib_dereg_mr(r->r.frmr.fr_mr); +			if (rc) +				dprintk("RPC:       %s:" +					" ib_dereg_mr" +					" failed %i\n", +					__func__, rc); +			ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); +			break; +		case RPCRDMA_MTHCAFMR: +			rc = ib_dealloc_fmr(r->r.fmr); +			if (rc) +				dprintk("RPC:       %s:" +					" ib_dealloc_fmr" +					" failed %i\n", +					__func__, rc); +			break; +		default: +			break; +		} +	} +  	kfree(buf->rb_pool);  } @@ -1289,21 +1302,17 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)  	int i;  	unsigned long flags; -	BUG_ON(req->rl_nchunks != 0);  	spin_lock_irqsave(&buffers->rb_lock, flags);  	buffers->rb_send_bufs[--buffers->rb_send_index] = req;  	req->rl_niovs = 0;  	if (req->rl_reply) {  		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply; -		init_waitqueue_head(&req->rl_reply->rr_unbind);  		req->rl_reply->rr_func = NULL;  		req->rl_reply = NULL;  	}  	switch (ia->ri_memreg_strategy) {  	case RPCRDMA_FRMR:  	case RPCRDMA_MTHCAFMR: -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS:  		/*  		 * Cycle mw's back in reverse order, and "spin" them.  		 * This delays and scrambles reuse as much as possible. @@ -1348,8 +1357,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)  /*   * Put reply buffers back into pool when not attached to - * request. This happens in error conditions, and when - * aborting unbinds. Pre-decrement counter/array index. + * request. This happens in error conditions.   */  void  rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) @@ -1450,6 +1458,12 @@ rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)  		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,  				seg->mr_offset,  				seg->mr_dmalen, seg->mr_dir); +	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) { +		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n", +			__func__, +			(unsigned long long)seg->mr_dma, +			seg->mr_offset, seg->mr_dmalen); +	}  }  static void @@ -1469,20 +1483,29 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,  			struct rpcrdma_xprt *r_xprt)  {  	struct rpcrdma_mr_seg *seg1 = seg; -	struct ib_send_wr frmr_wr, *bad_wr; +	struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr; +  	u8 key;  	int len, pageoff;  	int i, rc; +	int seg_len; +	u64 pa; +	int page_no;  	pageoff = offset_in_page(seg1->mr_offset);  	seg1->mr_offset -= pageoff;	/* start of page */  	seg1->mr_len += pageoff;  	len = -pageoff; -	if (*nsegs > RPCRDMA_MAX_DATA_SEGS) -		*nsegs = RPCRDMA_MAX_DATA_SEGS; -	for (i = 0; i < *nsegs;) { +	if (*nsegs > ia->ri_max_frmr_depth) +		*nsegs = ia->ri_max_frmr_depth; +	for (page_no = i = 0; i < *nsegs;) {  		rpcrdma_map_one(ia, seg, writing); -		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma; +		pa = seg->mr_dma; +		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) { +			seg1->mr_chunk.rl_mw->r.frmr.fr_pgl-> +				page_list[page_no++] = pa; +			pa += PAGE_SIZE; +		}  		len += seg->mr_len;  		++seg;  		++i; @@ -1494,26 +1517,50 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,  	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",  		__func__, seg1->mr_chunk.rl_mw, i); -	/* Bump the key */ -	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); -	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); +	if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) { +		dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n", +			__func__, +			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey); +		/* Invalidate before using. */ +		memset(&invalidate_wr, 0, sizeof invalidate_wr); +		invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; +		invalidate_wr.next = &frmr_wr; +		invalidate_wr.opcode = IB_WR_LOCAL_INV; +		invalidate_wr.send_flags = IB_SEND_SIGNALED; +		invalidate_wr.ex.invalidate_rkey = +			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; +		DECR_CQCOUNT(&r_xprt->rx_ep); +		post_wr = &invalidate_wr; +	} else +		post_wr = &frmr_wr;  	/* Prepare FRMR WR */  	memset(&frmr_wr, 0, sizeof frmr_wr); +	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;  	frmr_wr.opcode = IB_WR_FAST_REG_MR; -	frmr_wr.send_flags = 0;			/* unsignaled */ +	frmr_wr.send_flags = IB_SEND_SIGNALED;  	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;  	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl; -	frmr_wr.wr.fast_reg.page_list_len = i; +	frmr_wr.wr.fast_reg.page_list_len = page_no;  	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT; -	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT; +	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT; +	if (frmr_wr.wr.fast_reg.length < len) { +		while (seg1->mr_nsegs--) +			rpcrdma_unmap_one(ia, seg++); +		return -EIO; +	} + +	/* Bump the key */ +	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF); +	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key); +  	frmr_wr.wr.fast_reg.access_flags = (writing ?  				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :  				IB_ACCESS_REMOTE_READ);  	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;  	DECR_CQCOUNT(&r_xprt->rx_ep); -	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr); +	rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);  	if (rc) {  		dprintk("RPC:       %s: failed ib_post_send for register," @@ -1542,8 +1589,9 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,  		rpcrdma_unmap_one(ia, seg++);  	memset(&invalidate_wr, 0, sizeof invalidate_wr); +	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;  	invalidate_wr.opcode = IB_WR_LOCAL_INV; -	invalidate_wr.send_flags = 0;			/* unsignaled */ +	invalidate_wr.send_flags = IB_SEND_SIGNALED;  	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;  	DECR_CQCOUNT(&r_xprt->rx_ep); @@ -1616,135 +1664,6 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,  	return rc;  } -static int -rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg, -			int *nsegs, int writing, struct rpcrdma_ia *ia, -			struct rpcrdma_xprt *r_xprt) -{ -	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : -				  IB_ACCESS_REMOTE_READ); -	struct ib_mw_bind param; -	int rc; - -	*nsegs = 1; -	rpcrdma_map_one(ia, seg, writing); -	param.mr = ia->ri_bind_mem; -	param.wr_id = 0ULL;	/* no send cookie */ -	param.addr = seg->mr_dma; -	param.length = seg->mr_len; -	param.send_flags = 0; -	param.mw_access_flags = mem_priv; - -	DECR_CQCOUNT(&r_xprt->rx_ep); -	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); -	if (rc) { -		dprintk("RPC:       %s: failed ib_bind_mw " -			"%u@0x%llx status %i\n", -			__func__, seg->mr_len, -			(unsigned long long)seg->mr_dma, rc); -		rpcrdma_unmap_one(ia, seg); -	} else { -		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey; -		seg->mr_base = param.addr; -		seg->mr_nsegs = 1; -	} -	return rc; -} - -static int -rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg, -			struct rpcrdma_ia *ia, -			struct rpcrdma_xprt *r_xprt, void **r) -{ -	struct ib_mw_bind param; -	LIST_HEAD(l); -	int rc; - -	BUG_ON(seg->mr_nsegs != 1); -	param.mr = ia->ri_bind_mem; -	param.addr = 0ULL;	/* unbind */ -	param.length = 0; -	param.mw_access_flags = 0; -	if (*r) { -		param.wr_id = (u64) (unsigned long) *r; -		param.send_flags = IB_SEND_SIGNALED; -		INIT_CQCOUNT(&r_xprt->rx_ep); -	} else { -		param.wr_id = 0ULL; -		param.send_flags = 0; -		DECR_CQCOUNT(&r_xprt->rx_ep); -	} -	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, ¶m); -	rpcrdma_unmap_one(ia, seg); -	if (rc) -		dprintk("RPC:       %s: failed ib_(un)bind_mw," -			" status %i\n", __func__, rc); -	else -		*r = NULL;	/* will upcall on completion */ -	return rc; -} - -static int -rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg, -			int *nsegs, int writing, struct rpcrdma_ia *ia) -{ -	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE : -				  IB_ACCESS_REMOTE_READ); -	struct rpcrdma_mr_seg *seg1 = seg; -	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS]; -	int len, i, rc = 0; - -	if (*nsegs > RPCRDMA_MAX_DATA_SEGS) -		*nsegs = RPCRDMA_MAX_DATA_SEGS; -	for (len = 0, i = 0; i < *nsegs;) { -		rpcrdma_map_one(ia, seg, writing); -		ipb[i].addr = seg->mr_dma; -		ipb[i].size = seg->mr_len; -		len += seg->mr_len; -		++seg; -		++i; -		/* Check for holes */ -		if ((i < *nsegs && offset_in_page(seg->mr_offset)) || -		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len)) -			break; -	} -	seg1->mr_base = seg1->mr_dma; -	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd, -				ipb, i, mem_priv, &seg1->mr_base); -	if (IS_ERR(seg1->mr_chunk.rl_mr)) { -		rc = PTR_ERR(seg1->mr_chunk.rl_mr); -		dprintk("RPC:       %s: failed ib_reg_phys_mr " -			"%u@0x%llx (%d)... status %i\n", -			__func__, len, -			(unsigned long long)seg1->mr_dma, i, rc); -		while (i--) -			rpcrdma_unmap_one(ia, --seg); -	} else { -		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey; -		seg1->mr_nsegs = i; -		seg1->mr_len = len; -	} -	*nsegs = i; -	return rc; -} - -static int -rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg, -			struct rpcrdma_ia *ia) -{ -	struct rpcrdma_mr_seg *seg1 = seg; -	int rc; - -	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr); -	seg1->mr_chunk.rl_mr = NULL; -	while (seg1->mr_nsegs--) -		rpcrdma_unmap_one(ia, seg++); -	if (rc) -		dprintk("RPC:       %s: failed ib_dereg_mr," -			" status %i\n", __func__, rc); -	return rc; -} -  int  rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  			int nsegs, int writing, struct rpcrdma_xprt *r_xprt) @@ -1774,16 +1693,8 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);  		break; -	/* Registration using memory windows */ -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt); -		break; - -	/* Default registration each time */  	default: -		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia); -		break; +		return -1;  	}  	if (rc)  		return -1; @@ -1793,7 +1704,7 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,  int  rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg, -		struct rpcrdma_xprt *r_xprt, void *r) +		struct rpcrdma_xprt *r_xprt)  {  	struct rpcrdma_ia *ia = &r_xprt->rx_ia;  	int nsegs = seg->mr_nsegs, rc; @@ -1802,9 +1713,7 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,  #if RPCRDMA_PERSISTENT_REGISTRATION  	case RPCRDMA_ALLPHYSICAL: -		BUG_ON(nsegs != 1);  		rpcrdma_unmap_one(ia, seg); -		rc = 0;  		break;  #endif @@ -1816,21 +1725,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,  		rc = rpcrdma_deregister_fmr_external(seg, ia);  		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -	case RPCRDMA_MEMWINDOWS: -		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r); -		break; -  	default: -		rc = rpcrdma_deregister_default_external(seg, ia);  		break;  	} -	if (r) { -		struct rpcrdma_rep *rep = r; -		void (*func)(struct rpcrdma_rep *) = rep->rr_func; -		rep->rr_func = NULL; -		func(rep);	/* dereg done, callback now */ -	}  	return nsegs;  } @@ -1905,7 +1802,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,  	ib_dma_sync_single_for_cpu(ia->ri_id->device,  		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL); -	DECR_CQCOUNT(ep);  	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);  	if (rc) diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index c7a7eba991b..89e7cd47970 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -42,7 +42,8 @@  #include <linux/wait.h> 		/* wait_queue_head_t, etc */  #include <linux/spinlock.h> 		/* spinlock_t, etc */ -#include <asm/atomic.h>			/* atomic_t, etc */ +#include <linux/atomic.h>			/* atomic_t, etc */ +#include <linux/workqueue.h>		/* struct work_struct */  #include <rdma/rdma_cm.h>		/* RDMA connection api */  #include <rdma/ib_verbs.h>		/* RDMA verbs api */ @@ -66,18 +67,21 @@ struct rpcrdma_ia {  	struct completion	ri_done;  	int			ri_async_rc;  	enum rpcrdma_memreg	ri_memreg_strategy; +	unsigned int		ri_max_frmr_depth;  };  /*   * RDMA Endpoint -- one per transport instance   */ +#define RPCRDMA_WC_BUDGET	(128) +#define RPCRDMA_POLLSIZE	(16) +  struct rpcrdma_ep {  	atomic_t		rep_cqcount;  	int			rep_cqinit;  	int			rep_connected;  	struct rpcrdma_ia	*rep_ia; -	struct ib_cq		*rep_cq;  	struct ib_qp_init_attr	rep_attr;  	wait_queue_head_t 	rep_connect_wait;  	struct ib_sge		rep_pad;	/* holds zeroed pad */ @@ -86,6 +90,9 @@ struct rpcrdma_ep {  	struct rpc_xprt		*rep_xprt;	/* for rep_func */  	struct rdma_conn_param	rep_remote_cma;  	struct sockaddr_storage	rep_remote_addr; +	struct delayed_work	rep_connect_worker; +	struct ib_wc		rep_send_wcs[RPCRDMA_POLLSIZE]; +	struct ib_wc		rep_recv_wcs[RPCRDMA_POLLSIZE];  };  #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) @@ -109,7 +116,7 @@ struct rpcrdma_ep {   */  /* temporary static scatter/gather max */ -#define RPCRDMA_MAX_DATA_SEGS	(8)	/* max scatter/gather */ +#define RPCRDMA_MAX_DATA_SEGS	(64)	/* max scatter/gather */  #define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */  #define MAX_RPCRDMAHDR	(\  	/* max supported RPC/RDMA header */ \ @@ -124,7 +131,6 @@ struct rpcrdma_rep {  	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */  	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */  	struct list_head rr_list;	/* tasklet list */ -	wait_queue_head_t rr_unbind;	/* optional unbind wait */  	struct ib_sge	rr_iov;		/* for posting */  	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */  	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */ @@ -159,11 +165,11 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */  		struct ib_mr	*rl_mr;		/* if registered directly */  		struct rpcrdma_mw {		/* if registered from region */  			union { -				struct ib_mw	*mw;  				struct ib_fmr	*fmr;  				struct {  					struct ib_fast_reg_page_list *fr_pgl;  					struct ib_mr *fr_mr; +					enum { FRMR_IS_INVALID, FRMR_IS_VALID  } state;  				} frmr;  			} r;  			struct list_head mw_list; @@ -206,7 +212,6 @@ struct rpcrdma_req {  struct rpcrdma_buffer {  	spinlock_t	rb_lock;	/* protects indexes */  	atomic_t	rb_credits;	/* most recent server credits */ -	unsigned long	rb_cwndscale;	/* cached framework rpc_cwndscale */  	int		rb_max_requests;/* client max requests */  	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */  	int		rb_send_index; @@ -234,13 +239,13 @@ struct rpcrdma_create_data_internal {  };  #define RPCRDMA_INLINE_READ_THRESHOLD(rq) \ -	(rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_rsize) +	(rpcx_to_rdmad(rq->rq_xprt).inline_rsize)  #define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\ -	(rpcx_to_rdmad(rq->rq_task->tk_xprt).inline_wsize) +	(rpcx_to_rdmad(rq->rq_xprt).inline_wsize)  #define RPCRDMA_INLINE_PAD_VALUE(rq)\ -	rpcx_to_rdmad(rq->rq_task->tk_xprt).padding +	rpcx_to_rdmad(rq->rq_xprt).padding  /*   * Statistics for RPCRDMA @@ -299,7 +304,7 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);   */  int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,  				struct rpcrdma_create_data_internal *); -int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *); +void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);  int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);  int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); @@ -329,11 +334,12 @@ int rpcrdma_deregister_internal(struct rpcrdma_ia *,  int rpcrdma_register_external(struct rpcrdma_mr_seg *,  				int, int, struct rpcrdma_xprt *);  int rpcrdma_deregister_external(struct rpcrdma_mr_seg *, -				struct rpcrdma_xprt *, void *); +				struct rpcrdma_xprt *);  /*   * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c   */ +void rpcrdma_connect_worker(struct work_struct *);  void rpcrdma_conn_func(struct rpcrdma_ep *);  void rpcrdma_reply_handler(struct rpcrdma_rep *); @@ -342,4 +348,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);   */  int rpcrdma_marshal_req(struct rpc_rqst *); +/* Temporary NFS request map cache. Created in svc_rdma.c  */ +extern struct kmem_cache *svc_rdma_map_cachep; +/* WR context cache. Created in svc_rdma.c  */ +extern struct kmem_cache *svc_rdma_ctxt_cachep; +/* Workqueue created in svc_rdma.c */ +extern struct workqueue_struct *svc_rdma_wq; +  #endif				/* _LINUX_SUNRPC_XPRT_RDMA_H */  | 
