diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
| -rw-r--r-- | net/sunrpc/xprtsock.c | 203 | 
1 files changed, 125 insertions, 78 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index ee03d35677d..be8bbd5d65e 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -254,9 +254,10 @@ struct sock_xprt {  	/*  	 * Saved socket callback addresses  	 */ -	void			(*old_data_ready)(struct sock *, int); +	void			(*old_data_ready)(struct sock *);  	void			(*old_state_change)(struct sock *);  	void			(*old_write_space)(struct sock *); +	void			(*old_error_report)(struct sock *);  };  /* @@ -274,6 +275,11 @@ struct sock_xprt {   */  #define TCP_RPC_REPLY		(1UL << 6) +static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) +{ +	return (struct rpc_xprt *) sk->sk_user_data; +} +  static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)  {  	return (struct sockaddr *) &xprt->addr; @@ -393,8 +399,10 @@ static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen,  	return kernel_sendmsg(sock, &msg, NULL, 0, 0);  } -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) +static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)  { +	ssize_t (*do_sendpage)(struct socket *sock, struct page *page, +			int offset, size_t size, int flags);  	struct page **ppage;  	unsigned int remainder;  	int err, sent = 0; @@ -403,6 +411,9 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i  	base += xdr->page_base;  	ppage = xdr->pages + (base >> PAGE_SHIFT);  	base &= ~PAGE_MASK; +	do_sendpage = sock->ops->sendpage; +	if (!zerocopy) +		do_sendpage = sock_no_sendpage;  	for(;;) {  		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);  		int flags = XS_SENDMSG_FLAGS; @@ -410,7 +421,7 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i  		remainder -= len;  		if (remainder != 0 || more)  			flags |= MSG_MORE; -		err = sock->ops->sendpage(sock, *ppage, base, len, flags); +		err = do_sendpage(sock, *ppage, base, len, flags);  		if (remainder == 0 || err != len)  			break;  		sent += err; @@ -431,9 +442,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i   * @addrlen: UDP only -- length of destination address   * @xdr: buffer containing this request   * @base: starting position in the buffer + * @zerocopy: true if it is safe to use sendpage()   *   */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) +static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)  {  	unsigned int remainder = xdr->len - base;  	int err, sent = 0; @@ -461,7 +473,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,  	if (base < xdr->page_len) {  		unsigned int len = xdr->page_len - base;  		remainder -= len; -		err = xs_send_pagedata(sock, xdr, base, remainder != 0); +		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);  		if (remainder == 0 || err != len)  			goto out;  		sent += err; @@ -498,6 +510,7 @@ static int xs_nospace(struct rpc_task *task)  	struct rpc_rqst *req = task->tk_rqstp;  	struct rpc_xprt *xprt = req->rq_xprt;  	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); +	struct sock *sk = transport->inet;  	int ret = -EAGAIN;  	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", @@ -515,7 +528,7 @@ static int xs_nospace(struct rpc_task *task)  			 * window size  			 */  			set_bit(SOCK_NOSPACE, &transport->sock->flags); -			transport->inet->sk_write_pending++; +			sk->sk_write_pending++;  			/* ...and wait for more buffer space */  			xprt_wait_for_buffer_space(task, xs_nospace_callback);  		} @@ -525,6 +538,9 @@ static int xs_nospace(struct rpc_task *task)  	}  	spin_unlock_bh(&xprt->transport_lock); + +	/* Race breaker in case memory is freed before above code is called */ +	sk->sk_write_space(sk);  	return ret;  } @@ -564,7 +580,7 @@ static int xs_local_send_request(struct rpc_task *task)  			req->rq_svec->iov_base, req->rq_svec->iov_len);  	status = xs_sendpages(transport->sock, NULL, 0, -						xdr, req->rq_bytes_sent); +						xdr, req->rq_bytes_sent, true);  	dprintk("RPC:       %s(%u) = %d\n",  			__func__, xdr->len - req->rq_bytes_sent, status);  	if (likely(status >= 0)) { @@ -620,7 +636,7 @@ static int xs_udp_send_request(struct rpc_task *task)  	status = xs_sendpages(transport->sock,  			      xs_addr(xprt),  			      xprt->addrlen, xdr, -			      req->rq_bytes_sent); +			      req->rq_bytes_sent, true);  	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",  			xdr->len - req->rq_bytes_sent, status); @@ -693,6 +709,7 @@ static int xs_tcp_send_request(struct rpc_task *task)  	struct rpc_xprt *xprt = req->rq_xprt;  	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);  	struct xdr_buf *xdr = &req->rq_snd_buf; +	bool zerocopy = true;  	int status;  	xs_encode_stream_record_marker(&req->rq_snd_buf); @@ -700,13 +717,20 @@ static int xs_tcp_send_request(struct rpc_task *task)  	xs_pktdump("packet data:",  				req->rq_svec->iov_base,  				req->rq_svec->iov_len); +	/* Don't use zero copy if this is a resend. If the RPC call +	 * completes while the socket holds a reference to the pages, +	 * then we may end up resending corrupted data. +	 */ +	if (task->tk_flags & RPC_TASK_SENT) +		zerocopy = false;  	/* Continue transmitting the packet/record. We must be careful  	 * to cope with writespace callbacks arriving _after_ we have  	 * called sendmsg(). */  	while (1) {  		status = xs_sendpages(transport->sock, -					NULL, 0, xdr, req->rq_bytes_sent); +					NULL, 0, xdr, req->rq_bytes_sent, +					zerocopy);  		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",  				xdr->len - req->rq_bytes_sent, status); @@ -785,6 +809,7 @@ static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)  	transport->old_data_ready = sk->sk_data_ready;  	transport->old_state_change = sk->sk_state_change;  	transport->old_write_space = sk->sk_write_space; +	transport->old_error_report = sk->sk_error_report;  }  static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) @@ -792,6 +817,34 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s  	sk->sk_data_ready = transport->old_data_ready;  	sk->sk_state_change = transport->old_state_change;  	sk->sk_write_space = transport->old_write_space; +	sk->sk_error_report = transport->old_error_report; +} + +/** + * xs_error_report - callback to handle TCP socket state errors + * @sk: socket + * + * Note: we don't call sock_error() since there may be a rpc_task + * using the socket, and so we don't want to clear sk->sk_err. + */ +static void xs_error_report(struct sock *sk) +{ +	struct rpc_xprt *xprt; +	int err; + +	read_lock_bh(&sk->sk_callback_lock); +	if (!(xprt = xprt_from_sock(sk))) +		goto out; + +	err = -sk->sk_err; +	if (err == 0) +		goto out; +	dprintk("RPC:       xs_error_report client %p, error=%d...\n", +			xprt, -err); +	trace_rpc_socket_error(xprt, sk->sk_socket, err); +	xprt_wake_pending_tasks(xprt, err); + out: +	read_unlock_bh(&sk->sk_callback_lock);  }  static void xs_reset_transport(struct sock_xprt *transport) @@ -813,8 +866,6 @@ static void xs_reset_transport(struct sock_xprt *transport)  	xs_restore_old_callbacks(transport, sk);  	write_unlock_bh(&sk->sk_callback_lock); -	sk->sk_no_check = 0; -  	trace_rpc_socket_close(&transport->xprt, sock);  	sock_release(sock);  } @@ -835,14 +886,16 @@ static void xs_close(struct rpc_xprt *xprt)  	dprintk("RPC:       xs_close xprt %p\n", xprt); +	cancel_delayed_work_sync(&transport->connect_worker); +  	xs_reset_transport(transport);  	xprt->reestablish_timeout = 0; -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);  	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);  	clear_bit(XPRT_CLOSING, &xprt->state); -	smp_mb__after_clear_bit(); +	smp_mb__after_atomic();  	xprt_disconnect_done(xprt);  } @@ -854,12 +907,10 @@ static void xs_tcp_close(struct rpc_xprt *xprt)  		xs_tcp_shutdown(xprt);  } -static void xs_local_destroy(struct rpc_xprt *xprt) +static void xs_xprt_free(struct rpc_xprt *xprt)  { -	xs_close(xprt);  	xs_free_peer_addresses(xprt);  	xprt_free(xprt); -	module_put(THIS_MODULE);  }  /** @@ -869,18 +920,11 @@ static void xs_local_destroy(struct rpc_xprt *xprt)   */  static void xs_destroy(struct rpc_xprt *xprt)  { -	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); -  	dprintk("RPC:       xs_destroy xprt %p\n", xprt); -	cancel_delayed_work_sync(&transport->connect_worker); - -	xs_local_destroy(xprt); -} - -static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) -{ -	return (struct rpc_xprt *) sk->sk_user_data; +	xs_close(xprt); +	xs_xprt_free(xprt); +	module_put(THIS_MODULE);  }  static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) @@ -905,7 +949,7 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)   *   * Currently this assumes we can read the whole reply in a single gulp.   */ -static void xs_local_data_ready(struct sock *sk, int len) +static void xs_local_data_ready(struct sock *sk)  {  	struct rpc_task *task;  	struct rpc_xprt *xprt; @@ -968,7 +1012,7 @@ static void xs_local_data_ready(struct sock *sk, int len)   * @len: how much data to read   *   */ -static void xs_udp_data_ready(struct sock *sk, int len) +static void xs_udp_data_ready(struct sock *sk)  {  	struct rpc_task *task;  	struct rpc_xprt *xprt; @@ -1265,41 +1309,29 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,   * If we're unable to obtain the rpc_rqst we schedule the closing of the   * connection and return -1.   */ -static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, +static int xs_tcp_read_callback(struct rpc_xprt *xprt,  				       struct xdr_skb_reader *desc)  {  	struct sock_xprt *transport =  				container_of(xprt, struct sock_xprt, xprt);  	struct rpc_rqst *req; -	req = xprt_alloc_bc_request(xprt); +	/* Look up and lock the request corresponding to the given XID */ +	spin_lock(&xprt->transport_lock); +	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);  	if (req == NULL) { +		spin_unlock(&xprt->transport_lock);  		printk(KERN_WARNING "Callback slot table overflowed\n");  		xprt_force_disconnect(xprt);  		return -1;  	} -	req->rq_xid = transport->tcp_xid;  	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));  	xs_tcp_read_common(xprt, desc, req); -	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { -		struct svc_serv *bc_serv = xprt->bc_serv; - -		/* -		 * Add callback request to callback list.  The callback -		 * service sleeps on the sv_cb_waitq waiting for new -		 * requests.  Wake it up after adding enqueing the -		 * request. -		 */ -		dprintk("RPC:       add callback request to list\n"); -		spin_lock(&bc_serv->sv_cb_lock); -		list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); -		spin_unlock(&bc_serv->sv_cb_lock); -		wake_up(&bc_serv->sv_cb_waitq); -	} - -	req->rq_private_buf.len = transport->tcp_copied; +	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) +		xprt_complete_bc_request(req, transport->tcp_copied); +	spin_unlock(&xprt->transport_lock);  	return 0;  } @@ -1403,7 +1435,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns   * @bytes: how much data to read   *   */ -static void xs_tcp_data_ready(struct sock *sk, int bytes) +static void xs_tcp_data_ready(struct sock *sk)  {  	struct rpc_xprt *xprt;  	read_descriptor_t rd_desc; @@ -1463,12 +1495,12 @@ static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)  static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)  { -	smp_mb__before_clear_bit(); +	smp_mb__before_atomic();  	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);  	clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);  	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);  	clear_bit(XPRT_CLOSING, &xprt->state); -	smp_mb__after_clear_bit(); +	smp_mb__after_atomic();  }  static void xs_sock_mark_closed(struct rpc_xprt *xprt) @@ -1511,6 +1543,7 @@ static void xs_tcp_state_change(struct sock *sk)  			transport->tcp_copied = 0;  			transport->tcp_flags =  				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; +			xprt->connect_cookie++;  			xprt_wake_pending_tasks(xprt, -EAGAIN);  		} @@ -1521,10 +1554,10 @@ static void xs_tcp_state_change(struct sock *sk)  		xprt->connect_cookie++;  		xprt->reestablish_timeout = 0;  		set_bit(XPRT_CLOSING, &xprt->state); -		smp_mb__before_clear_bit(); +		smp_mb__before_atomic();  		clear_bit(XPRT_CONNECTED, &xprt->state);  		clear_bit(XPRT_CLOSE_WAIT, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);  		break;  	case TCP_CLOSE_WAIT: @@ -1543,9 +1576,9 @@ static void xs_tcp_state_change(struct sock *sk)  	case TCP_LAST_ACK:  		set_bit(XPRT_CLOSING, &xprt->state);  		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); -		smp_mb__before_clear_bit(); +		smp_mb__before_atomic();  		clear_bit(XPRT_CONNECTED, &xprt->state); -		smp_mb__after_clear_bit(); +		smp_mb__after_atomic();  		break;  	case TCP_CLOSE:  		xs_tcp_cancel_linger_timeout(xprt); @@ -1666,7 +1699,7 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)  static unsigned short xs_get_random_port(void)  {  	unsigned short range = xprt_max_resvport - xprt_min_resvport; -	unsigned short rand = (unsigned short) net_random() % range; +	unsigned short rand = (unsigned short) prandom_u32() % range;  	return rand + xprt_min_resvport;  } @@ -1816,6 +1849,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock)  }  #endif +static void xs_dummy_setup_socket(struct work_struct *work) +{ +} +  static struct socket *xs_create_sock(struct rpc_xprt *xprt,  		struct sock_xprt *transport, int family, int type, int protocol)  { @@ -1857,6 +1894,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,  		sk->sk_user_data = xprt;  		sk->sk_data_ready = xs_local_data_ready;  		sk->sk_write_space = xs_udp_write_space; +		sk->sk_error_report = xs_error_report;  		sk->sk_allocation = GFP_ATOMIC;  		xprt_clear_connected(xprt); @@ -2006,7 +2044,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  		sk->sk_user_data = xprt;  		sk->sk_data_ready = xs_udp_data_ready;  		sk->sk_write_space = xs_udp_write_space; -		sk->sk_no_check = UDP_CSUM_NORCV;  		sk->sk_allocation = GFP_ATOMIC;  		xprt_set_connected(xprt); @@ -2112,6 +2149,19 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  	if (!transport->inet) {  		struct sock *sk = sock->sk; +		unsigned int keepidle = xprt->timeout->to_initval / HZ; +		unsigned int keepcnt = xprt->timeout->to_retries + 1; +		unsigned int opt_on = 1; + +		/* TCP Keepalive options */ +		kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, +				(char *)&opt_on, sizeof(opt_on)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, +				(char *)&keepidle, sizeof(keepidle)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, +				(char *)&keepidle, sizeof(keepidle)); +		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, +				(char *)&keepcnt, sizeof(keepcnt));  		write_lock_bh(&sk->sk_callback_lock); @@ -2121,6 +2171,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  		sk->sk_data_ready = xs_tcp_data_ready;  		sk->sk_state_change = xs_tcp_state_change;  		sk->sk_write_space = xs_tcp_write_space; +		sk->sk_error_report = xs_error_report;  		sk->sk_allocation = GFP_ATOMIC;  		/* socket options */ @@ -2151,7 +2202,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)  	case 0:  	case -EINPROGRESS:  		/* SYN_SENT! */ -		xprt->connect_cookie++;  		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)  			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;  	} @@ -2484,6 +2534,10 @@ static void bc_close(struct rpc_xprt *xprt)  static void bc_destroy(struct rpc_xprt *xprt)  { +	dprintk("RPC:       bc_destroy xprt %p\n", xprt); + +	xs_xprt_free(xprt); +	module_put(THIS_MODULE);  }  static struct rpc_xprt_ops xs_local_ops = { @@ -2498,7 +2552,7 @@ static struct rpc_xprt_ops xs_local_ops = {  	.send_request		= xs_local_send_request,  	.set_retrans_timeout	= xprt_set_retrans_timeout_def,  	.close			= xs_close, -	.destroy		= xs_local_destroy, +	.destroy		= xs_destroy,  	.print_stats		= xs_local_print_stats,  }; @@ -2655,6 +2709,9 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)  	xprt->ops = &xs_local_ops;  	xprt->timeout = &xs_local_default_timeout; +	INIT_DELAYED_WORK(&transport->connect_worker, +			xs_dummy_setup_socket); +  	switch (sun->sun_family) {  	case AF_LOCAL:  		if (sun->sun_path[0] != '/') { @@ -2681,7 +2738,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2759,7 +2816,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2834,12 +2891,11 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)  				xprt->address_strings[RPC_DISPLAY_ADDR],  				xprt->address_strings[RPC_DISPLAY_PROTO]); -  	if (try_module_get(THIS_MODULE))  		return xprt;  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  } @@ -2856,15 +2912,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	struct svc_sock *bc_sock;  	struct rpc_xprt *ret; -	if (args->bc_xprt->xpt_bc_xprt) { -		/* -		 * This server connection already has a backchannel -		 * export; we can't create a new one, as we wouldn't be -		 * able to match replies based on xid any more.  So, -		 * reuse the already-existing one: -		 */ -		 return args->bc_xprt->xpt_bc_xprt; -	}  	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,  			xprt_tcp_slot_table_entries);  	if (IS_ERR(xprt)) @@ -2905,10 +2952,9 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	/*  	 * Once we've associated a backchannel xprt with a connection, -	 * we want to keep it around as long as long as the connection -	 * lasts, in case we need to start using it for a backchannel -	 * again; this reference won't be dropped until bc_xprt is -	 * destroyed. +	 * we want to keep it around as long as the connection lasts, +	 * in case we need to start using it for a backchannel again; +	 * this reference won't be dropped until bc_xprt is destroyed.  	 */  	xprt_get(xprt);  	args->bc_xprt->xpt_bc_xprt = xprt; @@ -2923,13 +2969,14 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)  	 */  	xprt_set_connected(xprt); -  	if (try_module_get(THIS_MODULE))  		return xprt; + +	args->bc_xprt->xpt_bc_xprt = NULL;  	xprt_put(xprt);  	ret = ERR_PTR(-EINVAL);  out_err: -	xprt_free(xprt); +	xs_xprt_free(xprt);  	return ret;  }  | 
