diff options
Diffstat (limited to 'net/sunrpc/xprt.c')
| -rw-r--r-- | net/sunrpc/xprt.c | 552 | 
1 files changed, 370 insertions, 182 deletions
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 4c8f18aff7c..c3b2b3369e5 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -62,31 +62,15 @@  /*   * Local functions   */ +static void	 xprt_init(struct rpc_xprt *xprt, struct net *net);  static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);  static void	xprt_connect_status(struct rpc_task *task);  static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); +static void	 xprt_destroy(struct rpc_xprt *xprt);  static DEFINE_SPINLOCK(xprt_list_lock);  static LIST_HEAD(xprt_list); -/* - * The transport code maintains an estimate on the maximum number of out- - * standing RPC requests, using a smoothed version of the congestion - * avoidance implemented in 44BSD. This is basically the Van Jacobson - * congestion algorithm: If a retransmit occurs, the congestion window is - * halved; otherwise, it is incremented by 1/cwnd when - * - *	-	a reply is received and - *	-	a full number of requests are outstanding and - *	-	the congestion window hasn't been updated recently. - */ -#define RPC_CWNDSHIFT		(8U) -#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT) -#define RPC_INITCWND		RPC_CWNDSCALE -#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT) - -#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) -  /**   * xprt_register_transport - register a transport implementation   * @transport: transport to register @@ -186,15 +170,16 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);  /**   * xprt_reserve_xprt - serialize write access to transports   * @task: task that is requesting access to the transport + * @xprt: pointer to the target transport   *   * This prevents mixing the payload of separate requests, and prevents   * transport connects from colliding with writes.  No congestion control   * is provided.   */ -int xprt_reserve_xprt(struct rpc_task *task) +int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)  {  	struct rpc_rqst *req = task->tk_rqstp; -	struct rpc_xprt	*xprt = req->rq_xprt; +	int priority;  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {  		if (task == xprt->snd_task) @@ -202,10 +187,9 @@ int xprt_reserve_xprt(struct rpc_task *task)  		goto out_sleep;  	}  	xprt->snd_task = task; -	if (req) { -		req->rq_bytes_sent = 0; +	if (req != NULL)  		req->rq_ntrans++; -	} +  	return 1;  out_sleep: @@ -213,10 +197,13 @@ out_sleep:  			task->tk_pid, xprt);  	task->tk_timeout = 0;  	task->tk_status = -EAGAIN; -	if (req && req->rq_ntrans) -		rpc_sleep_on(&xprt->resend, task, NULL); +	if (req == NULL) +		priority = RPC_PRIORITY_LOW; +	else if (!req->rq_ntrans) +		priority = RPC_PRIORITY_NORMAL;  	else -		rpc_sleep_on(&xprt->sending, task, NULL); +		priority = RPC_PRIORITY_HIGH; +	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);  	return 0;  }  EXPORT_SYMBOL_GPL(xprt_reserve_xprt); @@ -224,10 +211,10 @@ EXPORT_SYMBOL_GPL(xprt_reserve_xprt);  static void xprt_clear_locked(struct rpc_xprt *xprt)  {  	xprt->snd_task = NULL; -	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { -		smp_mb__before_clear_bit(); +	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { +		smp_mb__before_atomic();  		clear_bit(XPRT_LOCKED, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  	} else  		queue_work(rpciod_workqueue, &xprt->task_cleanup);  } @@ -240,22 +227,23 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)   * integrated into the decision of whether a request is allowed to be   * woken up and given access to the transport.   */ -int xprt_reserve_xprt_cong(struct rpc_task *task) +int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)  { -	struct rpc_xprt	*xprt = task->tk_xprt;  	struct rpc_rqst *req = task->tk_rqstp; +	int priority;  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {  		if (task == xprt->snd_task)  			return 1;  		goto out_sleep;  	} +	if (req == NULL) { +		xprt->snd_task = task; +		return 1; +	}  	if (__xprt_get_cong(xprt, task)) {  		xprt->snd_task = task; -		if (req) { -			req->rq_bytes_sent = 0; -			req->rq_ntrans++; -		} +		req->rq_ntrans++;  		return 1;  	}  	xprt_clear_locked(xprt); @@ -263,10 +251,13 @@ out_sleep:  	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);  	task->tk_timeout = 0;  	task->tk_status = -EAGAIN; -	if (req && req->rq_ntrans) -		rpc_sleep_on(&xprt->resend, task, NULL); +	if (req == NULL) +		priority = RPC_PRIORITY_LOW; +	else if (!req->rq_ntrans) +		priority = RPC_PRIORITY_NORMAL;  	else -		rpc_sleep_on(&xprt->sending, task, NULL); +		priority = RPC_PRIORITY_HIGH; +	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);  	return 0;  }  EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); @@ -276,61 +267,59 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)  	int retval;  	spin_lock_bh(&xprt->transport_lock); -	retval = xprt->ops->reserve_xprt(task); +	retval = xprt->ops->reserve_xprt(xprt, task);  	spin_unlock_bh(&xprt->transport_lock);  	return retval;  } -static void __xprt_lock_write_next(struct rpc_xprt *xprt) +static bool __xprt_lock_write_func(struct rpc_task *task, void *data)  { -	struct rpc_task *task; +	struct rpc_xprt *xprt = data;  	struct rpc_rqst *req; +	req = task->tk_rqstp; +	xprt->snd_task = task; +	if (req) +		req->rq_ntrans++; +	return true; +} + +static void __xprt_lock_write_next(struct rpc_xprt *xprt) +{  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))  		return; -	task = rpc_wake_up_next(&xprt->resend); -	if (!task) { -		task = rpc_wake_up_next(&xprt->sending); -		if (!task) -			goto out_unlock; -	} +	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) +		return; +	xprt_clear_locked(xprt); +} + +static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) +{ +	struct rpc_xprt *xprt = data; +	struct rpc_rqst *req;  	req = task->tk_rqstp; -	xprt->snd_task = task; -	if (req) { -		req->rq_bytes_sent = 0; +	if (req == NULL) { +		xprt->snd_task = task; +		return true; +	} +	if (__xprt_get_cong(xprt, task)) { +		xprt->snd_task = task;  		req->rq_ntrans++; +		return true;  	} -	return; - -out_unlock: -	xprt_clear_locked(xprt); +	return false;  }  static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)  { -	struct rpc_task *task; -  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))  		return;  	if (RPCXPRT_CONGESTED(xprt))  		goto out_unlock; -	task = rpc_wake_up_next(&xprt->resend); -	if (!task) { -		task = rpc_wake_up_next(&xprt->sending); -		if (!task) -			goto out_unlock; -	} -	if (__xprt_get_cong(xprt, task)) { -		struct rpc_rqst *req = task->tk_rqstp; -		xprt->snd_task = task; -		if (req) { -			req->rq_bytes_sent = 0; -			req->rq_ntrans++; -		} +	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))  		return; -	}  out_unlock:  	xprt_clear_locked(xprt);  } @@ -345,6 +334,11 @@ out_unlock:  void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)  {  	if (xprt->snd_task == task) { +		if (task != NULL) { +			struct rpc_rqst *req = task->tk_rqstp; +			if (req != NULL) +				req->rq_bytes_sent = 0; +		}  		xprt_clear_locked(xprt);  		__xprt_lock_write_next(xprt);  	} @@ -362,6 +356,11 @@ EXPORT_SYMBOL_GPL(xprt_release_xprt);  void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)  {  	if (xprt->snd_task == task) { +		if (task != NULL) { +			struct rpc_rqst *req = task->tk_rqstp; +			if (req != NULL) +				req->rq_bytes_sent = 0; +		}  		xprt_clear_locked(xprt);  		__xprt_lock_write_next_cong(xprt);  	} @@ -417,21 +416,31 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)   */  void xprt_release_rqst_cong(struct rpc_task *task)  { -	__xprt_put_cong(task->tk_xprt, task->tk_rqstp); +	struct rpc_rqst *req = task->tk_rqstp; + +	__xprt_put_cong(req->rq_xprt, req);  }  EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);  /**   * xprt_adjust_cwnd - adjust transport congestion window + * @xprt: pointer to xprt   * @task: recently completed RPC request used to adjust window   * @result: result code of completed RPC request   * - * We use a time-smoothed congestion estimator to avoid heavy oscillation. + * The transport code maintains an estimate on the maximum number of out- + * standing RPC requests, using a smoothed version of the congestion + * avoidance implemented in 44BSD. This is basically the Van Jacobson + * congestion algorithm: If a retransmit occurs, the congestion window is + * halved; otherwise, it is incremented by 1/cwnd when + * + *	-	a reply is received and + *	-	a full number of requests are outstanding and + *	-	the congestion window hasn't been updated recently.   */ -void xprt_adjust_cwnd(struct rpc_task *task, int result) +void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)  {  	struct rpc_rqst *req = task->tk_rqstp; -	struct rpc_xprt *xprt = task->tk_xprt;  	unsigned long cwnd = xprt->cwnd;  	if (result >= 0 && cwnd <= xprt->cong) { @@ -472,13 +481,17 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);   * xprt_wait_for_buffer_space - wait for transport output buffer to clear   * @task: task to be put to sleep   * @action: function pointer to be executed after wait + * + * Note that we only set the timer for the case of RPC_IS_SOFT(), since + * we don't in general want to force a socket disconnection due to + * an incomplete RPC call transmission.   */  void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)  {  	struct rpc_rqst *req = task->tk_rqstp;  	struct rpc_xprt *xprt = req->rq_xprt; -	task->tk_timeout = req->rq_timeout; +	task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;  	rpc_sleep_on(&xprt->pending, task, action);  }  EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); @@ -491,9 +504,6 @@ EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);   */  void xprt_write_space(struct rpc_xprt *xprt)  { -	if (unlikely(xprt->shutdown)) -		return; -  	spin_lock_bh(&xprt->transport_lock);  	if (xprt->snd_task) {  		dprintk("RPC:       write space: waking waiting task on " @@ -518,7 +528,7 @@ void xprt_set_retrans_timeout_def(struct rpc_task *task)  }  EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); -/* +/**   * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout   * @task: task whose timeout is to be set   * @@ -666,7 +676,7 @@ xprt_init_autodisconnect(unsigned long data)  	struct rpc_xprt *xprt = (struct rpc_xprt *)data;  	spin_lock(&xprt->transport_lock); -	if (!list_empty(&xprt->recv) || xprt->shutdown) +	if (!list_empty(&xprt->recv))  		goto out_abort;  	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))  		goto out_abort; @@ -685,7 +695,7 @@ out_abort:   */  void xprt_connect(struct rpc_task *task)  { -	struct rpc_xprt	*xprt = task->tk_xprt; +	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;  	dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,  			xprt, (xprt_connected(xprt) ? "is" : "is not")); @@ -703,9 +713,7 @@ void xprt_connect(struct rpc_task *task)  	if (xprt_connected(xprt))  		xprt_release_write(xprt, task);  	else { -		if (task->tk_rqstp) -			task->tk_rqstp->rq_bytes_sent = 0; - +		task->tk_rqstp->rq_bytes_sent = 0;  		task->tk_timeout = task->tk_rqstp->rq_timeout;  		rpc_sleep_on(&xprt->pending, task, xprt_connect_status); @@ -714,13 +722,13 @@ void xprt_connect(struct rpc_task *task)  		if (xprt_test_and_set_connecting(xprt))  			return;  		xprt->stat.connect_start = jiffies; -		xprt->ops->connect(task); +		xprt->ops->connect(xprt, task);  	}  }  static void xprt_connect_status(struct rpc_task *task)  { -	struct rpc_xprt	*xprt = task->tk_xprt; +	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;  	if (task->tk_status == 0) {  		xprt->stat.connect_count++; @@ -731,6 +739,11 @@ static void xprt_connect_status(struct rpc_task *task)  	}  	switch (task->tk_status) { +	case -ECONNREFUSED: +	case -ECONNRESET: +	case -ECONNABORTED: +	case -ENETUNREACH: +	case -EHOSTUNREACH:  	case -EAGAIN:  		dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);  		break; @@ -741,7 +754,7 @@ static void xprt_connect_status(struct rpc_task *task)  	default:  		dprintk("RPC: %5u xprt_connect_status: error %d connecting to "  				"server %s\n", task->tk_pid, -task->tk_status, -				task->tk_client->cl_server); +				xprt->servername);  		xprt_release_write(xprt, task);  		task->tk_status = -EIO;  	} @@ -772,7 +785,7 @@ static void xprt_update_rtt(struct rpc_task *task)  {  	struct rpc_rqst *req = task->tk_rqstp;  	struct rpc_rtt *rtt = task->tk_client->cl_rtt; -	unsigned timer = task->tk_msg.rpc_proc->p_timer; +	unsigned int timer = task->tk_msg.rpc_proc->p_timer;  	long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));  	if (timer) { @@ -824,7 +837,7 @@ static void xprt_timer(struct rpc_task *task)  	spin_lock_bh(&xprt->transport_lock);  	if (!req->rq_reply_bytes_recvd) {  		if (xprt->ops->timer) -			xprt->ops->timer(task); +			xprt->ops->timer(xprt, task);  	} else  		task->tk_status = 0;  	spin_unlock_bh(&xprt->transport_lock); @@ -840,24 +853,36 @@ static inline int xprt_has_timer(struct rpc_xprt *xprt)   * @task: RPC task about to send a request   *   */ -int xprt_prepare_transmit(struct rpc_task *task) +bool xprt_prepare_transmit(struct rpc_task *task)  {  	struct rpc_rqst	*req = task->tk_rqstp;  	struct rpc_xprt	*xprt = req->rq_xprt; -	int err = 0; +	bool ret = false;  	dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);  	spin_lock_bh(&xprt->transport_lock); -	if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { -		err = req->rq_reply_bytes_recvd; +	if (!req->rq_bytes_sent) { +		if (req->rq_reply_bytes_recvd) { +			task->tk_status = req->rq_reply_bytes_recvd; +			goto out_unlock; +		} +		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) +		    && xprt_connected(xprt) +		    && req->rq_connect_cookie == xprt->connect_cookie) { +			xprt->ops->set_retrans_timeout(task); +			rpc_sleep_on(&xprt->pending, task, xprt_timer); +			goto out_unlock; +		} +	} +	if (!xprt->ops->reserve_xprt(xprt, task)) { +		task->tk_status = -EAGAIN;  		goto out_unlock;  	} -	if (!xprt->ops->reserve_xprt(task)) -		err = -EAGAIN; +	ret = true;  out_unlock:  	spin_unlock_bh(&xprt->transport_lock); -	return err; +	return ret;  }  void xprt_end_transmit(struct rpc_task *task) @@ -875,7 +900,7 @@ void xprt_transmit(struct rpc_task *task)  {  	struct rpc_rqst	*req = task->tk_rqstp;  	struct rpc_xprt	*xprt = req->rq_xprt; -	int status; +	int status, numreqs;  	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); @@ -898,7 +923,6 @@ void xprt_transmit(struct rpc_task *task)  	} else if (!req->rq_bytes_sent)  		return; -	req->rq_connect_cookie = xprt->connect_cookie;  	req->rq_xtime = ktime_get();  	status = xprt->ops->send_request(task);  	if (status != 0) { @@ -907,75 +931,188 @@ void xprt_transmit(struct rpc_task *task)  	}  	dprintk("RPC: %5u xmit complete\n", task->tk_pid); +	task->tk_flags |= RPC_TASK_SENT;  	spin_lock_bh(&xprt->transport_lock);  	xprt->ops->set_retrans_timeout(task); +	numreqs = atomic_read(&xprt->num_reqs); +	if (numreqs > xprt->stat.max_slots) +		xprt->stat.max_slots = numreqs;  	xprt->stat.sends++;  	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;  	xprt->stat.bklog_u += xprt->backlog.qlen; +	xprt->stat.sending_u += xprt->sending.qlen; +	xprt->stat.pending_u += xprt->pending.qlen;  	/* Don't race with disconnect */  	if (!xprt_connected(xprt))  		task->tk_status = -ENOTCONN; -	else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { +	else {  		/*  		 * Sleep on the pending queue since  		 * we're expecting a reply.  		 */ -		rpc_sleep_on(&xprt->pending, task, xprt_timer); +		if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) +			rpc_sleep_on(&xprt->pending, task, xprt_timer); +		req->rq_connect_cookie = xprt->connect_cookie;  	}  	spin_unlock_bh(&xprt->transport_lock);  } -static void xprt_alloc_slot(struct rpc_task *task) +static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) +{ +	set_bit(XPRT_CONGESTED, &xprt->state); +	rpc_sleep_on(&xprt->backlog, task, NULL); +} + +static void xprt_wake_up_backlog(struct rpc_xprt *xprt)  { -	struct rpc_xprt	*xprt = task->tk_xprt; +	if (rpc_wake_up_next(&xprt->backlog) == NULL) +		clear_bit(XPRT_CONGESTED, &xprt->state); +} -	task->tk_status = 0; -	if (task->tk_rqstp) -		return; +static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) +{ +	bool ret = false; + +	if (!test_bit(XPRT_CONGESTED, &xprt->state)) +		goto out; +	spin_lock(&xprt->reserve_lock); +	if (test_bit(XPRT_CONGESTED, &xprt->state)) { +		rpc_sleep_on(&xprt->backlog, task, NULL); +		ret = true; +	} +	spin_unlock(&xprt->reserve_lock); +out: +	return ret; +} + +static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) +{ +	struct rpc_rqst *req = ERR_PTR(-EAGAIN); + +	if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) +		goto out; +	req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); +	if (req != NULL) +		goto out; +	atomic_dec(&xprt->num_reqs); +	req = ERR_PTR(-ENOMEM); +out: +	return req; +} + +static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) +{ +	if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { +		kfree(req); +		return true; +	} +	return false; +} + +void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) +{ +	struct rpc_rqst *req; + +	spin_lock(&xprt->reserve_lock);  	if (!list_empty(&xprt->free)) { -		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); -		list_del_init(&req->rq_list); -		task->tk_rqstp = req; -		xprt_request_init(task, xprt); -		return; +		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); +		list_del(&req->rq_list); +		goto out_init_req;  	} -	dprintk("RPC:       waiting for request slot\n"); -	task->tk_status = -EAGAIN; -	task->tk_timeout = 0; -	rpc_sleep_on(&xprt->backlog, task, NULL); +	req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN); +	if (!IS_ERR(req)) +		goto out_init_req; +	switch (PTR_ERR(req)) { +	case -ENOMEM: +		dprintk("RPC:       dynamic allocation of request slot " +				"failed! Retrying\n"); +		task->tk_status = -ENOMEM; +		break; +	case -EAGAIN: +		xprt_add_backlog(xprt, task); +		dprintk("RPC:       waiting for request slot\n"); +	default: +		task->tk_status = -EAGAIN; +	} +	spin_unlock(&xprt->reserve_lock); +	return; +out_init_req: +	task->tk_status = 0; +	task->tk_rqstp = req; +	xprt_request_init(task, xprt); +	spin_unlock(&xprt->reserve_lock);  } +EXPORT_SYMBOL_GPL(xprt_alloc_slot); -static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) +void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)  { -	memset(req, 0, sizeof(*req));	/* mark unused */ +	/* Note: grabbing the xprt_lock_write() ensures that we throttle +	 * new slot allocation if the transport is congested (i.e. when +	 * reconnecting a stream transport or when out of socket write +	 * buffer space). +	 */ +	if (xprt_lock_write(xprt, task)) { +		xprt_alloc_slot(xprt, task); +		xprt_release_write(xprt, task); +	} +} +EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); +static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) +{  	spin_lock(&xprt->reserve_lock); -	list_add(&req->rq_list, &xprt->free); -	rpc_wake_up_next(&xprt->backlog); +	if (!xprt_dynamic_free_slot(xprt, req)) { +		memset(req, 0, sizeof(*req));	/* mark unused */ +		list_add(&req->rq_list, &xprt->free); +	} +	xprt_wake_up_backlog(xprt);  	spin_unlock(&xprt->reserve_lock);  } -struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req) +static void xprt_free_all_slots(struct rpc_xprt *xprt) +{ +	struct rpc_rqst *req; +	while (!list_empty(&xprt->free)) { +		req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); +		list_del(&req->rq_list); +		kfree(req); +	} +} + +struct rpc_xprt *xprt_alloc(struct net *net, size_t size, +		unsigned int num_prealloc, +		unsigned int max_alloc)  {  	struct rpc_xprt *xprt; +	struct rpc_rqst *req; +	int i;  	xprt = kzalloc(size, GFP_KERNEL);  	if (xprt == NULL)  		goto out; -	xprt->max_reqs = max_req; -	xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL); -	if (xprt->slot == NULL) -		goto out_free; +	xprt_init(xprt, net); + +	for (i = 0; i < num_prealloc; i++) { +		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); +		if (!req) +			goto out_free; +		list_add(&req->rq_list, &xprt->free); +	} +	if (max_alloc > num_prealloc) +		xprt->max_reqs = max_alloc; +	else +		xprt->max_reqs = num_prealloc; +	xprt->min_reqs = num_prealloc; +	atomic_set(&xprt->num_reqs, num_prealloc); -	xprt->xprt_net = get_net(net);  	return xprt;  out_free: -	kfree(xprt); +	xprt_free(xprt);  out:  	return NULL;  } @@ -984,7 +1121,7 @@ EXPORT_SYMBOL_GPL(xprt_alloc);  void xprt_free(struct rpc_xprt *xprt)  {  	put_net(xprt->xprt_net); -	kfree(xprt->slot); +	xprt_free_all_slots(xprt);  	kfree(xprt);  }  EXPORT_SYMBOL_GPL(xprt_free); @@ -993,17 +1130,50 @@ EXPORT_SYMBOL_GPL(xprt_free);   * xprt_reserve - allocate an RPC request slot   * @task: RPC task requesting a slot allocation   * - * If no more slots are available, place the task on the transport's + * If the transport is marked as being congested, or if no more + * slots are available, place the task on the transport's   * backlog queue.   */  void xprt_reserve(struct rpc_task *task)  { -	struct rpc_xprt	*xprt = task->tk_xprt; +	struct rpc_xprt	*xprt; -	task->tk_status = -EIO; -	spin_lock(&xprt->reserve_lock); -	xprt_alloc_slot(task); -	spin_unlock(&xprt->reserve_lock); +	task->tk_status = 0; +	if (task->tk_rqstp != NULL) +		return; + +	task->tk_timeout = 0; +	task->tk_status = -EAGAIN; +	rcu_read_lock(); +	xprt = rcu_dereference(task->tk_client->cl_xprt); +	if (!xprt_throttle_congested(xprt, task)) +		xprt->ops->alloc_slot(xprt, task); +	rcu_read_unlock(); +} + +/** + * xprt_retry_reserve - allocate an RPC request slot + * @task: RPC task requesting a slot allocation + * + * If no more slots are available, place the task on the transport's + * backlog queue. + * Note that the only difference with xprt_reserve is that we now + * ignore the value of the XPRT_CONGESTED flag. + */ +void xprt_retry_reserve(struct rpc_task *task) +{ +	struct rpc_xprt	*xprt; + +	task->tk_status = 0; +	if (task->tk_rqstp != NULL) +		return; + +	task->tk_timeout = 0; +	task->tk_status = -EAGAIN; +	rcu_read_lock(); +	xprt = rcu_dereference(task->tk_client->cl_xprt); +	xprt->ops->alloc_slot(xprt, task); +	rcu_read_unlock();  }  static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) @@ -1013,18 +1183,25 @@ static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)  static inline void xprt_init_xid(struct rpc_xprt *xprt)  { -	xprt->xid = net_random(); +	xprt->xid = prandom_u32();  }  static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)  {  	struct rpc_rqst	*req = task->tk_rqstp; +	INIT_LIST_HEAD(&req->rq_list);  	req->rq_timeout = task->tk_client->cl_timeout->to_initval;  	req->rq_task	= task;  	req->rq_xprt    = xprt;  	req->rq_buffer  = NULL;  	req->rq_xid     = xprt_alloc_xid(xprt); +	req->rq_connect_cookie = xprt->connect_cookie - 1; +	req->rq_bytes_sent = 0; +	req->rq_snd_buf.len = 0; +	req->rq_snd_buf.buflen = 0; +	req->rq_rcv_buf.len = 0; +	req->rq_rcv_buf.buflen = 0;  	req->rq_release_snd_buf = NULL;  	xprt_reset_majortimeo(req);  	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, @@ -1039,13 +1216,24 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)  void xprt_release(struct rpc_task *task)  {  	struct rpc_xprt	*xprt; -	struct rpc_rqst	*req; +	struct rpc_rqst	*req = task->tk_rqstp; -	if (!(req = task->tk_rqstp)) +	if (req == NULL) { +		if (task->tk_client) { +			rcu_read_lock(); +			xprt = rcu_dereference(task->tk_client->cl_xprt); +			if (xprt->snd_task == task) +				xprt_release_write(xprt, task); +			rcu_read_unlock(); +		}  		return; +	}  	xprt = req->rq_xprt; -	rpc_count_iostats(task); +	if (task->tk_ops->rpc_count_stats != NULL) +		task->tk_ops->rpc_count_stats(task, task->tk_calldata); +	else if (task->tk_client) +		rpc_count_iostats(task, task->tk_client->cl_metrics);  	spin_lock_bh(&xprt->transport_lock);  	xprt->ops->release_xprt(xprt, task);  	if (xprt->ops->release_request) @@ -1072,6 +1260,34 @@ void xprt_release(struct rpc_task *task)  		xprt_free_bc_request(req);  } +static void xprt_init(struct rpc_xprt *xprt, struct net *net) +{ +	atomic_set(&xprt->count, 1); + +	spin_lock_init(&xprt->transport_lock); +	spin_lock_init(&xprt->reserve_lock); + +	INIT_LIST_HEAD(&xprt->free); +	INIT_LIST_HEAD(&xprt->recv); +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +	spin_lock_init(&xprt->bc_pa_lock); +	INIT_LIST_HEAD(&xprt->bc_pa_list); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + +	xprt->last_used = jiffies; +	xprt->cwnd = RPC_INITCWND; +	xprt->bind_index = 0; + +	rpc_init_wait_queue(&xprt->binding, "xprt_binding"); +	rpc_init_wait_queue(&xprt->pending, "xprt_pending"); +	rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); +	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); + +	xprt_init_xid(xprt); + +	xprt->xprt_net = get_net(net); +} +  /**   * xprt_create_transport - create an RPC transport   * @args: rpc transport creation arguments @@ -1080,7 +1296,6 @@ void xprt_release(struct rpc_task *task)  struct rpc_xprt *xprt_create_transport(struct xprt_create *args)  {  	struct rpc_xprt	*xprt; -	struct rpc_rqst	*req;  	struct xprt_class *t;  	spin_lock(&xprt_list_lock); @@ -1099,66 +1314,49 @@ found:  	if (IS_ERR(xprt)) {  		dprintk("RPC:       xprt_create_transport: failed, %ld\n",  				-PTR_ERR(xprt)); -		return xprt; +		goto out;  	} - -	kref_init(&xprt->kref); -	spin_lock_init(&xprt->transport_lock); -	spin_lock_init(&xprt->reserve_lock); - -	INIT_LIST_HEAD(&xprt->free); -	INIT_LIST_HEAD(&xprt->recv); -#if defined(CONFIG_NFS_V4_1) -	spin_lock_init(&xprt->bc_pa_lock); -	INIT_LIST_HEAD(&xprt->bc_pa_list); -#endif /* CONFIG_NFS_V4_1 */ - +	if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) +		xprt->idle_timeout = 0;  	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);  	if (xprt_has_timer(xprt))  		setup_timer(&xprt->timer, xprt_init_autodisconnect,  			    (unsigned long)xprt);  	else  		init_timer(&xprt->timer); -	xprt->last_used = jiffies; -	xprt->cwnd = RPC_INITCWND; -	xprt->bind_index = 0; - -	rpc_init_wait_queue(&xprt->binding, "xprt_binding"); -	rpc_init_wait_queue(&xprt->pending, "xprt_pending"); -	rpc_init_wait_queue(&xprt->sending, "xprt_sending"); -	rpc_init_wait_queue(&xprt->resend, "xprt_resend"); -	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); - -	/* initialize free list */ -	for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) -		list_add(&req->rq_list, &xprt->free); -	xprt_init_xid(xprt); +	if (strlen(args->servername) > RPC_MAXNETNAMELEN) { +		xprt_destroy(xprt); +		return ERR_PTR(-EINVAL); +	} +	xprt->servername = kstrdup(args->servername, GFP_KERNEL); +	if (xprt->servername == NULL) { +		xprt_destroy(xprt); +		return ERR_PTR(-ENOMEM); +	}  	dprintk("RPC:       created transport %p with %u slots\n", xprt,  			xprt->max_reqs); +out:  	return xprt;  }  /**   * xprt_destroy - destroy an RPC transport, killing off all requests. - * @kref: kref for the transport to destroy + * @xprt: transport to destroy   *   */ -static void xprt_destroy(struct kref *kref) +static void xprt_destroy(struct rpc_xprt *xprt)  { -	struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref); -  	dprintk("RPC:       destroying transport %p\n", xprt); -	xprt->shutdown = 1;  	del_timer_sync(&xprt->timer);  	rpc_destroy_wait_queue(&xprt->binding);  	rpc_destroy_wait_queue(&xprt->pending);  	rpc_destroy_wait_queue(&xprt->sending); -	rpc_destroy_wait_queue(&xprt->resend);  	rpc_destroy_wait_queue(&xprt->backlog);  	cancel_work_sync(&xprt->task_cleanup); +	kfree(xprt->servername);  	/*  	 * Tear down transport state and free the rpc_xprt  	 */ @@ -1172,16 +1370,6 @@ static void xprt_destroy(struct kref *kref)   */  void xprt_put(struct rpc_xprt *xprt)  { -	kref_put(&xprt->kref, xprt_destroy); -} - -/** - * xprt_get - return a reference to an RPC transport. - * @xprt: pointer to the transport - * - */ -struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) -{ -	kref_get(&xprt->kref); -	return xprt; +	if (atomic_dec_and_test(&xprt->count)) +		xprt_destroy(xprt);  }  | 
