diff options
Diffstat (limited to 'net/sunrpc/xprtsock.c')
| -rw-r--r-- | net/sunrpc/xprtsock.c | 290 |
1 files changed, 190 insertions, 100 deletions
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 68b0a81c31d..be8bbd5d65e 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -33,6 +33,7 @@ #include <linux/udp.h> #include <linux/tcp.h> #include <linux/sunrpc/clnt.h> +#include <linux/sunrpc/addr.h> #include <linux/sunrpc/sched.h> #include <linux/sunrpc/svcsock.h> #include <linux/sunrpc/xprtsock.h> @@ -46,6 +47,8 @@ #include <net/udp.h> #include <net/tcp.h> +#include <trace/events/sunrpc.h> + #include "sunrpc.h" static void xs_close(struct rpc_xprt *xprt); @@ -86,7 +89,7 @@ static struct ctl_table_header *sunrpc_table_header; * FIXME: changing the UDP slot table size should also resize the UDP * socket buffers for existing UDP transports */ -static ctl_table xs_tunables_table[] = { +static struct ctl_table xs_tunables_table[] = { { .procname = "udp_slot_table_entries", .data = &xprt_udp_slot_table_entries, @@ -142,7 +145,7 @@ static ctl_table xs_tunables_table[] = { { }, }; -static ctl_table sunrpc_table[] = { +static struct ctl_table sunrpc_table[] = { { .procname = "sunrpc", .mode = 0555, @@ -251,9 +254,10 @@ struct sock_xprt { /* * Saved socket callback addresses */ - void (*old_data_ready)(struct sock *, int); + void (*old_data_ready)(struct sock *); void (*old_state_change)(struct sock *); void (*old_write_space)(struct sock *); + void (*old_error_report)(struct sock *); }; /* @@ -271,6 +275,11 @@ struct sock_xprt { */ #define TCP_RPC_REPLY (1UL << 6) +static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) +{ + return (struct rpc_xprt *) sk->sk_user_data; +} + static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { return (struct sockaddr *) &xprt->addr; @@ -390,8 +399,10 @@ static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, return kernel_sendmsg(sock, &msg, NULL, 0, 0); } -static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) +static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy) { + ssize_t (*do_sendpage)(struct socket *sock, struct page *page, + int offset, size_t size, int flags); struct page **ppage; unsigned int remainder; int err, sent = 0; @@ -400,6 +411,9 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i base += xdr->page_base; ppage = xdr->pages + (base >> PAGE_SHIFT); base &= ~PAGE_MASK; + do_sendpage = sock->ops->sendpage; + if (!zerocopy) + do_sendpage = sock_no_sendpage; for(;;) { unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); int flags = XS_SENDMSG_FLAGS; @@ -407,7 +421,7 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i remainder -= len; if (remainder != 0 || more) flags |= MSG_MORE; - err = sock->ops->sendpage(sock, *ppage, base, len, flags); + err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; sent += err; @@ -428,9 +442,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i * @addrlen: UDP only -- length of destination address * @xdr: buffer containing this request * @base: starting position in the buffer + * @zerocopy: true if it is safe to use sendpage() * */ -static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) +static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy) { unsigned int remainder = xdr->len - base; int err, sent = 0; @@ -458,7 +473,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, if (base < xdr->page_len) { unsigned int len = xdr->page_len - base; remainder -= len; - err = xs_send_pagedata(sock, xdr, base, remainder != 0); + err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy); if (remainder == 0 || err != len) goto out; sent += err; @@ -495,6 +510,7 @@ static int xs_nospace(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + struct sock *sk = transport->inet; int ret = -EAGAIN; dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", @@ -512,7 +528,7 @@ static int xs_nospace(struct rpc_task *task) * window size */ set_bit(SOCK_NOSPACE, &transport->sock->flags); - transport->inet->sk_write_pending++; + sk->sk_write_pending++; /* ...and wait for more buffer space */ xprt_wait_for_buffer_space(task, xs_nospace_callback); } @@ -522,6 +538,9 @@ static int xs_nospace(struct rpc_task *task) } spin_unlock_bh(&xprt->transport_lock); + + /* Race breaker in case memory is freed before above code is called */ + sk->sk_write_space(sk); return ret; } @@ -561,7 +580,7 @@ static int xs_local_send_request(struct rpc_task *task) req->rq_svec->iov_base, req->rq_svec->iov_len); status = xs_sendpages(transport->sock, NULL, 0, - xdr, req->rq_bytes_sent); + xdr, req->rq_bytes_sent, true); dprintk("RPC: %s(%u) = %d\n", __func__, xdr->len - req->rq_bytes_sent, status); if (likely(status >= 0)) { @@ -617,7 +636,7 @@ static int xs_udp_send_request(struct rpc_task *task) status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, xdr, - req->rq_bytes_sent); + req->rq_bytes_sent, true); dprintk("RPC: xs_udp_send_request(%u) = %d\n", xdr->len - req->rq_bytes_sent, status); @@ -664,8 +683,10 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct socket *sock = transport->sock; - if (sock != NULL) + if (sock != NULL) { kernel_sock_shutdown(sock, SHUT_WR); + trace_rpc_socket_shutdown(xprt, sock); + } } /** @@ -688,6 +709,7 @@ static int xs_tcp_send_request(struct rpc_task *task) struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; + bool zerocopy = true; int status; xs_encode_stream_record_marker(&req->rq_snd_buf); @@ -695,13 +717,20 @@ static int xs_tcp_send_request(struct rpc_task *task) xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); + /* Don't use zero copy if this is a resend. If the RPC call + * completes while the socket holds a reference to the pages, + * then we may end up resending corrupted data. + */ + if (task->tk_flags & RPC_TASK_SENT) + zerocopy = false; /* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have * called sendmsg(). */ while (1) { status = xs_sendpages(transport->sock, - NULL, 0, xdr, req->rq_bytes_sent); + NULL, 0, xdr, req->rq_bytes_sent, + zerocopy); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", xdr->len - req->rq_bytes_sent, status); @@ -770,7 +799,7 @@ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) goto out_release; if (req->rq_bytes_sent == req->rq_snd_buf.len) goto out_release; - set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state); + set_bit(XPRT_CLOSE_WAIT, &xprt->state); out_release: xprt_release_xprt(xprt, task); } @@ -780,6 +809,7 @@ static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) transport->old_data_ready = sk->sk_data_ready; transport->old_state_change = sk->sk_state_change; transport->old_write_space = sk->sk_write_space; + transport->old_error_report = sk->sk_error_report; } static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) @@ -787,6 +817,34 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_data_ready = transport->old_data_ready; sk->sk_state_change = transport->old_state_change; sk->sk_write_space = transport->old_write_space; + sk->sk_error_report = transport->old_error_report; +} + +/** + * xs_error_report - callback to handle TCP socket state errors + * @sk: socket + * + * Note: we don't call sock_error() since there may be a rpc_task + * using the socket, and so we don't want to clear sk->sk_err. + */ +static void xs_error_report(struct sock *sk) +{ + struct rpc_xprt *xprt; + int err; + + read_lock_bh(&sk->sk_callback_lock); + if (!(xprt = xprt_from_sock(sk))) + goto out; + + err = -sk->sk_err; + if (err == 0) + goto out; + dprintk("RPC: xs_error_report client %p, error=%d...\n", + xprt, -err); + trace_rpc_socket_error(xprt, sk->sk_socket, err); + xprt_wake_pending_tasks(xprt, err); + out: + read_unlock_bh(&sk->sk_callback_lock); } static void xs_reset_transport(struct sock_xprt *transport) @@ -808,8 +866,7 @@ static void xs_reset_transport(struct sock_xprt *transport) xs_restore_old_callbacks(transport, sk); write_unlock_bh(&sk->sk_callback_lock); - sk->sk_no_check = 0; - + trace_rpc_socket_close(&transport->xprt, sock); sock_release(sock); } @@ -829,14 +886,16 @@ static void xs_close(struct rpc_xprt *xprt) dprintk("RPC: xs_close xprt %p\n", xprt); + cancel_delayed_work_sync(&transport->connect_worker); + xs_reset_transport(transport); xprt->reestablish_timeout = 0; - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); xprt_disconnect_done(xprt); } @@ -848,6 +907,12 @@ static void xs_tcp_close(struct rpc_xprt *xprt) xs_tcp_shutdown(xprt); } +static void xs_xprt_free(struct rpc_xprt *xprt) +{ + xs_free_peer_addresses(xprt); + xprt_free(xprt); +} + /** * xs_destroy - prepare to shutdown a transport * @xprt: doomed transport @@ -855,23 +920,13 @@ static void xs_tcp_close(struct rpc_xprt *xprt) */ static void xs_destroy(struct rpc_xprt *xprt) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - dprintk("RPC: xs_destroy xprt %p\n", xprt); - cancel_delayed_work_sync(&transport->connect_worker); - xs_close(xprt); - xs_free_peer_addresses(xprt); - xprt_free(xprt); + xs_xprt_free(xprt); module_put(THIS_MODULE); } -static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) -{ - return (struct rpc_xprt *) sk->sk_user_data; -} - static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) { struct xdr_skb_reader desc = { @@ -894,7 +949,7 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk, int len) +static void xs_local_data_ready(struct sock *sk) { struct rpc_task *task; struct rpc_xprt *xprt; @@ -957,7 +1012,7 @@ static void xs_local_data_ready(struct sock *sk, int len) * @len: how much data to read * */ -static void xs_udp_data_ready(struct sock *sk, int len) +static void xs_udp_data_ready(struct sock *sk) { struct rpc_task *task; struct rpc_xprt *xprt; @@ -1005,7 +1060,7 @@ static void xs_udp_data_ready(struct sock *sk, int len) UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); - xprt_adjust_cwnd(task, copied); + xprt_adjust_cwnd(xprt, task, copied); xprt_complete_rqst(task, copied); out_unlock: @@ -1254,41 +1309,29 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, * If we're unable to obtain the rpc_rqst we schedule the closing of the * connection and return -1. */ -static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, +static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct rpc_rqst *req; - req = xprt_alloc_bc_request(xprt); + /* Look up and lock the request corresponding to the given XID */ + spin_lock(&xprt->transport_lock); + req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { + spin_unlock(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; } - req->rq_xid = transport->tcp_xid; dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); xs_tcp_read_common(xprt, desc, req); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { - struct svc_serv *bc_serv = xprt->bc_serv; - - /* - * Add callback request to callback list. The callback - * service sleeps on the sv_cb_waitq waiting for new - * requests. Wake it up after adding enqueing the - * request. - */ - dprintk("RPC: add callback request to list\n"); - spin_lock(&bc_serv->sv_cb_lock); - list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); - spin_unlock(&bc_serv->sv_cb_lock); - wake_up(&bc_serv->sv_cb_waitq); - } - - req->rq_private_buf.len = transport->tcp_copied; + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) + xprt_complete_bc_request(req, transport->tcp_copied); + spin_unlock(&xprt->transport_lock); return 0; } @@ -1392,7 +1435,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns * @bytes: how much data to read * */ -static void xs_tcp_data_ready(struct sock *sk, int bytes) +static void xs_tcp_data_ready(struct sock *sk) { struct rpc_xprt *xprt; read_descriptor_t rd_desc; @@ -1452,12 +1495,12 @@ static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) { - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); } static void xs_sock_mark_closed(struct rpc_xprt *xprt) @@ -1486,6 +1529,7 @@ static void xs_tcp_state_change(struct sock *sk) sock_flag(sk, SOCK_ZAPPED), sk->sk_shutdown); + trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: spin_lock(&xprt->transport_lock); @@ -1499,6 +1543,7 @@ static void xs_tcp_state_change(struct sock *sk) transport->tcp_copied = 0; transport->tcp_flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + xprt->connect_cookie++; xprt_wake_pending_tasks(xprt, -EAGAIN); } @@ -1509,10 +1554,10 @@ static void xs_tcp_state_change(struct sock *sk) xprt->connect_cookie++; xprt->reestablish_timeout = 0; set_bit(XPRT_CLOSING, &xprt->state); - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); break; case TCP_CLOSE_WAIT: @@ -1531,9 +1576,9 @@ static void xs_tcp_state_change(struct sock *sk) case TCP_LAST_ACK: set_bit(XPRT_CLOSING, &xprt->state); xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); - smp_mb__before_clear_bit(); + smp_mb__before_atomic(); clear_bit(XPRT_CONNECTED, &xprt->state); - smp_mb__after_clear_bit(); + smp_mb__after_atomic(); break; case TCP_CLOSE: xs_tcp_cancel_linger_timeout(xprt); @@ -1596,7 +1641,7 @@ static void xs_tcp_write_space(struct sock *sk) read_lock_bh(&sk->sk_callback_lock); /* from net/core/stream.c:sk_stream_write_space */ - if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) + if (sk_stream_is_writeable(sk)) xs_write_space(sk); read_unlock_bh(&sk->sk_callback_lock); @@ -1646,15 +1691,15 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t * * Adjust the congestion window after a retransmit timeout has occurred. */ -static void xs_udp_timer(struct rpc_task *task) +static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) { - xprt_adjust_cwnd(task, -ETIMEDOUT); + xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); } static unsigned short xs_get_random_port(void) { unsigned short range = xprt_max_resvport - xprt_min_resvport; - unsigned short rand = (unsigned short) net_random() % range; + unsigned short rand = (unsigned short) prandom_u32() % range; return rand + xprt_min_resvport; } @@ -1731,7 +1776,9 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) */ static void xs_local_rpcbind(struct rpc_task *task) { - xprt_set_bound(task->tk_xprt); + rcu_read_lock(); + xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt)); + rcu_read_unlock(); } static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) @@ -1802,6 +1849,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif +static void xs_dummy_setup_socket(struct work_struct *work) +{ +} + static struct socket *xs_create_sock(struct rpc_xprt *xprt, struct sock_xprt *transport, int family, int type, int protocol) { @@ -1843,6 +1894,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, sk->sk_user_data = xprt; sk->sk_data_ready = xs_local_data_ready; sk->sk_write_space = xs_udp_write_space; + sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_ATOMIC; xprt_clear_connected(xprt); @@ -1865,13 +1917,9 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, * @xprt: RPC transport to connect * @transport: socket transport to connect * @create_sock: function to create a socket of the correct type - * - * Invoked by a work queue tasklet. */ -static void xs_local_setup_socket(struct work_struct *work) +static int xs_local_setup_socket(struct sock_xprt *transport) { - struct sock_xprt *transport = - container_of(work, struct sock_xprt, connect_worker.work); struct rpc_xprt *xprt = &transport->xprt; struct socket *sock; int status = -EIO; @@ -1892,6 +1940,7 @@ static void xs_local_setup_socket(struct work_struct *work) xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); status = xs_local_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, status); switch (status) { case 0: dprintk("RPC: xprt %p connected to %s\n", @@ -1916,6 +1965,30 @@ out: xprt_clear_connecting(xprt); xprt_wake_pending_tasks(xprt, status); current->flags &= ~PF_FSTRANS; + return status; +} + +static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + int ret; + + if (RPC_IS_ASYNC(task)) { + /* + * We want the AF_LOCAL connect to be resolved in the + * filesystem namespace of the process making the rpc + * call. Thus we connect synchronously. + * + * If we want to support asynchronous AF_LOCAL calls, + * we'll need to figure out how to pass a namespace to + * connect. + */ + rpc_exit(task, -ENOTCONN); + return; + } + ret = xs_local_setup_socket(transport); + if (ret && !RPC_IS_SOFTCONN(task)) + msleep_interruptible(15000); } #ifdef CONFIG_SUNRPC_SWAP @@ -1971,7 +2044,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_user_data = xprt; sk->sk_data_ready = xs_udp_data_ready; sk->sk_write_space = xs_udp_write_space; - sk->sk_no_check = UDP_CSUM_NORCV; sk->sk_allocation = GFP_ATOMIC; xprt_set_connected(xprt); @@ -2011,6 +2083,7 @@ static void xs_udp_setup_socket(struct work_struct *work) xprt->address_strings[RPC_DISPLAY_PORT]); xs_udp_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, 0); status = 0; out: xprt_clear_connecting(xprt); @@ -2036,6 +2109,8 @@ static void xs_abort_connection(struct sock_xprt *transport) memset(&any, 0, sizeof(any)); any.sa_family = AF_UNSPEC; result = kernel_connect(transport->sock, &any, sizeof(any), 0); + trace_rpc_socket_reset_connection(&transport->xprt, + transport->sock, result); if (!result) xs_sock_reset_connection_flags(&transport->xprt); dprintk("RPC: AF_UNSPEC connect return code %d\n", result); @@ -2074,6 +2149,19 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) if (!transport->inet) { struct sock *sk = sock->sk; + unsigned int keepidle = xprt->timeout->to_initval / HZ; + unsigned int keepcnt = xprt->timeout->to_retries + 1; + unsigned int opt_on = 1; + + /* TCP Keepalive options */ + kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&opt_on, sizeof(opt_on)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, + (char *)&keepidle, sizeof(keepidle)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, + (char *)&keepidle, sizeof(keepidle)); + kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, + (char *)&keepcnt, sizeof(keepcnt)); write_lock_bh(&sk->sk_callback_lock); @@ -2083,6 +2171,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_data_ready = xs_tcp_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; + sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_ATOMIC; /* socket options */ @@ -2113,7 +2202,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) case 0: case -EINPROGRESS: /* SYN_SENT! */ - xprt->connect_cookie++; if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; } @@ -2166,6 +2254,7 @@ static void xs_tcp_setup_socket(struct work_struct *work) xprt->address_strings[RPC_DISPLAY_PORT]); status = xs_tcp_finish_connecting(xprt, sock); + trace_rpc_socket_connect(xprt, sock, status); dprintk("RPC: %p connect status %d connected %d sock state %d\n", xprt, -status, xprt_connected(xprt), sock->sk->sk_state); @@ -2179,10 +2268,6 @@ static void xs_tcp_setup_socket(struct work_struct *work) */ xs_tcp_force_close(xprt); break; - case -ECONNREFUSED: - case -ECONNRESET: - case -ENETUNREACH: - /* retry with existing socket, after a delay */ case 0: case -EINPROGRESS: case -EALREADY: @@ -2193,6 +2278,10 @@ static void xs_tcp_setup_socket(struct work_struct *work) /* Happens, for instance, if the user specified a link * local IPv6 address without a scope-id. */ + case -ECONNREFUSED: + case -ECONNRESET: + case -ENETUNREACH: + /* retry with existing socket, after a delay */ goto out; } out_eagain: @@ -2205,6 +2294,7 @@ out: /** * xs_connect - connect a socket to a remote endpoint + * @xprt: pointer to transport structure * @task: address of RPC task that manages state of connect request * * TCP: If the remote end dropped the connection, delay reconnecting. @@ -2216,9 +2306,8 @@ out: * If a UDP socket connect fails, the delay behavior here prevents * retry floods (hard mounts). */ -static void xs_connect(struct rpc_task *task) +static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { @@ -2445,6 +2534,10 @@ static void bc_close(struct rpc_xprt *xprt) static void bc_destroy(struct rpc_xprt *xprt) { + dprintk("RPC: bc_destroy xprt %p\n", xprt); + + xs_xprt_free(xprt); + module_put(THIS_MODULE); } static struct rpc_xprt_ops xs_local_ops = { @@ -2453,7 +2546,7 @@ static struct rpc_xprt_ops xs_local_ops = { .alloc_slot = xprt_alloc_slot, .rpcbind = xs_local_rpcbind, .set_port = xs_local_set_port, - .connect = xs_connect, + .connect = xs_local_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, .send_request = xs_local_send_request, @@ -2506,7 +2599,6 @@ static struct rpc_xprt_ops bc_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, .release_xprt = xprt_release_xprt, .alloc_slot = xprt_alloc_slot, - .rpcbind = xs_local_rpcbind, .buf_alloc = bc_malloc, .buf_free = bc_free, .send_request = bc_send_request, @@ -2617,6 +2709,9 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_DELAYED_WORK(&transport->connect_worker, + xs_dummy_setup_socket); + switch (sun->sun_family) { case AF_LOCAL: if (sun->sun_path[0] != '/') { @@ -2626,9 +2721,10 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) goto out_err; } xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_local_setup_socket); xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); + ret = ERR_PTR(xs_local_setup_socket(transport)); + if (ret) + goto out_err; break; default: ret = ERR_PTR(-EAFNOSUPPORT); @@ -2642,7 +2738,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2720,7 +2816,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2741,9 +2837,13 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) struct rpc_xprt *xprt; struct sock_xprt *transport; struct rpc_xprt *ret; + unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; + + if (args->flags & XPRT_CREATE_INFINITE_SLOTS) + max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, - xprt_max_tcp_slot_table_entries); + max_slot_table_size); if (IS_ERR(xprt)) return xprt; transport = container_of(xprt, struct sock_xprt, xprt); @@ -2791,12 +2891,11 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->address_strings[RPC_DISPLAY_ADDR], xprt->address_strings[RPC_DISPLAY_PROTO]); - if (try_module_get(THIS_MODULE)) return xprt; ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } @@ -2813,15 +2912,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) struct svc_sock *bc_sock; struct rpc_xprt *ret; - if (args->bc_xprt->xpt_bc_xprt) { - /* - * This server connection already has a backchannel - * export; we can't create a new one, as we wouldn't be - * able to match replies based on xid any more. So, - * reuse the already-existing one: - */ - return args->bc_xprt->xpt_bc_xprt; - } xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, xprt_tcp_slot_table_entries); if (IS_ERR(xprt)) @@ -2862,10 +2952,9 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) /* * Once we've associated a backchannel xprt with a connection, - * we want to keep it around as long as long as the connection - * lasts, in case we need to start using it for a backchannel - * again; this reference won't be dropped until bc_xprt is - * destroyed. + * we want to keep it around as long as the connection lasts, + * in case we need to start using it for a backchannel again; + * this reference won't be dropped until bc_xprt is destroyed. */ xprt_get(xprt); args->bc_xprt->xpt_bc_xprt = xprt; @@ -2880,13 +2969,14 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) */ xprt_set_connected(xprt); - if (try_module_get(THIS_MODULE)) return xprt; + + args->bc_xprt->xpt_bc_xprt = NULL; xprt_put(xprt); ret = ERR_PTR(-EINVAL); out_err: - xprt_free(xprt); + xs_xprt_free(xprt); return ret; } |
