diff options
author | Lars Ellenberg <lars.ellenberg@linbit.com> | 2010-05-14 17:10:48 +0200 |
---|---|---|
committer | Philipp Reisner <philipp.reisner@linbit.com> | 2010-05-18 02:01:23 +0200 |
commit | 45bb912bd5ea4d2b3a270a93cbdf767a0e2df6f5 (patch) | |
tree | d95d27ea8e945fcda3427c50a5bc062c804c6eff /drivers/block/drbd/drbd_receiver.c | |
parent | 708d740ed8242b84eefc63df144313a7308c7de5 (diff) |
drbd: Allow drbd_epoch_entries to use multiple bios.
This should allow for better performance if the lower level IO stack
of the peers differs in limits exposed either via the queue,
or via some merge_bvec_fn.
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 483 |
1 files changed, 274 insertions, 209 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index fee0d249adf..388a3e8bb0d 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -80,30 +80,124 @@ static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epo #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) -static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev) +/* + * some helper functions to deal with single linked page lists, + * page->private being our "next" pointer. + */ + +/* If at least n pages are linked at head, get n pages off. + * Otherwise, don't modify head, and return NULL. + * Locking is the responsibility of the caller. + */ +static struct page *page_chain_del(struct page **head, int n) +{ + struct page *page; + struct page *tmp; + + BUG_ON(!n); + BUG_ON(!head); + + page = *head; + while (page) { + tmp = page_chain_next(page); + if (--n == 0) + break; /* found sufficient pages */ + if (tmp == NULL) + /* insufficient pages, don't use any of them. */ + return NULL; + page = tmp; + } + + /* add end of list marker for the returned list */ + set_page_private(page, 0); + /* actual return value, and adjustment of head */ + page = *head; + *head = tmp; + return page; +} + +/* may be used outside of locks to find the tail of a (usually short) + * "private" page chain, before adding it back to a global chain head + * with page_chain_add() under a spinlock. */ +static struct page *page_chain_tail(struct page *page, int *len) +{ + struct page *tmp; + int i = 1; + while ((tmp = page_chain_next(page))) + ++i, page = tmp; + if (len) + *len = i; + return page; +} + +static int page_chain_free(struct page *page) +{ + struct page *tmp; + int i = 0; + page_chain_for_each_safe(page, tmp) { + put_page(page); + ++i; + } + return i; +} + +static void page_chain_add(struct page **head, + struct page *chain_first, struct page *chain_last) +{ +#if 1 + struct page *tmp; + tmp = page_chain_tail(chain_first, NULL); + BUG_ON(tmp != chain_last); +#endif + + /* add chain to head */ + set_page_private(chain_last, (unsigned long)*head); + *head = chain_first; +} + +static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number) { struct page *page = NULL; + struct page *tmp = NULL; + int i = 0; /* Yes, testing drbd_pp_vacant outside the lock is racy. * So what. It saves a spin_lock. */ - if (drbd_pp_vacant > 0) { + if (drbd_pp_vacant >= number) { spin_lock(&drbd_pp_lock); - page = drbd_pp_pool; - if (page) { - drbd_pp_pool = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - drbd_pp_vacant--; - } + page = page_chain_del(&drbd_pp_pool, number); + if (page) + drbd_pp_vacant -= number; spin_unlock(&drbd_pp_lock); + if (page) + return page; } + /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD * "criss-cross" setup, that might cause write-out on some other DRBD, * which in turn might block on the other node at this very place. */ - if (!page) - page = alloc_page(GFP_TRY); - if (page) - atomic_inc(&mdev->pp_in_use); - return page; + for (i = 0; i < number; i++) { + tmp = alloc_page(GFP_TRY); + if (!tmp) + break; + set_page_private(tmp, (unsigned long)page); + page = tmp; + } + + if (i == number) + return page; + + /* Not enough pages immediately available this time. + * No need to jump around here, drbd_pp_alloc will retry this + * function "soon". */ + if (page) { + tmp = page_chain_tail(page, NULL); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); + } + return NULL; } /* kick lower level device, if we have more than (arbitrary number) @@ -127,7 +221,7 @@ static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed list_for_each_safe(le, tle, &mdev->net_ee) { e = list_entry(le, struct drbd_epoch_entry, w.list); - if (drbd_bio_has_active_page(e->private_bio)) + if (drbd_ee_has_active_page(e)) break; list_move(le, to_be_freed); } @@ -148,32 +242,34 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev) } /** - * drbd_pp_alloc() - Returns a page, fails only if a signal comes in + * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled) * @mdev: DRBD device. - * @retry: whether or not to retry allocation forever (or until signalled) + * @number: number of pages requested + * @retry: whether to retry, if not enough pages are available right now + * + * Tries to allocate number pages, first from our own page pool, then from + * the kernel, unless this allocation would exceed the max_buffers setting. + * Possibly retry until DRBD frees sufficient pages somewhere else. * - * Tries to allocate a page, first from our own page pool, then from the - * kernel, unless this allocation would exceed the max_buffers setting. - * If @retry is non-zero, retry until DRBD frees a page somewhere else. + * Returns a page chain linked via page->private. */ -static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) +static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry) { struct page *page = NULL; DEFINE_WAIT(wait); - if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); - if (page) - return page; - } + /* Yes, we may run up to @number over max_buffers. If we + * follow it strictly, the admin will get it wrong anyways. */ + if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) + page = drbd_pp_first_pages_or_try_alloc(mdev, number); - for (;;) { + while (page == NULL) { prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); drbd_kick_lo_and_reclaim_net(mdev); if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) { - page = drbd_pp_first_page_or_try_alloc(mdev); + page = drbd_pp_first_pages_or_try_alloc(mdev, number); if (page) break; } @@ -190,62 +286,32 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry) } finish_wait(&drbd_pp_wait, &wait); + if (page) + atomic_add(number, &mdev->pp_in_use); return page; } /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc. - * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */ + * Is also used from inside an other spin_lock_irq(&mdev->req_lock); + * Either links the page chain back to the global pool, + * or returns all pages to the system. */ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page) { - int free_it; - - spin_lock(&drbd_pp_lock); - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - free_it = 1; - } else { - set_page_private(page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = page; - drbd_pp_vacant++; - free_it = 0; - } - spin_unlock(&drbd_pp_lock); - - atomic_dec(&mdev->pp_in_use); - - if (free_it) - __free_page(page); - - wake_up(&drbd_pp_wait); -} - -static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio) -{ - struct page *p_to_be_freed = NULL; - struct page *page; - struct bio_vec *bvec; int i; - - spin_lock(&drbd_pp_lock); - __bio_for_each_segment(bvec, bio, i, 0) { - if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) { - set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed); - p_to_be_freed = bvec->bv_page; - } else { - set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool); - drbd_pp_pool = bvec->bv_page; - drbd_pp_vacant++; - } - } - spin_unlock(&drbd_pp_lock); - atomic_sub(bio->bi_vcnt, &mdev->pp_in_use); - - while (p_to_be_freed) { - page = p_to_be_freed; - p_to_be_freed = (struct page *)page_private(page); - set_page_private(page, 0); /* just to be polite */ - put_page(page); + if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) + i = page_chain_free(page); + else { + struct page *tmp; + tmp = page_chain_tail(page, &i); + spin_lock(&drbd_pp_lock); + page_chain_add(&drbd_pp_pool, page, tmp); + drbd_pp_vacant += i; + spin_unlock(&drbd_pp_lock); } - + atomic_sub(i, &mdev->pp_in_use); + i = atomic_read(&mdev->pp_in_use); + if (i < 0) + dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); wake_up(&drbd_pp_wait); } @@ -270,11 +336,9 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, unsigned int data_size, gfp_t gfp_mask) __must_hold(local) { - struct request_queue *q; struct drbd_epoch_entry *e; struct page *page; - struct bio *bio; - unsigned int ds; + unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE)) return NULL; @@ -286,84 +350,32 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev, return NULL; } - bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE)); - if (!bio) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a bio failed\n"); - goto fail1; - } - - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = sector; - - ds = data_size; - while (ds) { - page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT)); - if (!page) { - if (!(gfp_mask & __GFP_NOWARN)) - dev_err(DEV, "alloc_ee: Allocation of a page failed\n"); - goto fail2; - } - if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) { - drbd_pp_free(mdev, page); - dev_err(DEV, "alloc_ee: bio_add_page(s=%llu," - "data_size=%u,ds=%u) failed\n", - (unsigned long long)sector, data_size, ds); - - q = bdev_get_queue(bio->bi_bdev); - if (q->merge_bvec_fn) { - struct bvec_merge_data bvm = { - .bi_bdev = bio->bi_bdev, - .bi_sector = bio->bi_sector, - .bi_size = bio->bi_size, - .bi_rw = bio->bi_rw, - }; - int l = q->merge_bvec_fn(q, &bvm, - &bio->bi_io_vec[bio->bi_vcnt]); - dev_err(DEV, "merge_bvec_fn() = %d\n", l); - } - - /* dump more of the bio. */ - dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs); - dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt); - dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size); - dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments); - - goto fail2; - break; - } - ds -= min_t(int, ds, PAGE_SIZE); - } - - D_ASSERT(data_size == bio->bi_size); - - bio->bi_private = e; - e->mdev = mdev; - e->sector = sector; - e->size = bio->bi_size; + page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT)); + if (!page) + goto fail; - e->private_bio = bio; - e->block_id = id; INIT_HLIST_NODE(&e->colision); e->epoch = NULL; + e->mdev = mdev; + e->pages = page; + atomic_set(&e->pending_bios, 0); + e->size = data_size; e->flags = 0; + e->sector = sector; + e->sector = sector; + e->block_id = id; return e; - fail2: - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); - fail1: + fail: mempool_free(e, drbd_ee_mempool); - return NULL; } void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e) { - struct bio *bio = e->private_bio; - drbd_pp_free_bio_pages(mdev, bio); - bio_put(bio); + drbd_pp_free(mdev, e->pages); + D_ASSERT(atomic_read(&e->pending_bios) == 0); D_ASSERT(hlist_unhashed(&e->colision)); mempool_free(e, drbd_ee_mempool); } @@ -1121,6 +1133,90 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) } /** + * drbd_submit_ee() + * @mdev: DRBD device. + * @e: epoch entry + * @rw: flag field, see bio->bi_rw + */ +/* TODO allocate from our own bio_set. */ +int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, + const unsigned rw, const int fault_type) +{ + struct bio *bios = NULL; + struct bio *bio; + struct page *page = e->pages; + sector_t sector = e->sector; + unsigned ds = e->size; + unsigned n_bios = 0; + unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT; + + /* In most cases, we will only need one bio. But in case the lower + * level restrictions happen to be different at this offset on this + * side than those of the sending peer, we may need to submit the + * request in more than one bio. */ +next_bio: + bio = bio_alloc(GFP_NOIO, nr_pages); + if (!bio) { + dev_err(DEV, "submit_ee: Allocation of a bio failed\n"); + goto fail; + } + /* > e->sector, unless this is the first bio */ + bio->bi_sector = sector; + bio->bi_bdev = mdev->ldev->backing_bdev; + /* we special case some flags in the multi-bio case, see below + * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */ + bio->bi_rw = rw; + bio->bi_private = e; + bio->bi_end_io = drbd_endio_sec; + + bio->bi_next = bios; + bios = bio; + ++n_bios; + + page_chain_for_each(page) { + unsigned len = min_t(unsigned, ds, PAGE_SIZE); + if (!bio_add_page(bio, page, len, 0)) { + /* a single page must always be possible! */ + BUG_ON(bio->bi_vcnt == 0); + goto next_bio; + } + ds -= len; + sector += len >> 9; + --nr_pages; + } + D_ASSERT(page == NULL); + D_ASSERT(ds == 0); + + atomic_set(&e->pending_bios, n_bios); + do { + bio = bios; + bios = bios->bi_next; + bio->bi_next = NULL; + + /* strip off BIO_RW_UNPLUG unless it is the last bio */ + if (bios) + bio->bi_rw &= ~(1<<BIO_RW_UNPLUG); + + drbd_generic_make_request(mdev, fault_type, bio); + + /* strip off BIO_RW_BARRIER, + * unless it is the first or last bio */ + if (bios && bios->bi_next) + bios->bi_rw &= ~(1<<BIO_RW_BARRIER); + } while (bios); + maybe_kick_lo(mdev); + return 0; + +fail: + while (bios) { + bio = bios; + bios = bios->bi_next; + bio_put(bio); + } + return -ENOMEM; +} + +/** * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set * @mdev: DRBD device. * @w: work object. @@ -1129,8 +1225,6 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local) { struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w; - struct bio *bio = e->private_bio; - /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place, (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch) so that we can finish that epoch in drbd_may_finish_epoch(). @@ -1144,33 +1238,17 @@ int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __relea if (previous_epoch(mdev, e->epoch)) dev_warn(DEV, "Write ordering was not enforced (one time event)\n"); - /* prepare bio for re-submit, - * re-init volatile members */ /* we still have a local reference, * get_ldev was done in receive_Data. */ - bio->bi_bdev = mdev->ldev->backing_bdev; - bio->bi_sector = e->sector; - bio->bi_size = e->size; - bio->bi_idx = 0; - - bio->bi_flags &= ~(BIO_POOL_MASK - 1); - bio->bi_flags |= 1 << BIO_UPTODATE; - - /* don't know whether this is necessary: */ - bio->bi_phys_segments = 0; - bio->bi_next = NULL; - - /* these should be unchanged: */ - /* bio->bi_end_io = drbd_endio_write_sec; */ - /* bio->bi_vcnt = whatever; */ e->w.cb = e_end_block; - - /* This is no longer a barrier request. */ - bio->bi_rw &= ~(1UL << BIO_RW_BARRIER); - - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio); - + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) { + /* drbd_submit_ee fails for one reason only: + * if was not able to allocate sufficient bios. + * requeue, try again later. */ + e->w.cb = w_e_reissue; + drbd_queue_work(&mdev->data.work, &e->w); + } return 1; } @@ -1264,10 +1342,8 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __ { const sector_t capacity = drbd_get_capacity(mdev->this_bdev); struct drbd_epoch_entry *e; - struct bio_vec *bvec; struct page *page; - struct bio *bio; - int dgs, ds, i, rr; + int dgs, ds, rr; void *dig_in = mdev->int_dig_in; void *dig_vv = mdev->int_dig_vv; unsigned long *data; @@ -1304,28 +1380,29 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __ e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO); if (!e) return NULL; - bio = e->private_bio; + ds = data_size; - bio_for_each_segment(bvec, bio, i) { - page = bvec->bv_page; + page = e->pages; + page_chain_for_each(page) { + unsigned len = min_t(int, ds, PAGE_SIZE); data = kmap(page); - rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE)); + rr = drbd_recv(mdev, data, len); if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) { dev_err(DEV, "Fault injection: Corrupting data on receive\n"); data[0] = data[0] ^ (unsigned long)-1; } kunmap(page); - if (rr != min_t(int, ds, PAGE_SIZE)) { + if (rr != len) { drbd_free_ee(mdev, e); dev_warn(DEV, "short read receiving data: read %d expected %d\n", - rr, min_t(int, ds, PAGE_SIZE)); + rr, len); return NULL; } ds -= rr; } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED.\n"); drbd_bcast_ee(mdev, "digest failed", @@ -1350,7 +1427,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size) if (!data_size) return TRUE; - page = drbd_pp_alloc(mdev, 1); + page = drbd_pp_alloc(mdev, 1, 1); data = kmap(page); while (data_size) { @@ -1414,7 +1491,7 @@ static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req, } if (dgs) { - drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv); + drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv); if (memcmp(dig_in, dig_vv, dgs)) { dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n"); return 0; @@ -1435,7 +1512,7 @@ static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int u D_ASSERT(hlist_unhashed(&e->colision)); - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { drbd_set_in_sync(mdev, sector, e->size); ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e); } else { @@ -1454,30 +1531,28 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si struct drbd_epoch_entry *e; e = read_in_block(mdev, ID_SYNCER, sector, data_size); - if (!e) { - put_ldev(mdev); - return FALSE; - } + if (!e) + goto fail; dec_rs_pending(mdev); - e->private_bio->bi_end_io = drbd_endio_write_sec; - e->private_bio->bi_rw = WRITE; - e->w.cb = e_end_resync_block; - inc_unacked(mdev); /* corresponding dec_unacked() in e_end_resync_block() * respective _drbd_clear_done_ee */ + e->w.cb = e_end_resync_block; + spin_lock_irq(&mdev->req_lock); list_add(&e->w.list, &mdev->sync_ee); spin_unlock_irq(&mdev->req_lock); - drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio); - /* accounting done in endio */ + if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0) + return TRUE; - maybe_kick_lo(mdev); - return TRUE; + drbd_free_ee(mdev, e); +fail: + put_ldev(mdev); + return FALSE; } static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h) @@ -1572,7 +1647,7 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel) } if (mdev->net_conf->wire_protocol == DRBD_PROT_C) { - if (likely(drbd_bio_uptodate(e->private_bio))) { + if (likely((e->flags & EE_WAS_ERROR) == 0)) { pcmd = (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn <= C_PAUSED_SYNC_T && e->flags & EE_MAY_SET_IN_SYNC) ? @@ -1718,7 +1793,6 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_end_io = drbd_endio_write_sec; e->w.cb = e_end_block; spin_lock(&mdev->epoch_lock); @@ -1914,12 +1988,8 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h) drbd_al_begin_io(mdev, e->sector); } - e->private_bio->bi_rw = rw; - drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio); - /* accounting done in endio */ - - maybe_kick_lo(mdev); - return TRUE; + if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0) + return TRUE; out_interrupted: /* yes, the epoch_size now is imbalanced. @@ -1977,9 +2047,6 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) return FALSE; } - e->private_bio->bi_rw = READ; - e->private_bio->bi_end_io = drbd_endio_read_sec; - switch (h->command) { case P_DATA_REQUEST: e->w.cb = w_e_end_data_req; @@ -2073,10 +2140,8 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h) inc_unacked(mdev); - drbd_generic_make_request(mdev, fault_type, e->private_bio); - maybe_kick_lo(mdev); - - return TRUE; + if (drbd_submit_ee(mdev, e, READ, fault_type) == 0) + return TRUE; out_free_e: kfree(di); @@ -3837,7 +3902,7 @@ static void drbd_disconnect(struct drbd_conf *mdev) dev_info(DEV, "net_ee not empty, killed %u entries\n", i); i = atomic_read(&mdev->pp_in_use); if (i) - dev_info(DEV, "pp_in_use = %u, expected 0\n", i); + dev_info(DEV, "pp_in_use = %d, expected 0\n", i); D_ASSERT(list_empty(&mdev->read_ee)); D_ASSERT(list_empty(&mdev->active_ee)); |