diff options
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 4426 |
1 files changed, 4426 insertions, 0 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c new file mode 100644 index 00000000000..c548f24f54a --- /dev/null +++ b/drivers/block/drbd/drbd_receiver.c @@ -0,0 +1,4426 @@ +/* + drbd_receiver.c + + This file is part of DRBD by Philipp Reisner and Lars Ellenberg. + + Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. + Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. + Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. + + drbd is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2, or (at your option) + any later version. + + drbd is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with drbd; see the file COPYING. If not, write to + the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. + */ + + +#include <linux/module.h> + +#include <asm/uaccess.h> +#include <net/sock.h> + +#include <linux/version.h> +#include <linux/drbd.h> +#include <linux/fs.h> +#include <linux/file.h> +#include <linux/in.h> +#include <linux/mm.h> +#include <linux/memcontrol.h> +#include <linux/mm_inline.h> +#include <linux/slab.h> +#include <linux/smp_lock.h> +#include <linux/pkt_sched.h> +#define __KERNEL_SYSCALLS__ +#include <linux/unistd.h> +#include <linux/vmalloc.h> +#include <linux/random.h> +#include <linux/mm.h> +#include <linux/string.h> +#include <linux/scatterlist.h> +#include "drbd_int.h" +#include "drbd_req.h" + +#include "drbd_vli.h" + +struct flush_work { + struct drbd_work w; + struct drbd_epoch *epoch; +}; + +enum finish_epoch { + FE_STILL_LIVE, + FE_DESTROYED, + FE_RECYCLED, +}; + +static int drbd_do_handshake(struct drbd_conf *mdev); +static int drbd_do_auth(struct drbd_conf *mdev); + +static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event); +static int e_end_block(struct drbd_conf *, struct drbd_work *, int); + +static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) +{ + struct drbd_epoch *prev; + spin_lock(&mdev->epoch_lock); + prev = list_entry(epoch->list.prev, struct drbd_epoch, list); + if (prev == epoch || prev == mdev->current_epoch) + prev = NULL; + spin_unlock(&mdev->epoch_lock); + return prev; +} + +#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) + +static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) +{ + struct page *page = NULL; + + /* Yes, testing drbd_pp_vacant outside the lock is racy. + * So what. It saves a spin_lock. */ + if (drbd_pp_vacant > 0) { + spin_lock(&drbd_pp_lock); + page = drbd_pp_pool; + if (page) { + drbd_pp_pool = (struct page *)page_private(page); + set_page_private(page, 0); /* just to be polite */ + drbd_pp_vacant--; + } + spin_unlock(&drbd_pp_lock); + } + /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD + * "criss-cross" setup, that might cause write-out on some other DRBD, + * which in turn might block on the other node at this very place. */ + if (!page) + page = alloc_page(GFP_TRY); + if (page) + atomic_inc(&mdev->pp_in_use); + return page; +} + +/* kick lower level device, if we have more than (arbitrary number) + * reference counts on it, which typically are locally submitted io + * requests. don't use unacked_cnt, so we speed up proto A and B, too. */ +static void maybe_kick_lo(struct drbd_conf *mdev) +{ + if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark) + drbd_kick_lo(mdev); +} + +static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed) +{ + struct drbd_epoch_entry *e; + struct list_head *le, *tle; + + /* The EEs are always appended to the end of the list. Since + they are sent in order over the wire, they have to finish + in order. As soon as we see the first not finished we can + stop to examine the list... */ + + list_for_each_safe(le, tle, &mdev->net_ee) { + e = list_entry(le, struct drbd_epoch_entry, w.list); + if (drbd_bio_has_active_page(e->private_bio)) + break; + list_move(le, to_be_freed); + } +} + +static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) +{ + LIST_HEAD(reclaimed); + struct drbd_epoch_entry *e, *t; + + maybe_kick_lo(mdev); + spin_lock_irq(&mdev->req_lock); + reclaim_net_ee(mdev, &reclaimed); + spin_unlock_irq(&mdev->req_lock); + + list_for_each_entry_safe(e, t, &reclaimed, w.list) + drbd_free_ee(mdev, e); +} + +/** + * drbd_pp_alloc() - Returns a page, fails only if a signal comes in + * @mdev: DRBD device. + * @retry: whether or not to retry allocation forever (or until signalled) + * + * Tries to allocate a page, first from our own page pool, then from the + * kernel, unless this allocation would exceed the max_buffers setting. + * If @retry is non-zero, retry until DRBD frees a page somewhere else. + */ +static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) +{ + struct page *page = NULL; + DEFINE_WAIT(wait); + + if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { + page = drbd_pp_first_page_or_try_alloc(mdev); + if (page) + return page; + } + + for (;;) { + prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); + + drbd_kick_lo_and_reclaim_net(mdev); + + if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { + page = drbd_pp_first_page_or_try_alloc(mdev); + if (page) + break; + } + + if (!retry) + break; + + if (signal_pending(current)) { + dev_warn(DEV, "drbd_pp_alloc interrupted!\n"); + break; + } + + schedule(); + } + finish_wait(&drbd_pp_wait, &wait); + + return page; +} + +/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. + * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ +static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) +{ + int free_it; + + spin_lock(&drbd_pp_lock); + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { + free_it = 1; + } else { + set_page_private(page, (unsigned long)drbd_pp_pool); + drbd_pp_pool = page; + drbd_pp_vacant++; + free_it = 0; + } + spin_unlock(&drbd_pp_lock); + + atomic_dec(&mdev->pp_in_use); + + if (free_it) + __free_page(page); + + wake_up(&drbd_pp_wait); +} + +static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio) +{ + struct page *p_to_be_freed = NULL; + struct page *page; + struct bio_vec *bvec; + int i; + + spin_lock(&drbd_pp_lock); + __bio_for_each_segment(bvec, bio, i, 0) { + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { + set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); + p_to_be_freed = bvec->bv_page; + } else { + set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); + drbd_pp_pool = bvec->bv_page; + drbd_pp_vacant++; + } + } + spin_unlock(&drbd_pp_lock); + atomic_sub(bio->bi_vcnt, &mdev->pp_in_use); + + while (p_to_be_freed) { + page = p_to_be_freed; + p_to_be_freed = (struct page *)page_private(page); + set_page_private(page, 0); /* just to be polite */ + put_page(page); + } + + wake_up(&drbd_pp_wait); +} + +/* +You need to hold the req_lock: + _drbd_wait_ee_list_empty() + +You must not have the req_lock: + drbd_free_ee() + drbd_alloc_ee() + drbd_init_ee() + drbd_release_ee() + drbd_ee_fix_bhs() + drbd_process_done_ee() + drbd_clear_done_ee() + drbd_wait_ee_list_empty() +*/ + +struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, + u64 id, + sector_t sector, + unsigned int data_size, + gfp_t gfp_mask) __must_hold(local) +{ + struct request_queue *q; + struct drbd_epoch_entry *e; + struct page *page; + struct bio *bio; + unsigned int ds; + + if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) + return NULL; + + e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); + if (!e) { + if (!(gfp_mask & __GFP_NOWARN)) + dev_err(DEV, "alloc_ee: Allocation of an EE failed\n"); + return NULL; + } + + bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); + if (!bio) { + if (!(gfp_mask & __GFP_NOWARN)) + dev_err(DEV, "alloc_ee: Allocation of a bio failed\n"); + goto fail1; + } + + bio->bi_bdev = mdev->ldev->backing_bdev; + bio->bi_sector = sector; + + ds = data_size; + while (ds) { + page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT)); + if (!page) { + if (!(gfp_mask & __GFP_NOWARN)) + dev_err(DEV, "alloc_ee: Allocation of a page failed\n"); + goto fail2; + } + if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) { + drbd_pp_free(mdev, page); + dev_err(DEV, "alloc_ee: bio_add_page(s=%llu," + "data_size=%u,ds=%u) failed\n", + (unsigned long long)sector, data_size, ds); + + q = bdev_get_queue(bio->bi_bdev); + if (q->merge_bvec_fn) { + struct bvec_merge_data bvm = { + .bi_bdev = bio->bi_bdev, + .bi_sector = bio->bi_sector, + .bi_size = bio->bi_size, + .bi_rw = bio->bi_rw, + }; + int l = q->merge_bvec_fn(q, &bvm, + &bio->bi_io_vec[bio->bi_vcnt]); + dev_err(DEV, "merge_bvec_fn() = %d\n", l); + } + + /* dump more of the bio. */ + dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs); + dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt); + dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size); + dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments); + + goto fail2; + break; + } + ds -= min_t(int, ds, PAGE_SIZE); + } + + D_ASSERT(data_size == bio->bi_size); + + bio->bi_private = e; + e->mdev = mdev; + e->sector = sector; + e->size = bio->bi_size; + + e->private_bio = bio; + e->block_id = id; + INIT_HLIST_NODE(&e->colision); + e->epoch = NULL; + e->flags = 0; + + return e; + + fail2: + drbd_pp_free_bio_pages(mdev, bio); + bio_put(bio); + fail1: + mempool_free(e, drbd_ee_mempool); + + return NULL; +} + +void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) +{ + struct bio *bio = e->private_bio; + drbd_pp_free_bio_pages(mdev, bio); + bio_put(bio); + D_ASSERT(hlist_unhashed(&e->colision)); + mempool_free(e, drbd_ee_mempool); +} + +int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list) +{ + LIST_HEAD(work_list); + struct drbd_epoch_entry *e, *t; + int count = 0; + + spin_lock_irq(&mdev->req_lock); + list_splice_init(list, &work_list); + spin_unlock_irq(&mdev->req_lock); + + list_for_each_entry_safe(e, t, &work_list, w.list) { + drbd_free_ee(mdev, e); + count++; + } + return count; +} + + +/* + * This function is called from _asender only_ + * but see also comments in _req_mod(,barrier_acked) + * and receive_Barrier. + * + * Move entries from net_ee to done_ee, if ready. + * Grab done_ee, call all callbacks, free the entries. + * The callbacks typically send out ACKs. + */ +static int drbd_process_done_ee(struct drbd_conf *mdev) +{ + LIST_HEAD(work_list); + LIST_HEAD(reclaimed); + struct drbd_epoch_entry *e, *t; + int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS); + + spin_lock_irq(&mdev->req_lock); + reclaim_net_ee(mdev, &reclaimed); + list_splice_init(&mdev->done_ee, &work_list); + spin_unlock_irq(&mdev->req_lock); + + list_for_each_entry_safe(e, t, &reclaimed, w.list) + drbd_free_ee(mdev, e); + + /* possible callbacks here: + * e_end_block, and e_end_resync_block, e_send_discard_ack. + * all ignore the last argument. + */ + list_for_each_entry_safe(e, t, &work_list, w.list) { + /* list_del not necessary, next/prev members not touched */ + ok = e->w.cb(mdev, &e->w, !ok) && ok; + drbd_free_ee(mdev, e); + } + wake_up(&mdev->ee_wait); + + return ok; +} + +void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) +{ + DEFINE_WAIT(wait); + + /* avoids spin_lock/unlock + * and calling prepare_to_wait in the fast path */ + while (!list_empty(head)) { + prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&mdev->req_lock); + drbd_kick_lo(mdev); + schedule(); + finish_wait(&mdev->ee_wait, &wait); + spin_lock_irq(&mdev->req_lock); + } +} + +void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head) +{ + spin_lock_irq(&mdev->req_lock); + _drbd_wait_ee_list_empty(mdev, head); + spin_unlock_irq(&mdev->req_lock); +} + +/* see also kernel_accept; which is only present since 2.6.18. + * also we want to log which part of it failed, exactly */ +static int drbd_accept(struct drbd_conf *mdev, const char **what, + struct socket *sock, struct socket **newsock) +{ + struct sock *sk = sock->sk; + int err = 0; + + *what = "listen"; + err = sock->ops->listen(sock, 5); + if (err < 0) + goto out; + + *what = "sock_create_lite"; + err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol, + newsock); + if (err < 0) + goto out; + + *what = "accept"; + err = sock->ops->accept(sock, *newsock, 0); + if (err < 0) { + sock_release(*newsock); + *newsock = NULL; + goto out; + } + (*newsock)->ops = sock->ops; + +out: + return err; +} + +static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock, + void *buf, size_t size, int flags) +{ + mm_segment_t oldfs; + struct kvec iov = { + .iov_base = buf, + .iov_len = size, + }; + struct msghdr msg = { + .msg_iovlen = 1, + .msg_iov = (struct iovec *)&iov, + .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) + }; + int rv; + + oldfs = get_fs(); + set_fs(KERNEL_DS); + rv = sock_recvmsg(sock, &msg, size, msg.msg_flags); + set_fs(oldfs); + + return rv; +} + +static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size) +{ + mm_segment_t oldfs; + struct kvec iov = { + .iov_base = buf, + .iov_len = size, + }; + struct msghdr msg = { + .msg_iovlen = 1, + .msg_iov = (struct iovec *)&iov, + .msg_flags = MSG_WAITALL | MSG_NOSIGNAL + }; + int rv; + + oldfs = get_fs(); + set_fs(KERNEL_DS); + + for (;;) { + rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags); + if (rv == size) + break; + + /* Note: + * ECONNRESET other side closed the connection + * ERESTARTSYS (on sock) we got a signal + */ + + if (rv < 0) { + if (rv == -ECONNRESET) + dev_info(DEV, "sock was reset by peer\n"); + else if (rv != -ERESTARTSYS) + dev_err(DEV, "sock_recvmsg returned %d\n", rv); + break; + } else if (rv == 0) { + dev_info(DEV, "sock was shut down by peer\n"); + break; + } else { + /* signal came in, or peer/link went down, + * after we read a partial message + */ + /* D_ASSERT(signal_pending(current)); */ + break; + } + }; + + set_fs(oldfs); + + if (rv != size) + drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE)); + + return rv; +} + +static struct socket *drbd_try_connect(struct drbd_conf *mdev) +{ + const char *what; + struct socket *sock; + struct sockaddr_in6 src_in6; + int err; + int disconnect_on_error = 1; + + if (!get_net_conf(mdev)) + return NULL; + + what = "sock_create_kern"; + err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, + SOCK_STREAM, IPPROTO_TCP, &sock); + if (err < 0) { + sock = NULL; + goto out; + } + + sock->sk->sk_rcvtimeo = + sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ; + + /* explicitly bind to the configured IP as source IP + * for the outgoing connections. + * This is needed for multihomed hosts and to be + * able to use lo: interfaces for drbd. + * Make sure to use 0 as port number, so linux selects + * a free one dynamically. + */ + memcpy(&src_in6, mdev->net_conf->my_addr, + min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6))); + if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6) + src_in6.sin6_port = 0; + else + ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ + + what = "bind before connect"; + err = sock->ops->bind(sock, + (struct sockaddr *) &src_in6, + mdev->net_conf->my_addr_len); + if (err < 0) + goto out; + + /* connect may fail, peer not yet available. + * stay C_WF_CONNECTION, don't go Disconnecting! */ + disconnect_on_error = 0; + what = "connect"; + err = sock->ops->connect(sock, + (struct sockaddr *)mdev->net_conf->peer_addr, + mdev->net_conf->peer_addr_len, 0); + +out: + if (err < 0) { + if (sock) { + sock_release(sock); + sock = NULL; + } + switch (-err) { + /* timeout, busy, signal pending */ + case ETIMEDOUT: case EAGAIN: case EINPROGRESS: + case EINTR: case ERESTARTSYS: + /* peer not (yet) available, network problem */ + case ECONNREFUSED: case ENETUNREACH: + case EHOSTDOWN: case EHOSTUNREACH: + disconnect_on_error = 0; + break; + default: + dev_err(DEV, "%s failed, err = %d\n", what, err); + } + if (disconnect_on_error) + drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); + } + put_net_conf(mdev); + return sock; +} + +static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev) +{ + int timeo, err; + struct socket *s_estab = NULL, *s_listen; + const char *what; + + if (!get_net_conf(mdev)) + return NULL; + + what = "sock_create_kern"; + err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family, + SOCK_STREAM, IPPROTO_TCP, &s_listen); + if (err) { + s_listen = NULL; + goto out; + } + + timeo = mdev->net_conf->try_connect_int * HZ; + timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */ + + s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */ + s_listen->sk->sk_rcvtimeo = timeo; + s_listen->sk->sk_sndtimeo = timeo; + + what = "bind before listen"; + err = s_listen->ops->bind(s_listen, + (struct sockaddr *) mdev->net_conf->my_addr, + mdev->net_conf->my_addr_len); + if (err < 0) + goto out; + + err = drbd_accept(mdev, &what, s_listen, &s_estab); + +out: + if (s_listen) + sock_release(s_listen); + if (err < 0) { + if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { + dev_err(DEV, "%s failed, err = %d\n", what, err); + drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); + } + } + put_net_conf(mdev); + + return s_estab; +} + +static int drbd_send_fp(struct drbd_conf *mdev, + struct socket *sock, enum drbd_packets cmd) +{ + struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + + return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0); +} + +static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock) +{ + struct p_header *h = (struct p_header *) &mdev->data.sbuf.header; + int rr; + + rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0); + + if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC) + return be16_to_cpu(h->command); + + return 0xffff; +} + +/** + * drbd_socket_okay() - Free the socket if its connection is not okay + * @mdev: DRBD device. + * @sock: pointer to the pointer to the socket. + */ +static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock) +{ + int rr; + char tb[4]; + + if (!*sock) + return FALSE; + + rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); + + if (rr > 0 || rr == -EAGAIN) { + return TRUE; + } else { + sock_release(*sock); + *sock = NULL; + return FALSE; + } +} + +/* + * return values: + * 1 yes, we have a valid connection + * 0 oops, did not work out, please try again + * -1 peer talks different language, + * no point in trying again, please go standalone. + * -2 We do not have a network config... + */ +static int drbd_connect(struct drbd_conf *mdev) +{ + struct socket *s, *sock, *msock; + int try, h, ok; + + D_ASSERT(!mdev->data.socket); + + if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) + dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n"); + + if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS) + return -2; + + clear_bit(DISCARD_CONCURRENT, &mdev->flags); + + sock = NULL; + msock = NULL; + + do { + for (try = 0;;) { + /* 3 tries, this should take less than a second! */ + s = drbd_try_connect(mdev); + if (s || ++try >= 3) + break; + /* give the other side time to call bind() & listen() */ + __set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(HZ / 10); + } + + if (s) { + if (!sock) { + drbd_send_fp(mdev, s, P_HAND_SHAKE_S); + sock = s; + s = NULL; + } else if (!msock) { + drbd_send_fp(mdev, s, P_HAND_SHAKE_M); + msock = s; + s = NULL; + } else { + dev_err(DEV, "Logic error in drbd_connect()\n"); + goto out_release_sockets; + } + } + + if (sock && msock) { + __set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(HZ / 10); + ok = drbd_socket_okay(mdev, &sock); + ok = drbd_socket_okay(mdev, &msock) && ok; + if (ok) + break; + } + +retry: + s = drbd_wait_for_connect(mdev); + if (s) { + try = drbd_recv_fp(mdev, s); + drbd_socket_okay(mdev, &sock); + drbd_socket_okay(mdev, &msock); + switch (try) { + case P_HAND_SHAKE_S: + if (sock) { + dev_warn(DEV, "initial packet S crossed\n"); + sock_release(sock); + } + sock = s; + break; + case P_HAND_SHAKE_M: + if (msock) { + dev_warn(DEV, "initial packet M crossed\n"); + sock_release(msock); + } + msock = s; + set_bit(DISCARD_CONCURRENT, &mdev->flags); + break; + default: + dev_warn(DEV, "Error receiving initial packet\n"); + sock_release(s); + if (random32() & 1) + goto retry; + } + } + + if (mdev->state.conn <= C_DISCONNECTING) + goto out_release_sockets; + if (signal_pending(current)) { + flush_signals(current); + smp_rmb(); + if (get_t_state(&mdev->receiver) == Exiting) + goto out_release_sockets; + } + + if (sock && msock) { + ok = drbd_socket_okay(mdev, &sock); + ok = drbd_socket_okay(mdev, &msock) && ok; + if (ok) + break; + } + } while (1); + + msock->sk->sk_reuse = 1; /* SO_REUSEADDR */ + sock->sk->sk_reuse = 1; /* SO_REUSEADDR */ + + sock->sk->sk_allocation = GFP_NOIO; + msock->sk->sk_allocation = GFP_NOIO; + + sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; + msock->sk->sk_priority = TC_PRIO_INTERACTIVE; + + if (mdev->net_conf->sndbuf_size) { + sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size; + sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; + } + + if (mdev->net_conf->rcvbuf_size) { + sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size; + sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; + } + + /* NOT YET ... + * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; + * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; + * first set it to the P_HAND_SHAKE timeout, + * which we set to 4x the configured ping_timeout. */ + sock->sk->sk_sndtimeo = + sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10; + + msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; + msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ; + + /* we don't want delays. + * we use TCP_CORK where apropriate, though */ + drbd_tcp_nodelay(sock); + drbd_tcp_nodelay(msock); + + mdev->data.socket = sock; + mdev->meta.socket = msock; + mdev->last_received = jiffies; + + D_ASSERT(mdev->asender.task == NULL); + + h = drbd_do_handshake(mdev); + if (h <= 0) + return h; + + if (mdev->cram_hmac_tfm) { + /* drbd_request_state(mdev, NS(conn, WFAuth)); */ + if (!drbd_do_auth(mdev)) { + dev_err(DEV, "Authentication of peer failed\n"); + return -1; + } + } + + if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS) + return 0; + + sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10; + sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; + + atomic_set(&mdev->packet_seq, 0); + mdev->peer_seq = 0; + + drbd_thread_start(&mdev->asender); + + drbd_send_protocol(mdev); + drbd_send_sync_param(mdev, &mdev->sync_conf); + drbd_send_sizes(mdev, 0); + drbd_send_uuids(mdev); + drbd_send_state(mdev); + clear_bit(USE_DEGR_WFC_T, &mdev->flags); + clear_bit(RESIZE_PENDING, &mdev->flags); + + return 1; + +out_release_sockets: + if (sock) + sock_release(sock); + if (msock) + sock_release(msock); + return -1; +} + +static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h) +{ + int r; + + r = drbd_recv(mdev, h, sizeof(*h)); + + if (unlikely(r != sizeof(*h))) { + dev_err(DEV, "short read expecting header on sock: r=%d\n", r); + return FALSE; + }; + h->command = be16_to_cpu(h->command); + h->length = be16_to_cpu(h->length); + if (unlikely(h->magic != BE_DRBD_MAGIC)) { + dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n", + (long)be32_to_cpu(h->magic), + h->command, h->length); + return FALSE; + } + mdev->last_received = jiffies; + + return TRUE; +} + +static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch) +{ + int rv; + + if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) { + rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL); + if (rv) { + dev_err(DEV, "local disk flush failed with status %d\n", rv); + /* would rather check on EOPNOTSUPP, but that is not reliable. + * don't try again for ANY return value != 0 + * if (rv == -EOPNOTSUPP) */ + drbd_bump_write_ordering(mdev, WO_drain_io); + } + put_ldev(mdev); + } + + return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); +} + +static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel) +{ + struct flush_work *fw = (struct flush_work *)w; + struct drbd_epoch *epoch = fw->epoch; + + kfree(w); + + if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags)) + drbd_flush_after_epoch(mdev, epoch); + + drbd_may_finish_epoch(mdev, epoch, EV_PUT | + (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0)); + + return 1; +} + +/** + * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. + * @mdev: DRBD device. + * @epoch: Epoch object. + * @ev: Epoch event. + */ +static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev, + struct drbd_epoch *epoch, + enum epoch_event ev) +{ + int finish, epoch_size; + struct drbd_epoch *next_epoch; + int schedule_flush = 0; + enum finish_epoch rv = FE_STILL_LIVE; + + spin_lock(&mdev->epoch_lock); + do { + next_epoch = NULL; + finish = 0; + + epoch_size = atomic_read(&epoch->epoch_size); + + switch (ev & ~EV_CLEANUP) { + case EV_PUT: + atomic_dec(&epoch->active); + break; + case EV_GOT_BARRIER_NR: + set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); + + /* Special case: If we just switched from WO_bio_barrier to + WO_bdev_flush we should not finish the current epoch */ + if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 && + mdev->write_ordering != WO_bio_barrier && + epoch == mdev->current_epoch) + clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags); + break; + case EV_BARRIER_DONE: + set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags); + break; + case EV_BECAME_LAST: + /* nothing to do*/ + break; + } + + if (epoch_size != 0 && + atomic_read(&epoch->active) == 0 && + test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) && + epoch->list.prev == &mdev->current_epoch->list && + !test_bit(DE_IS_FINISHING, &epoch->flags)) { + /* Nearly all conditions are met to finish that epoch... */ + if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) || + mdev->write_ordering == WO_none || + (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) || + ev & EV_CLEANUP) { + finish = 1; + set_bit(DE_IS_FINISHING, &epoch->flags); + } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) && + mdev->write_ordering == WO_bio_barrier) { + atomic_inc(&epoch->active); + schedule_flush = 1; + } + } + if (finish) { + if (!(ev & EV_CLEANUP)) { + spin_unlock(&mdev->epoch_lock); + drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size); + spin_lock(&mdev->epoch_lock); + } + dec_unacked(mdev); + + if (mdev->current_epoch != epoch) { + next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); + list_del(&epoch->list); + ev = EV_BECAME_LAST | (ev & EV_CLEANUP); + mdev->epochs--; + kfree(epoch); + + if (rv == FE_STILL_LIVE) + rv = FE_DESTROYED; + } else { + epoch->flags = 0; + atomic_set(&epoch->epoch_size, 0); + /* atomic_set(&epoch->active, 0); is alrady zero */ + if (rv == FE_STILL_LIVE) + rv = FE_RECYCLED; + } + } + + if (!next_epoch) + break; + + epoch = next_epoch; + } while (1); + + spin_unlock(&mdev->epoch_lock); + + if (schedule_flush) { + struct flush_work *fw; + fw = kmalloc(sizeof(*fw), GFP_ATOMIC); + if (fw) { + fw->w.cb = w_flush; + fw->epoch = epoch; + drbd_queue_work(&mdev->data.work, &fw->w); + } else { + dev_warn(DEV, "Could not kmalloc a flush_work obj\n"); + set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags); + /* That is not a recursion, only one level */ + drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE); + drbd_may_finish_epoch(mdev, epoch, EV_PUT); + } + } + + return rv; +} + +/** + * drbd_bump_write_ordering() - Fall back to an other write ordering method + * @mdev: DRBD device. + * @wo: Write ordering method to try. + */ +void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local) +{ + enum write_ordering_e pwo; + static char *write_ordering_str[] = { + [WO_none] = "none", + [WO_drain_io] = "drain", + [WO_bdev_flush] = "flush", + [WO_bio_barrier] = "barrier", + }; + + pwo = mdev->write_ordering; + wo = min(pwo, wo); + if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier) + wo = WO_bdev_flush; + if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush) + wo = WO_drain_io; + if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain) + wo = WO_none; + mdev->write_ordering = wo; + if (pwo != mdev->write_ordering || wo == WO_bio_barrier) + dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]); +} + +/** + * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set + * @mdev: DRBD device. + * @w: work object. + * @cancel: The connection will be closed anyways (unused in this callback) + */ +int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) +{ + struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; + struct bio *bio = e->private_bio; + + /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, + (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) + so that we can finish that epoch in drbd_may_finish_epoch(). + That is necessary if we already have a long chain of Epochs, before + we realize that BIO_RW_BARRIER is actually not supported */ + + /* As long as the -ENOTSUPP on the barrier is reported immediately + that will never trigger. If it is reported late, we will just + print that warning and continue correctly for all future requests + with WO_bdev_flush */ + if (previous_epoch(mdev, e->epoch)) + dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); + + /* prepare bio for re-submit, + * re-init volatile members */ + /* we still have a local reference, + * get_ldev was done in receive_Data. */ + bio->bi_bdev = mdev->ldev->backing_bdev; + bio->bi_sector = e->sector; + bio->bi_size = e->size; + bio->bi_idx = 0; + + bio->bi_flags &= ~(BIO_POOL_MASK - 1); + bio->bi_flags |= 1 << BIO_UPTODATE; + + /* don't know whether this is necessary: */ + bio->bi_phys_segments = 0; + bio->bi_next = NULL; + + /* these should be unchanged: */ + /* bio->bi_end_io = drbd_endio_write_sec; */ + /* bio->bi_vcnt = whatever; */ + + e->w.cb = e_end_block; + + /* This is no longer a barrier request. */ + bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); + + drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); + + return 1; +} + +static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h) +{ + int rv, issue_flush; + struct p_barrier *p = (struct p_barrier *)h; + struct drbd_epoch *epoch; + + ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE; + + rv = drbd_recv(mdev, h->payload, h->length); + ERR_IF(rv != h->length) return FALSE; + + inc_unacked(mdev); |