diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
| -rw-r--r-- | net/sunrpc/xprt.c | 821 |
1 files changed, 558 insertions, 263 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d5553b8179f..c3b2b3369e5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -12,8 +12,9 @@ * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the - * transport's wait list. At the same time, it installs a timer that - * is run after the packet's timeout has expired. + * transport's wait list. At the same time, if a reply is expected, + * it installs a timer that is run after the packet's timeout has + * expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. @@ -42,9 +43,13 @@ #include <linux/interrupt.h> #include <linux/workqueue.h> #include <linux/net.h> +#include <linux/ktime.h> #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/metrics.h> +#include <linux/sunrpc/bc_xprt.h> + +#include "sunrpc.h" /* * Local variables @@ -57,32 +62,15 @@ /* * Local functions */ +static void xprt_init(struct rpc_xprt *xprt, struct net *net); static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); -static inline void do_xprt_reserve(struct rpc_task *); static void xprt_connect_status(struct rpc_task *task); static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); +static void xprt_destroy(struct rpc_xprt *xprt); static DEFINE_SPINLOCK(xprt_list_lock); static LIST_HEAD(xprt_list); -/* - * The transport code maintains an estimate on the maximum number of out- - * standing RPC requests, using a smoothed version of the congestion - * avoidance implemented in 44BSD. This is basically the Van Jacobson - * congestion algorithm: If a retransmit occurs, the congestion window is - * halved; otherwise, it is incremented by 1/cwnd when - * - * - a reply is received and - * - a full number of requests are outstanding and - * - the congestion window hasn't been updated recently. - */ -#define RPC_CWNDSHIFT (8U) -#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) -#define RPC_INITCWND RPC_CWNDSCALE -#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) - -#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) - /** * xprt_register_transport - register a transport implementation * @transport: transport to register @@ -108,13 +96,10 @@ int xprt_register_transport(struct xprt_class *transport) goto out; } - result = -EINVAL; - if (try_module_get(THIS_MODULE)) { - list_add_tail(&transport->list, &xprt_list); - printk(KERN_INFO "RPC: Registered %s transport module.\n", - transport->name); - result = 0; - } + list_add_tail(&transport->list, &xprt_list); + printk(KERN_INFO "RPC: Registered %s transport module.\n", + transport->name); + result = 0; out: spin_unlock(&xprt_list_lock); @@ -143,7 +128,6 @@ int xprt_unregister_transport(struct xprt_class *transport) "RPC: Unregistered %s transport module.\n", transport->name); list_del_init(&transport->list); - module_put(THIS_MODULE); goto out; } } @@ -156,30 +140,56 @@ out: EXPORT_SYMBOL_GPL(xprt_unregister_transport); /** + * xprt_load_transport - load a transport implementation + * @transport_name: transport to load + * + * Returns: + * 0: transport successfully loaded + * -ENOENT: transport module not available + */ +int xprt_load_transport(const char *transport_name) +{ + struct xprt_class *t; + int result; + + result = 0; + spin_lock(&xprt_list_lock); + list_for_each_entry(t, &xprt_list, list) { + if (strcmp(t->name, transport_name) == 0) { + spin_unlock(&xprt_list_lock); + goto out; + } + } + spin_unlock(&xprt_list_lock); + result = request_module("xprt%s", transport_name); +out: + return result; +} +EXPORT_SYMBOL_GPL(xprt_load_transport); + +/** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport + * @xprt: pointer to the target transport * * This prevents mixing the payload of separate requests, and prevents * transport connects from colliding with writes. No congestion control * is provided. */ -int xprt_reserve_xprt(struct rpc_task *task) +int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; + int priority; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; - if (task == NULL) - return 0; goto out_sleep; } xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; + if (req != NULL) req->rq_ntrans++; - } + return 1; out_sleep: @@ -187,10 +197,13 @@ out_sleep: task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; - if (req && req->rq_ntrans) - rpc_sleep_on(&xprt->resend, task, NULL, NULL); + if (req == NULL) + priority = RPC_PRIORITY_LOW; + else if (!req->rq_ntrans) + priority = RPC_PRIORITY_NORMAL; else - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + priority = RPC_PRIORITY_HIGH; + rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt); @@ -198,10 +211,10 @@ EXPORT_SYMBOL_GPL(xprt_reserve_xprt); static void xprt_clear_locked(struct rpc_xprt *xprt) { xprt->snd_task = NULL; - if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { - smp_mb__before_clear_bit(); + if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { + smp_mb__before_atomic(); clear_bit(XPRT_LOCKED, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); } else queue_work(rpciod_workqueue, &xprt->task_cleanup); } @@ -214,22 +227,23 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. */ -int xprt_reserve_xprt_cong(struct rpc_task *task) +int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; + int priority; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; goto out_sleep; } + if (req == NULL) { + xprt->snd_task = task; + return 1; + } if (__xprt_get_cong(xprt, task)) { xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; - req->rq_ntrans++; - } + req->rq_ntrans++; return 1; } xprt_clear_locked(xprt); @@ -237,10 +251,13 @@ out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; - if (req && req->rq_ntrans) - rpc_sleep_on(&xprt->resend, task, NULL, NULL); + if (req == NULL) + priority = RPC_PRIORITY_LOW; + else if (!req->rq_ntrans) + priority = RPC_PRIORITY_NORMAL; else - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + priority = RPC_PRIORITY_HIGH; + rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); return 0; } EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); @@ -250,61 +267,59 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) int retval; spin_lock_bh(&xprt->transport_lock); - retval = xprt->ops->reserve_xprt(task); + retval = xprt->ops->reserve_xprt(xprt, task); spin_unlock_bh(&xprt->transport_lock); return retval; } -static void __xprt_lock_write_next(struct rpc_xprt *xprt) +static bool __xprt_lock_write_func(struct rpc_task *task, void *data) { - struct rpc_task *task; + struct rpc_xprt *xprt = data; struct rpc_rqst *req; + req = task->tk_rqstp; + xprt->snd_task = task; + if (req) + req->rq_ntrans++; + return true; +} + +static void __xprt_lock_write_next(struct rpc_xprt *xprt) +{ if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - task = rpc_wake_up_next(&xprt->resend); - if (!task) { - task = rpc_wake_up_next(&xprt->sending); - if (!task) - goto out_unlock; - } + if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) + return; + xprt_clear_locked(xprt); +} + +static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) +{ + struct rpc_xprt *xprt = data; + struct rpc_rqst *req; req = task->tk_rqstp; - xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; + if (req == NULL) { + xprt->snd_task = task; + return true; + } + if (__xprt_get_cong(xprt, task)) { + xprt->snd_task = task; req->rq_ntrans++; + return true; } - return; - -out_unlock: - xprt_clear_locked(xprt); + return false; } static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) { - struct rpc_task *task; - if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; if (RPCXPRT_CONGESTED(xprt)) goto out_unlock; - task = rpc_wake_up_next(&xprt->resend); - if (!task) { - task = rpc_wake_up_next(&xprt->sending); - if (!task) - goto out_unlock; - } - if (__xprt_get_cong(xprt, task)) { - struct rpc_rqst *req = task->tk_rqstp; - xprt->snd_task = task; - if (req) { - req->rq_bytes_sent = 0; - req->rq_ntrans++; - } + if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) return; - } out_unlock: xprt_clear_locked(xprt); } @@ -319,6 +334,11 @@ out_unlock: void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { if (xprt->snd_task == task) { + if (task != NULL) { + struct rpc_rqst *req = task->tk_rqstp; + if (req != NULL) + req->rq_bytes_sent = 0; + } xprt_clear_locked(xprt); __xprt_lock_write_next(xprt); } @@ -336,6 +356,11 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt); void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) { if (xprt->snd_task == task) { + if (task != NULL) { + struct rpc_rqst *req = task->tk_rqstp; + if (req != NULL) + req->rq_bytes_sent = 0; + } xprt_clear_locked(xprt); __xprt_lock_write_next_cong(xprt); } @@ -391,21 +416,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) */ void xprt_release_rqst_cong(struct rpc_task *task) { - __xprt_put_cong(task->tk_xprt, task->tk_rqstp); + struct rpc_rqst *req = task->tk_rqstp; + + __xprt_put_cong(req->rq_xprt, req); } EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); /** * xprt_adjust_cwnd - adjust transport congestion window + * @xprt: pointer to xprt * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * - * We use a time-smoothed congestion estimator to avoid heavy oscillation. + * The transport code maintains an estimate on the maximum number of out- + * standing RPC requests, using a smoothed version of the congestion + * avoidance implemented in 44BSD. This is basically the Van Jacobson + * congestion algorithm: If a retransmit occurs, the congestion window is + * halved; otherwise, it is incremented by 1/cwnd when + * + * - a reply is received and + * - a full number of requests are outstanding and + * - the congestion window hasn't been updated recently. */ -void xprt_adjust_cwnd(struct rpc_task *task, int result) +void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) { struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = task->tk_xprt; unsigned long cwnd = xprt->cwnd; if (result >= 0 && cwnd <= xprt->cong) { @@ -445,15 +480,19 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); /** * xprt_wait_for_buffer_space - wait for transport output buffer to clear * @task: task to be put to sleep + * @action: function pointer to be executed after wait * + * Note that we only set the timer for the case of RPC_IS_SOFT(), since + * we don't in general want to force a socket disconnection due to + * an incomplete RPC call transmission. */ -void xprt_wait_for_buffer_space(struct rpc_task *task) +void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - task->tk_timeout = req->rq_timeout; - rpc_sleep_on(&xprt->pending, task, NULL, NULL); + task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; + rpc_sleep_on(&xprt->pending, task, action); } EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); @@ -465,14 +504,11 @@ EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); */ void xprt_write_space(struct rpc_xprt *xprt) { - if (unlikely(xprt->shutdown)) - return; - spin_lock_bh(&xprt->transport_lock); if (xprt->snd_task) { dprintk("RPC: write space: waking waiting task on " "xprt %p\n", xprt); - rpc_wake_up_task(xprt->snd_task); + rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); } spin_unlock_bh(&xprt->transport_lock); } @@ -492,7 +528,7 @@ void xprt_set_retrans_timeout_def(struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); -/* +/** * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout * @task: task whose timeout is to be set * @@ -584,7 +620,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt) dprintk("RPC: disconnected transport %p\n", xprt); spin_lock_bh(&xprt->transport_lock); xprt_clear_connected(xprt); - xprt_wake_pending_tasks(xprt, -ENOTCONN); + xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } EXPORT_SYMBOL_GPL(xprt_disconnect_done); @@ -602,11 +638,37 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) queue_work(rpciod_workqueue, &xprt->task_cleanup); - else if (xprt->snd_task != NULL) - rpc_wake_up_task(xprt->snd_task); + xprt_wake_pending_tasks(xprt, -EAGAIN); + spin_unlock_bh(&xprt->transport_lock); +} + +/** + * xprt_conditional_disconnect - force a transport to disconnect + * @xprt: transport to disconnect + * @cookie: 'connection cookie' + * + * This attempts to break the connection if and only if 'cookie' matches + * the current transport 'connection cookie'. It ensures that we don't + * try to break the connection more than once when we need to retransmit + * a batch of RPC requests. + * + */ +void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) +{ + /* Don't race with the test_bit() in xprt_clear_locked() */ + spin_lock_bh(&xprt->transport_lock); + if (cookie != xprt->connect_cookie) + goto out; + if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) + goto out; + set_bit(XPRT_CLOSE_WAIT, &xprt->state); + /* Try to schedule an autoclose RPC call */ + if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) + queue_work(rpciod_workqueue, &xprt->task_cleanup); + xprt_wake_pending_tasks(xprt, -EAGAIN); +out: spin_unlock_bh(&xprt->transport_lock); } -EXPORT_SYMBOL_GPL(xprt_force_disconnect); static void xprt_init_autodisconnect(unsigned long data) @@ -614,15 +676,13 @@ xprt_init_autodisconnect(unsigned long data) struct rpc_xprt *xprt = (struct rpc_xprt *)data; spin_lock(&xprt->transport_lock); - if (!list_empty(&xprt->recv) || xprt->shutdown) + if (!list_empty(&xprt->recv)) goto out_abort; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; spin_unlock(&xprt->transport_lock); - if (xprt_connecting(xprt)) - xprt_release_write(xprt, NULL); - else - queue_work(rpciod_workqueue, &xprt->task_cleanup); + set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); + queue_work(rpciod_workqueue, &xprt->task_cleanup); return; out_abort: spin_unlock(&xprt->transport_lock); @@ -635,36 +695,42 @@ out_abort: */ void xprt_connect(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); if (!xprt_bound(xprt)) { - task->tk_status = -EIO; + task->tk_status = -EAGAIN; return; } if (!xprt_lock_write(xprt, task)) return; + + if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) + xprt->ops->close(xprt); + if (xprt_connected(xprt)) xprt_release_write(xprt, task); else { - if (task->tk_rqstp) - task->tk_rqstp->rq_bytes_sent = 0; - - task->tk_timeout = xprt->connect_timeout; - rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); + task->tk_rqstp->rq_bytes_sent = 0; + task->tk_timeout = task->tk_rqstp->rq_timeout; + rpc_sleep_on(&xprt->pending, task, xprt_connect_status); + + if (test_bit(XPRT_CLOSING, &xprt->state)) + return; + if (xprt_test_and_set_connecting(xprt)) + return; xprt->stat.connect_start = jiffies; - xprt->ops->connect(task); + xprt->ops->connect(xprt, task); } - return; } static void xprt_connect_status(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - if (task->tk_status >= 0) { + if (task->tk_status == 0) { xprt->stat.connect_count++; xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; dprintk("RPC: %5u xprt_connect_status: connection established\n", @@ -675,13 +741,11 @@ static void xprt_connect_status(struct rpc_task *task) switch (task->tk_status) { case -ECONNREFUSED: case -ECONNRESET: - dprintk("RPC: %5u xprt_connect_status: server %s refused " - "connection\n", task->tk_pid, - task->tk_client->cl_server); - break; - case -ENOTCONN: - dprintk("RPC: %5u xprt_connect_status: connection broken\n", - task->tk_pid); + case -ECONNABORTED: + case -ENETUNREACH: + case -EHOSTUNREACH: + case -EAGAIN: + dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); break; case -ETIMEDOUT: dprintk("RPC: %5u xprt_connect_status: connect attempt timed " @@ -690,7 +754,7 @@ static void xprt_connect_status(struct rpc_task *task) default: dprintk("RPC: %5u xprt_connect_status: error %d connecting to " "server %s\n", task->tk_pid, -task->tk_status, - task->tk_client->cl_server); + xprt->servername); xprt_release_write(xprt, task); task->tk_status = -EIO; } @@ -704,13 +768,11 @@ static void xprt_connect_status(struct rpc_task *task) */ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { - struct list_head *pos; + struct rpc_rqst *entry; - list_for_each(pos, &xprt->recv) { - struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); + list_for_each_entry(entry, &xprt->recv, rq_list) if (entry->rq_xid == xid) return entry; - } dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", ntohl(xid)); @@ -719,25 +781,19 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) } EXPORT_SYMBOL_GPL(xprt_lookup_rqst); -/** - * xprt_update_rtt - update an RPC client's RTT state after receiving a reply - * @task: RPC request that recently completed - * - */ -void xprt_update_rtt(struct rpc_task *task) +static void xprt_update_rtt(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_rtt *rtt = task->tk_client->cl_rtt; - unsigned timer = task->tk_msg.rpc_proc->p_timer; + unsigned int timer = task->tk_msg.rpc_proc->p_timer; + long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); if (timer) { if (req->rq_ntrans == 1) - rpc_update_rtt(rtt, timer, - (long)jiffies - req->rq_xtime); + rpc_update_rtt(rtt, timer, m); rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); } } -EXPORT_SYMBOL_GPL(xprt_update_rtt); /** * xprt_complete_rqst - called when reply processing is complete @@ -749,18 +805,23 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt); void xprt_complete_rqst(struct rpc_task *task, int copied) { struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", task->tk_pid, ntohl(req->rq_xid), copied); - task->tk_xprt->stat.recvs++; - task->tk_rtt = (long)jiffies - req->rq_xtime; + xprt->stat.recvs++; + req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); + if (xprt->ops->timer != NULL) + xprt_update_rtt(task); list_del_init(&req->rq_list); - /* Ensure all writes are done before we update req->rq_received */ + req->rq_private_buf.len = copied; + /* Ensure all writes are done before we update */ + /* req->rq_reply_bytes_recvd */ smp_wmb(); - req->rq_received = req->rq_private_buf.len = copied; - rpc_wake_up_task(task); + req->rq_reply_bytes_recvd = copied; + rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); @@ -769,17 +830,22 @@ static void xprt_timer(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; + if (task->tk_status != -ETIMEDOUT) + return; dprintk("RPC: %5u xprt_timer\n", task->tk_pid); - spin_lock(&xprt->transport_lock); - if (!req->rq_received) { + spin_lock_bh(&xprt->transport_lock); + if (!req->rq_reply_bytes_recvd) { if (xprt->ops->timer) - xprt->ops->timer(task); - task->tk_status = -ETIMEDOUT; - } - task->tk_timeout = 0; - rpc_wake_up_task(task); - spin_unlock(&xprt->transport_lock); + xprt->ops->timer(xprt, task); + } else + task->tk_status = 0; + spin_unlock_bh(&xprt->transport_lock); +} + +static inline int xprt_has_timer(struct rpc_xprt *xprt) +{ + return xprt->idle_timeout != 0; } /** @@ -787,36 +853,41 @@ static void xprt_timer(struct rpc_task *task) * @task: RPC task about to send a request * */ -int xprt_prepare_transmit(struct rpc_task *task) +bool xprt_prepare_transmit(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - int err = 0; + bool ret = false; dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); spin_lock_bh(&xprt->transport_lock); - if (req->rq_received && !req->rq_bytes_sent) { - err = req->rq_received; - goto out_unlock; - } - if (!xprt->ops->reserve_xprt(task)) { - err = -EAGAIN; - goto out_unlock; + if (!req->rq_bytes_sent) { + if (req->rq_reply_bytes_recvd) { + task->tk_status = req->rq_reply_bytes_recvd; + goto out_unlock; + } + if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) + && xprt_connected(xprt) + && req->rq_connect_cookie == xprt->connect_cookie) { + xprt->ops->set_retrans_timeout(task); + rpc_sleep_on(&xprt->pending, task, xprt_timer); + goto out_unlock; + } } - - if (!xprt_connected(xprt)) { - err = -ENOTCONN; + if (!xprt->ops->reserve_xprt(xprt, task)) { + task->tk_status = -EAGAIN; goto out_unlock; } + ret = true; out_unlock: spin_unlock_bh(&xprt->transport_lock); - return err; + return ret; } void xprt_end_transmit(struct rpc_task *task) { - xprt_release_write(task->tk_xprt, task); + xprt_release_write(task->tk_rqstp->rq_xprt, task); } /** @@ -829,12 +900,15 @@ void xprt_transmit(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; - int status; + int status, numreqs; dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - if (!req->rq_received) { - if (list_empty(&req->rq_list)) { + if (!req->rq_reply_bytes_recvd) { + if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { + /* + * Add to the list only if we're expecting a reply + */ spin_lock_bh(&xprt->transport_lock); /* Update the softirq receive buffer */ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, @@ -849,91 +923,285 @@ void xprt_transmit(struct rpc_task *task) } else if (!req->rq_bytes_sent) return; + req->rq_xtime = ktime_get(); status = xprt->ops->send_request(task); - if (status == 0) { - dprintk("RPC: %5u xmit complete\n", task->tk_pid); - spin_lock_bh(&xprt->transport_lock); + if (status != 0) { + task->tk_status = status; + return; + } - xprt->ops->set_retrans_timeout(task); + dprintk("RPC: %5u xmit complete\n", task->tk_pid); + task->tk_flags |= RPC_TASK_SENT; + spin_lock_bh(&xprt->transport_lock); - xprt->stat.sends++; - xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; - xprt->stat.bklog_u += xprt->backlog.qlen; + xprt->ops->set_retrans_timeout(task); - /* Don't race with disconnect */ - if (!xprt_connected(xprt)) - task->tk_status = -ENOTCONN; - else if (!req->rq_received) - rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); - spin_unlock_bh(&xprt->transport_lock); - return; + numreqs = atomic_read(&xprt->num_reqs); + if (numreqs > xprt->stat.max_slots) + xprt->stat.max_slots = numreqs; + xprt->stat.sends++; + xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; + xprt->stat.bklog_u += xprt->backlog.qlen; + xprt->stat.sending_u += xprt->sending.qlen; + xprt->stat.pending_u += xprt->pending.qlen; + + /* Don't race with disconnect */ + if (!xprt_connected(xprt)) + task->tk_status = -ENOTCONN; + else { + /* + * Sleep on the pending queue since + * we're expecting a reply. + */ + if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) + rpc_sleep_on(&xprt->pending, task, xprt_timer); + req->rq_connect_cookie = xprt->connect_cookie; } + spin_unlock_bh(&xprt->transport_lock); +} - /* Note: at this point, task->tk_sleeping has not yet been set, - * hence there is no danger of the waking up task being put on - * schedq, and being picked up by a parallel run of rpciod(). +static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) +{ + set_bit(XPRT_CONGESTED, &xprt->state); + rpc_sleep_on(&xprt->backlog, task, NULL); +} + +static void xprt_wake_up_backlog(struct rpc_xprt *xprt) +{ + if (rpc_wake_up_next(&xprt->backlog) == NULL) + clear_bit(XPRT_CONGESTED, &xprt->state); +} + +static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) +{ + bool ret = false; + + if (!test_bit(XPRT_CONGESTED, &xprt->state)) + goto out; + spin_lock(&xprt->reserve_lock); + if (test_bit(XPRT_CONGESTED, &xprt->state)) { + rpc_sleep_on(&xprt->backlog, task, NULL); + ret = true; + } + spin_unlock(&xprt->reserve_lock); +out: + return ret; +} + +static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) +{ + struct rpc_rqst *req = ERR_PTR(-EAGAIN); + + if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) + goto out; + req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); + if (req != NULL) + goto out; + atomic_dec(&xprt->num_reqs); + req = ERR_PTR(-ENOMEM); +out: + return req; +} + +static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { + kfree(req); + return true; + } + return false; +} + +void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) +{ + struct rpc_rqst *req; + + spin_lock(&xprt->reserve_lock); + if (!list_empty(&xprt->free)) { + req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); + list_del(&req->rq_list); + goto out_init_req; + } + req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN); + if (!IS_ERR(req)) + goto out_init_req; + switch (PTR_ERR(req)) { + case -ENOMEM: + dprintk("RPC: dynamic allocation of request slot " + "failed! Retrying\n"); + task->tk_status = -ENOMEM; + break; + case -EAGAIN: + xprt_add_backlog(xprt, task); + dprintk("RPC: waiting for request slot\n"); + default: + task->tk_status = -EAGAIN; + } + spin_unlock(&xprt->reserve_lock); + return; +out_init_req: + task->tk_status = 0; + task->tk_rqstp = req; + xprt_request_init(task, xprt); + spin_unlock(&xprt->reserve_lock); +} +EXPORT_SYMBOL_GPL(xprt_alloc_slot); + +void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) +{ + /* Note: grabbing the xprt_lock_write() ensures that we throttle + * new slot allocation if the transport is congested (i.e. when + * reconnecting a stream transport or when out of socket write + * buffer space). */ - task->tk_status = status; - if (status == -ECONNREFUSED) - rpc_sleep_on(&xprt->sending, task, NULL, NULL); + if (xprt_lock_write(xprt, task)) { + xprt_alloc_slot(xprt, task); + xprt_release_write(xprt, task); + } +} +EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); + +static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ + spin_lock(&xprt->reserve_lock); + if (!xprt_dynamic_free_slot(xprt, req)) { + memset(req, 0, sizeof(*req)); /* mark unused */ + list_add(&req->rq_list, &xprt->free); + } + xprt_wake_up_backlog(xprt); + spin_unlock(&xprt->reserve_lock); +} + +static void xprt_free_all_slots(struct rpc_xprt *xprt) +{ + struct rpc_rqst *req; + while (!list_empty(&xprt->free)) { + req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); + list_del(&req->rq_list); + kfree(req); + } +} + +struct rpc_xprt *xprt_alloc(struct net *net, size_t size, + unsigned int num_prealloc, + unsigned int max_alloc) +{ + struct rpc_xprt *xprt; + struct rpc_rqst *req; + int i; + + xprt = kzalloc(size, GFP_KERNEL); + if (xprt == NULL) + goto out; + + xprt_init(xprt, net); + + for (i = 0; i < num_prealloc; i++) { + req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); + if (!req) + goto out_free; + list_add(&req->rq_list, &xprt->free); + } + if (max_alloc > num_prealloc) + xprt->max_reqs = max_alloc; + else + xprt->max_reqs = num_prealloc; + xprt->min_reqs = num_prealloc; + atomic_set(&xprt->num_reqs, num_prealloc); + + return xprt; + +out_free: + xprt_free(xprt); +out: + return NULL; } +EXPORT_SYMBOL_GPL(xprt_alloc); -static inline void do_xprt_reserve(struct rpc_task *task) +void xprt_free(struct rpc_xprt *xprt) +{ + put_net(xprt->xprt_net); + xprt_free_all_slots(xprt); + kfree(xprt); +} +EXPORT_SYMBOL_GPL(xprt_free); + +/** + * xprt_reserve - allocate an RPC request slot + * @task: RPC task requesting a slot allocation + * + * If the transport is marked as being congested, or if no more + * slots are available, place the task on the transport's + * backlog queue. + */ +void xprt_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; task->tk_status = 0; - if (task->tk_rqstp) - return; - if (!list_empty(&xprt->free)) { - struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); - list_del_init(&req->rq_list); - task->tk_rqstp = req; - xprt_request_init(task, xprt); + if (task->tk_rqstp != NULL) return; - } - dprintk("RPC: waiting for request slot\n"); - task->tk_status = -EAGAIN; + task->tk_timeout = 0; - rpc_sleep_on(&xprt->backlog, task, NULL, NULL); + task->tk_status = -EAGAIN; + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + if (!xprt_throttle_congested(xprt, task)) + xprt->ops->alloc_slot(xprt, task); + rcu_read_unlock(); } /** - * xprt_reserve - allocate an RPC request slot + * xprt_retry_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation * * If no more slots are available, place the task on the transport's * backlog queue. + * Note that the only difference with xprt_reserve is that we now + * ignore the value of the XPRT_CONGESTED flag. */ -void xprt_reserve(struct rpc_task *task) +void xprt_retry_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; - task->tk_status = -EIO; - spin_lock(&xprt->reserve_lock); - do_xprt_reserve(task); - spin_unlock(&xprt->reserve_lock); + task->tk_status = 0; + if (task->tk_rqstp != NULL) + return; + + task->tk_timeout = 0; + task->tk_status = -EAGAIN; + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + xprt->ops->alloc_slot(xprt, task); + rcu_read_unlock(); } static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) { - return xprt->xid++; + return (__force __be32)xprt->xid++; } static inline void xprt_init_xid(struct rpc_xprt *xprt) { - xprt->xid = net_random(); + xprt->xid = prandom_u32(); } static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) { struct rpc_rqst *req = task->tk_rqstp; + INIT_LIST_HEAD(&req->rq_list); req->rq_timeout = task->tk_client->cl_timeout->to_initval; req->rq_task = task; req->rq_xprt = xprt; req->rq_buffer = NULL; req->rq_xid = xprt_alloc_xid(xprt); + req->rq_connect_cookie = xprt->connect_cookie - 1; + req->rq_bytes_sent = 0; + req->rq_snd_buf.len = 0; + req->rq_snd_buf.buflen = 0; + req->rq_rcv_buf.len = 0; + req->rq_rcv_buf.buflen = 0; req->rq_release_snd_buf = NULL; xprt_reset_majortimeo(req); dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, @@ -947,12 +1215,25 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) */ void xprt_release(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; - struct rpc_rqst *req; + struct rpc_xprt *xprt; + struct rpc_rqst *req = task->tk_rqstp; - if (!(req = task->tk_rqstp)) + if (req == NULL) { + if (task->tk_client) { + rcu_read_lock(); + xprt = rcu_dereference(task->tk_client->cl_xprt); + if (xprt->snd_task == task) + xprt_release_write(xprt, task); + rcu_read_unlock(); + } return; - rpc_count_iostats(task); + } + + xprt = req->rq_xprt; + if (task->tk_ops->rpc_count_stats != NULL) + task->tk_ops->rpc_count_stats(task, task->tk_calldata); + else if (task->tk_client) + rpc_count_iostats(task, task->tk_client->cl_metrics); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) @@ -960,22 +1241,51 @@ void xprt_release(struct rpc_task *task) if (!list_empty(&req->rq_list)) list_del(&req->rq_list); xprt->last_used = jiffies; - if (list_empty(&xprt->recv)) + if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); spin_unlock_bh(&xprt->transport_lock); - xprt->ops->buf_free(req->rq_buffer); + if (req->rq_buffer) + xprt->ops->buf_free(req->rq_buffer); + if (req->rq_cred != NULL) + put_rpccred(req->rq_cred); task->tk_rqstp = NULL; if (req->rq_release_snd_buf) req->rq_release_snd_buf(req); - memset(req, 0, sizeof(*req)); /* mark unused */ dprintk("RPC: %5u release request %p\n", task->tk_pid, req); + if (likely(!bc_prealloc(req))) + xprt_free_slot(xprt, req); + else + xprt_free_bc_request(req); +} - spin_lock(&xprt->reserve_lock); - list_add(&req->rq_list, &xprt->free); - rpc_wake_up_next(&xprt->backlog); - spin_unlock(&xprt->reserve_lock); +static void xprt_init(struct rpc_xprt *xprt, struct net *net) +{ + atomic_set(&xprt->count, 1); + + spin_lock_init(&xprt->transport_lock); + spin_lock_init(&xprt->reserve_lock); + + INIT_LIST_HEAD(&xprt->free); + INIT_LIST_HEAD(&xprt->recv); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + spin_lock_init(&xprt->bc_pa_lock); + INIT_LIST_HEAD(&xprt->bc_pa_list); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + + xprt->last_used = jiffies; + xprt->cwnd = RPC_INITCWND; + xprt->bind_index = 0; + + rpc_init_wait_queue(&xprt->binding, "xprt_binding"); + rpc_init_wait_queue(&xprt->pending, "xprt_pending"); + rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); + rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); + + xprt_init_xid(xprt); + + xprt->xprt_net = get_net(net); } /** @@ -986,7 +1296,6 @@ void xprt_release(struct rpc_task *task) struct rpc_xprt *xprt_create_transport(struct xprt_create *args) { struct rpc_xprt *xprt; - struct rpc_rqst *req; struct xprt_class *t; spin_lock(&xprt_list_lock); @@ -1005,53 +1314,49 @@ found: if (IS_ERR(xprt)) { dprintk("RPC: xprt_create_transport: failed, %ld\n", -PTR_ERR(xprt)); - return xprt; + goto out; } - - kref_init(&xprt->kref); - spin_lock_init(&xprt->transport_lock); - spin_lock_init(&xprt->reserve_lock); - - INIT_LIST_HEAD(&xprt->free); - INIT_LIST_HEAD(&xprt->recv); + if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) + xprt->idle_timeout = 0; INIT_WORK(&xprt->task_cleanup, xprt_autoclose); - setup_timer(&xprt->timer, xprt_init_autodisconnect, - (unsigned long)xprt); - xprt->last_used = jiffies; - xprt->cwnd = RPC_INITCWND; - xprt->bind_index = 0; - - rpc_init_wait_queue(&xprt->binding, "xprt_binding"); - rpc_init_wait_queue(&xprt->pending, "xprt_pending"); - rpc_init_wait_queue(&xprt->sending, "xprt_sending"); - rpc_init_wait_queue(&xprt->resend, "xprt_resend"); - rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); - - /* initialize free list */ - for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) - list_add(&req->rq_list, &xprt->free); + if (xprt_has_timer(xprt)) + setup_timer(&xprt->timer, xprt_init_autodisconnect, + (unsigned long)xprt); + else + init_timer(&xprt->timer); - xprt_init_xid(xprt); + if (strlen(args->servername) > RPC_MAXNETNAMELEN) { + xprt_destroy(xprt); + return ERR_PTR(-EINVAL); + } + xprt->servername = kstrdup(args->servername, GFP_KERNEL); + if (xprt->servername == NULL) { + xprt_destroy(xprt); + return ERR_PTR(-ENOMEM); + } dprintk("RPC: created transport %p with %u slots\n", xprt, xprt->max_reqs); - +out: return xprt; } /** * xprt_destroy - destroy an RPC transport, killing off all requests. - * @kref: kref for the transport to destroy + * @xprt: transport to destroy * */ -static void xprt_destroy(struct kref *kref) +static void xprt_destroy(struct rpc_xprt *xprt) { - struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref); - dprintk("RPC: destroying transport %p\n", xprt); - xprt->shutdown = 1; del_timer_sync(&xprt->timer); + rpc_destroy_wait_queue(&xprt->binding); + rpc_destroy_wait_queue(&xprt->pending); + rpc_destroy_wait_queue(&xprt->sending); + rpc_destroy_wait_queue(&xprt->backlog); + cancel_work_sync(&xprt->task_cleanup); + kfree(xprt->servername); /* * Tear down transport state and free the rpc_xprt */ @@ -1065,16 +1370,6 @@ static void xprt_destroy(struct kref *kref) */ void xprt_put(struct rpc_xprt *xprt) { - kref_put(&xprt->kref, xprt_destroy); -} - -/** - * xprt_get - return a reference to an RPC transport. - * @xprt: pointer to the transport - * - */ -struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) -{ - kref_get(&xprt->kref); - return xprt; + if (atomic_dec_and_test(&xprt->count)) + xprt_destroy(xprt); } |
