diff options
Diffstat (limited to 'fs/ceph/addr.c')
| -rw-r--r-- | fs/ceph/addr.c | 673 | 
1 files changed, 420 insertions, 253 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e9c874abc9e..90b3954d48e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -11,6 +11,7 @@  #include "super.h"  #include "mds_client.h" +#include "cache.h"  #include <linux/ceph/osd_client.h>  /* @@ -24,7 +25,7 @@   * context needs to be associated with the osd write during writeback.   *   * Similarly, struct ceph_inode_info maintains a set of counters to - * count dirty pages on the inode.  In the absense of snapshots, + * count dirty pages on the inode.  In the absence of snapshots,   * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.   *   * When a snapshot is taken (that is, when the client receives @@ -54,7 +55,12 @@  	(CONGESTION_ON_THRESH(congestion_kb) -				\  	 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) - +static inline struct ceph_snap_context *page_snap_context(struct page *page) +{ +	if (PagePrivate(page)) +		return (void *)page->private; +	return NULL; +}  /*   * Dirty a page.  Optimistically adjust accounting, on the assumption @@ -65,15 +71,16 @@ static int ceph_set_page_dirty(struct page *page)  	struct address_space *mapping = page->mapping;  	struct inode *inode;  	struct ceph_inode_info *ci; -	int undo = 0;  	struct ceph_snap_context *snapc; +	int ret;  	if (unlikely(!mapping))  		return !TestSetPageDirty(page); -	if (TestSetPageDirty(page)) { +	if (PageDirty(page)) {  		dout("%p set_page_dirty %p idx %lu -- already dirty\n",  		     mapping->host, page, page->index); +		BUG_ON(!PagePrivate(page));  		return 0;  	} @@ -87,12 +94,12 @@ static int ceph_set_page_dirty(struct page *page)  	snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);  	/* dirty the head */ -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	if (ci->i_head_snapc == NULL)  		ci->i_head_snapc = ceph_get_snap_context(snapc);  	++ci->i_wrbuffer_ref_head;  	if (ci->i_wrbuffer_ref == 0) -		igrab(inode); +		ihold(inode);  	++ci->i_wrbuffer_ref;  	dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "  	     "snapc %p seq %lld (%d snaps)\n", @@ -100,37 +107,21 @@ static int ceph_set_page_dirty(struct page *page)  	     ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,  	     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,  	     snapc, snapc->seq, snapc->num_snaps); -	spin_unlock(&inode->i_lock); - -	/* now adjust page */ -	spin_lock_irq(&mapping->tree_lock); -	if (page->mapping) {	/* Race with truncate? */ -		WARN_ON_ONCE(!PageUptodate(page)); -		account_page_dirtied(page, page->mapping); -		radix_tree_tag_set(&mapping->page_tree, -				page_index(page), PAGECACHE_TAG_DIRTY); - -		/* -		 * Reference snap context in page->private.  Also set -		 * PagePrivate so that we get invalidatepage callback. -		 */ -		page->private = (unsigned long)snapc; -		SetPagePrivate(page); -	} else { -		dout("ANON set_page_dirty %p (raced truncate?)\n", page); -		undo = 1; -	} - -	spin_unlock_irq(&mapping->tree_lock); +	spin_unlock(&ci->i_ceph_lock); -	if (undo) -		/* whoops, we failed to dirty the page */ -		ceph_put_wrbuffer_cap_refs(ci, 1, snapc); +	/* +	 * Reference snap context in page->private.  Also set +	 * PagePrivate so that we get invalidatepage callback. +	 */ +	BUG_ON(PagePrivate(page)); +	page->private = (unsigned long)snapc; +	SetPagePrivate(page); -	__mark_inode_dirty(mapping->host, I_DIRTY_PAGES); +	ret = __set_page_dirty_nobuffers(page); +	WARN_ON(!PageLocked(page)); +	WARN_ON(!page->mapping); -	BUG_ON(!PageDirty(page)); -	return 1; +	return ret;  }  /* @@ -138,18 +129,26 @@ static int ceph_set_page_dirty(struct page *page)   * dirty page counters appropriately.  Only called if there is private   * data on the page.   */ -static void ceph_invalidatepage(struct page *page, unsigned long offset) +static void ceph_invalidatepage(struct page *page, unsigned int offset, +				unsigned int length)  {  	struct inode *inode;  	struct ceph_inode_info *ci; -	struct ceph_snap_context *snapc = (void *)page->private; - -	BUG_ON(!PageLocked(page)); -	BUG_ON(!page->private); -	BUG_ON(!PagePrivate(page)); -	BUG_ON(!page->mapping); +	struct ceph_snap_context *snapc = page_snap_context(page);  	inode = page->mapping->host; +	ci = ceph_inode(inode); + +	if (offset != 0 || length != PAGE_CACHE_SIZE) { +		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n", +		     inode, page, page->index, offset, length); +		return; +	} + +	ceph_invalidate_fscache_page(inode, page); + +	if (!PagePrivate(page)) +		return;  	/*  	 * We can get non-dirty pages here due to races between @@ -159,32 +158,28 @@ static void ceph_invalidatepage(struct page *page, unsigned long offset)  	if (!PageDirty(page))  		pr_err("%p invalidatepage %p page not dirty\n", inode, page); -	if (offset == 0) -		ClearPageChecked(page); +	ClearPageChecked(page); -	ci = ceph_inode(inode); -	if (offset == 0) { -		dout("%p invalidatepage %p idx %lu full dirty page %lu\n", -		     inode, page, page->index, offset); -		ceph_put_wrbuffer_cap_refs(ci, 1, snapc); -		ceph_put_snap_context(snapc); -		page->private = 0; -		ClearPagePrivate(page); -	} else { -		dout("%p invalidatepage %p idx %lu partial dirty page\n", -		     inode, page, page->index); -	} +	dout("%p invalidatepage %p idx %lu full dirty page\n", +	     inode, page, page->index); + +	ceph_put_wrbuffer_cap_refs(ci, 1, snapc); +	ceph_put_snap_context(snapc); +	page->private = 0; +	ClearPagePrivate(page);  } -/* just a sanity check */  static int ceph_releasepage(struct page *page, gfp_t g)  {  	struct inode *inode = page->mapping ? page->mapping->host : NULL;  	dout("%p releasepage %p idx %lu\n", inode, page, page->index);  	WARN_ON(PageDirty(page)); -	WARN_ON(page->private); -	WARN_ON(PagePrivate(page)); -	return 0; + +	/* Can we release the page from the cache? */ +	if (!ceph_release_fscache_page(page, g)) +		return 0; + +	return !PagePrivate(page);  }  /* @@ -192,29 +187,39 @@ static int ceph_releasepage(struct page *page, gfp_t g)   */  static int readpage_nounlock(struct file *filp, struct page *page)  { -	struct inode *inode = filp->f_dentry->d_inode; +	struct inode *inode = file_inode(filp);  	struct ceph_inode_info *ci = ceph_inode(inode); -	struct ceph_osd_client *osdc =  +	struct ceph_osd_client *osdc =  		&ceph_inode_to_client(inode)->client->osdc;  	int err = 0;  	u64 len = PAGE_CACHE_SIZE; +	err = ceph_readpage_from_fscache(inode, page); + +	if (err == 0) +		goto out; +  	dout("readpage inode %p file %p page %p index %lu\n",  	     inode, filp, page, page->index);  	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, -				  page->index << PAGE_CACHE_SHIFT, &len, +				  (u64) page_offset(page), &len,  				  ci->i_truncate_seq, ci->i_truncate_size, -				  &page, 1); +				  &page, 1, 0);  	if (err == -ENOENT)  		err = 0;  	if (err < 0) {  		SetPageError(page); +		ceph_fscache_readpage_cancel(inode, page);  		goto out; -	} else if (err < PAGE_CACHE_SIZE) { +	} +	if (err < PAGE_CACHE_SIZE)  		/* zero fill remainder of page */  		zero_user_segment(page, err, PAGE_CACHE_SIZE); -	} +	else +		flush_dcache_page(page); +  	SetPageUptodate(page); +	ceph_readpage_to_fscache(inode, page);  out:  	return err < 0 ? err : 0; @@ -228,102 +233,180 @@ static int ceph_readpage(struct file *filp, struct page *page)  }  /* - * Build a vector of contiguous pages from the provided page list. + * Finish an async read(ahead) op.   */ -static struct page **page_vector_from_list(struct list_head *page_list, -					   unsigned *nr_pages) +static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)  { -	struct page **pages; -	struct page *page; -	int next_index, contig_pages = 0; +	struct inode *inode = req->r_inode; +	struct ceph_osd_data *osd_data; +	int rc = req->r_result; +	int bytes = le32_to_cpu(msg->hdr.data_len); +	int num_pages; +	int i; -	/* build page vector */ -	pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); -	if (!pages) -		return ERR_PTR(-ENOMEM); +	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); -	BUG_ON(list_empty(page_list)); -	next_index = list_entry(page_list->prev, struct page, lru)->index; -	list_for_each_entry_reverse(page, page_list, lru) { -		if (page->index == next_index) { -			dout("readpages page %d %p\n", contig_pages, page); -			pages[contig_pages] = page; -			contig_pages++; -			next_index++; -		} else { -			break; +	/* unlock all pages, zeroing any data we didn't read */ +	osd_data = osd_req_op_extent_osd_data(req, 0); +	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); +	num_pages = calc_pages_for((u64)osd_data->alignment, +					(u64)osd_data->length); +	for (i = 0; i < num_pages; i++) { +		struct page *page = osd_data->pages[i]; + +		if (rc < 0) +			goto unlock; +		if (bytes < (int)PAGE_CACHE_SIZE) { +			/* zero (remainder of) page */ +			int s = bytes < 0 ? 0 : bytes; +			zero_user_segment(page, s, PAGE_CACHE_SIZE);  		} + 		dout("finish_read %p uptodate %p idx %lu\n", inode, page, +		     page->index); +		flush_dcache_page(page); +		SetPageUptodate(page); +		ceph_readpage_to_fscache(inode, page); +unlock: +		unlock_page(page); +		page_cache_release(page); +		bytes -= PAGE_CACHE_SIZE;  	} -	*nr_pages = contig_pages; -	return pages; +	kfree(osd_data->pages); +} + +static void ceph_unlock_page_vector(struct page **pages, int num_pages) +{ +	int i; + +	for (i = 0; i < num_pages; i++) +		unlock_page(pages[i]);  }  /* - * Read multiple pages.  Leave pages we don't read + unlock in page_list; - * the caller (VM) cleans them up. + * start an async read(ahead) operation.  return nr_pages we submitted + * a read for on success, or negative error code.   */ -static int ceph_readpages(struct file *file, struct address_space *mapping, -			  struct list_head *page_list, unsigned nr_pages) +static int start_read(struct inode *inode, struct list_head *page_list, int max)  { -	struct inode *inode = file->f_dentry->d_inode; -	struct ceph_inode_info *ci = ceph_inode(inode);  	struct ceph_osd_client *osdc =  		&ceph_inode_to_client(inode)->client->osdc; -	int rc = 0; -	struct page **pages; -	loff_t offset; +	struct ceph_inode_info *ci = ceph_inode(inode); +	struct page *page = list_entry(page_list->prev, struct page, lru); +	struct ceph_vino vino; +	struct ceph_osd_request *req; +	u64 off;  	u64 len; +	int i; +	struct page **pages; +	pgoff_t next_index; +	int nr_pages = 0; +	int ret; -	dout("readpages %p file %p nr_pages %d\n", -	     inode, file, nr_pages); - -	pages = page_vector_from_list(page_list, &nr_pages); -	if (IS_ERR(pages)) -		return PTR_ERR(pages); +	off = (u64) page_offset(page); -	/* guess read extent */ -	offset = pages[0]->index << PAGE_CACHE_SHIFT; +	/* count pages */ +	next_index = page->index; +	list_for_each_entry_reverse(page, page_list, lru) { +		if (page->index != next_index) +			break; +		nr_pages++; +		next_index++; +		if (max && nr_pages == max) +			break; +	}  	len = nr_pages << PAGE_CACHE_SHIFT; -	rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, -				 offset, &len, -				 ci->i_truncate_seq, ci->i_truncate_size, -				 pages, nr_pages); -	if (rc == -ENOENT) -		rc = 0; -	if (rc < 0) -		goto out; - -	for (; !list_empty(page_list) && len > 0; -	     rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { -		struct page *page = -			list_entry(page_list->prev, struct page, lru); +	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, +	     off, len); +	vino = ceph_vino(inode); +	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, +				    1, CEPH_OSD_OP_READ, +				    CEPH_OSD_FLAG_READ, NULL, +				    ci->i_truncate_seq, ci->i_truncate_size, +				    false); +	if (IS_ERR(req)) +		return PTR_ERR(req); +	/* build page vector */ +	nr_pages = calc_pages_for(0, len); +	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); +	ret = -ENOMEM; +	if (!pages) +		goto out; +	for (i = 0; i < nr_pages; ++i) { +		page = list_entry(page_list->prev, struct page, lru); +		BUG_ON(PageLocked(page));  		list_del(&page->lru); -		if (rc < (int)PAGE_CACHE_SIZE) { -			/* zero (remainder of) page */ -			int s = rc < 0 ? 0 : rc; -			zero_user_segment(page, s, PAGE_CACHE_SIZE); -		} - -		if (add_to_page_cache_lru(page, mapping, page->index, + 		dout("start_read %p adding %p idx %lu\n", inode, page, +		     page->index); +		if (add_to_page_cache_lru(page, &inode->i_data, page->index,  					  GFP_NOFS)) { +			ceph_fscache_uncache_page(inode, page);  			page_cache_release(page); -			dout("readpages %p add_to_page_cache failed %p\n", +			dout("start_read %p add_to_page_cache failed %p\n",  			     inode, page); -			continue; +			nr_pages = i; +			goto out_pages;  		} -		dout("readpages %p adding %p idx %lu\n", inode, page, -		     page->index); -		flush_dcache_page(page); -		SetPageUptodate(page); -		unlock_page(page); -		page_cache_release(page); +		pages[i] = page;  	} -	rc = 0; +	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false); +	req->r_callback = finish_read; +	req->r_inode = inode; + +	ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); + +	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); +	ret = ceph_osdc_start_request(osdc, req, false); +	if (ret < 0) +		goto out_pages; +	ceph_osdc_put_request(req); +	return nr_pages; +out_pages: +	ceph_unlock_page_vector(pages, nr_pages); +	ceph_release_page_vector(pages, nr_pages);  out: -	kfree(pages); +	ceph_osdc_put_request(req); +	return ret; +} + + +/* + * Read multiple pages.  Leave pages we don't read + unlock in page_list; + * the caller (VM) cleans them up. + */ +static int ceph_readpages(struct file *file, struct address_space *mapping, +			  struct list_head *page_list, unsigned nr_pages) +{ +	struct inode *inode = file_inode(file); +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode); +	int rc = 0; +	int max = 0; + +	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, +					 &nr_pages); + +	if (rc == 0) +		goto out; + +	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) +		max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) +			>> PAGE_SHIFT; + +	dout("readpages %p file %p nr_pages %d max %d\n", inode, +		file, nr_pages, +	     max); +	while (!list_empty(page_list)) { +		rc = start_read(inode, page_list, max); +		if (rc < 0) +			goto out; +		BUG_ON(rc == 0); +	} +out: +	ceph_fscache_readpages_cancel(inode, page_list); + +	dout("readpages %p file %p ret %d\n", inode, file, rc);  	return rc;  } @@ -338,7 +421,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,  	struct ceph_snap_context *snapc = NULL;  	struct ceph_cap_snap *capsnap = NULL; -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {  		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,  		     capsnap->context, capsnap->dirty_pages); @@ -354,7 +437,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,  		dout(" head snapc %p has %d dirty pages\n",  		     snapc, ci->i_wrbuffer_ref_head);  	} -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock);  	return snapc;  } @@ -370,13 +453,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  	struct ceph_inode_info *ci;  	struct ceph_fs_client *fsc;  	struct ceph_osd_client *osdc; -	loff_t page_off = page->index << PAGE_CACHE_SHIFT; -	int len = PAGE_CACHE_SIZE; -	loff_t i_size; -	int err = 0;  	struct ceph_snap_context *snapc, *oldest; -	u64 snap_size = 0; +	loff_t page_off = page_offset(page);  	long writeback_stat; +	u64 truncate_size, snap_size = 0; +	u32 truncate_seq; +	int err = 0, len = PAGE_CACHE_SIZE;  	dout("writepage %p idx %lu\n", page, page->index); @@ -390,7 +472,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  	osdc = &fsc->client->osdc;  	/* verify this is a writeable snap context */ -	snapc = (void *)page->private; +	snapc = page_snap_context(page);  	if (snapc == NULL) {  		dout("writepage %p page %p not dirty?\n", inode, page);  		goto out; @@ -398,7 +480,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  	oldest = get_oldest_context(inode, &snap_size);  	if (snapc->seq > oldest->seq) {  		dout("writepage %p page %p snapc %p not writeable - noop\n", -		     inode, page, (void *)page->private); +		     inode, page, snapc);  		/* we should only noop if called by kswapd */  		WARN_ON((current->flags & PF_MEMALLOC) == 0);  		ceph_put_snap_context(oldest); @@ -406,13 +488,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  	}  	ceph_put_snap_context(oldest); +	spin_lock(&ci->i_ceph_lock); +	truncate_seq = ci->i_truncate_seq; +	truncate_size = ci->i_truncate_size; +	if (!snap_size) +		snap_size = i_size_read(inode); +	spin_unlock(&ci->i_ceph_lock); +  	/* is this a partial page at end of file? */ -	if (snap_size) -		i_size = snap_size; -	else -		i_size = i_size_read(inode); -	if (i_size < page_off + len) -		len = i_size - page_off; +	if (page_off >= snap_size) { +		dout("%p page eof %llu\n", page, snap_size); +		goto out; +	} +	if (snap_size < page_off + len) +		len = snap_size - page_off;  	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",  	     inode, page, page->index, page_off, len, snapc); @@ -422,13 +511,14 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))  		set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); +	ceph_readpage_to_fscache(inode, page); +  	set_page_writeback(page);  	err = ceph_osdc_writepages(osdc, ceph_vino(inode),  				   &ci->i_layout, snapc,  				   page_off, len, -				   ci->i_truncate_seq, ci->i_truncate_size, -				   &inode->i_mtime, -				   &page, 1, 0, 0, true); +				   truncate_seq, truncate_size, +				   &inode->i_mtime, &page, 1);  	if (err < 0) {  		dout("writepage setting page/mapping error %d %p\n", err, page);  		SetPageError(page); @@ -453,7 +543,7 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)  	int err;  	struct inode *inode = page->mapping->host;  	BUG_ON(!inode); -	igrab(inode); +	ihold(inode);  	err = writepage_nounlock(page, wbc);  	unlock_page(page);  	iput(inode); @@ -478,7 +568,6 @@ static void ceph_release_pages(struct page **pages, int num)  	pagevec_release(&pvec);  } -  /*   * async writeback completion handler.   * @@ -489,27 +578,24 @@ static void writepages_finish(struct ceph_osd_request *req,  			      struct ceph_msg *msg)  {  	struct inode *inode = req->r_inode; -	struct ceph_osd_reply_head *replyhead; -	struct ceph_osd_op *op;  	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_osd_data *osd_data;  	unsigned wrote;  	struct page *page; +	int num_pages;  	int i;  	struct ceph_snap_context *snapc = req->r_snapc;  	struct address_space *mapping = inode->i_mapping; -	__s32 rc = -EIO; -	u64 bytes = 0; +	int rc = req->r_result; +	u64 bytes = req->r_ops[0].extent.length;  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	long writeback_stat;  	unsigned issued = ceph_caps_issued(ci); -	/* parse reply */ -	replyhead = msg->front.iov_base; -	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); -	op = (void *)(replyhead + 1); -	rc = le32_to_cpu(replyhead->result); -	bytes = le64_to_cpu(op->extent.length); - +	osd_data = osd_req_op_extent_osd_data(req, 0); +	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); +	num_pages = calc_pages_for((u64)osd_data->alignment, +					(u64)osd_data->length);  	if (rc >= 0) {  		/*  		 * Assume we wrote the pages we originally sent.  The @@ -517,7 +603,7 @@ static void writepages_finish(struct ceph_osd_request *req,  		 * raced with a truncation and was adjusted at the osd,  		 * so don't believe the reply.  		 */ -		wrote = req->r_num_pages; +		wrote = num_pages;  	} else {  		wrote = 0;  		mapping_set_error(mapping, rc); @@ -526,8 +612,8 @@ static void writepages_finish(struct ceph_osd_request *req,  	     inode, rc, bytes, wrote);  	/* clean all pages */ -	for (i = 0; i < req->r_num_pages; i++) { -		page = req->r_pages[i]; +	for (i = 0; i < num_pages; i++) { +		page = osd_data->pages[i];  		BUG_ON(!page);  		WARN_ON(!PageUptodate(page)); @@ -538,7 +624,7 @@ static void writepages_finish(struct ceph_osd_request *req,  			clear_bdi_congested(&fsc->backing_dev_info,  					    BLK_RW_ASYNC); -		ceph_put_snap_context((void *)page->private); +		ceph_put_snap_context(page_snap_context(page));  		page->private = 0;  		ClearPagePrivate(page);  		dout("unlocking %d %p\n", i, page); @@ -556,35 +642,18 @@ static void writepages_finish(struct ceph_osd_request *req,  		unlock_page(page);  	}  	dout("%p wrote+cleaned %d pages\n", inode, wrote); -	ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc); +	ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc); -	ceph_release_pages(req->r_pages, req->r_num_pages); -	if (req->r_pages_from_pool) -		mempool_free(req->r_pages, +	ceph_release_pages(osd_data->pages, num_pages); +	if (osd_data->pages_from_pool) +		mempool_free(osd_data->pages,  			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);  	else -		kfree(req->r_pages); +		kfree(osd_data->pages);  	ceph_osdc_put_request(req);  }  /* - * allocate a page vec, either directly, or if necessary, via a the - * mempool.  we avoid the mempool if we can because req->r_num_pages - * may be less than the maximum write size. - */ -static void alloc_page_vec(struct ceph_fs_client *fsc, -			   struct ceph_osd_request *req) -{ -	req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages, -			       GFP_NOFS); -	if (!req->r_pages) { -		req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); -		req->r_pages_from_pool = 1; -		WARN_ON(!req->r_pages); -	} -} - -/*   * initiate async writeback   */  static int ceph_writepages_start(struct address_space *mapping, @@ -592,7 +661,8 @@ static int ceph_writepages_start(struct address_space *mapping,  {  	struct inode *inode = mapping->host;  	struct ceph_inode_info *ci = ceph_inode(inode); -	struct ceph_fs_client *fsc; +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode); +	struct ceph_vino vino = ceph_vino(inode);  	pgoff_t index, start, end;  	int range_whole = 0;  	int should_loop = 1; @@ -604,24 +674,24 @@ static int ceph_writepages_start(struct address_space *mapping,  	unsigned wsize = 1 << inode->i_blkbits;  	struct ceph_osd_request *req = NULL;  	int do_sync; -	u64 snap_size = 0; +	u64 truncate_size, snap_size; +	u32 truncate_seq;  	/*  	 * Include a 'sync' in the OSD request if this is a data  	 * integrity write (e.g., O_SYNC write or fsync()), or if our  	 * cap is being revoked.  	 */ -	do_sync = wbc->sync_mode == WB_SYNC_ALL; -	if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) +	if ((wbc->sync_mode == WB_SYNC_ALL) || +		ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))  		do_sync = 1;  	dout("writepages_start %p dosync=%d (mode=%s)\n",  	     inode, do_sync,  	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :  	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); -	fsc = ceph_inode_to_client(inode);  	if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { -		pr_warning("writepage_start %p on forced umount\n", inode); +		pr_warn("writepage_start %p on forced umount\n", inode);  		return -EIO; /* we're in a forced umount, don't write! */  	}  	if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize) @@ -650,6 +720,7 @@ static int ceph_writepages_start(struct address_space *mapping,  retry:  	/* find oldest snap context with dirty data */  	ceph_put_snap_context(snapc); +	snap_size = 0;  	snapc = get_oldest_context(inode, &snap_size);  	if (!snapc) {  		/* hmm, why does writepages get called when there @@ -657,8 +728,18 @@ retry:  		dout(" no snap context with dirty data?\n");  		goto out;  	} +	if (snap_size == 0) +		snap_size = i_size_read(inode);  	dout(" oldest snapc is %p seq %lld (%d snaps)\n",  	     snapc, snapc->seq, snapc->num_snaps); + +	spin_lock(&ci->i_ceph_lock); +	truncate_seq = ci->i_truncate_seq; +	truncate_size = ci->i_truncate_size; +	if (!snap_size) +		snap_size = i_size_read(inode); +	spin_unlock(&ci->i_ceph_lock); +  	if (last_snapc && snapc != last_snapc) {  		/* if we switched to a newer snapc, restart our scan at the  		 * start of the original file range. */ @@ -669,15 +750,16 @@ retry:  	last_snapc = snapc;  	while (!done && index <= end) { +		int num_ops = do_sync ? 2 : 1;  		unsigned i;  		int first;  		pgoff_t next;  		int pvec_pages, locked_pages; +		struct page **pages = NULL; +		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */  		struct page *page;  		int want;  		u64 offset, len; -		struct ceph_osd_request_head *reqhead; -		struct ceph_osd_op *op;  		long writeback_stat;  		next = 0; @@ -726,11 +808,8 @@ get_more_pages:  				dout("waiting on writeback %p\n", page);  				wait_on_page_writeback(page);  			} -			if ((snap_size && page_offset(page) > snap_size) || -			    (!snap_size && -			     page_offset(page) > i_size_read(inode))) { -				dout("%p page eof %llu\n", page, snap_size ? -				     snap_size : i_size_read(inode)); +			if (page_offset(page) >= snap_size) { +				dout("%p page eof %llu\n", page, snap_size);  				done = 1;  				unlock_page(page);  				break; @@ -742,7 +821,7 @@ get_more_pages:  			}  			/* only if matching snap context */ -			pgsnapc = (void *)page->private; +			pgsnapc = page_snap_context(page);  			if (pgsnapc->seq > snapc->seq) {  				dout("page snapc %p %lld > oldest %p %lld\n",  				     pgsnapc, pgsnapc->seq, snapc, snapc->seq); @@ -758,28 +837,42 @@ get_more_pages:  				break;  			} -			/* ok */ +			/* +			 * We have something to write.  If this is +			 * the first locked page this time through, +			 * allocate an osd request and a page array +			 * that it will use. +			 */  			if (locked_pages == 0) { +				BUG_ON(pages);  				/* prepare async write request */ -				offset = (unsigned long long)page->index -					<< PAGE_CACHE_SHIFT; +				offset = (u64)page_offset(page);  				len = wsize;  				req = ceph_osdc_new_request(&fsc->client->osdc, -					    &ci->i_layout, -					    ceph_vino(inode), -					    offset, &len, -					    CEPH_OSD_OP_WRITE, -					    CEPH_OSD_FLAG_WRITE | -						    CEPH_OSD_FLAG_ONDISK, -					    snapc, do_sync, -					    ci->i_truncate_seq, -					    ci->i_truncate_size, -					    &inode->i_mtime, true, 1); -				max_pages = req->r_num_pages; - -				alloc_page_vec(fsc, req); +							&ci->i_layout, vino, +							offset, &len, num_ops, +							CEPH_OSD_OP_WRITE, +							CEPH_OSD_FLAG_WRITE | +							CEPH_OSD_FLAG_ONDISK, +							snapc, truncate_seq, +							truncate_size, true); +				if (IS_ERR(req)) { +					rc = PTR_ERR(req); +					unlock_page(page); +					break; +				} +  				req->r_callback = writepages_finish;  				req->r_inode = inode; + +				max_pages = calc_pages_for(0, (u64)len); +				pages = kmalloc(max_pages * sizeof (*pages), +						GFP_NOFS); +				if (!pages) { +					pool = fsc->wb_pagevec_pool; +					pages = mempool_alloc(pool, GFP_NOFS); +					BUG_ON(!pages); +				}  			}  			/* note position of first page in pvec */ @@ -797,7 +890,7 @@ get_more_pages:  			}  			set_page_writeback(page); -			req->r_pages[locked_pages] = page; +			pages[locked_pages] = page;  			locked_pages++;  			next = page->index + 1;  		} @@ -826,22 +919,30 @@ get_more_pages:  			pvec.nr -= i-first;  		} -		/* submit the write */ -		offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT; -		len = min((snap_size ? snap_size : i_size_read(inode)) - offset, +		/* Format the osd request message and submit the write */ + +		offset = page_offset(pages[0]); +		len = min(snap_size - offset,  			  (u64)locked_pages << PAGE_CACHE_SHIFT);  		dout("writepages got %d pages at %llu~%llu\n",  		     locked_pages, offset, len); -		/* revise final length, page count */ -		req->r_num_pages = locked_pages; -		reqhead = req->r_request->front.iov_base; -		op = (void *)(reqhead + 1); -		op->extent.length = cpu_to_le64(len); -		op->payload_len = cpu_to_le32(len); -		req->r_request->hdr.data_len = cpu_to_le32(len); +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, +							!!pool, false); + +		pages = NULL;	/* request message now owns the pages array */ +		pool = NULL; + +		/* Update the write op length in case we changed it */ -		ceph_osdc_start_request(&fsc->client->osdc, req, true); +		osd_req_op_extent_update(req, 0, len); + +		vino = ceph_vino(inode); +		ceph_osdc_build_request(req, offset, snapc, vino.snap, +					&inode->i_mtime); + +		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); +		BUG_ON(rc);  		req = NULL;  		/* continue? */ @@ -873,8 +974,6 @@ release_pvec_pages:  out:  	if (req)  		ceph_osdc_put_request(req); -	if (rc > 0) -		rc = 0;  /* vfs expects us to return 0 */  	ceph_put_snap_context(snapc);  	dout("writepages done, rc = %d\n", rc);  	return rc; @@ -907,7 +1006,7 @@ static int ceph_update_writeable_page(struct file *file,  			    loff_t pos, unsigned len,  			    struct page *page)  { -	struct inode *inode = file->f_dentry->d_inode; +	struct inode *inode = file_inode(file);  	struct ceph_inode_info *ci = ceph_inode(inode);  	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;  	loff_t page_off = pos & PAGE_CACHE_MASK; @@ -925,7 +1024,7 @@ retry_locked:  	BUG_ON(!ci->i_snap_realm);  	down_read(&mdsc->snap_rwsem);  	BUG_ON(!ci->i_snap_realm->cached_context); -	snapc = (void *)page->private; +	snapc = page_snap_context(page);  	if (snapc && snapc != ci->i_head_snapc) {  		/*  		 * this page is already dirty in another (older) snap @@ -1016,7 +1115,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,  			    loff_t pos, unsigned len, unsigned flags,  			    struct page **pagep, void **fsdata)  { -	struct inode *inode = file->f_dentry->d_inode; +	struct inode *inode = file_inode(file);  	struct page *page;  	pgoff_t index = pos >> PAGE_CACHE_SHIFT;  	int r; @@ -1046,7 +1145,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,  			  loff_t pos, unsigned len, unsigned copied,  			  struct page *page, void *fsdata)  { -	struct inode *inode = file->f_dentry->d_inode; +	struct inode *inode = file_inode(file);  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	struct ceph_mds_client *mdsc = fsc->mdsc;  	unsigned from = pos & (PAGE_CACHE_SIZE - 1); @@ -1085,8 +1184,8 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,   * never get called.   */  static ssize_t ceph_direct_io(int rw, struct kiocb *iocb, -			      const struct iovec *iov, -			      loff_t pos, unsigned long nr_segs) +			      struct iov_iter *iter, +			      loff_t pos)  {  	WARN_ON(1);  	return -EINVAL; @@ -1109,27 +1208,83 @@ const struct address_space_operations ceph_aops = {  /*   * vm ops   */ +static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) +{ +	struct inode *inode = file_inode(vma->vm_file); +	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_file_info *fi = vma->vm_file->private_data; +	loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; +	int want, got, ret; + +	dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", +	     inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE); +	if (fi->fmode & CEPH_FILE_MODE_LAZY) +		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; +	else +		want = CEPH_CAP_FILE_CACHE; +	while (1) { +		got = 0; +		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); +		if (ret == 0) +			break; +		if (ret != -ERESTARTSYS) { +			WARN_ON(1); +			return VM_FAULT_SIGBUS; +		} +	} +	dout("filemap_fault %p %llu~%zd got cap refs on %s\n", +	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); + +	ret = filemap_fault(vma, vmf); + +	dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", +	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); +	ceph_put_cap_refs(ci, got); + +	return ret; +}  /*   * Reuse write_begin here for simplicity.   */  static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)  { -	struct inode *inode = vma->vm_file->f_dentry->d_inode; -	struct page *page = vmf->page; +	struct inode *inode = file_inode(vma->vm_file); +	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_file_info *fi = vma->vm_file->private_data;  	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; -	loff_t off = page->index << PAGE_CACHE_SHIFT; -	loff_t size, len; -	int ret; +	struct page *page = vmf->page; +	loff_t off = page_offset(page); +	loff_t size = i_size_read(inode); +	size_t len; +	int want, got, ret; -	size = i_size_read(inode);  	if (off + PAGE_CACHE_SIZE <= size)  		len = PAGE_CACHE_SIZE;  	else  		len = size & ~PAGE_CACHE_MASK; -	dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, -	     off, len, page, page->index); +	dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", +	     inode, ceph_vinop(inode), off, len, size); +	if (fi->fmode & CEPH_FILE_MODE_LAZY) +		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; +	else +		want = CEPH_CAP_FILE_BUFFER; +	while (1) { +		got = 0; +		ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); +		if (ret == 0) +			break; +		if (ret != -ERESTARTSYS) { +			WARN_ON(1); +			return VM_FAULT_SIGBUS; +		} +	} +	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", +	     inode, off, len, ceph_cap_string(got)); + +	/* Update time before taking page lock */ +	file_update_time(vma->vm_file);  	lock_page(page); @@ -1151,15 +1306,28 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)  			ret = VM_FAULT_SIGBUS;  	}  out: -	dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); -	if (ret != VM_FAULT_LOCKED) +	if (ret != VM_FAULT_LOCKED) {  		unlock_page(page); +	} else { +		int dirty; +		spin_lock(&ci->i_ceph_lock); +		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); +		spin_unlock(&ci->i_ceph_lock); +		if (dirty) +			__mark_inode_dirty(inode, dirty); +	} + +	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", +	     inode, off, len, ceph_cap_string(got), ret); +	ceph_put_cap_refs(ci, got); +  	return ret;  }  static struct vm_operations_struct ceph_vmops = { -	.fault		= filemap_fault, +	.fault		= ceph_filemap_fault,  	.page_mkwrite	= ceph_page_mkwrite, +	.remap_pages	= generic_file_remap_pages,  };  int ceph_mmap(struct file *file, struct vm_area_struct *vma) @@ -1170,6 +1338,5 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)  		return -ENOEXEC;  	file_accessed(file);  	vma->vm_ops = &ceph_vmops; -	vma->vm_flags |= VM_CAN_NONLINEAR;  	return 0;  }  | 
