diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
| -rw-r--r-- | net/sunrpc/xprt.c | 2242 |
1 files changed, 949 insertions, 1293 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 3c654e06b08..c3b2b3369e5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -10,12 +10,13 @@ * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into - * the request struct, and calls xprt_call(). - * - xprt_call transmits the message and installs the caller on the - * socket's wait list. At the same time, it installs a timer that - * is run after the packet's timeout has expired. + * the request struct, and calls xprt_transmit(). + * - xprt_transmit sends the message and installs the caller on the + * transport's wait list. At the same time, if a reply is expected, + * it installs a timer that is run after the packet's timeout has + * expired. * - When a packet arrives, the data_ready handler walks the list of - * pending requests for that socket. If a matching XID is found, the + * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the @@ -33,261 +34,344 @@ * * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * - * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> - * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> - * TCP NFS related read + write fixes - * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> - * - * Rewrite of larges part of the code in order to stabilize TCP stuff. - * Fix behaviour when socket buffer is full. - * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> + * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> */ +#include <linux/module.h> + #include <linux/types.h> -#include <linux/slab.h> -#include <linux/capability.h> -#include <linux/sched.h> -#include <linux/errno.h> -#include <linux/socket.h> -#include <linux/in.h> +#include <linux/interrupt.h> +#include <linux/workqueue.h> #include <linux/net.h> -#include <linux/mm.h> -#include <linux/udp.h> -#include <linux/tcp.h> +#include <linux/ktime.h> + #include <linux/sunrpc/clnt.h> -#include <linux/file.h> -#include <linux/workqueue.h> -#include <linux/random.h> +#include <linux/sunrpc/metrics.h> +#include <linux/sunrpc/bc_xprt.h> -#include <net/sock.h> -#include <net/checksum.h> -#include <net/udp.h> -#include <net/tcp.h> +#include "sunrpc.h" /* * Local variables */ #ifdef RPC_DEBUG -# undef RPC_DEBUG_DATA # define RPCDBG_FACILITY RPCDBG_XPRT #endif -#define XPRT_MAX_BACKOFF (8) -#define XPRT_IDLE_TIMEOUT (5*60*HZ) -#define XPRT_MAX_RESVPORT (800) - /* * Local functions */ +static void xprt_init(struct rpc_xprt *xprt, struct net *net); static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); -static inline void do_xprt_reserve(struct rpc_task *); -static void xprt_disconnect(struct rpc_xprt *); static void xprt_connect_status(struct rpc_task *task); -static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, - struct rpc_timeout *to); -static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); -static void xprt_bind_socket(struct rpc_xprt *, struct socket *); static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); +static void xprt_destroy(struct rpc_xprt *xprt); -static int xprt_clear_backlog(struct rpc_xprt *xprt); +static DEFINE_SPINLOCK(xprt_list_lock); +static LIST_HEAD(xprt_list); -#ifdef RPC_DEBUG_DATA -/* - * Print the buffer contents (first 128 bytes only--just enough for - * diropres return). +/** + * xprt_register_transport - register a transport implementation + * @transport: transport to register + * + * If a transport implementation is loaded as a kernel module, it can + * call this interface to make itself known to the RPC client. + * + * Returns: + * 0: transport successfully registered + * -EEXIST: transport already registered + * -EINVAL: transport module being unloaded */ -static void -xprt_pktdump(char *msg, u32 *packet, unsigned int count) +int xprt_register_transport(struct xprt_class *transport) +{ + struct xprt_class *t; + int result; + + result = -EEXIST; + spin_lock(&xprt_list_lock); + list_for_each_entry(t, &xprt_list, list) { + /* don't register the same transport class twice */ + if (t->ident == transport->ident) + goto out; + } + + list_add_tail(&transport->list, &xprt_list); + printk(KERN_INFO "RPC: Registered %s transport module.\n", + transport->name); + result = 0; + +out: + spin_unlock(&xprt_list_lock); + return result; +} +EXPORT_SYMBOL_GPL(xprt_register_transport); + +/** + * xprt_unregister_transport - unregister a transport implementation + * @transport: transport to unregister + * + * Returns: + * 0: transport successfully unregistered + * -ENOENT: transport never registered + */ +int xprt_unregister_transport(struct xprt_class *transport) { - u8 *buf = (u8 *) packet; - int j; - - dprintk("RPC: %s\n", msg); - for (j = 0; j < count && j < 128; j += 4) { - if (!(j & 31)) { - if (j) - dprintk("\n"); - dprintk("0x%04x ", j); + struct xprt_class *t; + int result; + + result = 0; + spin_lock(&xprt_list_lock); + list_for_each_entry(t, &xprt_list, list) { + if (t == transport) { + printk(KERN_INFO + "RPC: Unregistered %s transport module.\n", + transport->name); + list_del_init(&transport->list); + goto out; } - dprintk("%02x%02x%02x%02x ", - buf[j], buf[j+1], buf[j+2], buf[j+3]); } - dprintk("\n"); + result = -ENOENT; + +out: + spin_unlock(&xprt_list_lock); + return result; } -#else -static inline void -xprt_pktdump(char *msg, u32 *packet, unsigned int count) +EXPORT_SYMBOL_GPL(xprt_unregister_transport); + +/** + * xprt_load_transport - load a transport implementation + * @transport_name: transport to load + * + * Returns: + * 0: transport successfully loaded + * -ENOENT: transport module not available + */ +int xprt_load_transport(const char *transport_name) { - /* NOP */ + struct xprt_class *t; + int result; + + result = 0; + spin_lock(&xprt_list_lock); + list_for_each_entry(t, &xprt_list, list) { + if (strcmp(t->name, transport_name) == 0) { + spin_unlock(&xprt_list_lock); + goto out; + } + } + spin_unlock(&xprt_list_lock); + result = request_module("xprt%s", transport_name); +out: + return result; } -#endif +EXPORT_SYMBOL_GPL(xprt_load_transport); -/* - * Look up RPC transport given an INET socket +/** + * xprt_reserve_xprt - serialize write access to transports + * @task: task that is requesting access to the transport + * @xprt: pointer to the target transport + * + * This prevents mixing the payload of separate requests, and prevents + * transport connects from colliding with writes. No congestion control + * is provided. */ -static inline struct rpc_xprt * -xprt_from_sock(struct sock *sk) +int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + int priority; + + if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { + if (task == xprt->snd_task) + return 1; + goto out_sleep; + } + xprt->snd_task = task; + if (req != NULL) + req->rq_ntrans++; + + return 1; + +out_sleep: + dprintk("RPC: %5u failed to lock transport %p\n", + task->tk_pid, xprt); + task->tk_timeout = 0; + task->tk_status = -EAGAIN; + if (req == NULL) + priority = RPC_PRIORITY_LOW; + else if (!req->rq_ntrans) + priority = RPC_PRIORITY_NORMAL; + else + priority = RPC_PRIORITY_HIGH; + rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); + return 0; +} +EXPORT_SYMBOL_GPL(xprt_reserve_xprt); + +static void xprt_clear_locked(struct rpc_xprt *xprt) { - return (struct rpc_xprt *) sk->sk_user_data; + xprt->snd_task = NULL; + if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { + smp_mb__before_atomic(); + clear_bit(XPRT_LOCKED, &xprt->state); + smp_mb__after_atomic(); + } else + queue_work(rpciod_workqueue, &xprt->task_cleanup); } /* - * Serialize write access to sockets, in order to prevent different - * requests from interfering with each other. - * Also prevents TCP socket connects from colliding with writes. + * xprt_reserve_xprt_cong - serialize write access to transports + * @task: task that is requesting access to the transport + * + * Same as xprt_reserve_xprt, but Van Jacobson congestion control is + * integrated into the decision of whether a request is allowed to be + * woken up and given access to the transport. */ -static int -__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) +int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; + int priority; - if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { + if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; goto out_sleep; } - if (xprt->nocong || __xprt_get_cong(xprt, task)) { + if (req == NULL) { xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; - req->rq_ntrans++; - } return 1; } - smp_mb__before_clear_bit(); - clear_bit(XPRT_LOCKED, &xprt->sockstate); - smp_mb__after_clear_bit(); + if (__xprt_get_cong(xprt, task)) { + xprt->snd_task = task; + req->rq_ntrans++; + return 1; + } + xprt_clear_locked(xprt); out_sleep: - dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); + dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; - if (req && req->rq_ntrans) - rpc_sleep_on(&xprt->resend, task, NULL, NULL); + if (req == NULL) + priority = RPC_PRIORITY_LOW; + else if (!req->rq_ntrans) + priority = RPC_PRIORITY_NORMAL; else - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + priority = RPC_PRIORITY_HIGH; + rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); return 0; } +EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); -static inline int -xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) +static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) { int retval; - spin_lock_bh(&xprt->sock_lock); - retval = __xprt_lock_write(xprt, task); - spin_unlock_bh(&xprt->sock_lock); + spin_lock_bh(&xprt->transport_lock); + retval = xprt->ops->reserve_xprt(xprt, task); + spin_unlock_bh(&xprt->transport_lock); return retval; } +static bool __xprt_lock_write_func(struct rpc_task *task, void *data) +{ + struct rpc_xprt *xprt = data; + struct rpc_rqst *req; + + req = task->tk_rqstp; + xprt->snd_task = task; + if (req) + req->rq_ntrans++; + return true; +} -static void -__xprt_lock_write_next(struct rpc_xprt *xprt) +static void __xprt_lock_write_next(struct rpc_xprt *xprt) { - struct rpc_task *task; + if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) + return; - if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) + if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) return; - if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) - goto out_unlock; - task = rpc_wake_up_next(&xprt->resend); - if (!task) { - task = rpc_wake_up_next(&xprt->sending); - if (!task) - goto out_unlock; + xprt_clear_locked(xprt); +} + +static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) +{ + struct rpc_xprt *xprt = data; + struct rpc_rqst *req; + + req = task->tk_rqstp; + if (req == NULL) { + xprt->snd_task = task; + return true; } - if (xprt->nocong || __xprt_get_cong(xprt, task)) { - struct rpc_rqst *req = task->tk_rqstp; + if (__xprt_get_cong(xprt, task)) { xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; - req->rq_ntrans++; - } - return; + req->rq_ntrans++; + return true; } + return false; +} + +static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) +{ + if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) + return; + if (RPCXPRT_CONGESTED(xprt)) + goto out_unlock; + if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) + return; out_unlock: - smp_mb__before_clear_bit(); - clear_bit(XPRT_LOCKED, &xprt->sockstate); - smp_mb__after_clear_bit(); + xprt_clear_locked(xprt); } -/* - * Releases the socket for use by other requests. +/** + * xprt_release_xprt - allow other requests to use a transport + * @xprt: transport with other tasks potentially waiting + * @task: task that is releasing access to the transport + * + * Note that "task" can be NULL. No congestion control is provided. */ -static void -__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) +void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { if (xprt->snd_task == task) { - xprt->snd_task = NULL; - smp_mb__before_clear_bit(); - clear_bit(XPRT_LOCKED, &xprt->sockstate); - smp_mb__after_clear_bit(); + if (task != NULL) { + struct rpc_rqst *req = task->tk_rqstp; + if (req != NULL) + req->rq_bytes_sent = 0; + } + xprt_clear_locked(xprt); __xprt_lock_write_next(xprt); } } +EXPORT_SYMBOL_GPL(xprt_release_xprt); -static inline void -xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) -{ - spin_lock_bh(&xprt->sock_lock); - __xprt_release_write(xprt, task); - spin_unlock_bh(&xprt->sock_lock); -} - -/* - * Write data to socket. +/** + * xprt_release_xprt_cong - allow other requests to use a transport + * @xprt: transport with other tasks potentially waiting + * @task: task that is releasing access to the transport + * + * Note that "task" can be NULL. Another task is awoken to use the + * transport if the transport's congestion window allows it. */ -static inline int -xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) +void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { - struct socket *sock = xprt->sock; - struct xdr_buf *xdr = &req->rq_snd_buf; - struct sockaddr *addr = NULL; - int addrlen = 0; - unsigned int skip; - int result; - - if (!sock) - return -ENOTCONN; - - xprt_pktdump("packet data:", - req->rq_svec->iov_base, - req->rq_svec->iov_len); - - /* For UDP, we need to provide an address */ - if (!xprt->stream) { - addr = (struct sockaddr *) &xprt->addr; - addrlen = sizeof(xprt->addr); + if (xprt->snd_task == task) { + if (task != NULL) { + struct rpc_rqst *req = task->tk_rqstp; + if (req != NULL) + req->rq_bytes_sent = 0; + } + xprt_clear_locked(xprt); + __xprt_lock_write_next_cong(xprt); } - /* Dont repeat bytes */ - skip = req->rq_bytes_sent; - - clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); - result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); - - dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); - - if (result >= 0) - return result; +} +EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); - switch (result) { - case -ECONNREFUSED: - /* When the server has died, an ICMP port unreachable message - * prompts ECONNREFUSED. - */ - case -EAGAIN: - break; - case -ECONNRESET: - case -ENOTCONN: - case -EPIPE: - /* connection broken */ - if (xprt->stream) - result = -ENOTCONN; - break; - default: - printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); - } - return result; +static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) +{ + spin_lock_bh(&xprt->transport_lock); + xprt->ops->release_xprt(xprt, task); + spin_unlock_bh(&xprt->transport_lock); } /* @@ -301,7 +385,7 @@ __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) if (req->rq_cong) return 1; - dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", + dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", task->tk_pid, xprt->cong, xprt->cwnd); if (RPCXPRT_CONGESTED(xprt)) return 0; @@ -321,42 +405,153 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) return; req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; - __xprt_lock_write_next(xprt); + __xprt_lock_write_next_cong(xprt); } -/* - * Adjust RPC congestion window - * We use a time-smoothed congestion estimator to avoid heavy oscillation. +/** + * xprt_release_rqst_cong - housekeeping when request is complete + * @task: RPC request that recently completed + * + * Useful for transports that require congestion control. */ -static void -xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) +void xprt_release_rqst_cong(struct rpc_task *task) { - unsigned long cwnd; + struct rpc_rqst *req = task->tk_rqstp; + + __xprt_put_cong(req->rq_xprt, req); +} +EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); + +/** + * xprt_adjust_cwnd - adjust transport congestion window + * @xprt: pointer to xprt + * @task: recently completed RPC request used to adjust window + * @result: result code of completed RPC request + * + * The transport code maintains an estimate on the maximum number of out- + * standing RPC requests, using a smoothed version of the congestion + * avoidance implemented in 44BSD. This is basically the Van Jacobson + * congestion algorithm: If a retransmit occurs, the congestion window is + * halved; otherwise, it is incremented by 1/cwnd when + * + * - a reply is received and + * - a full number of requests are outstanding and + * - the congestion window hasn't been updated recently. + */ +void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) +{ + struct rpc_rqst *req = task->tk_rqstp; + unsigned long cwnd = xprt->cwnd; - cwnd = xprt->cwnd; if (result >= 0 && cwnd <= xprt->cong) { /* The (cwnd >> 1) term makes sure * the result gets rounded properly. */ cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; if (cwnd > RPC_MAXCWND(xprt)) cwnd = RPC_MAXCWND(xprt); - __xprt_lock_write_next(xprt); + __xprt_lock_write_next_cong(xprt); } else if (result == -ETIMEDOUT) { cwnd >>= 1; if (cwnd < RPC_CWNDSCALE) cwnd = RPC_CWNDSCALE; } - dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", + dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", xprt->cong, xprt->cwnd, cwnd); xprt->cwnd = cwnd; + __xprt_put_cong(xprt, req); } +EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); -/* - * Reset the major timeout value +/** + * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue + * @xprt: transport with waiting tasks + * @status: result code to plant in each task before waking it + * */ +void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) +{ + if (status < 0) + rpc_wake_up_status(&xprt->pending, status); + else + rpc_wake_up(&xprt->pending); +} +EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); + +/** + * xprt_wait_for_buffer_space - wait for transport output buffer to clear + * @task: task to be put to sleep + * @action: function pointer to be executed after wait + * + * Note that we only set the timer for the case of RPC_IS_SOFT(), since + * we don't in general want to force a socket disconnection due to + * an incomplete RPC call transmission. + */ +void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; + rpc_sleep_on(&xprt->pending, task, action); +} +EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); + +/** + * xprt_write_space - wake the task waiting for transport output buffer space + * @xprt: transport with waiting tasks + * + * Can be called in a soft IRQ context, so xprt_write_space never sleeps. + */ +void xprt_write_space(struct rpc_xprt *xprt) +{ + spin_lock_bh(&xprt->transport_lock); + if (xprt->snd_task) { + dprintk("RPC: write space: waking waiting task on " + "xprt %p\n", xprt); + rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); + } + spin_unlock_bh(&xprt->transport_lock); +} +EXPORT_SYMBOL_GPL(xprt_write_space); + +/** + * xprt_set_retrans_timeout_def - set a request's retransmit timeout + * @task: task whose timeout is to be set + * + * Set a request's retransmit timeout based on the transport's + * default timeout parameters. Used by transports that don't adjust + * the retransmit timeout based on round-trip time estimation. + */ +void xprt_set_retrans_timeout_def(struct rpc_task *task) +{ + task->tk_timeout = task->tk_rqstp->rq_timeout; +} +EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); + +/** + * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout + * @task: task whose timeout is to be set + * + * Set a request's retransmit timeout using the RTT estimator. + */ +void xprt_set_retrans_timeout_rtt(struct rpc_task *task) +{ + int timer = task->tk_msg.rpc_proc->p_timer; + struct rpc_clnt *clnt = task->tk_client; + struct rpc_rtt *rtt = clnt->cl_rtt; + struct rpc_rqst *req = task->tk_rqstp; + unsigned long max_timeout = clnt->cl_timeout->to_maxval; + + task->tk_timeout = rpc_calc_rto(rtt, timer); + task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; + if (task->tk_timeout > max_timeout || task->tk_timeout == 0) + task->tk_timeout = max_timeout; +} +EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); + static void xprt_reset_majortimeo(struct rpc_rqst *req) { - struct rpc_timeout *to = &req->rq_xprt->timeout; + const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; req->rq_majortimeo = req->rq_timeout; if (to->to_exponential) @@ -368,13 +563,15 @@ static void xprt_reset_majortimeo(struct rpc_rqst *req) req->rq_majortimeo += jiffies; } -/* - * Adjust timeout values etc for next retransmit +/** + * xprt_adjust_timeout - adjust timeout values for next retransmit + * @req: RPC request containing parameters to use for the adjustment + * */ int xprt_adjust_timeout(struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; - struct rpc_timeout *to = &xprt->timeout; + const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; int status = 0; if (time_before(jiffies, req->rq_majortimeo)) { @@ -385,16 +582,14 @@ int xprt_adjust_timeout(struct rpc_rqst *req) if (to->to_maxval && req->rq_timeout >= to->to_maxval) req->rq_timeout = to->to_maxval; req->rq_retries++; - pprintk("RPC: %lu retrans\n", jiffies); } else { req->rq_timeout = to->to_initval; req->rq_retries = 0; xprt_reset_majortimeo(req); /* Reset the RTT counters == "slow start" */ - spin_lock_bh(&xprt->sock_lock); + spin_lock_bh(&xprt->transport_lock); rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); - spin_unlock_bh(&xprt->sock_lock); - pprintk("RPC: %lu timeout\n", jiffies); + spin_unlock_bh(&xprt->transport_lock); status = -ETIMEDOUT; } @@ -405,1315 +600,776 @@ int xprt_adjust_timeout(struct rpc_rqst *req) return status; } -/* - * Close down a transport socket - */ -static void -xprt_close(struct rpc_xprt *xprt) +static void xprt_autoclose(struct work_struct *work) { - struct socket *sock = xprt->sock; - struct sock *sk = xprt->inet; - - if (!sk) - return; - - write_lock_bh(&sk->sk_callback_lock); - xprt->inet = NULL; - xprt->sock = NULL; - - sk->sk_user_data = NULL; - sk->sk_data_ready = xprt->old_data_ready; - sk->sk_state_change = xprt->old_state_change; - sk->sk_write_space = xprt->old_write_space; - write_unlock_bh(&sk->sk_callback_lock); - - sk->sk_no_check = 0; + struct rpc_xprt *xprt = + container_of(work, struct rpc_xprt, task_cleanup); - sock_release(sock); + xprt->ops->close(xprt); + clear_bit(XPRT_CLOSE_WAIT, &xprt->state); + xprt_release_write(xprt, NULL); } -static void -xprt_socket_autoclose(void *args) +/** + * xprt_disconnect_done - mark a transport as disconnected + * @xprt: transport to flag for disconnect + * + */ +void xprt_disconnect_done(struct rpc_xprt *xprt) { - struct rpc_xprt *xprt = (struct rpc_xprt *)args; - - xprt_disconnect(xprt); - xprt_close(xprt); - xprt_release_write(xprt, NULL); + dprintk("RPC: disconnected transport %p\n", xprt); + spin_lock_bh(&xprt->transport_lock); + xprt_clear_connected(xprt); + xprt_wake_pending_tasks(xprt, -EAGAIN); + spin_unlock_bh(&xprt->transport_lock); } +EXPORT_SYMBOL_GPL(xprt_disconnect_done); -/* - * Mark a transport as disconnected +/** + * xprt_force_disconnect - force a transport to disconnect + * @xprt: transport to disconnect + * */ -static void -xprt_disconnect(struct rpc_xprt *xprt) +void xprt_force_disconnect(struct rpc_xprt *xprt) { - dprintk("RPC: disconnected transport %p\n", xprt); - spin_lock_bh(&xprt->sock_lock); - xprt_clear_connected(xprt); - rpc_wake_up_status(&xprt->pending, -ENOTCONN); - spin_unlock_bh(&xprt->sock_lock); + /* Don't race with the test_bit() in xprt_clear_locked() */ + spin_lock_bh(&xprt->transport_lock); + set_bit(XPRT_CLOSE_WAIT, &xprt->state); + /* Try to schedule an autoclose RPC call */ + if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) + queue_work(rpciod_workqueue, &xprt->task_cleanup); + xprt_wake_pending_tasks(xprt, -EAGAIN); + spin_unlock_bh(&xprt->transport_lock); } -/* - * Used to allow disconnection when we've been idle +/** + * xprt_conditional_disconnect - force a transport to disconnect + * @xprt: transport to disconnect + * @cookie: 'connection cookie' + * + * This attempts to break the connection if and only if 'cookie' matches + * the current transport 'connection cookie'. It ensures that we don't + * try to break the connection more than once when we need to retransmit + * a batch of RPC requests. + * */ +void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) +{ + /* Don't race with the test_bit() in xprt_clear_locked() */ + spin_lock_bh(&xprt->transport_lock); + if (cookie != xprt->connect_cookie) + goto out; + if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) + goto out; + set_bit(XPRT_CLOSE_WAIT, &xprt->state); + /* Try to schedule an autoclose RPC call */ + if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) + queue_work(rpciod_workqueue, &xprt->task_cleanup); + xprt_wake_pending_tasks(xprt, -EAGAIN); +out: + spin_unlock_bh(&xprt->transport_lock); +} + static void xprt_init_autodisconnect(unsigned long data) { struct rpc_xprt *xprt = (struct rpc_xprt *)data; - spin_lock(&xprt->sock_lock); - if (!list_empty(&xprt->recv) || xprt->shutdown) + spin_lock(&xprt->transport_lock); + if (!list_empty(&xprt->recv)) goto out_abort; - if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) + if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; - spin_unlock(&xprt->sock_lock); - /* Let keventd close the socket */ - if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) - xprt_release_write(xprt, NULL); - else - schedule_work(&xprt->task_cleanup); + spin_unlock(&xprt->transport_lock); + set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); + queue_work(rpciod_workqueue, &xprt->task_cleanup); return; out_abort: - spin_unlock(&xprt->sock_lock); + spin_unlock(&xprt->transport_lock); } -static void xprt_socket_connect(void *args) -{ - struct rpc_xprt *xprt = (struct rpc_xprt *)args; - struct socket *sock = xprt->sock; - int status = -EIO; - - if (xprt->shutdown || xprt->addr.sin_port == 0) - goto out; - - /* - * Start by resetting any existing state - */ - xprt_close(xprt); - sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); - if (sock == NULL) { - /* couldn't create socket or bind to reserved port; - * this is likely a permanent error, so cause an abort */ - goto out; - } - xprt_bind_socket(xprt, sock); - xprt_sock_setbufsize(xprt); - - status = 0; - if (!xprt->stream) - goto out; - - /* - * Tell the socket layer to start connecting... - */ - status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, - sizeof(xprt->addr), O_NONBLOCK); - dprintk("RPC: %p connect status %d connected %d sock state %d\n", - xprt, -status, xprt_connected(xprt), sock->sk->sk_state); - if (status < 0) { - switch (status) { - case -EINPROGRESS: - case -EALREADY: - goto out_clear; - } - } -out: - if (status < 0) - rpc_wake_up_status(&xprt->pending, status); - else - rpc_wake_up(&xprt->pending); -out_clear: - smp_mb__before_clear_bit(); - clear_bit(XPRT_CONNECTING, &xprt->sockstate); - smp_mb__after_clear_bit(); -} - -/* - * Attempt to connect a TCP socket. +/** + * xprt_connect - schedule a transport connect operation + * @task: RPC task that is requesting the connect * */ void xprt_connect(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, + dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); - if (xprt->shutdown) { - task->tk_status = -EIO; - return; - } - if (!xprt->addr.sin_port) { - task->tk_status = -EIO; + if (!xprt_bound(xprt)) { + task->tk_status = -EAGAIN; return; } if (!xprt_lock_write(xprt, task)) return; - if (xprt_connected(xprt)) - goto out_write; - if (task->tk_rqstp) + if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) + xprt->ops->close(xprt); + + if (xprt_connected(xprt)) + xprt_release_write(xprt, task); + else { task->tk_rqstp->rq_bytes_sent = 0; + task->tk_timeout = task->tk_rqstp->rq_timeout; + rpc_sleep_on(&xprt->pending, task, xprt_connect_status); - task->tk_timeout = RPC_CONNECT_TIMEOUT; - rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); - if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { - /* Note: if we are here due to a dropped connection - * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ - * seconds - */ - if (xprt->sock != NULL) - schedule_delayed_work(&xprt->sock_connect, - RPC_REESTABLISH_TIMEOUT); - else { - schedule_work(&xprt->sock_connect); - if (!RPC_IS_ASYNC(task)) - flush_scheduled_work(); - } + if (test_bit(XPRT_CLOSING, &xprt->state)) + return; + if (xprt_test_and_set_connecting(xprt)) + return; + xprt->stat.connect_start = jiffies; + xprt->ops->connect(xprt, task); } - return; - out_write: - xprt_release_write(xprt, task); } -/* - * We arrive here when awoken from waiting on connection establishment. - */ -static void -xprt_connect_status(struct rpc_task *task) +static void xprt_connect_status(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - if (task->tk_status >= 0) { - dprintk("RPC: %4d xprt_connect_status: connection established\n", + if (task->tk_status == 0) { + xprt->stat.connect_count++; + xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; + dprintk("RPC: %5u xprt_connect_status: connection established\n", task->tk_pid); return; } - /* if soft mounted, just cause this RPC to fail */ - if (RPC_IS_SOFT(task)) - task->tk_status = -EIO; - switch (task->tk_status) { case -ECONNREFUSED: case -ECONNRESET: - case -ENOTCONN: - return; + case -ECONNABORTED: + case -ENETUNREACH: + case -EHOSTUNREACH: + case -EAGAIN: + dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); + break; case -ETIMEDOUT: - dprintk("RPC: %4d xprt_connect_status: timed out\n", - task->tk_pid); + dprintk("RPC: %5u xprt_connect_status: connect attempt timed " + "out\n", task->tk_pid); break; default: - printk(KERN_ERR "RPC: error %d connecting to server %s\n", - -task->tk_status, task->tk_client->cl_server); + dprintk("RPC: %5u xprt_connect_status: error %d connecting to " + "server %s\n", task->tk_pid, -task->tk_status, + xprt->servername); + xprt_release_write(xprt, task); + task->tk_status = -EIO; } - xprt_release_write(xprt, task); } -/* - * Look up the RPC request corresponding to a reply, and then lock it. +/** + * xprt_lookup_rqst - find an RPC request corresponding to an XID + * @xprt: transport on which the original request was transmitted + * @xid: RPC XID of incoming reply + * */ -static inline struct rpc_rqst * -xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) +struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { - struct list_head *pos; - struct rpc_rqst *req = NULL; - - list_for_each(pos, &xprt->recv) { - struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); - if (entry->rq_xid == xid) { - req = entry; - break; - } + struct rpc_rqst *entry; + + list_for_each_entry(entry, &xprt->recv, rq_list) + if (entry->rq_xid == xid) + return entry; + + dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", + ntohl(xid)); + xprt->stat.bad_xids++; + return NULL; +} +EXPORT_SYMBOL_GPL(xprt_lookup_rqst); + +static void xprt_update_rtt(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_rtt *rtt = task->tk_client->cl_rtt; + unsigned int timer = task->tk_msg.rpc_proc->p_timer; + long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); + + if (timer) { + if (req->rq_ntrans == 1) + rpc_update_rtt(rtt, timer, m); + rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); } - return req; } -/* - * Complete reply received. - * The TCP code relies on us to remove the request from xprt->pending. +/** + * xprt_complete_rqst - called when reply processing is complete + * @task: RPC request that recently completed + * @copied: actual number of bytes received from the transport + * + * Caller holds transport lock. */ -static void -xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) +void xprt_complete_rqst(struct rpc_task *task, int copied) { - struct rpc_task *task = req->rq_task; - struct rpc_clnt *clnt = task->tk_client; + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; - /* Adjust congestion window */ - if (!xprt->nocong) { - unsigned timer = task->tk_msg.rpc_proc->p_timer; - xprt_adjust_cwnd(xprt, copied); - __xprt_put_cong(xprt, req); - if (timer) { - if (req->rq_ntrans == 1) - rpc_update_rtt(clnt->cl_rtt, timer, - (long)jiffies - req->rq_xtime); - rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); - } - } + dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", + task->tk_pid, ntohl(req->rq_xid), copied); -#ifdef RPC_PROFILE - /* Profile only reads for now */ - if (copied > 1024) { - static unsigned long nextstat; - static unsigned long pkt_rtt, pkt_len, pkt_cnt; - - pkt_cnt++; - pkt_len += req->rq_slen + copied; - pkt_rtt += jiffies - req->rq_xtime; - if (time_before(nextstat, jiffies)) { - printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); - printk("RPC: %ld %ld %ld %ld stat\n", - jiffies, pkt_cnt, pkt_len, pkt_rtt); - pkt_rtt = pkt_len = pkt_cnt = 0; - nextstat = jiffies + 5 * HZ; - } - } -#endif + xprt->stat.recvs++; + req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); + if (xprt->ops->timer != NULL) + xprt_update_rtt(task); - dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); list_del_init(&req->rq_list); - req->rq_received = req->rq_private_buf.len = copied; - - /* ... and wake up the process. */ - rpc_wake_up_task(task); - return; + req->rq_private_buf.len = copied; + /* Ensure all writes are done before we update */ + /* req->rq_reply_bytes_recvd */ + smp_wmb(); + req->rq_reply_bytes_recvd = copied; + rpc_wake_up_queued_task(&xprt->pending, task); } +EXPORT_SYMBOL_GPL(xprt_complete_rqst); -static size_t -skb_read_bits(skb_reader_t *desc, void *to, size_t len) +static void xprt_timer(struct rpc_task *task) { - if (len > desc->count) - len = desc->count; - if (skb_copy_bits(desc->skb, desc->offset, to, len)) - return 0; - desc->count -= len; - desc->offset += len; - return len; -} + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; -static size_t -skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len) -{ - unsigned int csum2, pos; - - if (len > desc->count) - len = desc->count; - pos = desc->offset; - csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0); - desc->csum = csum_block_add(desc->csum, csum2, pos); - desc->count -= len; - desc->offset += len; - return len; + if (task->tk_status != -ETIMEDOUT) + return; + dprintk("RPC: %5u xprt_timer\n", task->tk_pid); + + spin_lock_bh(&xprt->transport_lock); + if (!req->rq_reply_bytes_recvd) { + if (xprt->ops->timer) + xprt->ops->timer(xprt, task); + } else + task->tk_status = 0; + spin_unlock_bh(&xprt->transport_lock); } -/* - * We have set things up such that we perform the checksum of the UDP - * packet in parallel with the copies into the RPC client iovec. -DaveM - */ -int -csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) +static inline int xprt_has_timer(struct rpc_xprt *xprt) { - skb_reader_t desc; - - desc.skb = skb; - desc.offset = sizeof(struct udphdr); - desc.count = skb->len - desc.offset; - - if (skb->ip_summed == CHECKSUM_UNNECESSARY) - goto no_checksum; - - desc.csum = csum_partial(skb->data, desc.offset, skb->csum); - if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0) - return -1; - if (desc.offset != skb->len) { - unsigned int csum2; - csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0); - desc.csum = csum_block_add(desc.csum, csum2, desc.offset); - } - if (desc.count) - return -1; - if ((unsigned short)csum_fold(desc.csum)) - return -1; - return 0; -no_checksum: - if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0) - return -1; - if (desc.count) - return -1; - return 0; + return xprt->idle_timeout != 0; } -/* - * Input handler for RPC replies. Called from a bottom half and hence - * atomic. +/** + * xprt_prepare_transmit - reserve the transport before sending a request + * @task: RPC task about to send a request + * */ -static void -udp_data_ready(struct sock *sk, int len) +bool xprt_prepare_transmit(struct rpc_task *task) { - struct rpc_task *task; - struct rpc_xprt *xprt; - struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; - u32 _xid, *xp; - - read_lock(&sk->sk_callback_lock); - dprintk("RPC: udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) { - printk("RPC: udp_data_ready request not found!\n"); - goto out; - } - - dprintk("RPC: udp_data_ready client %p\n", xprt); - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + bool ret = false; - if (xprt->shutdown) - goto dropit; + dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); - repsize = skb->len - sizeof(struct udphdr); - if (repsize < 4) { - printk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + spin_lock_bh(&xprt->transport_lock); + if (!req->rq_bytes_sent) { + if (req->rq_reply_bytes_recvd) { + task->tk_status = req->rq_reply_bytes_recvd; + goto out_unlock; + } + if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) + && xprt_connected(xprt) + && req->rq_connect_cookie == xprt->connect_cookie) { + xprt->ops->set_retrans_timeout(task); + rpc_sleep_on(&xprt->pending, task, xprt_timer); + goto out_unlock; + } } - - /* Copy the XID from the skb... */ - xp = skb_header_pointer(skb, sizeof(struct udphdr), - sizeof(_xid), &_xid); - if (xp == NULL) - goto dropit; - - /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->sock_lock); - rovr = xprt_lookup_rqst(xprt, *xp); - if (!rovr) - goto out_unlock; - task = rovr->rq_task; - - dprintk("RPC: %4d received reply\n", task->tk_pid); - - if ((copied = rovr->rq_private_buf.buflen) > repsize) - copied = repsize; - - /* Suck it into the iovec, verify checksum if not done by hw. */ - if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) + if (!xprt->ops->reserve_xprt(xprt, task)) { + task->tk_status = -EAGAIN; goto out_unlock; - - /* Something worked... */ - dst_confirm(skb->dst); - - xprt_complete_rqst(xprt, rovr, copied); - - out_unlock: - spin_unlock(&xprt->sock_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock(&sk->sk_callback_lock); + } + ret = true; +out_unlock: + spin_unlock_bh(&xprt->transport_lock); + return ret; } -/* - * Copy from an skb into memory and shrink the skb. - */ -static inline size_t -tcp_copy_data(skb_reader_t *desc, void *p, size_t len) +void xprt_end_transmit(struct rpc_task *task) { - if (len > desc->count) - len = desc->count; - if (skb_copy_bits(desc->skb, desc->offset, p, len)) { - dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", - len, desc->count); - return 0; - } - desc->offset += len; - desc->count -= len; - dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", - len, desc->count); - return len; + xprt_release_write(task->tk_rqstp->rq_xprt, task); } -/* - * TCP read fragment marker +/** + * xprt_transmit - send an RPC request on a transport + * @task: controlling RPC task + * + * We have to copy the iovec because sendmsg fiddles with its contents. */ -static inline void -tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) +void xprt_transmit(struct rpc_task *task) { - size_t len, used; - char *p; - - p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; - len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; - used = tcp_copy_data(desc, p, len); - xprt->tcp_offset += used; - if (used != len) - return; - xprt->tcp_reclen = ntohl(xprt->tcp_recm); - if (xprt->tcp_reclen & 0x80000000) - xprt->tcp_flags |= XPRT_LAST_FRAG; - else - xprt->tcp_flags &= ~XPRT_LAST_FRAG; - xprt->tcp_reclen &= 0x7fffffff; - xprt->tcp_flags &= ~XPRT_COPY_RECM; - xprt->tcp_offset = 0; - /* Sanity check of the record length */ - if (xprt->tcp_reclen < 4) { - printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); - xprt_disconnect(xprt); - } - dprintk("RPC: reading TCP record fragment of length %d\n", - xprt->tcp_reclen); -} + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + int status, numreqs; -static void -tcp_check_recm(struct rpc_xprt *xprt) -{ - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", - xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); - if (xprt->tcp_offset == xprt->tcp_reclen) { - xprt->tcp_flags |= XPRT_COPY_RECM; - xprt->tcp_offset = 0; - if (xprt->tcp_flags & XPRT_LAST_FRAG) { - xprt->tcp_flags &= ~XPRT_COPY_DATA; - xprt->tcp_flags |= XPRT_COPY_XID; - xprt->tcp_copied = 0; - } - } -} + dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); -/* - * TCP read xid - */ -static inline void -tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) -{ - size_t len, used; - char *p; - - len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; - dprintk("RPC: reading XID (%Zu bytes)\n", len); - p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; - used = tcp_copy_data(desc, p, len); - xprt->tcp_offset += used; - if (used != len) + if (!req->rq_reply_bytes_recvd) { + if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { + /* + * Add to the list only if we're expecting a reply + */ + spin_lock_bh(&xprt->transport_lock); + /* Update the softirq receive buffer */ + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, + sizeof(req->rq_private_buf)); + /* Add request to the receive list */ + list_add_tail(&req->rq_list, &xprt->recv); + spin_unlock_bh(&xprt->transport_lock); + xprt_reset_majortimeo(req); + /* Turn off autodisconnect */ + del_singleshot_timer_sync(&xprt->timer); + } + } else if (!req->rq_bytes_sent) return; - xprt->tcp_flags &= ~XPRT_COPY_XID; - xprt->tcp_flags |= XPRT_COPY_DATA; - xprt->tcp_copied = 4; - dprintk("RPC: reading reply for XID %08x\n", - ntohl(xprt->tcp_xid)); - tcp_check_recm(xprt); -} -/* - * TCP read and complete request - */ -static inline void -tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) -{ - struct rpc_rqst *req; - struct xdr_buf *rcvbuf; - size_t len; - ssize_t r; - - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->sock_lock); - req = xprt_lookup_rqst(xprt, xprt->tcp_xid); - if (!req) { - xprt->tcp_flags &= ~XPRT_COPY_DATA; - dprintk("RPC: XID %08x request not found!\n", - ntohl(xprt->tcp_xid)); - spin_unlock(&xprt->sock_lock); + req->rq_xtime = ktime_get(); + status = xprt->ops->send_request(task); + if (status != 0) { + task->tk_status = status; return; } - rcvbuf = &req->rq_private_buf; - len = desc->count; - if (len > xprt->tcp_reclen - xprt->tcp_offset) { - skb_reader_t my_desc; - - len = xprt->tcp_reclen - xprt->tcp_offset; - memcpy(&my_desc, desc, sizeof(my_desc)); - my_desc.count = len; - r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, - &my_desc, tcp_copy_data); - desc->count -= r; - desc->offset += r; - } else - r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, - desc, tcp_copy_data); - - if (r > 0) { - xprt->tcp_copied += r; - xprt->tcp_offset += r; - } - if (r != len) { - /* Error when copying to the receive buffer, - * usually because we weren't able to allocate - * additional buffer pages. All we can do now - * is turn off XPRT_COPY_DATA, so the request - * will not receive any additional updates, - * and time out. - * Any remaining data from this record will - * be discarded. - */ - xprt->tcp_flags &= ~XPRT_COPY_DATA; - dprintk("RPC: XID %08x truncated request\n", - ntohl(xprt->tcp_xid)); - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", - xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); - goto out; - } + dprintk("RPC: %5u xmit complete\n", task->tk_pid); + task->tk_flags |= RPC_TASK_SENT; + spin_lock_bh(&xprt->transport_lock); - dprintk("RPC: XID %08x read %Zd bytes\n", - ntohl(xprt->tcp_xid), r); - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", - xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); + xprt->ops->set_retrans_timeout(task); - if (xprt->tcp_copied == req->rq_private_buf.buflen) - xprt->tcp_flags &= ~XPRT_COPY_DATA; - else if (xprt->tcp_offset == xprt->tcp_reclen) { - if (xprt->tcp_flags & XPRT_LAST_FRAG) - xprt->tcp_flags &= ~XPRT_COPY_DATA; - } + numreqs = atomic_read(&xprt->num_reqs); + if (numreqs > xprt->stat.max_slots) + xprt->stat.max_slots = numreqs; + xprt->stat.sends++; + xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; + xprt->stat.bklog_u += xprt->backlog.qlen; + xprt->stat.sending_u += xprt->sending.qlen; + xprt->stat.pending_u += xprt->pending.qlen; -out: - if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { - dprintk("RPC: %4d received reply complete\n", - req->rq_task->tk_pid); - xprt_complete_rqst(xprt, req, xprt->tcp_copied); + /* Don't race with disconnect */ + if (!xprt_connected(xprt)) + task->tk_status = -ENOTCONN; + else { + /* + * Sleep on the pending queue since + * we're expecting a reply. + */ + if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) + rpc_sleep_on(&xprt->pending, task, xprt_timer); + req->rq_connect_cookie = xprt->connect_cookie; } - spin_unlock(&xprt->sock_lock); - tcp_check_recm(xprt); + spin_unlock_bh(&xprt->transport_lock); } -/* - * TCP discard extra bytes from a short read - */ -static inline void -tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) +static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) { - size_t len; - - len = xprt->tcp_reclen - xprt->tcp_offset; - if (len > desc->count) - len = desc->count; - desc->count -= len; - desc->offset += len; - xprt->tcp_offset += len; - dprintk("RPC: discarded %Zu bytes\n", len); - tcp_check_recm(xprt); + set_bit(XPRT_CONGESTED, &xprt->state); + rpc_sleep_on(&xprt->backlog, task, NULL); } -/* - * TCP record receive routine - * We first have to grab the record marker, then the XID, then the data. - */ -static int -tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, - unsigned int offset, size_t len) +static void xprt_wake_up_backlog(struct rpc_xprt *xprt) { - struct rpc_xprt *xprt = rd_desc->arg.data; - skb_reader_t desc = { - .skb = skb, - .offset = offset, - .count = len, - .csum = 0 - }; - - dprintk("RPC: tcp_data_recv\n"); - do { - /* Read in a new fragment marker if necessary */ - /* Can we ever really expect to get completely empty fragments? */ - if (xprt->tcp_flags & XPRT_COPY_RECM) { - tcp_read_fraghdr(xprt, &desc); - continue; - } - /* Read in the xid if necessary */ - if (xprt->tcp_flags & XPRT_COPY_XID) { - tcp_read_xid(xprt, &desc); - continue; - } - /* Read in the request data */ - if (xprt->tcp_flags & XPRT_COPY_DATA) { - tcp_read_request(xprt, &desc); - continue; - } - /* Skip over any trailing bytes on short reads */ - tcp_read_discard(xprt, &desc); - } while (desc.count); - dprintk("RPC: tcp_data_recv done\n"); - return len - desc.count; + if (rpc_wake_up_next(&xprt->backlog) == NULL) + clear_bit(XPRT_CONGESTED, &xprt->state); } -static void tcp_data_ready(struct sock *sk, int bytes) +static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_xprt *xprt; - read_descriptor_t rd_desc; + bool ret = false; - read_lock(&sk->sk_callback_lock); - dprintk("RPC: tcp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) { - printk("RPC: tcp_data_ready socket info not found!\n"); + if (!test_bit(XPRT_CONGESTED, &xprt->state)) goto out; + spin_lock(&xprt->reserve_lock); + if (test_bit(XPRT_CONGESTED, &xprt->state)) { + rpc_sleep_on(&xprt->backlog, task, NULL); + ret = true; } - if (xprt->shutdown) - goto out; - - /* We use rd_desc to pass struct xprt to tcp_data_recv */ - rd_desc.arg.data = xprt; - rd_desc.count = 65536; - tcp_read_sock(sk, &rd_desc, tcp_data_recv); + spin_unlock(&xprt->reserve_lock); out: - read_unlock(&sk->sk_callback_lock); + return ret; } -static void -tcp_state_change(struct sock *sk) +static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) { - struct rpc_xprt *xprt; + struct rpc_rqst *req = ERR_PTR(-EAGAIN); - read_lock(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) + if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) goto out; - dprintk("RPC: tcp_state_change client %p...\n", xprt); - dprintk("RPC: state %x conn %d dead %d zapped %d\n", - sk->sk_state, xprt_connected(xprt), - sock_flag(sk, SOCK_DEAD), - sock_flag(sk, SOCK_ZAPPED)); - - switch (sk->sk_state) { - case TCP_ESTABLISHED: - spin_lock_bh(&xprt->sock_lock); - if (!xprt_test_and_set_connected(xprt)) { - /* Reset TCP record info */ - xprt->tcp_offset = 0; - xprt->tcp_reclen = 0; - xprt->tcp_copied = 0; - xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; - rpc_wake_up(&xprt->pending); - } - spin_unlock_bh(&xprt->sock_lock); - break; - case TCP_SYN_SENT: - case TCP_SYN_RECV: - break; - default: - xprt_disconnect(xprt); - break; - } - out: - read_unlock(&sk->sk_callback_lock); -} - -/* - * Called when more output buffer space is available for this socket. - * We try not to wake our writers until they can make "significant" - * progress, otherwise we'll waste resources thrashing sock_sendmsg - * with a bunch of small requests. - */ -static void -xprt_write_space(struct sock *sk) -{ - struct rpc_xprt *xprt; - struct socket *sock; - - read_lock(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) - goto out; - if (xprt->shutdown) + req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); + if (req != NULL) goto out; - - /* Wait until we have enough socket memory */ - if (xprt->stream) { - /* from net/core/stream.c:sk_stream_write_space */ - if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) - goto out; - } else { - /* from net/core/sock.c:sock_def_write_space */ - if (!sock_writeable(sk)) - goto out; - } - - if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) - goto out; - - spin_lock_bh(&xprt->sock_lock); - if (xprt->snd_task) - rpc_wake_up_task(xprt->snd_task); - spin_unlock_bh(&xprt->sock_lock); + atomic_dec(&xprt->num_reqs); + req = ERR_PTR(-ENOMEM); out: - read_unlock(&sk->sk_callback_lock); + return req; } -/* - * RPC receive timeout handler. - */ -static void -xprt_timer(struct rpc_task *task) +static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - - spin_lock(&xprt->sock_lock); - if (req->rq_received) - goto out; - - xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); - __xprt_put_cong(xprt, req); - - dprintk("RPC: %4d xprt_timer (%s request)\n", - task->tk_pid, req ? "pending" : "backlogged"); - - task->tk_status = -ETIMEDOUT; -out: - task->tk_timeout = 0; - rpc_wake_up_task(task); - spin_unlock(&xprt->sock_lock); + if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { + kfree(req); + return true; + } + return false; } -/* - * Place the actual RPC call. - * We have to copy the iovec because sendmsg fiddles with its contents. - */ -int -xprt_prepare_transmit(struct rpc_task *task) +void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - int err = 0; - - dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); - - if (xprt->shutdown) - return -EIO; + struct rpc_rqst *req; - spin_lock_bh(&xprt->sock_lock); - if (req->rq_received && !req->rq_bytes_sent) { - err = req->rq_received; - goto out_unlock; + spin_lock(&xprt->reserve_lock); + if (!list_empty(&xprt->free)) { + req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); + list_del(&req->rq_list); + goto out_init_req; } - if (!__xprt_lock_write(xprt, task)) { - err = -EAGAIN; - goto out_unlock; + req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN); + if (!IS_ERR(req)) + goto out_init_req; + switch (PTR_ERR(req)) { + case -ENOMEM: + dprintk("RPC: dynamic allocation of request slot " + "failed! Retrying\n"); + task->tk_status = -ENOMEM; + break; + case -EAGAIN: + xprt_add_backlog(xprt, task); + dprintk("RPC: waiting for request slot\n"); + default: + task->tk_status = -EAGAIN; } + spin_unlock(&xprt->reserve_lock); + return; +out_init_req: + task->tk_status = 0; + task->tk_rqstp = req; + xprt_request_init(task, xprt); + spin_unlock(&xprt->reserve_lock); +} +EXPORT_SYMBOL_GPL(xprt_alloc_slot); - if (!xprt_connected(xprt)) { - err = -ENOTCONN; - goto out_unlock; +void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) +{ + /* Note: grabbing the xprt_lock_write() ensures that we throttle + * new slot allocation if the transport is congested (i.e. when + * reconnecting a stream transport or when out of socket write + * buffer space). + */ + if (xprt_lock_write(xprt, task)) { + xprt_alloc_slot(xprt, task); + xprt_release_write(xprt, task); } -out_unlock: - spin_unlock_bh(&xprt->sock_lock); - return err; } +EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); -void -xprt_transmit(struct rpc_task *task) +static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) { - struct rpc_clnt *clnt = task->tk_client; - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; - int status, retry = 0; - - - dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - - /* set up everything as needed. */ - /* Write the record marker */ - if (xprt->stream) { - u32 *marker = req->rq_svec[0].iov_base; - - *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); + spin_lock(&xprt->reserve_lock); + if (!xprt_dynamic_free_slot(xprt, req)) { + memset(req, 0, sizeof(*req)); /* mark unused */ + list_add(&req->rq_list, &xprt->free); } + xprt_wake_up_backlog(xprt); + spin_unlock(&xprt->reserve_lock); +} - smp_rmb(); - if (!req->rq_received) { - if (list_empty(&req->rq_list)) { - spin_lock_bh(&xprt->sock_lock); - /* Update the softirq receive buffer */ - memcpy(&req->rq_private_buf, &req->rq_rcv_buf, - sizeof(req->rq_private_buf)); - /* Add request to the receive list */ - list_add_tail(&req->rq_list, &xprt->recv); - spin_unlock_bh(&xprt->sock_lock); - xprt_reset_majortimeo(req); - /* Turn off autodisconnect */ - del_singleshot_timer_sync(&xprt->timer); - } - } else if (!req->rq_bytes_sent) - return; - - /* Continue transmitting the packet/record. We must be careful - * to cope with writespace callbacks arriving _after_ we have - * called xprt_sendmsg(). - */ - while (1) { - req->rq_xtime = jiffies; - status = xprt_sendmsg(xprt, req); - - if (status < 0) - break; +static void xprt_free_all_slots(struct rpc_xprt *xprt) +{ + struct rpc_rqst *req; + while (!list_empty(&xprt->free)) { + req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); + list_del(&req->rq_list); + kfree(req); + } +} - if (xprt->stream) { - req->rq_bytes_sent += status; +struct rpc_xprt *xprt_alloc(struct net *net, size_t size, + unsigned int num_prealloc, + unsigned int max_alloc) +{ + struct rpc_xprt *xprt; + struct rpc_rqst *req; + int i; - /* If we've sent the entire packet, immediately - * reset the count of bytes sent. */ - if (req->rq_bytes_sent >= req->rq_slen) { - req->rq_bytes_sent = 0; - goto out_receive; - } - } else { - if (status >= req->rq_slen) - goto out_receive; - status = -EAGAIN; - break; - } + xprt = kzalloc(size, GFP_KERNEL); + if (xprt == NULL) + goto out; - dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", - task->tk_pid, req->rq_slen - req->rq_bytes_sent, - req->rq_slen); + xprt_init(xprt, net); - status = -EAGAIN; - if (retry++ > 50) - break; + for (i = 0; i < num_prealloc; i++) { + req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); + if (!req) + goto out_free; + list_add(&req->rq_list, &xprt->free); } + if (max_alloc > num_prealloc) + xprt->max_reqs = max_alloc; + else + xprt->max_reqs = num_prealloc; + xprt->min_reqs = num_prealloc; + atomic_set(&xprt->num_reqs, num_prealloc); - /* Note: at this point, task->tk_sleeping has not yet been set, - * hence there is no danger of the waking up task being put on - * schedq, and being picked up by a parallel run of rpciod(). - */ - task->tk_status = status; + return xprt; - switch (status) { - case -EAGAIN: - if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { - /* Protect against races with xprt_write_space */ - spin_lock_bh(&xprt->sock_lock); - /* Don't race with disconnect */ - if (!xprt_connected(xprt)) - task->tk_status = -ENOTCONN; - else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { - task->tk_timeout = req->rq_timeout; - rpc_sleep_on(&xprt->pending, task, NULL, NULL); - } - spin_unlock_bh(&xprt->sock_lock); - return; - } - /* Keep holding the socket if it is blocked */ - rpc_delay(task, HZ>>4); - return; - case -ECONNREFUSED: - task->tk_timeout = RPC_REESTABLISH_TIMEOUT; - rpc_sleep_on(&xprt->sending, task, NULL, NULL); - case -ENOTCONN: - return; - default: - if (xprt->stream) - xprt_disconnect(xprt); - } - xprt_release_write(xprt, task); - return; - out_receive: - dprintk("RPC: %4d xmit complete\n", task->tk_pid); - /* Set the task's receive timeout value */ - spin_lock_bh(&xprt->sock_lock); - if (!xprt->nocong) { - int timer = task->tk_msg.rpc_proc->p_timer; - task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); - task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; - if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) - task->tk_timeout = xprt->timeout.to_maxval; - } else - task->tk_timeout = req->rq_timeout; - /* Don't race with disconnect */ - if (!xprt_connected(xprt)) - task->tk_status = -ENOTCONN; - else if (!req->rq_received) - rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); - __xprt_release_write(xprt, task); - spin_unlock_bh(&xprt->sock_lock); +out_free: + xprt_free(xprt); +out: + return NULL; } +EXPORT_SYMBOL_GPL(xprt_alloc); -/* - * Reserve an RPC call slot. +void xprt_free(struct rpc_xprt *xprt) +{ + put_net(xprt->xprt_net); + xprt_free_all_slots(xprt); + kfree(xprt); +} +EXPORT_SYMBOL_GPL(xprt_free); + +/** + * xprt_reserve - allocate an RPC request slot + * @task: RPC task requesting a slot allocation + * + * If the transport is marked as being congested, or if no more + * slots are available, place the task on the transport's + * backlog queue. */ -static inline void -do_xprt_reserve(struct rpc_task *task) +void xprt_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; task->tk_status = 0; - if (task->tk_rqstp) + if (task->tk_rqstp != NULL) return; - if (!list_empty(&xprt->free)) { - struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); - list_del_init(&req->rq_list); - task->tk_rqstp = req; - xprt_request_init(task, xprt); - return; - } - dprintk("RPC: waiting for request slot\n"); - task->tk_status = -EAGAIN; + task->tk_timeout = 0; - rpc_sleep_on(&xprt->backlog, task, NULL, NULL); + task->tk_status = -EAGAIN; + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + if (!xprt_throttle_congested(xprt, task)) + xprt->ops->alloc_slot(xprt, task); + rcu_read_unlock(); } -void -xprt_reserve(struct rpc_task *task) +/** + * xprt_retry_reserve - allocate an RPC request slot + * @task: RPC task requesting a slot allocation + * + * If no more slots are available, place the task on the transport's + * backlog queue. + * Note that the only difference with xprt_reserve is that we now + * ignore the value of the XPRT_CONGESTED flag. + */ +void xprt_retry_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; - task->tk_status = -EIO; - if (!xprt->shutdown) { - spin_lock(&xprt->xprt_lock); - do_xprt_reserve(task); - spin_unlock(&xprt->xprt_lock); - } + task->tk_status = 0; + if (task->tk_rqstp != NULL) + return; + + task->tk_timeout = 0; + task->tk_status = -EAGAIN; + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + xprt->ops->alloc_slot(xprt, task); + rcu_read_unlock(); } -/* - * Allocate a 'unique' XID - */ -static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) +static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) { - return xprt->xid++; + return (__force __be32)xprt->xid++; } static inline void xprt_init_xid(struct rpc_xprt *xprt) { - get_random_bytes(&xprt->xid, sizeof(xprt->xid)); + xprt->xid = prandom_u32(); } -/* - * Initialize RPC request - */ -static void -xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) +static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) { struct rpc_rqst *req = task->tk_rqstp; - req->rq_timeout = xprt->timeout.to_initval; + INIT_LIST_HEAD(&req->rq_list); + req->rq_timeout = task->tk_client->cl_timeout->to_initval; req->rq_task = task; req->rq_xprt = xprt; + req->rq_buffer = NULL; req->rq_xid = xprt_alloc_xid(xprt); - dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, + req->rq_connect_cookie = xprt->connect_cookie - 1; + req->rq_bytes_sent = 0; + req->rq_snd_buf.len = 0; + req->rq_snd_buf.buflen = 0; + req->rq_rcv_buf.len = 0; + req->rq_rcv_buf.buflen = 0; + req->rq_release_snd_buf = NULL; + xprt_reset_majortimeo(req); + dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, req, ntohl(req->rq_xid)); } -/* - * Release an RPC call slot +/** + * xprt_release - release an RPC request slot + * @task: task which is finished with the slot + * */ -void -xprt_release(struct rpc_task *task) +void xprt_release(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; - struct rpc_rqst *req; + struct rpc_xprt *xprt; + struct rpc_rqst *req = task->tk_rqstp; - if (!(req = task->tk_rqstp)) + if (req == NULL) { + if (task->tk_client) { + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + if (xprt->snd_task == task) + xprt_release_write(xprt, task); + rcu_read_unlock(); + } return; - spin_lock_bh(&xprt->sock_lock); - __xprt_release_write(xprt, task); - __xprt_put_cong(xprt, req); + } + + xprt = req->rq_xprt; + if (task->tk_ops->rpc_count_stats != NULL) + task->tk_ops->rpc_count_stats(task, task->tk_calldata); + else if (task->tk_client) + rpc_count_iostats(task, task->tk_client->cl_metrics); + spin_lock_bh(&xprt->transport_lock); + xprt->ops->release_xprt(xprt, task); + if (xprt->ops->release_request) + xprt->ops->release_request(task); if (!list_empty(&req->rq_list)) list_del(&req->rq_list); xprt->last_used = jiffies; - if (list_empty(&xprt->recv) && !xprt->shutdown) - mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); - spin_unlock_bh(&xprt->sock_lock); + if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) + mod_timer(&xprt->timer, + xprt->last_used + xprt->idle_timeout); + spin_unlock_bh(&xprt->transport_lock); + if (req->rq_buffer) + xprt->ops->buf_free(req->rq_buffer); + if (req->rq_cred != NULL) + put_rpccred(req->rq_cred); task->tk_rqstp = NULL; - memset(req, 0, sizeof(*req)); /* mark unused */ - - dprintk("RPC: %4d release request %p\n", task->tk_pid, req); + if (req->rq_release_snd_buf) + req->rq_release_snd_buf(req); - spin_lock(&xprt->xprt_lock); - list_add(&req->rq_list, &xprt->free); - xprt_clear_backlog(xprt); - spin_unlock(&xprt->xprt_lock); -} - -/* - * Set default timeout parameters - */ -static void -xprt_default_timeout(struct rpc_timeout *to, int proto) -{ - if (proto == IPPROTO_UDP) - xprt_set_timeout(to, 5, 5 * HZ); + dprintk("RPC: %5u release request %p\n", task->tk_pid, req); + if (likely(!bc_prealloc(req))) + xprt_free_slot(xprt, req); else - xprt_set_timeout(to, 5, 60 * HZ); -} - -/* - * Set constant timeout - */ -void -xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) -{ - to->to_initval = - to->to_increment = incr; - to->to_maxval = incr * retr; - to->to_retries = retr; - to->to_exponential = 0; + xprt_free_bc_request(req); } -unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; -unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; - -/* - * Initialize an RPC client - */ -static struct rpc_xprt * -xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) +static void xprt_init(struct rpc_xprt *xprt, struct net *net) { - struct rpc_xprt *xprt; - unsigned int entries; - size_t slot_table_size; - struct rpc_rqst *req; - - dprintk("RPC: setting up %s transport...\n", - proto == IPPROTO_UDP? "UDP" : "TCP"); + atomic_set(&xprt->count, 1); - entries = (proto == IPPROTO_TCP)? - xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; - - if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) - return ERR_PTR(-ENOMEM); - memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ - xprt->max_reqs = entries; - slot_table_size = entries * sizeof(xprt->slot[0]); - xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); - if (xprt->slot == NULL) { - kfree(xprt); - return ERR_PTR(-ENOMEM); - } - memset(xprt->slot, 0, slot_table_size); - - xprt->addr = *ap; - xprt->prot = proto; - xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; - if (xprt->stream) { - xprt->cwnd = RPC_MAXCWND(xprt); - xprt->nocong = 1; - xprt->max_payload = (1U << 31) - 1; - } else { - xprt->cwnd = RPC_INITCWND; - xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); - } - spin_lock_init(&xprt->sock_lock); - spin_lock_init(&xprt->xprt_lock); - init_waitqueue_head(&xprt->cong_wait); + spin_lock_init(&xprt->transport_lock); + spin_lock_init(&xprt->reserve_lock); INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); - INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); - INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); - init_timer(&xprt->timer); - xprt->timer.function = xprt_init_autodisconnect; - xprt->timer.data = (unsigned long) xprt; - xprt->last_used = jiffies; - xprt->port = XPRT_MAX_RESVPORT; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + spin_lock_init(&xprt->bc_pa_lock); + INIT_LIST_HEAD(&xprt->bc_pa_list); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ - /* Set timeout parameters */ - if (to) { - xprt->timeout = *to; - } else - xprt_default_timeout(&xprt->timeout, xprt->prot); + xprt->last_used = jiffies; + xprt->cwnd = RPC_INITCWND; + xprt->bind_index = 0; + rpc_init_wait_queue(&xprt->binding, "xprt_binding"); rpc_init_wait_queue(&xprt->pending, "xprt_pending"); - rpc_init_wait_queue(&xprt->sending, "xprt_sending"); - rpc_init_wait_queue(&xprt->resend, "xprt_resend"); + rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); - /* initialize free list */ - for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) - list_add(&req->rq_list, &xprt->free); - xprt_init_xid(xprt); - /* Check whether we want to use a reserved port */ - xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; - - dprintk("RPC: created transport %p with %u slots\n", xprt, - xprt->max_reqs); - - return xprt; -} - -/* - * Bind to a reserved port - */ -static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) -{ - struct sockaddr_in myaddr = { - .sin_family = AF_INET, - }; - int err, port; - - /* Were we already bound to a given port? Try to reuse it */ - port = xprt->port; - do { - myaddr.sin_port = htons(port); - err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, - sizeof(myaddr)); - if (err == 0) { - xprt->port = port; - return 0; - } - if (--port == 0) - port = XPRT_MAX_RESVPORT; - } while (err == -EADDRINUSE && port != xprt->port); - - printk("RPC: Can't bind to reserved port (%d).\n", -err); - return err; + xprt->xprt_net = get_net(net); } -static void -xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) -{ - struct sock *sk = sock->sk; - - if (xprt->inet) - return; - - write_lock_bh(&sk->sk_callback_lock); - sk->sk_user_data = xprt; - xprt->old_data_ready = sk->sk_data_ready; - xprt->old_state_change = sk->sk_state_change; - xprt->old_write_space = sk->sk_write_space; - if (xprt->prot == IPPROTO_UDP) { - sk->sk_data_ready = udp_data_ready; - sk->sk_no_check = UDP_CSUM_NORCV; - xprt_set_connected(xprt); - } else { - tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ - sk->sk_data_ready = tcp_data_ready; - sk->sk_state_change = tcp_state_change; - xprt_clear_connected(xprt); - } - sk->sk_write_space = xprt_write_space; - - /* Reset to new socket */ - xprt->sock = sock; - xprt->inet = sk; - write_unlock_bh(&sk->sk_callback_lock); - - return; -} - -/* - * Set socket buffer length +/** + * xprt_create_transport - create an RPC transport + * @args: rpc transport creation arguments + * */ -void -xprt_sock_setbufsize(struct rpc_xprt *xprt) +struct rpc_xprt *xprt_create_transport(struct xprt_create *args) { - struct sock *sk = xprt->inet; + struct rpc_xprt *xprt; + struct xprt_class *t; - if (xprt->stream) - return; - if (xprt->rcvsize) { - sk->sk_userlocks |= SOCK_RCVBUF_LOCK; - sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; + spin_lock(&xprt_list_lock); + list_for_each_entry(t, &xprt_list, list) { + if (t->ident == args->ident) { + spin_unlock(&xprt_list_lock); + goto found; + } } - if (xprt->sndsize) { - sk->sk_userlocks |= SOCK_SNDBUF_LOCK; - sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; - sk->sk_write_space(sk); + spin_unlock(&xprt_list_lock); + printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); + return ERR_PTR(-EIO); + +found: + xprt = t->setup(args); + if (IS_ERR(xprt)) { + dprintk("RPC: xprt_create_transport: failed, %ld\n", + -PTR_ERR(xprt)); + goto out; } -} - -/* - * Datastream sockets are created here, but xprt_connect will create - * and connect stream sockets. - */ -static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) -{ - struct socket *sock; - int type, err; - - dprintk("RPC: xprt_create_socket(%s %d)\n", - (proto == IPPROTO_UDP)? "udp" : "tcp", proto); - - type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; + if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) + xprt->idle_timeout = 0; + INIT_WORK(&xprt->task_cleanup, xprt_autoclose); + if (xprt_has_timer(xprt)) + setup_timer(&xprt->timer, xprt_init_autodisconnect, + (unsigned long)xprt); + else + init_timer(&xprt->timer); - if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { - printk("RPC: can't create socket (%d).\n", -err); - return NULL; + if (strlen(args->servername) > RPC_MAXNETNAMELEN) { + xprt_destroy(xprt); + return ERR_PTR(-EINVAL); } - - /* If the caller has the capability, bind to a reserved port */ - if (resvport && xprt_bindresvport(xprt, sock) < 0) { - printk("RPC: can't bind to reserved port.\n"); - goto failed; + xprt->servername = kstrdup(args->servername, GFP_KERNEL); + if (xprt->servername == NULL) { + xprt_destroy(xprt); + return ERR_PTR(-ENOMEM); } - return sock; - -failed: - sock_release(sock); - return NULL; -} - -/* - * Create an RPC client transport given the protocol and peer address. - */ -struct rpc_xprt * -xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) -{ - struct rpc_xprt *xprt; - - xprt = xprt_setup(proto, sap, to); - if (IS_ERR(xprt)) - dprintk("RPC: xprt_create_proto failed\n"); - else - dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); + dprintk("RPC: created transport %p with %u slots\n", xprt, + xprt->max_reqs); +out: return xprt; } -/* - * Prepare for transport shutdown. +/** + * xprt_destroy - destroy an RPC transport, killing off all requests. + * @xprt: transport to destroy + * */ -static void -xprt_shutdown(struct rpc_xprt *xprt) +static void xprt_destroy(struct rpc_xprt *xprt) { - xprt->shutdown = 1; - rpc_wake_up(&xprt->sending); - rpc_wake_up(&xprt->resend); - rpc_wake_up(&xprt->pending); - rpc_wake_up(&xprt->backlog); - wake_up(&xprt->cong_wait); + dprintk("RPC: destroying transport %p\n", xprt); del_timer_sync(&xprt->timer); - /* synchronously wait for connect worker to finish */ - cancel_delayed_work(&xprt->sock_connect); - flush_scheduled_work(); -} - -/* - * Clear the xprt backlog queue - */ -static int -xprt_clear_backlog(struct rpc_xprt *xprt) { - rpc_wake_up_next(&xprt->backlog); - wake_up(&xprt->cong_wait); - return 1; + rpc_destroy_wait_queue(&xprt->binding); + rpc_destroy_wait_queue(&xprt->pending); + rpc_destroy_wait_queue(&xprt->sending); + rpc_destroy_wait_queue(&xprt->backlog); + cancel_work_sync(&xprt->task_cleanup); + kfree(xprt->servername); + /* + * Tear down transport state and free the rpc_xprt + */ + xprt->ops->destroy(xprt); } -/* - * Destroy an RPC transport, killing off all requests. +/** + * xprt_put - release a reference to an RPC transport. + * @xprt: pointer to the transport + * */ -int -xprt_destroy(struct rpc_xprt *xprt) +void xprt_put(struct rpc_xprt *xprt) { - dprintk("RPC: destroying transport %p\n", xprt); - xprt_shutdown(xprt); - xprt_disconnect(xprt); - xprt_close(xprt); - kfree(xprt->slot); - kfree(xprt); - - return 0; + if (atomic_dec_and_test(&xprt->count)) + xprt_destroy(xprt); } |
