diff options
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 456 |
1 files changed, 235 insertions, 221 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index ad5b70801f3..f0993af2ae4 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -38,48 +38,54 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; static struct lock_class_key socket_class; #endif +/* + * When skipping (ignoring) a block of input we read it into a "skip + * buffer," which is this many bytes in size. + */ +#define SKIP_BUF_SIZE 1024 static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); static void ceph_fault(struct ceph_connection *con); /* - * nicely render a sockaddr as a string. + * Nicely render a sockaddr as a string. An array of formatted + * strings is used, to approximate reentrancy. */ -#define MAX_ADDR_STR 20 -#define MAX_ADDR_STR_LEN 60 -static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; -static DEFINE_SPINLOCK(addr_str_lock); -static int last_addr_str; +#define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ +#define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) +#define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) +#define MAX_ADDR_STR_LEN 64 /* 54 is enough */ + +static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; +static atomic_t addr_str_seq = ATOMIC_INIT(0); + +static struct page *zero_page; /* used in certain error cases */ const char *ceph_pr_addr(const struct sockaddr_storage *ss) { int i; char *s; - struct sockaddr_in *in4 = (void *)ss; - struct sockaddr_in6 *in6 = (void *)ss; - - spin_lock(&addr_str_lock); - i = last_addr_str++; - if (last_addr_str == MAX_ADDR_STR) - last_addr_str = 0; - spin_unlock(&addr_str_lock); + struct sockaddr_in *in4 = (struct sockaddr_in *) ss; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; + + i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; s = addr_str[i]; switch (ss->ss_family) { case AF_INET: - snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, - (unsigned int)ntohs(in4->sin_port)); + snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, + ntohs(in4->sin_port)); break; case AF_INET6: - snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, - (unsigned int)ntohs(in6->sin6_port)); + snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, + ntohs(in6->sin6_port)); break; default: - snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", - (int)ss->ss_family); + snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", + ss->ss_family); } return s; @@ -95,22 +101,43 @@ static void encode_my_addr(struct ceph_messenger *msgr) /* * work queue for all reading and writing to/from the socket. */ -struct workqueue_struct *ceph_msgr_wq; +static struct workqueue_struct *ceph_msgr_wq; + +void _ceph_msgr_exit(void) +{ + if (ceph_msgr_wq) { + destroy_workqueue(ceph_msgr_wq); + ceph_msgr_wq = NULL; + } + + BUG_ON(zero_page == NULL); + kunmap(zero_page); + page_cache_release(zero_page); + zero_page = NULL; +} int ceph_msgr_init(void) { + BUG_ON(zero_page != NULL); + zero_page = ZERO_PAGE(0); + page_cache_get(zero_page); + ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); - if (!ceph_msgr_wq) { - pr_err("msgr_init failed to create workqueue\n"); - return -ENOMEM; - } - return 0; + if (ceph_msgr_wq) + return 0; + + pr_err("msgr_init failed to create workqueue\n"); + _ceph_msgr_exit(); + + return -ENOMEM; } EXPORT_SYMBOL(ceph_msgr_init); void ceph_msgr_exit(void) { - destroy_workqueue(ceph_msgr_wq); + BUG_ON(ceph_msgr_wq == NULL); + + _ceph_msgr_exit(); } EXPORT_SYMBOL(ceph_msgr_exit); @@ -128,8 +155,8 @@ EXPORT_SYMBOL(ceph_msgr_flush); /* data available on socket, or listen socket received a connect */ static void ceph_data_ready(struct sock *sk, int count_unused) { - struct ceph_connection *con = - (struct ceph_connection *)sk->sk_user_data; + struct ceph_connection *con = sk->sk_user_data; + if (sk->sk_state != TCP_CLOSE_WAIT) { dout("ceph_data_ready on %p state = %lu, queueing work\n", con, con->state); @@ -140,26 +167,30 @@ static void ceph_data_ready(struct sock *sk, int count_unused) /* socket has buffer space for writing */ static void ceph_write_space(struct sock *sk) { - struct ceph_connection *con = - (struct ceph_connection *)sk->sk_user_data; + struct ceph_connection *con = sk->sk_user_data; - /* only queue to workqueue if there is data we want to write. */ + /* only queue to workqueue if there is data we want to write, + * and there is sufficient space in the socket buffer to accept + * more data. clear SOCK_NOSPACE so that ceph_write_space() + * doesn't get called again until try_write() fills the socket + * buffer. See net/ipv4/tcp_input.c:tcp_check_space() + * and net/core/stream.c:sk_stream_write_space(). + */ if (test_bit(WRITE_PENDING, &con->state)) { - dout("ceph_write_space %p queueing write work\n", con); - queue_con(con); + if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { + dout("ceph_write_space %p queueing write work\n", con); + clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); + queue_con(con); + } } else { dout("ceph_write_space %p nothing to write\n", con); } - - /* since we have our own write_space, clear the SOCK_NOSPACE flag */ - clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); } /* socket's state has changed */ static void ceph_state_change(struct sock *sk) { - struct ceph_connection *con = - (struct ceph_connection *)sk->sk_user_data; + struct ceph_connection *con = sk->sk_user_data; dout("ceph_state_change %p state = %lu sk_state = %u\n", con, con->state, sk->sk_state); @@ -184,6 +215,8 @@ static void ceph_state_change(struct sock *sk) dout("ceph_state_change TCP_ESTABLISHED\n"); queue_con(con); break; + default: /* Everything else is uninteresting */ + break; } } @@ -194,7 +227,7 @@ static void set_sock_callbacks(struct socket *sock, struct ceph_connection *con) { struct sock *sk = sock->sk; - sk->sk_user_data = (void *)con; + sk->sk_user_data = con; sk->sk_data_ready = ceph_data_ready; sk->sk_write_space = ceph_write_space; sk->sk_state_change = ceph_state_change; @@ -208,7 +241,7 @@ static void set_sock_callbacks(struct socket *sock, /* * initiate connection to a remote socket. */ -static struct socket *ceph_tcp_connect(struct ceph_connection *con) +static int ceph_tcp_connect(struct ceph_connection *con) { struct sockaddr_storage *paddr = &con->peer_addr.in_addr; struct socket *sock; @@ -218,8 +251,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) - return ERR_PTR(ret); - con->sock = sock; + return ret; sock->sk->sk_allocation = GFP_NOFS; #ifdef CONFIG_LOCKDEP @@ -236,19 +268,17 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con) dout("connect %s EINPROGRESS sk_state = %u\n", ceph_pr_addr(&con->peer_addr.in_addr), sock->sk->sk_state); - ret = 0; - } - if (ret < 0) { + } else if (ret < 0) { pr_err("connect %s error %d\n", ceph_pr_addr(&con->peer_addr.in_addr), ret); sock_release(sock); - con->sock = NULL; con->error_msg = "connect error"; + + return ret; } + con->sock = sock; - if (ret < 0) - return ERR_PTR(ret); - return sock; + return 0; } static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) @@ -284,6 +314,19 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, return r; } +static int ceph_tcp_sendpage(struct socket *sock, struct page *page, + int offset, size_t size, int more) +{ + int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); + int ret; + + ret = kernel_sendpage(sock, page, offset, size, flags); + if (ret == -EAGAIN) + ret = 0; + + return ret; +} + /* * Shutdown/close the socket for the given connection. @@ -391,22 +434,23 @@ bool ceph_con_opened(struct ceph_connection *con) */ struct ceph_connection *ceph_con_get(struct ceph_connection *con) { - dout("con_get %p nref = %d -> %d\n", con, - atomic_read(&con->nref), atomic_read(&con->nref) + 1); - if (atomic_inc_not_zero(&con->nref)) - return con; - return NULL; + int nref = __atomic_add_unless(&con->nref, 1, 0); + + dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); + + return nref ? con : NULL; } void ceph_con_put(struct ceph_connection *con) { - dout("con_put %p nref = %d -> %d\n", con, - atomic_read(&con->nref), atomic_read(&con->nref) - 1); - BUG_ON(atomic_read(&con->nref) == 0); - if (atomic_dec_and_test(&con->nref)) { + int nref = atomic_dec_return(&con->nref); + + BUG_ON(nref < 0); + if (nref == 0) { BUG_ON(con->sock); kfree(con); } + dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); } /* @@ -442,14 +486,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) return ret; } +static void ceph_con_out_kvec_reset(struct ceph_connection *con) +{ + con->out_kvec_left = 0; + con->out_kvec_bytes = 0; + con->out_kvec_cur = &con->out_kvec[0]; +} + +static void ceph_con_out_kvec_add(struct ceph_connection *con, + size_t size, void *data) +{ + int index; + + index = con->out_kvec_left; + BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); + + con->out_kvec[index].iov_len = size; + con->out_kvec[index].iov_base = data; + con->out_kvec_left++; + con->out_kvec_bytes += size; +} /* * Prepare footer for currently outgoing message, and finish things * off. Assumes out_kvec* are already valid.. we just add on to the end. */ -static void prepare_write_message_footer(struct ceph_connection *con, int v) +static void prepare_write_message_footer(struct ceph_connection *con) { struct ceph_msg *m = con->out_msg; + int v = con->out_kvec_left; dout("prepare_write_message_footer %p\n", con); con->out_kvec_is_msg = true; @@ -467,9 +532,9 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v) static void prepare_write_message(struct ceph_connection *con) { struct ceph_msg *m; - int v = 0; + u32 crc; - con->out_kvec_bytes = 0; + ceph_con_out_kvec_reset(con); con->out_kvec_is_msg = true; con->out_msg_done = false; @@ -477,16 +542,13 @@ static void prepare_write_message(struct ceph_connection *con) * TCP packet that's a good thing. */ if (con->in_seq > con->in_seq_acked) { con->in_seq_acked = con->in_seq; - con->out_kvec[v].iov_base = &tag_ack; - con->out_kvec[v++].iov_len = 1; + ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con->out_kvec[v].iov_base = &con->out_temp_ack; - con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); - con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); + ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); } - m = list_first_entry(&con->out_queue, - struct ceph_msg, list_head); + m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); con->out_msg = m; /* put message on sent list */ @@ -510,30 +572,26 @@ static void prepare_write_message(struct ceph_connection *con) BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); /* tag + hdr + front + middle */ - con->out_kvec[v].iov_base = &tag_msg; - con->out_kvec[v++].iov_len = 1; - con->out_kvec[v].iov_base = &m->hdr; - con->out_kvec[v++].iov_len = sizeof(m->hdr); - con->out_kvec[v++] = m->front; + ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); + ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); + if (m->middle) - con->out_kvec[v++] = m->middle->vec; - con->out_kvec_left = v; - con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + - (m->middle ? m->middle->vec.iov_len : 0); - con->out_kvec_cur = con->out_kvec; + ceph_con_out_kvec_add(con, m->middle->vec.iov_len, + m->middle->vec.iov_base); /* fill in crc (except data pages), footer */ - con->out_msg->hdr.crc = - cpu_to_le32(crc32c(0, (void *)&m->hdr, - sizeof(m->hdr) - sizeof(m->hdr.crc))); + crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); + con->out_msg->hdr.crc = cpu_to_le32(crc); con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; - con->out_msg->footer.front_crc = - cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); - if (m->middle) - con->out_msg->footer.middle_crc = - cpu_to_le32(crc32c(0, m->middle->vec.iov_base, - m->middle->vec.iov_len)); - else + + crc = crc32c(0, m->front.iov_base, m->front.iov_len); + con->out_msg->footer.front_crc = cpu_to_le32(crc); + if (m->middle) { + crc = crc32c(0, m->middle->vec.iov_base, + m->middle->vec.iov_len); + con->out_msg->footer.middle_crc = cpu_to_le32(crc); + } else con->out_msg->footer.middle_crc = 0; con->out_msg->footer.data_crc = 0; dout("prepare_write_message front_crc %u data_crc %u\n", @@ -549,11 +607,11 @@ static void prepare_write_message(struct ceph_connection *con) else con->out_msg_pos.page_pos = 0; con->out_msg_pos.data_pos = 0; - con->out_msg_pos.did_page_crc = 0; + con->out_msg_pos.did_page_crc = false; con->out_more = 1; /* data + footer will follow */ } else { /* no, queue up footer too and be done */ - prepare_write_message_footer(con, v); + prepare_write_message_footer(con); } set_bit(WRITE_PENDING, &con->state); @@ -568,14 +626,14 @@ static void prepare_write_ack(struct ceph_connection *con) con->in_seq_acked, con->in_seq); con->in_seq_acked = con->in_seq; - con->out_kvec[0].iov_base = &tag_ack; - con->out_kvec[0].iov_len = 1; + ceph_con_out_kvec_reset(con); + + ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con->out_kvec[1].iov_base = &con->out_temp_ack; - con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); - con->out_kvec_left = 2; - con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); - con->out_kvec_cur = con->out_kvec; + ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); + con->out_more = 1; /* more will follow.. eventually.. */ set_bit(WRITE_PENDING, &con->state); } @@ -586,11 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con) static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); - con->out_kvec[0].iov_base = &tag_keepalive; - con->out_kvec[0].iov_len = 1; - con->out_kvec_left = 1; - con->out_kvec_bytes = 1; - con->out_kvec_cur = con->out_kvec; + ceph_con_out_kvec_reset(con); + ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); set_bit(WRITE_PENDING, &con->state); } @@ -619,12 +674,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con) con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_len = cpu_to_le32(auth_len); - if (auth_len) { - con->out_kvec[con->out_kvec_left].iov_base = auth_buf; - con->out_kvec[con->out_kvec_left].iov_len = auth_len; - con->out_kvec_left++; - con->out_kvec_bytes += auth_len; - } + if (auth_len) + ceph_con_out_kvec_add(con, auth_len, auth_buf); + return 0; } @@ -634,22 +686,18 @@ static int prepare_connect_authorizer(struct ceph_connection *con) static void prepare_write_banner(struct ceph_messenger *msgr, struct ceph_connection *con) { - int len = strlen(CEPH_BANNER); + ceph_con_out_kvec_reset(con); + ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); + ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr), + &msgr->my_enc_addr); - con->out_kvec[0].iov_base = CEPH_BANNER; - con->out_kvec[0].iov_len = len; - con->out_kvec[1].iov_base = &msgr->my_enc_addr; - con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); - con->out_kvec_left = 2; - con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); - con->out_kvec_cur = con->out_kvec; con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } static int prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con, - int after_banner) + int include_banner) { unsigned global_seq = get_global_seq(con->msgr, 0); int proto; @@ -678,22 +726,18 @@ static int prepare_write_connect(struct ceph_messenger *msgr, con->out_connect.protocol_version = cpu_to_le32(proto); con->out_connect.flags = 0; - if (!after_banner) { - con->out_kvec_left = 0; - con->out_kvec_bytes = 0; - } - con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; - con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); - con->out_kvec_left++; - con->out_kvec_bytes += sizeof(con->out_connect); - con->out_kvec_cur = con->out_kvec; + if (include_banner) + prepare_write_banner(msgr, con); + else + ceph_con_out_kvec_reset(con); + ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect); + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); return prepare_connect_authorizer(con); } - /* * write as much of pending kvecs to the socket as we can. * 1 -> done @@ -714,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con) con->out_kvec_bytes -= ret; if (con->out_kvec_bytes == 0) break; /* done */ - while (ret > 0) { - if (ret >= con->out_kvec_cur->iov_len) { - ret -= con->out_kvec_cur->iov_len; - con->out_kvec_cur++; - con->out_kvec_left--; - } else { - con->out_kvec_cur->iov_len -= ret; - con->out_kvec_cur->iov_base += ret; - ret = 0; - break; - } + + /* account for full iov entries consumed */ + while (ret >= con->out_kvec_cur->iov_len) { + BUG_ON(!con->out_kvec_left); + ret -= con->out_kvec_cur->iov_len; + con->out_kvec_cur++; + con->out_kvec_left--; + } + /* and for a partially-consumed entry */ + if (ret) { + con->out_kvec_cur->iov_len -= ret; + con->out_kvec_cur->iov_base += ret; } } con->out_kvec_left = 0; @@ -773,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) struct ceph_msg *msg = con->out_msg; unsigned data_len = le32_to_cpu(msg->hdr.data_len); size_t len; - int crc = con->msgr->nocrc; + bool do_datacrc = !con->msgr->nocrc; int ret; int total_max_write; int in_trail = 0; @@ -790,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con) while (data_len > con->out_msg_pos.data_pos) { struct page *page = NULL; - void *kaddr = NULL; int max_write = PAGE_SIZE; - int page_shift = 0; + int bio_offset = 0; total_max_write = data_len - trail_len - con->out_msg_pos.data_pos; @@ -811,58 +855,47 @@ static int write_partial_msg_pages(struct ceph_connection *con) page = list_first_entry(&msg->trail->head, struct page, lru); - if (crc) - kaddr = kmap(page); max_write = PAGE_SIZE; } else if (msg->pages) { page = msg->pages[con->out_msg_pos.page]; - if (crc) - kaddr = kmap(page); } else if (msg->pagelist) { page = list_first_entry(&msg->pagelist->head, struct page, lru); - if (crc) - kaddr = kmap(page); #ifdef CONFIG_BLOCK } else if (msg->bio) { struct bio_vec *bv; bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); page = bv->bv_page; - page_shift = bv->bv_offset; - if (crc) - kaddr = kmap(page) + page_shift; + bio_offset = bv->bv_offset; max_write = bv->bv_len; #endif } else { - page = con->msgr->zero_page; - if (crc) - kaddr = page_address(con->msgr->zero_page); + page = zero_page; } len = min_t(int, max_write - con->out_msg_pos.page_pos, total_max_write); - if (crc && !con->out_msg_pos.did_page_crc) { - void *base = kaddr + con->out_msg_pos.page_pos; + if (do_datacrc && !con->out_msg_pos.did_page_crc) { + void *base; + u32 crc; u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); + char *kaddr; + kaddr = kmap(page); BUG_ON(kaddr == NULL); - con->out_msg->footer.data_crc = - cpu_to_le32(crc32c(tmpcrc, base, len)); - con->out_msg_pos.did_page_crc = 1; + base = kaddr + con->out_msg_pos.page_pos + bio_offset; + crc = crc32c(tmpcrc, base, len); + con->out_msg->footer.data_crc = cpu_to_le32(crc); + con->out_msg_pos.did_page_crc = true; } - ret = kernel_sendpage(con->sock, page, - con->out_msg_pos.page_pos + page_shift, - len, - MSG_DONTWAIT | MSG_NOSIGNAL | - MSG_MORE); - - if (crc && - (msg->pages || msg->pagelist || msg->bio || in_trail)) + ret = ceph_tcp_sendpage(con->sock, page, + con->out_msg_pos.page_pos + bio_offset, + len, 1); + + if (do_datacrc) kunmap(page); - if (ret == -EAGAIN) - ret = 0; if (ret <= 0) goto out; @@ -871,7 +904,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) if (ret == len) { con->out_msg_pos.page_pos = 0; con->out_msg_pos.page++; - con->out_msg_pos.did_page_crc = 0; + con->out_msg_pos.did_page_crc = false; if (in_trail) list_move_tail(&page->lru, &msg->trail->head); @@ -888,12 +921,10 @@ static int write_partial_msg_pages(struct ceph_connection *con) dout("write_partial_msg_pages %p msg %p done\n", con, msg); /* prepare and queue up footer, too */ - if (!crc) + if (!do_datacrc) con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; - con->out_kvec_bytes = 0; - con->out_kvec_left = 0; - con->out_kvec_cur = con->out_kvec; - prepare_write_message_footer(con, 0); + ceph_con_out_kvec_reset(con); + prepare_write_message_footer(con); ret = 1; out: return ret; @@ -907,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con) int ret; while (con->out_skip > 0) { - struct kvec iov = { - .iov_base = page_address(con->msgr->zero_page), - .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) - }; + size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); - ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); + ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1); if (ret <= 0) goto out; con->out_skip -= ret; @@ -1085,8 +1113,8 @@ static void addr_set_port(struct sockaddr_storage *ss, int p) static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, char delim, const char **ipend) { - struct sockaddr_in *in4 = (void *)ss; - struct sockaddr_in6 *in6 = (void *)ss; + struct sockaddr_in *in4 = (struct sockaddr_in *) ss; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; memset(ss, 0, sizeof(*ss)); @@ -1512,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con, if (ret <= 0) return ret; section->iov_len += ret; - if (section->iov_len == sec_len) - *crc = crc32c(0, section->iov_base, - section->iov_len); } + if (section->iov_len == sec_len) + *crc = crc32c(0, section->iov_base, section->iov_len); return 1; } @@ -1527,7 +1554,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, static int read_partial_message_pages(struct ceph_connection *con, struct page **pages, - unsigned data_len, int datacrc) + unsigned data_len, bool do_datacrc) { void *p; int ret; @@ -1540,7 +1567,7 @@ static int read_partial_message_pages(struct ceph_connection *con, p = kmap(pages[con->in_msg_pos.page]); ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left); - if (ret > 0 && datacrc) + if (ret > 0 && do_datacrc) con->in_data_crc = crc32c(con->in_data_crc, p + con->in_msg_pos.page_pos, ret); @@ -1560,7 +1587,7 @@ static int read_partial_message_pages(struct ceph_connection *con, #ifdef CONFIG_BLOCK static int read_partial_message_bio(struct ceph_connection *con, struct bio **bio_iter, int *bio_seg, - unsigned data_len, int datacrc) + unsigned data_len, bool do_datacrc) { struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); void *p; @@ -1576,7 +1603,7 @@ static int read_partial_message_bio(struct ceph_connection *con, ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left); - if (ret > 0 && datacrc) + if (ret > 0 && do_datacrc) con->in_data_crc = crc32c(con->in_data_crc, p + con->in_msg_pos.page_pos, ret); @@ -1603,9 +1630,10 @@ static int read_partial_message(struct ceph_connection *con) int ret; int to, left; unsigned front_len, middle_len, data_len; - int datacrc = con->msgr->nocrc; + bool do_datacrc = !con->msgr->nocrc; int skip; u64 seq; + u32 crc; dout("read_partial_message con %p msg %p\n", con, m); @@ -1618,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con) if (ret <= 0) return ret; con->in_base_pos += ret; - if (con->in_base_pos == sizeof(con->in_hdr)) { - u32 crc = crc32c(0, (void *)&con->in_hdr, - sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); - if (crc != le32_to_cpu(con->in_hdr.crc)) { - pr_err("read_partial_message bad hdr " - " crc %u != expected %u\n", - crc, con->in_hdr.crc); - return -EBADMSG; - } - } } + + crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); + if (cpu_to_le32(crc) != con->in_hdr.crc) { + pr_err("read_partial_message bad hdr " + " crc %u != expected %u\n", + crc, con->in_hdr.crc); + return -EBADMSG; + } + front_len = le32_to_cpu(con->in_hdr.front_len); if (front_len > CEPH_MSG_MAX_FRONT_LEN) return -EIO; @@ -1714,7 +1741,7 @@ static int read_partial_message(struct ceph_connection *con) while (con->in_msg_pos.data_pos < data_len) { if (m->pages) { ret = read_partial_message_pages(con, m->pages, - data_len, datacrc); + data_len, do_datacrc); if (ret <= 0) return ret; #ifdef CONFIG_BLOCK @@ -1722,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con) ret = read_partial_message_bio(con, &m->bio_iter, &m->bio_seg, - data_len, datacrc); + data_len, do_datacrc); if (ret <= 0) return ret; #endif @@ -1757,7 +1784,7 @@ static int read_partial_message(struct ceph_connection *con) m, con->in_middle_crc, m->footer.middle_crc); return -EBADMSG; } - if (datacrc && + if (do_datacrc && (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { pr_err("read_partial_message %p data crc %u != exp. %u\n", m, @@ -1819,7 +1846,6 @@ more: /* open the socket first? */ if (con->sock == NULL) { - prepare_write_banner(msgr, con); prepare_write_connect(msgr, con, 1); prepare_read_banner(con); set_bit(CONNECTING, &con->state); @@ -1829,11 +1855,9 @@ more: con->in_tag = CEPH_MSGR_TAG_READY; dout("try_write initiating connect on %p new state %lu\n", con, con->state); - con->sock = ceph_tcp_connect(con); - if (IS_ERR(con->sock)) { - con->sock = NULL; + ret = ceph_tcp_connect(con); + if (ret < 0) { con->error_msg = "connect error"; - ret = -1; goto out; } } @@ -1953,8 +1977,9 @@ more: * * FIXME: there must be a better way to do this! */ - static char buf[1024]; - int skip = min(1024, -con->in_base_pos); + static char buf[SKIP_BUF_SIZE]; + int skip = min((int) sizeof (buf), -con->in_base_pos); + dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); ret = ceph_tcp_recvmsg(con->sock, buf, skip); if (ret <= 0) @@ -2216,15 +2241,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, spin_lock_init(&msgr->global_seq_lock); - /* the zero page is needed if a request is "canceled" while the message - * is being written over the socket */ - msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); - if (!msgr->zero_page) { - kfree(msgr); - return ERR_PTR(-ENOMEM); - } - kmap(msgr->zero_page); - if (myaddr) msgr->inst.addr = *myaddr; @@ -2241,8 +2257,6 @@ EXPORT_SYMBOL(ceph_messenger_create); void ceph_messenger_destroy(struct ceph_messenger *msgr) { dout("destroy %p\n", msgr); - kunmap(msgr->zero_page); - __free_page(msgr->zero_page); kfree(msgr); dout("destroyed messenger %p\n", msgr); } |