diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
| -rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 123 | 
1 files changed, 54 insertions, 69 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index e03725bfe2b..693966d3f33 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,7 @@ static const char transfertypes[][12] = {   * elements. Segments are then coalesced when registered, if possible   * within the selected memreg mode.   * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno.   */  static int @@ -102,10 +101,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  	page_base = xdrbuf->page_base & ~PAGE_MASK;  	p = 0;  	while (len && n < nsegs) { +		if (!ppages[p]) { +			/* alloc the pagelist for receiving buffer */ +			ppages[p] = alloc_page(GFP_ATOMIC); +			if (!ppages[p]) +				return -ENOMEM; +		}  		seg[n].mr_page = ppages[p];  		seg[n].mr_offset = (void *)(unsigned long) page_base;  		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); -		BUG_ON(seg[n].mr_len > PAGE_SIZE); +		if (seg[n].mr_len > PAGE_SIZE) +			return -EIO;  		len -= seg[n].mr_len;  		++n;  		++p; @@ -114,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  	/* Message overflows the seg array */  	if (len && n == nsegs) -		return 0; +		return -EIO;  	if (xdrbuf->tail[0].iov_len) {  		/* the rpcrdma protocol allows us to omit any trailing @@ -123,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,  			return n;  		if (n == nsegs)  			/* Tail remains, but we're out of segments */ -			return 0; +			return -EIO;  		seg[n].mr_page = NULL;  		seg[n].mr_offset = xdrbuf->tail[0].iov_base;  		seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,   *  Reply chunk (a counted array):   *   N elements:   *    1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno.   */ -static unsigned int +static ssize_t  rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)  {  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); -	int nsegs, nchunks = 0; +	int n, nsegs, nchunks = 0;  	unsigned int pos;  	struct rpcrdma_mr_seg *seg = req->rl_segments;  	struct rpcrdma_read_chunk *cur_rchunk = NULL; @@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  		pos = target->head[0].iov_len;  	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); -	if (nsegs == 0) -		return 0; +	if (nsegs < 0) +		return nsegs;  	do { -		/* bind/register the memory, then build chunk from result. */ -		int n = rpcrdma_register_external(seg, nsegs, +		n = rpcrdma_register_external(seg, nsegs,  						cur_wchunk != NULL, r_xprt);  		if (n <= 0)  			goto out; @@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  	/* success. all failures return above */  	req->rl_nchunks = nchunks; -	BUG_ON(nchunks == 0); -	BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) -	       && (nchunks > 3)); -  	/*  	 * finish off header. If write, marshal discrim and nchunks.  	 */ @@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,  out:  	for (pos = 0; nchunks--;)  		pos += rpcrdma_deregister_external( -				&req->rl_segments[pos], r_xprt, NULL); -	return 0; +				&req->rl_segments[pos], r_xprt); +	return n;  }  /* @@ -361,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)   *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.   *  [2] -- optional padding.   *  [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno.   */  int @@ -370,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);  	char *base; -	size_t hdrlen, rpclen, padlen; +	size_t rpclen, padlen; +	ssize_t hdrlen;  	enum rpcrdma_chunktype rtype, wtype;  	struct rpcrdma_msg *headerp; @@ -441,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  	/* The following simplification is not true forever */  	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)  		wtype = rpcrdma_noch; -	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - -	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && -	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { -		/* forced to "pure inline"? */ -		dprintk("RPC:       %s: too much data (%d/%d) for inline\n", -			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); -		return -1; +	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { +		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n", +			__func__); +		return -EIO;  	}  	hdrlen = 28; /*sizeof *headerp;*/ @@ -474,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;  			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;  			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ -			BUG_ON(wtype != rpcrdma_noch); - +			if (wtype != rpcrdma_noch) { +				dprintk("RPC:       %s: invalid chunk list\n", +					__func__); +				return -EIO; +			}  		} else {  			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;  			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; @@ -492,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  			 * on receive. Therefore, we request a reply chunk  			 * for non-writes wherever feasible and efficient.  			 */ -			if (wtype == rpcrdma_noch && -			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) +			if (wtype == rpcrdma_noch)  				wtype = rpcrdma_replych;  		}  	} @@ -511,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)  		hdrlen = rpcrdma_create_chunks(rqst,  					&rqst->rq_rcv_buf, headerp, wtype);  	} - -	if (hdrlen == 0) -		return -1; +	if (hdrlen < 0) +		return hdrlen;  	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"  		" headerp 0x%p base 0x%p lkey 0x%x\n", @@ -649,9 +652,7 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  				break;  			page_base = 0;  		} -		rqst->rq_rcv_buf.page_len = olen - copy_len; -	} else -		rqst->rq_rcv_buf.page_len = 0; +	}  	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {  		curlen = copy_len; @@ -682,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)  	rqst->rq_private_buf = rqst->rq_rcv_buf;  } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */  void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work)  { +	struct rpcrdma_ep *ep = +		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);  	struct rpc_xprt *xprt = ep->rep_xprt;  	spin_lock_bh(&xprt->transport_lock); @@ -707,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)  }  /* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest.   */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +void +rpcrdma_conn_func(struct rpcrdma_ep *ep)  { -	wake_up(&rep->rr_unbind); +	schedule_delayed_work(&ep->rep_connect_worker, 0);  }  /* @@ -730,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)  	struct rpc_xprt *xprt = rep->rr_xprt;  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);  	__be32 *iptr; -	int i, rdmalen, status; +	int rdmalen, status; +	unsigned long cwnd;  	/* Check status. If bad, signal disconnect and return rep to pool */  	if (rep->rr_len == ~0U) { @@ -785,6 +785,7 @@ repost:  	/* from here on, the reply is no longer an orphan */  	req->rl_reply = rep; +	xprt->reestablish_timeout = 0;  	/* check for expected message types */  	/* The order of some of these tests is important. */ @@ -859,26 +860,10 @@ badheader:  		break;  	} -	/* If using mw bind, start the deregister process now. */ -	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */ -	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { -	case RPCRDMA_MEMWINDOWS: -		for (i = 0; req->rl_nchunks-- > 1;) -			i += rpcrdma_deregister_external( -				&req->rl_segments[i], r_xprt, NULL); -		/* Optionally wait (not here) for unbinds to complete */ -		rep->rr_func = rpcrdma_unbind_func; -		(void) rpcrdma_deregister_external(&req->rl_segments[i], -						   r_xprt, rep); -		break; -	case RPCRDMA_MEMWINDOWS_ASYNC: -		for (i = 0; req->rl_nchunks--;) -			i += rpcrdma_deregister_external(&req->rl_segments[i], -							 r_xprt, NULL); -		break; -	default: -		break; -	} +	cwnd = xprt->cwnd; +	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; +	if (xprt->cwnd > cwnd) +		xprt_release_rqst_cong(rqst->rq_task);  	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",  			__func__, xprt, rqst, status);  | 
