diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
| -rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 222 |
1 files changed, 105 insertions, 117 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 2ac3f6e8adf..693966d3f33 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -78,8 +78,7 @@ static const char transfertypes[][12] = { * elements. Segments are then coalesced when registered, if possible * within the selected memreg mode. * - * Note, this routine is never called if the connection's memory - * registration strategy is 0 (bounce buffers). + * Returns positive number of segments converted, or a negative errno. */ static int @@ -87,6 +86,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) { int len, n = 0, p; + int page_base; + struct page **ppages; if (pos == 0 && xdrbuf->head[0].iov_len) { seg[n].mr_page = NULL; @@ -95,35 +96,40 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, ++n; } - if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) { - if (n == nsegs) - return 0; - seg[n].mr_page = xdrbuf->pages[0]; - seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base; - seg[n].mr_len = min_t(u32, - PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len); - len = xdrbuf->page_len - seg[n].mr_len; - ++n; - p = 1; - while (len > 0) { - if (n == nsegs) - return 0; - seg[n].mr_page = xdrbuf->pages[p]; - seg[n].mr_offset = NULL; - seg[n].mr_len = min_t(u32, PAGE_SIZE, len); - len -= seg[n].mr_len; - ++n; - ++p; + len = xdrbuf->page_len; + ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); + page_base = xdrbuf->page_base & ~PAGE_MASK; + p = 0; + while (len && n < nsegs) { + if (!ppages[p]) { + /* alloc the pagelist for receiving buffer */ + ppages[p] = alloc_page(GFP_ATOMIC); + if (!ppages[p]) + return -ENOMEM; } + seg[n].mr_page = ppages[p]; + seg[n].mr_offset = (void *)(unsigned long) page_base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); + if (seg[n].mr_len > PAGE_SIZE) + return -EIO; + len -= seg[n].mr_len; + ++n; + ++p; + page_base = 0; /* page offset only applies to first page */ } + /* Message overflows the seg array */ + if (len && n == nsegs) + return -EIO; + if (xdrbuf->tail[0].iov_len) { /* the rpcrdma protocol allows us to omit any trailing * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; if (n == nsegs) - return 0; + /* Tail remains, but we're out of segments */ + return -EIO; seg[n].mr_page = NULL; seg[n].mr_offset = xdrbuf->tail[0].iov_base; seg[n].mr_len = xdrbuf->tail[0].iov_len; @@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO + * + * Returns positive RPC/RDMA header size, or negative errno. */ -static unsigned int +static ssize_t rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt); - int nsegs, nchunks = 0; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); + int n, nsegs, nchunks = 0; unsigned int pos; struct rpcrdma_mr_seg *seg = req->rl_segments; struct rpcrdma_read_chunk *cur_rchunk = NULL; @@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, pos = target->head[0].iov_len; nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); - if (nsegs == 0) - return 0; + if (nsegs < 0) + return nsegs; do { - /* bind/register the memory, then build chunk from result. */ - int n = rpcrdma_register_external(seg, nsegs, + n = rpcrdma_register_external(seg, nsegs, cur_wchunk != NULL, r_xprt); if (n <= 0) goto out; @@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, /* success. all failures return above */ req->rl_nchunks = nchunks; - BUG_ON(nchunks == 0); - BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR) - && (nchunks > 3)); - /* * finish off header. If write, marshal discrim and nchunks. */ @@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, out: for (pos = 0; nchunks--;) pos += rpcrdma_deregister_external( - &req->rl_segments[pos], r_xprt, NULL); - return 0; + &req->rl_segments[pos], r_xprt); + return n; } /* @@ -296,6 +299,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) int copy_len; unsigned char *srcp, *destp; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); + int page_base; + struct page **ppages; destp = rqst->rq_svec[0].iov_base; curlen = rqst->rq_svec[0].iov_len; @@ -324,28 +329,25 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) __func__, destp + copy_len, curlen); rqst->rq_svec[0].iov_len += curlen; } - r_xprt->rx_stats.pullup_copy_count += copy_len; - npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT; + + page_base = rqst->rq_snd_buf.page_base; + ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); + page_base &= ~PAGE_MASK; + npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT; for (i = 0; copy_len && i < npages; i++) { - if (i == 0) - curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base; - else - curlen = PAGE_SIZE; + curlen = PAGE_SIZE - page_base; if (curlen > copy_len) curlen = copy_len; dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", __func__, i, destp, copy_len, curlen); - srcp = kmap_atomic(rqst->rq_snd_buf.pages[i], - KM_SKB_SUNRPC_DATA); - if (i == 0) - memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen); - else - memcpy(destp, srcp, curlen); - kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA); + srcp = kmap_atomic(ppages[i]); + memcpy(destp, srcp+page_base, curlen); + kunmap_atomic(srcp); rqst->rq_svec[0].iov_len += curlen; destp += curlen; copy_len -= curlen; + page_base = 0; } /* header now contains entire send message */ return pad; @@ -362,16 +364,19 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. * [2] -- optional padding. * [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno. */ int rpcrdma_marshal_req(struct rpc_rqst *rqst) { - struct rpc_xprt *xprt = rqst->rq_task->tk_xprt; + struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); char *base; - size_t hdrlen, rpclen, padlen; + size_t rpclen, padlen; + ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; @@ -442,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* The following simplification is not true forever */ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) wtype = rpcrdma_noch; - BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); - - if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && - (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { - /* forced to "pure inline"? */ - dprintk("RPC: %s: too much data (%d/%d) for inline\n", - __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); - return -1; + if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { + dprintk("RPC: %s: cannot marshal multiple chunk lists\n", + __func__); + return -EIO; } hdrlen = 28; /*sizeof *headerp;*/ @@ -475,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ - BUG_ON(wtype != rpcrdma_noch); - + if (wtype != rpcrdma_noch) { + dprintk("RPC: %s: invalid chunk list\n", + __func__); + return -EIO; + } } else { headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; @@ -493,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * on receive. Therefore, we request a reply chunk * for non-writes wherever feasible and efficient. */ - if (wtype == rpcrdma_noch && - r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) + if (wtype == rpcrdma_noch) wtype = rpcrdma_replych; } } @@ -512,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, headerp, wtype); } - - if (hdrlen == 0) - return -1; + if (hdrlen < 0) + return hdrlen; dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", @@ -606,6 +608,8 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) { int i, npages, curlen, olen; char *destp; + struct page **ppages; + int page_base; curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len) { /* write chunk header fixup */ @@ -624,36 +628,31 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) olen = copy_len; i = 0; rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; + page_base = rqst->rq_rcv_buf.page_base; + ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); + page_base &= ~PAGE_MASK; + if (copy_len && rqst->rq_rcv_buf.page_len) { - npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base + + npages = PAGE_ALIGN(page_base + rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; for (; i < npages; i++) { - if (i == 0) - curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base; - else - curlen = PAGE_SIZE; + curlen = PAGE_SIZE - page_base; if (curlen > copy_len) curlen = copy_len; dprintk("RPC: %s: page %d" " srcp 0x%p len %d curlen %d\n", __func__, i, srcp, copy_len, curlen); - destp = kmap_atomic(rqst->rq_rcv_buf.pages[i], - KM_SKB_SUNRPC_DATA); - if (i == 0) - memcpy(destp + rqst->rq_rcv_buf.page_base, - srcp, curlen); - else - memcpy(destp, srcp, curlen); - flush_dcache_page(rqst->rq_rcv_buf.pages[i]); - kunmap_atomic(destp, KM_SKB_SUNRPC_DATA); + destp = kmap_atomic(ppages[i]); + memcpy(destp + page_base, srcp, curlen); + flush_dcache_page(ppages[i]); + kunmap_atomic(destp); srcp += curlen; copy_len -= curlen; if (copy_len == 0) break; + page_base = 0; } - rqst->rq_rcv_buf.page_len = olen - copy_len; - } else - rqst->rq_rcv_buf.page_len = 0; + } if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { curlen = copy_len; @@ -684,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) rqst->rq_private_buf = rqst->rq_rcv_buf; } -/* - * This function is called when an async event is posted to - * the connection which changes the connection state. All it - * does at this point is mark the connection up/down, the rpc - * timers do the rest. - */ void -rpcrdma_conn_func(struct rpcrdma_ep *ep) +rpcrdma_connect_worker(struct work_struct *work) { + struct rpcrdma_ep *ep = + container_of(work, struct rpcrdma_ep, rep_connect_worker.work); struct rpc_xprt *xprt = ep->rep_xprt; spin_lock_bh(&xprt->transport_lock); @@ -709,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) } /* - * This function is called when memory window unbind which we are waiting - * for completes. Just use rr_func (zeroed by upcall) to signal completion. + * This function is called when an async event is posted to + * the connection which changes the connection state. All it + * does at this point is mark the connection up/down, the rpc + * timers do the rest. */ -static void -rpcrdma_unbind_func(struct rpcrdma_rep *rep) +void +rpcrdma_conn_func(struct rpcrdma_ep *ep) { - wake_up(&rep->rr_unbind); + schedule_delayed_work(&ep->rep_connect_worker, 0); } /* @@ -732,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpc_xprt *xprt = rep->rr_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); __be32 *iptr; - int i, rdmalen, status; + int rdmalen, status; + unsigned long cwnd; /* Check status. If bad, signal disconnect and return rep to pool */ if (rep->rr_len == ~0U) { @@ -773,15 +771,21 @@ repost: /* get request object */ req = rpcr_to_rdmar(rqst); + if (req->rl_reply) { + spin_unlock(&xprt->transport_lock); + dprintk("RPC: %s: duplicate reply 0x%p to RPC " + "request 0x%p: xid 0x%08x\n", __func__, rep, req, + headerp->rm_xid); + goto repost; + } dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", __func__, rep, req, rqst, headerp->rm_xid); - BUG_ON(!req || req->rl_reply); - /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; + xprt->reestablish_timeout = 0; /* check for expected message types */ /* The order of some of these tests is important. */ @@ -856,26 +860,10 @@ badheader: break; } - /* If using mw bind, start the deregister process now. */ - /* (Note: if mr_free(), cannot perform it here, in tasklet context) */ - if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { - case RPCRDMA_MEMWINDOWS: - for (i = 0; req->rl_nchunks-- > 1;) - i += rpcrdma_deregister_external( - &req->rl_segments[i], r_xprt, NULL); - /* Optionally wait (not here) for unbinds to complete */ - rep->rr_func = rpcrdma_unbind_func; - (void) rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, rep); - break; - case RPCRDMA_MEMWINDOWS_ASYNC: - for (i = 0; req->rl_nchunks--;) - i += rpcrdma_deregister_external(&req->rl_segments[i], - r_xprt, NULL); - break; - default: - break; - } + cwnd = xprt->cwnd; + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(rqst->rq_task); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); |
