diff options
Diffstat (limited to 'fs/ocfs2/cluster/tcp.c')
| -rw-r--r-- | fs/ocfs2/cluster/tcp.c | 123 |
1 files changed, 67 insertions, 56 deletions
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c index 2cd2406b414..681691bc233 100644 --- a/fs/ocfs2/cluster/tcp.c +++ b/fs/ocfs2/cluster/tcp.c @@ -108,7 +108,7 @@ static struct rb_root o2net_handler_tree = RB_ROOT; static struct o2net_node o2net_nodes[O2NM_MAX_NODES]; /* XXX someday we'll need better accounting */ -static struct socket *o2net_listen_sock = NULL; +static struct socket *o2net_listen_sock; /* * listen work is only queued by the listening socket callbacks on the @@ -137,7 +137,7 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] = static void o2net_sc_connect_completed(struct work_struct *work); static void o2net_rx_until_empty(struct work_struct *work); static void o2net_shutdown_sc(struct work_struct *work); -static void o2net_listen_data_ready(struct sock *sk, int bytes); +static void o2net_listen_data_ready(struct sock *sk); static void o2net_sc_send_keep_req(struct work_struct *work); static void o2net_idle_timer(unsigned long data); static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); @@ -262,17 +262,17 @@ static void o2net_update_recv_stats(struct o2net_sock_container *sc) #endif /* CONFIG_OCFS2_FS_STATS */ -static inline int o2net_reconnect_delay(void) +static inline unsigned int o2net_reconnect_delay(void) { return o2nm_single_cluster->cl_reconnect_delay_ms; } -static inline int o2net_keepalive_delay(void) +static inline unsigned int o2net_keepalive_delay(void) { return o2nm_single_cluster->cl_keepalive_delay_ms; } -static inline int o2net_idle_timeout(void) +static inline unsigned int o2net_idle_timeout(void) { return o2nm_single_cluster->cl_idle_timeout_ms; } @@ -597,9 +597,9 @@ static void o2net_set_nn_state(struct o2net_node *nn, } /* see o2net_register_callbacks() */ -static void o2net_data_ready(struct sock *sk, int bytes) +static void o2net_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); read_lock(&sk->sk_callback_lock); if (sk->sk_user_data) { @@ -613,7 +613,7 @@ static void o2net_data_ready(struct sock *sk, int bytes) } read_unlock(&sk->sk_callback_lock); - ready(sk, bytes); + ready(sk); } /* see o2net_register_callbacks() */ @@ -916,57 +916,30 @@ static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key) static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len) { - int ret; - mm_segment_t oldfs; - struct kvec vec = { - .iov_len = len, - .iov_base = data, - }; - struct msghdr msg = { - .msg_iovlen = 1, - .msg_iov = (struct iovec *)&vec, - .msg_flags = MSG_DONTWAIT, - }; - - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_recvmsg(sock, &msg, len, msg.msg_flags); - set_fs(oldfs); - - return ret; + struct kvec vec = { .iov_len = len, .iov_base = data, }; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT, }; + return kernel_recvmsg(sock, &msg, &vec, 1, len, msg.msg_flags); } static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec, size_t veclen, size_t total) { int ret; - mm_segment_t oldfs; - struct msghdr msg = { - .msg_iov = (struct iovec *)vec, - .msg_iovlen = veclen, - }; + struct msghdr msg; if (sock == NULL) { ret = -EINVAL; goto out; } - oldfs = get_fs(); - set_fs(get_ds()); - ret = sock_sendmsg(sock, &msg, total); - set_fs(oldfs); - if (ret != total) { - mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, - total); - if (ret >= 0) - ret = -EPIPE; /* should be smarter, I bet */ - goto out; - } - - ret = 0; + ret = kernel_sendmsg(sock, &msg, vec, veclen, total); + if (likely(ret == total)) + return 0; + mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total); + if (ret >= 0) + ret = -EPIPE; /* should be smarter, I bet */ out: - if (ret < 0) - mlog(0, "returning error: %d\n", ret); + mlog(0, "returning error: %d\n", ret); return ret; } @@ -1826,7 +1799,7 @@ int o2net_register_hb_callbacks(void) /* ------------------------------------------------------------ */ -static int o2net_accept_one(struct socket *sock) +static int o2net_accept_one(struct socket *sock, int *more) { int ret, slen; struct sockaddr_in sin; @@ -1837,6 +1810,7 @@ static int o2net_accept_one(struct socket *sock) struct o2net_node *nn; BUG_ON(sock == NULL); + *more = 0; ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, sock->sk->sk_protocol, &new_sock); if (ret) @@ -1848,6 +1822,7 @@ static int o2net_accept_one(struct socket *sock) if (ret < 0) goto out; + *more = 1; new_sock->sk->sk_allocation = GFP_ATOMIC; ret = o2net_set_nodelay(new_sock); @@ -1946,16 +1921,41 @@ out: return ret; } +/* + * This function is invoked in response to one or more + * pending accepts at softIRQ level. We must drain the + * entire que before returning. + */ + static void o2net_accept_many(struct work_struct *work) { struct socket *sock = o2net_listen_sock; - while (o2net_accept_one(sock) == 0) + int more; + int err; + + /* + * It is critical to note that due to interrupt moderation + * at the network driver level, we can't assume to get a + * softIRQ for every single conn since tcp SYN packets + * can arrive back-to-back, and therefore many pending + * accepts may result in just 1 softIRQ. If we terminate + * the o2net_accept_one() loop upon seeing an err, what happens + * to the rest of the conns in the queue? If no new SYN + * arrives for hours, no softIRQ will be delivered, + * and the connections will just sit in the queue. + */ + + for (;;) { + err = o2net_accept_one(sock, &more); + if (!more) + break; cond_resched(); + } } -static void o2net_listen_data_ready(struct sock *sk, int bytes) +static void o2net_listen_data_ready(struct sock *sk) { - void (*ready)(struct sock *sk, int bytes); + void (*ready)(struct sock *sk); read_lock(&sk->sk_callback_lock); ready = sk->sk_user_data; @@ -1964,18 +1964,29 @@ static void o2net_listen_data_ready(struct sock *sk, int bytes) goto out; } - /* ->sk_data_ready is also called for a newly established child socket - * before it has been accepted and the acceptor has set up their - * data_ready.. we only want to queue listen work for our listening - * socket */ + /* This callback may called twice when a new connection + * is being established as a child socket inherits everything + * from a parent LISTEN socket, including the data_ready cb of + * the parent. This leads to a hazard. In o2net_accept_one() + * we are still initializing the child socket but have not + * changed the inherited data_ready callback yet when + * data starts arriving. + * We avoid this hazard by checking the state. + * For the listening socket, the state will be TCP_LISTEN; for the new + * socket, will be TCP_ESTABLISHED. Also, in this case, + * sk->sk_user_data is not a valid function pointer. + */ + if (sk->sk_state == TCP_LISTEN) { - mlog(ML_TCP, "bytes: %d\n", bytes); queue_work(o2net_wq, &o2net_listen_work); + } else { + ready = NULL; } out: read_unlock(&sk->sk_callback_lock); - ready(sk, bytes); + if (ready != NULL) + ready(sk); } static int o2net_open_listening_sock(__be32 addr, __be16 port) |
