aboutsummaryrefslogtreecommitdiff
path: root/net/sunrpc/xprt.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprt.c')
-rw-r--r--net/sunrpc/xprt.c552
1 files changed, 370 insertions, 182 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 4c8f18aff7c..c3b2b3369e5 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -62,31 +62,15 @@
/*
* Local functions
*/
+static void xprt_init(struct rpc_xprt *xprt, struct net *net);
static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
static void xprt_connect_status(struct rpc_task *task);
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
+static void xprt_destroy(struct rpc_xprt *xprt);
static DEFINE_SPINLOCK(xprt_list_lock);
static LIST_HEAD(xprt_list);
-/*
- * The transport code maintains an estimate on the maximum number of out-
- * standing RPC requests, using a smoothed version of the congestion
- * avoidance implemented in 44BSD. This is basically the Van Jacobson
- * congestion algorithm: If a retransmit occurs, the congestion window is
- * halved; otherwise, it is incremented by 1/cwnd when
- *
- * - a reply is received and
- * - a full number of requests are outstanding and
- * - the congestion window hasn't been updated recently.
- */
-#define RPC_CWNDSHIFT (8U)
-#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
-#define RPC_INITCWND RPC_CWNDSCALE
-#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
-
-#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
-
/**
* xprt_register_transport - register a transport implementation
* @transport: transport to register
@@ -186,15 +170,16 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
/**
* xprt_reserve_xprt - serialize write access to transports
* @task: task that is requesting access to the transport
+ * @xprt: pointer to the target transport
*
* This prevents mixing the payload of separate requests, and prevents
* transport connects from colliding with writes. No congestion control
* is provided.
*/
-int xprt_reserve_xprt(struct rpc_task *task)
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = req->rq_xprt;
+ int priority;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
@@ -202,10 +187,9 @@ int xprt_reserve_xprt(struct rpc_task *task)
goto out_sleep;
}
xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
+ if (req != NULL)
req->rq_ntrans++;
- }
+
return 1;
out_sleep:
@@ -213,10 +197,13 @@ out_sleep:
task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req && req->rq_ntrans)
- rpc_sleep_on(&xprt->resend, task, NULL);
+ if (req == NULL)
+ priority = RPC_PRIORITY_LOW;
+ else if (!req->rq_ntrans)
+ priority = RPC_PRIORITY_NORMAL;
else
- rpc_sleep_on(&xprt->sending, task, NULL);
+ priority = RPC_PRIORITY_HIGH;
+ rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
return 0;
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -224,10 +211,10 @@ EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
static void xprt_clear_locked(struct rpc_xprt *xprt)
{
xprt->snd_task = NULL;
- if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
- smp_mb__before_clear_bit();
+ if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
+ smp_mb__before_atomic();
clear_bit(XPRT_LOCKED, &xprt->state);
- smp_mb__after_clear_bit();
+ smp_mb__after_atomic();
} else
queue_work(rpciod_workqueue, &xprt->task_cleanup);
}
@@ -240,22 +227,23 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
* integrated into the decision of whether a request is allowed to be
* woken up and given access to the transport.
*/
-int xprt_reserve_xprt_cong(struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;
+ int priority;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
if (task == xprt->snd_task)
return 1;
goto out_sleep;
}
+ if (req == NULL) {
+ xprt->snd_task = task;
+ return 1;
+ }
if (__xprt_get_cong(xprt, task)) {
xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
- }
+ req->rq_ntrans++;
return 1;
}
xprt_clear_locked(xprt);
@@ -263,10 +251,13 @@ out_sleep:
dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- if (req && req->rq_ntrans)
- rpc_sleep_on(&xprt->resend, task, NULL);
+ if (req == NULL)
+ priority = RPC_PRIORITY_LOW;
+ else if (!req->rq_ntrans)
+ priority = RPC_PRIORITY_NORMAL;
else
- rpc_sleep_on(&xprt->sending, task, NULL);
+ priority = RPC_PRIORITY_HIGH;
+ rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
return 0;
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -276,61 +267,59 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
int retval;
spin_lock_bh(&xprt->transport_lock);
- retval = xprt->ops->reserve_xprt(task);
+ retval = xprt->ops->reserve_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock);
return retval;
}
-static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
{
- struct rpc_task *task;
+ struct rpc_xprt *xprt = data;
struct rpc_rqst *req;
+ req = task->tk_rqstp;
+ xprt->snd_task = task;
+ if (req)
+ req->rq_ntrans++;
+ return true;
+}
+
+static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+{
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
- task = rpc_wake_up_next(&xprt->resend);
- if (!task) {
- task = rpc_wake_up_next(&xprt->sending);
- if (!task)
- goto out_unlock;
- }
+ if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
+ return;
+ xprt_clear_locked(xprt);
+}
+
+static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
+{
+ struct rpc_xprt *xprt = data;
+ struct rpc_rqst *req;
req = task->tk_rqstp;
- xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
+ if (req == NULL) {
+ xprt->snd_task = task;
+ return true;
+ }
+ if (__xprt_get_cong(xprt, task)) {
+ xprt->snd_task = task;
req->rq_ntrans++;
+ return true;
}
- return;
-
-out_unlock:
- xprt_clear_locked(xprt);
+ return false;
}
static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
- struct rpc_task *task;
-
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return;
if (RPCXPRT_CONGESTED(xprt))
goto out_unlock;
- task = rpc_wake_up_next(&xprt->resend);
- if (!task) {
- task = rpc_wake_up_next(&xprt->sending);
- if (!task)
- goto out_unlock;
- }
- if (__xprt_get_cong(xprt, task)) {
- struct rpc_rqst *req = task->tk_rqstp;
- xprt->snd_task = task;
- if (req) {
- req->rq_bytes_sent = 0;
- req->rq_ntrans++;
- }
+ if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
return;
- }
out_unlock:
xprt_clear_locked(xprt);
}
@@ -345,6 +334,11 @@ out_unlock:
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
+ if (task != NULL) {
+ struct rpc_rqst *req = task->tk_rqstp;
+ if (req != NULL)
+ req->rq_bytes_sent = 0;
+ }
xprt_clear_locked(xprt);
__xprt_lock_write_next(xprt);
}
@@ -362,6 +356,11 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{
if (xprt->snd_task == task) {
+ if (task != NULL) {
+ struct rpc_rqst *req = task->tk_rqstp;
+ if (req != NULL)
+ req->rq_bytes_sent = 0;
+ }
xprt_clear_locked(xprt);
__xprt_lock_write_next_cong(xprt);
}
@@ -417,21 +416,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
*/
void xprt_release_rqst_cong(struct rpc_task *task)
{
- __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
+ struct rpc_rqst *req = task->tk_rqstp;
+
+ __xprt_put_cong(req->rq_xprt, req);
}
EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
/**
* xprt_adjust_cwnd - adjust transport congestion window
+ * @xprt: pointer to xprt
* @task: recently completed RPC request used to adjust window
* @result: result code of completed RPC request
*
- * We use a time-smoothed congestion estimator to avoid heavy oscillation.
+ * The transport code maintains an estimate on the maximum number of out-
+ * standing RPC requests, using a smoothed version of the congestion
+ * avoidance implemented in 44BSD. This is basically the Van Jacobson
+ * congestion algorithm: If a retransmit occurs, the congestion window is
+ * halved; otherwise, it is incremented by 1/cwnd when
+ *
+ * - a reply is received and
+ * - a full number of requests are outstanding and
+ * - the congestion window hasn't been updated recently.
*/
-void xprt_adjust_cwnd(struct rpc_task *task, int result)
+void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
{
struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = task->tk_xprt;
unsigned long cwnd = xprt->cwnd;
if (result >= 0 && cwnd <= xprt->cong) {
@@ -472,13 +481,17 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
* xprt_wait_for_buffer_space - wait for transport output buffer to clear
* @task: task to be put to sleep
* @action: function pointer to be executed after wait
+ *
+ * Note that we only set the timer for the case of RPC_IS_SOFT(), since
+ * we don't in general want to force a socket disconnection due to
+ * an incomplete RPC call transmission.
*/
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- task->tk_timeout = req->rq_timeout;
+ task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
rpc_sleep_on(&xprt->pending, task, action);
}
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
@@ -491,9 +504,6 @@ EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
*/
void xprt_write_space(struct rpc_xprt *xprt)
{
- if (unlikely(xprt->shutdown))
- return;
-
spin_lock_bh(&xprt->transport_lock);
if (xprt->snd_task) {
dprintk("RPC: write space: waking waiting task on "
@@ -518,7 +528,7 @@ void xprt_set_retrans_timeout_def(struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
-/*
+/**
* xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
* @task: task whose timeout is to be set
*
@@ -666,7 +676,7 @@ xprt_init_autodisconnect(unsigned long data)
struct rpc_xprt *xprt = (struct rpc_xprt *)data;
spin_lock(&xprt->transport_lock);
- if (!list_empty(&xprt->recv) || xprt->shutdown)
+ if (!list_empty(&xprt->recv))
goto out_abort;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort;
@@ -685,7 +695,7 @@ out_abort:
*/
void xprt_connect(struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
xprt, (xprt_connected(xprt) ? "is" : "is not"));
@@ -703,9 +713,7 @@ void xprt_connect(struct rpc_task *task)
if (xprt_connected(xprt))
xprt_release_write(xprt, task);
else {
- if (task->tk_rqstp)
- task->tk_rqstp->rq_bytes_sent = 0;
-
+ task->tk_rqstp->rq_bytes_sent = 0;
task->tk_timeout = task->tk_rqstp->rq_timeout;
rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -714,13 +722,13 @@ void xprt_connect(struct rpc_task *task)
if (xprt_test_and_set_connecting(xprt))
return;
xprt->stat.connect_start = jiffies;
- xprt->ops->connect(task);
+ xprt->ops->connect(xprt, task);
}
}
static void xprt_connect_status(struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
if (task->tk_status == 0) {
xprt->stat.connect_count++;
@@ -731,6 +739,11 @@ static void xprt_connect_status(struct rpc_task *task)
}
switch (task->tk_status) {
+ case -ECONNREFUSED:
+ case -ECONNRESET:
+ case -ECONNABORTED:
+ case -ENETUNREACH:
+ case -EHOSTUNREACH:
case -EAGAIN:
dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
break;
@@ -741,7 +754,7 @@ static void xprt_connect_status(struct rpc_task *task)
default:
dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
"server %s\n", task->tk_pid, -task->tk_status,
- task->tk_client->cl_server);
+ xprt->servername);
xprt_release_write(xprt, task);
task->tk_status = -EIO;
}
@@ -772,7 +785,7 @@ static void xprt_update_rtt(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_rtt *rtt = task->tk_client->cl_rtt;
- unsigned timer = task->tk_msg.rpc_proc->p_timer;
+ unsigned int timer = task->tk_msg.rpc_proc->p_timer;
long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
if (timer) {
@@ -824,7 +837,7 @@ static void xprt_timer(struct rpc_task *task)
spin_lock_bh(&xprt->transport_lock);
if (!req->rq_reply_bytes_recvd) {
if (xprt->ops->timer)
- xprt->ops->timer(task);
+ xprt->ops->timer(xprt, task);
} else
task->tk_status = 0;
spin_unlock_bh(&xprt->transport_lock);
@@ -840,24 +853,36 @@ static inline int xprt_has_timer(struct rpc_xprt *xprt)
* @task: RPC task about to send a request
*
*/
-int xprt_prepare_transmit(struct rpc_task *task)
+bool xprt_prepare_transmit(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- int err = 0;
+ bool ret = false;
dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
spin_lock_bh(&xprt->transport_lock);
- if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
- err = req->rq_reply_bytes_recvd;
+ if (!req->rq_bytes_sent) {
+ if (req->rq_reply_bytes_recvd) {
+ task->tk_status = req->rq_reply_bytes_recvd;
+ goto out_unlock;
+ }
+ if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
+ && xprt_connected(xprt)
+ && req->rq_connect_cookie == xprt->connect_cookie) {
+ xprt->ops->set_retrans_timeout(task);
+ rpc_sleep_on(&xprt->pending, task, xprt_timer);
+ goto out_unlock;
+ }
+ }
+ if (!xprt->ops->reserve_xprt(xprt, task)) {
+ task->tk_status = -EAGAIN;
goto out_unlock;
}
- if (!xprt->ops->reserve_xprt(task))
- err = -EAGAIN;
+ ret = true;
out_unlock:
spin_unlock_bh(&xprt->transport_lock);
- return err;
+ return ret;
}
void xprt_end_transmit(struct rpc_task *task)
@@ -875,7 +900,7 @@ void xprt_transmit(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
- int status;
+ int status, numreqs;
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
@@ -898,7 +923,6 @@ void xprt_transmit(struct rpc_task *task)
} else if (!req->rq_bytes_sent)
return;
- req->rq_connect_cookie = xprt->connect_cookie;
req->rq_xtime = ktime_get();
status = xprt->ops->send_request(task);
if (status != 0) {
@@ -907,75 +931,188 @@ void xprt_transmit(struct rpc_task *task)
}
dprintk("RPC: %5u xmit complete\n", task->tk_pid);
+ task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock);
xprt->ops->set_retrans_timeout(task);
+ numreqs = atomic_read(&xprt->num_reqs);
+ if (numreqs > xprt->stat.max_slots)
+ xprt->stat.max_slots = numreqs;
xprt->stat.sends++;
xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
xprt->stat.bklog_u += xprt->backlog.qlen;
+ xprt->stat.sending_u += xprt->sending.qlen;
+ xprt->stat.pending_u += xprt->pending.qlen;
/* Don't race with disconnect */
if (!xprt_connected(xprt))
task->tk_status = -ENOTCONN;
- else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
+ else {
/*
* Sleep on the pending queue since
* we're expecting a reply.
*/
- rpc_sleep_on(&xprt->pending, task, xprt_timer);
+ if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task))
+ rpc_sleep_on(&xprt->pending, task, xprt_timer);
+ req->rq_connect_cookie = xprt->connect_cookie;
}
spin_unlock_bh(&xprt->transport_lock);
}
-static void xprt_alloc_slot(struct rpc_task *task)
+static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ set_bit(XPRT_CONGESTED, &xprt->state);
+ rpc_sleep_on(&xprt->backlog, task, NULL);
+}
+
+static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
{
- struct rpc_xprt *xprt = task->tk_xprt;
+ if (rpc_wake_up_next(&xprt->backlog) == NULL)
+ clear_bit(XPRT_CONGESTED, &xprt->state);
+}
- task->tk_status = 0;
- if (task->tk_rqstp)
- return;
+static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ bool ret = false;
+
+ if (!test_bit(XPRT_CONGESTED, &xprt->state))
+ goto out;
+ spin_lock(&xprt->reserve_lock);
+ if (test_bit(XPRT_CONGESTED, &xprt->state)) {
+ rpc_sleep_on(&xprt->backlog, task, NULL);
+ ret = true;
+ }
+ spin_unlock(&xprt->reserve_lock);
+out:
+ return ret;
+}
+
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+ struct rpc_rqst *req = ERR_PTR(-EAGAIN);
+
+ if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
+ goto out;
+ req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+ if (req != NULL)
+ goto out;
+ atomic_dec(&xprt->num_reqs);
+ req = ERR_PTR(-ENOMEM);
+out:
+ return req;
+}
+
+static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+ if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
+ kfree(req);
+ return true;
+ }
+ return false;
+}
+
+void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ struct rpc_rqst *req;
+
+ spin_lock(&xprt->reserve_lock);
if (!list_empty(&xprt->free)) {
- struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
- list_del_init(&req->rq_list);
- task->tk_rqstp = req;
- xprt_request_init(task, xprt);
- return;
+ req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+ list_del(&req->rq_list);
+ goto out_init_req;
}
- dprintk("RPC: waiting for request slot\n");
- task->tk_status = -EAGAIN;
- task->tk_timeout = 0;
- rpc_sleep_on(&xprt->backlog, task, NULL);
+ req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN);
+ if (!IS_ERR(req))
+ goto out_init_req;
+ switch (PTR_ERR(req)) {
+ case -ENOMEM:
+ dprintk("RPC: dynamic allocation of request slot "
+ "failed! Retrying\n");
+ task->tk_status = -ENOMEM;
+ break;
+ case -EAGAIN:
+ xprt_add_backlog(xprt, task);
+ dprintk("RPC: waiting for request slot\n");
+ default:
+ task->tk_status = -EAGAIN;
+ }
+ spin_unlock(&xprt->reserve_lock);
+ return;
+out_init_req:
+ task->tk_status = 0;
+ task->tk_rqstp = req;
+ xprt_request_init(task, xprt);
+ spin_unlock(&xprt->reserve_lock);
}
+EXPORT_SYMBOL_GPL(xprt_alloc_slot);
-static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
- memset(req, 0, sizeof(*req)); /* mark unused */
+ /* Note: grabbing the xprt_lock_write() ensures that we throttle
+ * new slot allocation if the transport is congested (i.e. when
+ * reconnecting a stream transport or when out of socket write
+ * buffer space).
+ */
+ if (xprt_lock_write(xprt, task)) {
+ xprt_alloc_slot(xprt, task);
+ xprt_release_write(xprt, task);
+ }
+}
+EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot);
+static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
spin_lock(&xprt->reserve_lock);
- list_add(&req->rq_list, &xprt->free);
- rpc_wake_up_next(&xprt->backlog);
+ if (!xprt_dynamic_free_slot(xprt, req)) {
+ memset(req, 0, sizeof(*req)); /* mark unused */
+ list_add(&req->rq_list, &xprt->free);
+ }
+ xprt_wake_up_backlog(xprt);
spin_unlock(&xprt->reserve_lock);
}
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
+static void xprt_free_all_slots(struct rpc_xprt *xprt)
+{
+ struct rpc_rqst *req;
+ while (!list_empty(&xprt->free)) {
+ req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
+ list_del(&req->rq_list);
+ kfree(req);
+ }
+}
+
+struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
+ unsigned int num_prealloc,
+ unsigned int max_alloc)
{
struct rpc_xprt *xprt;
+ struct rpc_rqst *req;
+ int i;
xprt = kzalloc(size, GFP_KERNEL);
if (xprt == NULL)
goto out;
- xprt->max_reqs = max_req;
- xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
- if (xprt->slot == NULL)
- goto out_free;
+ xprt_init(xprt, net);
+
+ for (i = 0; i < num_prealloc; i++) {
+ req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
+ if (!req)
+ goto out_free;
+ list_add(&req->rq_list, &xprt->free);
+ }
+ if (max_alloc > num_prealloc)
+ xprt->max_reqs = max_alloc;
+ else
+ xprt->max_reqs = num_prealloc;
+ xprt->min_reqs = num_prealloc;
+ atomic_set(&xprt->num_reqs, num_prealloc);
- xprt->xprt_net = get_net(net);
return xprt;
out_free:
- kfree(xprt);
+ xprt_free(xprt);
out:
return NULL;
}
@@ -984,7 +1121,7 @@ EXPORT_SYMBOL_GPL(xprt_alloc);
void xprt_free(struct rpc_xprt *xprt)
{
put_net(xprt->xprt_net);
- kfree(xprt->slot);
+ xprt_free_all_slots(xprt);
kfree(xprt);
}
EXPORT_SYMBOL_GPL(xprt_free);
@@ -993,17 +1130,50 @@ EXPORT_SYMBOL_GPL(xprt_free);
* xprt_reserve - allocate an RPC request slot
* @task: RPC task requesting a slot allocation
*
- * If no more slots are available, place the task on the transport's
+ * If the transport is marked as being congested, or if no more
+ * slots are available, place the task on the transport's
* backlog queue.
*/
void xprt_reserve(struct rpc_task *task)
{
- struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpc_xprt *xprt;
- task->tk_status = -EIO;
- spin_lock(&xprt->reserve_lock);
- xprt_alloc_slot(task);
- spin_unlock(&xprt->reserve_lock);
+ task->tk_status = 0;
+ if (task->tk_rqstp != NULL)
+ return;
+
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ rcu_read_lock();
+ xprt = rcu_dereference(task->tk_client->cl_xprt);
+ if (!xprt_throttle_congested(xprt, task))
+ xprt->ops->alloc_slot(xprt, task);
+ rcu_read_unlock();
+}
+
+/**
+ * xprt_retry_reserve - allocate an RPC request slot
+ * @task: RPC task requesting a slot allocation
+ *
+ * If no more slots are available, place the task on the transport's
+ * backlog queue.
+ * Note that the only difference with xprt_reserve is that we now
+ * ignore the value of the XPRT_CONGESTED flag.
+ */
+void xprt_retry_reserve(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt;
+
+ task->tk_status = 0;
+ if (task->tk_rqstp != NULL)
+ return;
+
+ task->tk_timeout = 0;
+ task->tk_status = -EAGAIN;
+ rcu_read_lock();
+ xprt = rcu_dereference(task->tk_client->cl_xprt);
+ xprt->ops->alloc_slot(xprt, task);
+ rcu_read_unlock();
}
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1013,18 +1183,25 @@ static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
static inline void xprt_init_xid(struct rpc_xprt *xprt)
{
- xprt->xid = net_random();
+ xprt->xid = prandom_u32();
}
static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
{
struct rpc_rqst *req = task->tk_rqstp;
+ INIT_LIST_HEAD(&req->rq_list);
req->rq_timeout = task->tk_client->cl_timeout->to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
req->rq_buffer = NULL;
req->rq_xid = xprt_alloc_xid(xprt);
+ req->rq_connect_cookie = xprt->connect_cookie - 1;
+ req->rq_bytes_sent = 0;
+ req->rq_snd_buf.len = 0;
+ req->rq_snd_buf.buflen = 0;
+ req->rq_rcv_buf.len = 0;
+ req->rq_rcv_buf.buflen = 0;
req->rq_release_snd_buf = NULL;
xprt_reset_majortimeo(req);
dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
@@ -1039,13 +1216,24 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
void xprt_release(struct rpc_task *task)
{
struct rpc_xprt *xprt;
- struct rpc_rqst *req;
+ struct rpc_rqst *req = task->tk_rqstp;
- if (!(req = task->tk_rqstp))
+ if (req == NULL) {
+ if (task->tk_client) {
+ rcu_read_lock();
+ xprt = rcu_dereference(task->tk_client->cl_xprt);
+ if (xprt->snd_task == task)
+ xprt_release_write(xprt, task);
+ rcu_read_unlock();
+ }
return;
+ }
xprt = req->rq_xprt;
- rpc_count_iostats(task);
+ if (task->tk_ops->rpc_count_stats != NULL)
+ task->tk_ops->rpc_count_stats(task, task->tk_calldata);
+ else if (task->tk_client)
+ rpc_count_iostats(task, task->tk_client->cl_metrics);
spin_lock_bh(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request)
@@ -1072,6 +1260,34 @@ void xprt_release(struct rpc_task *task)
xprt_free_bc_request(req);
}
+static void xprt_init(struct rpc_xprt *xprt, struct net *net)
+{
+ atomic_set(&xprt->count, 1);
+
+ spin_lock_init(&xprt->transport_lock);
+ spin_lock_init(&xprt->reserve_lock);
+
+ INIT_LIST_HEAD(&xprt->free);
+ INIT_LIST_HEAD(&xprt->recv);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ spin_lock_init(&xprt->bc_pa_lock);
+ INIT_LIST_HEAD(&xprt->bc_pa_list);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+ xprt->last_used = jiffies;
+ xprt->cwnd = RPC_INITCWND;
+ xprt->bind_index = 0;
+
+ rpc_init_wait_queue(&xprt->binding, "xprt_binding");
+ rpc_init_wait_queue(&xprt->pending, "xprt_pending");
+ rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+ rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
+
+ xprt_init_xid(xprt);
+
+ xprt->xprt_net = get_net(net);
+}
+
/**
* xprt_create_transport - create an RPC transport
* @args: rpc transport creation arguments
@@ -1080,7 +1296,6 @@ void xprt_release(struct rpc_task *task)
struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
{
struct rpc_xprt *xprt;
- struct rpc_rqst *req;
struct xprt_class *t;
spin_lock(&xprt_list_lock);
@@ -1099,66 +1314,49 @@ found:
if (IS_ERR(xprt)) {
dprintk("RPC: xprt_create_transport: failed, %ld\n",
-PTR_ERR(xprt));
- return xprt;
+ goto out;
}
-
- kref_init(&xprt->kref);
- spin_lock_init(&xprt->transport_lock);
- spin_lock_init(&xprt->reserve_lock);
-
- INIT_LIST_HEAD(&xprt->free);
- INIT_LIST_HEAD(&xprt->recv);
-#if defined(CONFIG_NFS_V4_1)
- spin_lock_init(&xprt->bc_pa_lock);
- INIT_LIST_HEAD(&xprt->bc_pa_list);
-#endif /* CONFIG_NFS_V4_1 */
-
+ if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
+ xprt->idle_timeout = 0;
INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
if (xprt_has_timer(xprt))
setup_timer(&xprt->timer, xprt_init_autodisconnect,
(unsigned long)xprt);
else
init_timer(&xprt->timer);
- xprt->last_used = jiffies;
- xprt->cwnd = RPC_INITCWND;
- xprt->bind_index = 0;
-
- rpc_init_wait_queue(&xprt->binding, "xprt_binding");
- rpc_init_wait_queue(&xprt->pending, "xprt_pending");
- rpc_init_wait_queue(&xprt->sending, "xprt_sending");
- rpc_init_wait_queue(&xprt->resend, "xprt_resend");
- rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
-
- /* initialize free list */
- for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
- list_add(&req->rq_list, &xprt->free);
- xprt_init_xid(xprt);
+ if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
+ xprt_destroy(xprt);
+ return ERR_PTR(-EINVAL);
+ }
+ xprt->servername = kstrdup(args->servername, GFP_KERNEL);
+ if (xprt->servername == NULL) {
+ xprt_destroy(xprt);
+ return ERR_PTR(-ENOMEM);
+ }
dprintk("RPC: created transport %p with %u slots\n", xprt,
xprt->max_reqs);
+out:
return xprt;
}
/**
* xprt_destroy - destroy an RPC transport, killing off all requests.
- * @kref: kref for the transport to destroy
+ * @xprt: transport to destroy
*
*/
-static void xprt_destroy(struct kref *kref)
+static void xprt_destroy(struct rpc_xprt *xprt)
{
- struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
-
dprintk("RPC: destroying transport %p\n", xprt);
- xprt->shutdown = 1;
del_timer_sync(&xprt->timer);
rpc_destroy_wait_queue(&xprt->binding);
rpc_destroy_wait_queue(&xprt->pending);
rpc_destroy_wait_queue(&xprt->sending);
- rpc_destroy_wait_queue(&xprt->resend);
rpc_destroy_wait_queue(&xprt->backlog);
cancel_work_sync(&xprt->task_cleanup);
+ kfree(xprt->servername);
/*
* Tear down transport state and free the rpc_xprt
*/
@@ -1172,16 +1370,6 @@ static void xprt_destroy(struct kref *kref)
*/
void xprt_put(struct rpc_xprt *xprt)
{
- kref_put(&xprt->kref, xprt_destroy);
-}
-
-/**
- * xprt_get - return a reference to an RPC transport.
- * @xprt: pointer to the transport
- *
- */
-struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
-{
- kref_get(&xprt->kref);
- return xprt;
+ if (atomic_dec_and_test(&xprt->count))
+ xprt_destroy(xprt);
}