diff options
Diffstat (limited to 'fs/ceph/inode.c')
| -rw-r--r-- | fs/ceph/inode.c | 892 | 
1 files changed, 541 insertions, 351 deletions
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1d6a45b5a04..04c89c266ce 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -2,7 +2,6 @@  #include <linux/module.h>  #include <linux/fs.h> -#include <linux/smp_lock.h>  #include <linux/slab.h>  #include <linux/string.h>  #include <linux/uaccess.h> @@ -10,10 +9,12 @@  #include <linux/namei.h>  #include <linux/writeback.h>  #include <linux/vmalloc.h> -#include <linux/pagevec.h> +#include <linux/posix_acl.h> +#include <linux/random.h>  #include "super.h"  #include "mds_client.h" +#include "cache.h"  #include <linux/ceph/decode.h>  /* @@ -37,6 +38,13 @@ static void ceph_vmtruncate_work(struct work_struct *work);  /*   * find or create an inode, given the ceph ino number   */ +static int ceph_set_ino_cb(struct inode *inode, void *data) +{ +	ceph_inode(inode)->i_vino = *(struct ceph_vino *)data; +	inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data); +	return 0; +} +  struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)  {  	struct inode *inode; @@ -89,6 +97,8 @@ const struct inode_operations ceph_file_iops = {  	.getxattr = ceph_getxattr,  	.listxattr = ceph_listxattr,  	.removexattr = ceph_removexattr, +	.get_acl = ceph_get_acl, +	.set_acl = ceph_set_acl,  }; @@ -170,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)   * specified, copy the frag delegation info to the caller if   * it is present.   */ -u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, -		     struct ceph_inode_frag *pfrag, -		     int *found) +static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v, +			      struct ceph_inode_frag *pfrag, int *found)  {  	u32 t = ceph_frag_make(0, 0);  	struct ceph_inode_frag *frag; @@ -182,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,  	if (found)  		*found = 0; -	mutex_lock(&ci->i_fragtree_mutex);  	while (1) {  		WARN_ON(!ceph_frag_contains_value(t, v));  		frag = __ceph_find_frag(ci, t); @@ -211,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,  	}  	dout("choose_frag(%x) = %x\n", v, t); -	mutex_unlock(&ci->i_fragtree_mutex);  	return t;  } +u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, +		     struct ceph_inode_frag *pfrag, int *found) +{ +	u32 ret; +	mutex_lock(&ci->i_fragtree_mutex); +	ret = __ceph_choose_frag(ci, v, pfrag, found); +	mutex_unlock(&ci->i_fragtree_mutex); +	return ret; +} +  /*   * Process dirfrag (delegation) info from the mds.  Include leaf   * fragment in tree ONLY if ndist > 0.  Otherwise, only @@ -228,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,  	u32 id = le32_to_cpu(dirinfo->frag);  	int mds = le32_to_cpu(dirinfo->auth);  	int ndist = le32_to_cpu(dirinfo->ndist); +	int diri_auth = -1;  	int i;  	int err = 0; +	spin_lock(&ci->i_ceph_lock); +	if (ci->i_auth_cap) +		diri_auth = ci->i_auth_cap->mds; +	spin_unlock(&ci->i_ceph_lock); +  	mutex_lock(&ci->i_fragtree_mutex); -	if (ndist == 0) { +	if (ndist == 0 && mds == diri_auth) {  		/* no delegation info needed. */  		frag = __ceph_find_frag(ci, id);  		if (!frag) @@ -277,6 +300,75 @@ out:  	return err;  } +static int ceph_fill_fragtree(struct inode *inode, +			      struct ceph_frag_tree_head *fragtree, +			      struct ceph_mds_reply_dirfrag *dirinfo) +{ +	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_inode_frag *frag; +	struct rb_node *rb_node; +	int i; +	u32 id, nsplits; +	bool update = false; + +	mutex_lock(&ci->i_fragtree_mutex); +	nsplits = le32_to_cpu(fragtree->nsplits); +	if (nsplits) { +		i = prandom_u32() % nsplits; +		id = le32_to_cpu(fragtree->splits[i].frag); +		if (!__ceph_find_frag(ci, id)) +			update = true; +	} else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) { +		rb_node = rb_first(&ci->i_fragtree); +		frag = rb_entry(rb_node, struct ceph_inode_frag, node); +		if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node)) +			update = true; +	} +	if (!update && dirinfo) { +		id = le32_to_cpu(dirinfo->frag); +		if (id != __ceph_choose_frag(ci, id, NULL, NULL)) +			update = true; +	} +	if (!update) +		goto out_unlock; + +	dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode)); +	rb_node = rb_first(&ci->i_fragtree); +	for (i = 0; i < nsplits; i++) { +		id = le32_to_cpu(fragtree->splits[i].frag); +		frag = NULL; +		while (rb_node) { +			frag = rb_entry(rb_node, struct ceph_inode_frag, node); +			if (ceph_frag_compare(frag->frag, id) >= 0) { +				if (frag->frag != id) +					frag = NULL; +				else +					rb_node = rb_next(rb_node); +				break; +			} +			rb_node = rb_next(rb_node); +			rb_erase(&frag->node, &ci->i_fragtree); +			kfree(frag); +			frag = NULL; +		} +		if (!frag) { +			frag = __get_or_create_frag(ci, id); +			if (IS_ERR(frag)) +				continue; +		} +		frag->split_by = le32_to_cpu(fragtree->splits[i].by); +		dout(" frag %x split by %d\n", frag->frag, frag->split_by); +	} +	while (rb_node) { +		frag = rb_entry(rb_node, struct ceph_inode_frag, node); +		rb_node = rb_next(rb_node); +		rb_erase(&frag->node, &ci->i_fragtree); +		kfree(frag); +	} +out_unlock: +	mutex_unlock(&ci->i_fragtree_mutex); +	return 0; +}  /*   * initialize a newly allocated inode. @@ -292,12 +384,17 @@ struct inode *ceph_alloc_inode(struct super_block *sb)  	dout("alloc_inode %p\n", &ci->vfs_inode); +	spin_lock_init(&ci->i_ceph_lock); +  	ci->i_version = 0;  	ci->i_time_warp_seq = 0;  	ci->i_ceph_flags = 0; -	ci->i_release_count = 0; +	atomic_set(&ci->i_release_count, 1); +	atomic_set(&ci->i_complete_count, 0);  	ci->i_symlink = NULL; +	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); +  	ci->i_fragtree = RB_ROOT;  	mutex_init(&ci->i_fragtree_mutex); @@ -324,9 +421,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)  	ci->i_hold_caps_min = 0;  	ci->i_hold_caps_max = 0;  	INIT_LIST_HEAD(&ci->i_cap_delay_list); -	ci->i_cap_exporting_mds = 0; -	ci->i_cap_exporting_mseq = 0; -	ci->i_cap_exporting_issued = 0;  	INIT_LIST_HEAD(&ci->i_cap_snaps);  	ci->i_head_snapc = NULL;  	ci->i_snap_caps = 0; @@ -334,6 +428,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)  	for (i = 0; i < CEPH_FILE_MODE_NUM; i++)  		ci->i_nr_by_mode[i] = 0; +	mutex_init(&ci->i_truncate_mutex);  	ci->i_truncate_seq = 0;  	ci->i_truncate_size = 0;  	ci->i_truncate_pending = 0; @@ -347,6 +442,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)  	ci->i_rd_ref = 0;  	ci->i_rdcache_ref = 0;  	ci->i_wr_ref = 0; +	ci->i_wb_ref = 0;  	ci->i_wrbuffer_ref = 0;  	ci->i_wrbuffer_ref_head = 0;  	ci->i_shared_gen = 0; @@ -366,9 +462,19 @@ struct inode *ceph_alloc_inode(struct super_block *sb)  	INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); +	ceph_fscache_inode_init(ci); +  	return &ci->vfs_inode;  } +static void ceph_i_callback(struct rcu_head *head) +{ +	struct inode *inode = container_of(head, struct inode, i_rcu); +	struct ceph_inode_info *ci = ceph_inode(inode); + +	kmem_cache_free(ceph_inode_cachep, ci); +} +  void ceph_destroy_inode(struct inode *inode)  {  	struct ceph_inode_info *ci = ceph_inode(inode); @@ -377,11 +483,13 @@ void ceph_destroy_inode(struct inode *inode)  	dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode)); +	ceph_fscache_unregister_inode_cookie(ci); +  	ceph_queue_caps_release(inode);  	/*  	 * we may still have a snap_realm reference if there are stray -	 * caps in i_cap_exporting_issued or i_snap_caps. +	 * caps in i_snap_caps.  	 */  	if (ci->i_snap_realm) {  		struct ceph_mds_client *mdsc = @@ -408,9 +516,18 @@ void ceph_destroy_inode(struct inode *inode)  	if (ci->i_xattrs.prealloc_blob)  		ceph_buffer_put(ci->i_xattrs.prealloc_blob); -	kmem_cache_free(ceph_inode_cachep, ci); +	call_rcu(&inode->i_rcu, ceph_i_callback);  } +int ceph_drop_inode(struct inode *inode) +{ +	/* +	 * Positve dentry and corresponding inode are always accompanied +	 * in MDS reply. So no need to keep inode in the cache after +	 * dropping all its aliases. +	 */ +	return 1; +}  /*   * Helpers to fill in size, ctime, mtime, and atime.  We have to be @@ -436,16 +553,20 @@ int ceph_fill_file_size(struct inode *inode, int issued,  			dout("truncate_seq %u -> %u\n",  			     ci->i_truncate_seq, truncate_seq);  			ci->i_truncate_seq = truncate_seq; + +			/* the MDS should have revoked these caps */ +			WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL | +					       CEPH_CAP_FILE_RD | +					       CEPH_CAP_FILE_WR | +					       CEPH_CAP_FILE_LAZYIO));  			/*  			 * If we hold relevant caps, or in the case where we're  			 * not the only client referencing this file and we  			 * don't hold those caps, then we need to check whether  			 * the file is either opened or mmaped  			 */ -			if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD| -				       CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER| -				       CEPH_CAP_FILE_EXCL| -				       CEPH_CAP_FILE_LAZYIO)) || +			if ((issued & (CEPH_CAP_FILE_CACHE| +				       CEPH_CAP_FILE_BUFFER)) ||  			    mapping_mapped(inode->i_mapping) ||  			    __ceph_caps_file_wanted(ci)) {  				ci->i_truncate_pending++; @@ -459,6 +580,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,  		     truncate_size);  		ci->i_truncate_size = truncate_size;  	} + +	if (queue_trunc) +		ceph_fscache_invalidate(inode); +  	return queue_trunc;  } @@ -471,7 +596,9 @@ void ceph_fill_file_time(struct inode *inode, int issued,  	if (issued & (CEPH_CAP_FILE_EXCL|  		      CEPH_CAP_FILE_WR| -		      CEPH_CAP_FILE_BUFFER)) { +		      CEPH_CAP_FILE_BUFFER| +		      CEPH_CAP_AUTH_EXCL| +		      CEPH_CAP_XATTR_EXCL)) {  		if (timespec_compare(ctime, &inode->i_ctime) > 0) {  			dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",  			     inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec, @@ -511,7 +638,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,  			warn = 1;  		}  	} else { -		/* we have no write caps; whatever the MDS says is true */ +		/* we have no write|excl caps; whatever the MDS says is true */  		if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {  			inode->i_ctime = *ctime;  			inode->i_mtime = *mtime; @@ -537,20 +664,26 @@ static int fill_inode(struct inode *inode,  		      unsigned long ttl_from, int cap_fmode,  		      struct ceph_cap_reservation *caps_reservation)  { +	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;  	struct ceph_mds_reply_inode *info = iinfo->in;  	struct ceph_inode_info *ci = ceph_inode(inode); -	int i; -	int issued, implemented; +	int issued = 0, implemented, new_issued;  	struct timespec mtime, atime, ctime; -	u32 nsplits;  	struct ceph_buffer *xattr_blob = NULL; +	struct ceph_cap *new_cap = NULL;  	int err = 0; -	int queue_trunc = 0; +	bool wake = false; +	bool queue_trunc = false; +	bool new_version = false;  	dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",  	     inode, ceph_vinop(inode), le64_to_cpu(info->version),  	     ci->i_version); +	/* prealloc new cap struct */ +	if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP) +		new_cap = ceph_get_cap(mdsc, caps_reservation); +  	/*  	 * prealloc xattr data, if it looks like we'll need it.  only  	 * if len > 4 (meaning there are actually xattrs; the first 4 @@ -563,52 +696,73 @@ static int fill_inode(struct inode *inode,  			       iinfo->xattr_len);  	} -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	/*  	 * provided version will be odd if inode value is projected, -	 * even if stable.  skip the update if we have a newer info -	 * (e.g., due to inode info racing form multiple MDSs), or if -	 * we are getting projected (unstable) inode info. +	 * even if stable.  skip the update if we have newer stable +	 * info (ours>=theirs, e.g. due to racing mds replies), unless +	 * we are getting projected (unstable) info (in which case the +	 * version is odd, and we want ours>theirs). +	 *   us   them +	 *   2    2     skip +	 *   3    2     skip +	 *   3    3     update  	 */ -	if (le64_to_cpu(info->version) > 0 && -	    (ci->i_version & ~1) > le64_to_cpu(info->version)) -		goto no_change; +	if (ci->i_version == 0 || +	    ((info->cap.flags & CEPH_CAP_FLAG_AUTH) && +	     le64_to_cpu(info->version) > (ci->i_version & ~1))) +		new_version = true;  	issued = __ceph_caps_issued(ci, &implemented);  	issued |= implemented | __ceph_caps_dirty(ci); +	new_issued = ~issued & le32_to_cpu(info->cap.caps);  	/* update inode */  	ci->i_version = le64_to_cpu(info->version);  	inode->i_version++;  	inode->i_rdev = le32_to_cpu(info->rdev); +	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; -	if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { +	if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) && +	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {  		inode->i_mode = le32_to_cpu(info->mode); -		inode->i_uid = le32_to_cpu(info->uid); -		inode->i_gid = le32_to_cpu(info->gid); +		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid)); +		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));  		dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, -		     inode->i_uid, inode->i_gid); +		     from_kuid(&init_user_ns, inode->i_uid), +		     from_kgid(&init_user_ns, inode->i_gid));  	} -	if ((issued & CEPH_CAP_LINK_EXCL) == 0) -		inode->i_nlink = le32_to_cpu(info->nlink); - -	/* be careful with mtime, atime, size */ -	ceph_decode_timespec(&atime, &info->atime); -	ceph_decode_timespec(&mtime, &info->mtime); -	ceph_decode_timespec(&ctime, &info->ctime); -	queue_trunc = ceph_fill_file_size(inode, issued, -					  le32_to_cpu(info->truncate_seq), -					  le64_to_cpu(info->truncate_size), -					  le64_to_cpu(info->size)); -	ceph_fill_file_time(inode, issued, -			    le32_to_cpu(info->time_warp_seq), -			    &ctime, &mtime, &atime); - -	ci->i_max_size = le64_to_cpu(info->max_size); -	ci->i_layout = info->layout; -	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; +	if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) && +	    (issued & CEPH_CAP_LINK_EXCL) == 0) +		set_nlink(inode, le32_to_cpu(info->nlink)); + +	if (new_version || (new_issued & CEPH_CAP_ANY_RD)) { +		/* be careful with mtime, atime, size */ +		ceph_decode_timespec(&atime, &info->atime); +		ceph_decode_timespec(&mtime, &info->mtime); +		ceph_decode_timespec(&ctime, &info->ctime); +		ceph_fill_file_time(inode, issued, +				le32_to_cpu(info->time_warp_seq), +				&ctime, &mtime, &atime); +	} + +	if (new_version || +	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { +		ci->i_layout = info->layout; +		queue_trunc = ceph_fill_file_size(inode, issued, +					le32_to_cpu(info->truncate_seq), +					le64_to_cpu(info->truncate_size), +					le64_to_cpu(info->size)); +		/* only update max_size on auth cap */ +		if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) && +		    ci->i_max_size != le64_to_cpu(info->max_size)) { +			dout("max_size %lld -> %llu\n", ci->i_max_size, +					le64_to_cpu(info->max_size)); +			ci->i_max_size = le64_to_cpu(info->max_size); +		} +	}  	/* xattrs */  	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ @@ -621,6 +775,7 @@ static int fill_inode(struct inode *inode,  			memcpy(ci->i_xattrs.blob->vec.iov_base,  			       iinfo->xattr_data, iinfo->xattr_len);  		ci->i_xattrs.version = le64_to_cpu(info->xattr_version); +		ceph_forget_all_cached_acls(inode);  		xattr_blob = NULL;  	} @@ -643,20 +798,21 @@ static int fill_inode(struct inode *inode,  	case S_IFLNK:  		inode->i_op = &ceph_symlink_iops;  		if (!ci->i_symlink) { -			int symlen = iinfo->symlink_len; +			u32 symlen = iinfo->symlink_len;  			char *sym; -			BUG_ON(symlen != inode->i_size); -			spin_unlock(&inode->i_lock); +			spin_unlock(&ci->i_ceph_lock); + +			err = -EINVAL; +			if (WARN_ON(symlen != inode->i_size)) +				goto out;  			err = -ENOMEM; -			sym = kmalloc(symlen+1, GFP_NOFS); +			sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);  			if (!sym)  				goto out; -			memcpy(sym, iinfo->symlink, symlen); -			sym[symlen] = 0; -			spin_lock(&inode->i_lock); +			spin_lock(&ci->i_ceph_lock);  			if (!ci->i_symlink)  				ci->i_symlink = sym;  			else @@ -667,54 +823,30 @@ static int fill_inode(struct inode *inode,  		inode->i_op = &ceph_dir_iops;  		inode->i_fop = &ceph_dir_fops; +		ci->i_dir_layout = iinfo->dir_layout; +  		ci->i_files = le64_to_cpu(info->files);  		ci->i_subdirs = le64_to_cpu(info->subdirs);  		ci->i_rbytes = le64_to_cpu(info->rbytes);  		ci->i_rfiles = le64_to_cpu(info->rfiles);  		ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);  		ceph_decode_timespec(&ci->i_rctime, &info->rctime); - -		/* set dir completion flag? */ -		if (ci->i_files == 0 && ci->i_subdirs == 0 && -		    ceph_snap(inode) == CEPH_NOSNAP && -		    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && -		    (issued & CEPH_CAP_FILE_EXCL) == 0 && -		    (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { -			dout(" marking %p complete (empty)\n", inode); -			ci->i_ceph_flags |= CEPH_I_COMPLETE; -			ci->i_max_offset = 2; -		} - -		/* it may be better to set st_size in getattr instead? */ -		if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), RBYTES)) -			inode->i_size = ci->i_rbytes;  		break;  	default:  		pr_err("fill_inode %llx.%llx BAD mode 0%o\n",  		       ceph_vinop(inode), inode->i_mode);  	} -no_change: -	spin_unlock(&inode->i_lock); - -	/* queue truncate if we saw i_size decrease */ -	if (queue_trunc) -		ceph_queue_vmtruncate(inode); - -	/* populate frag tree */ -	/* FIXME: move me up, if/when version reflects fragtree changes */ -	nsplits = le32_to_cpu(info->fragtree.nsplits); -	mutex_lock(&ci->i_fragtree_mutex); -	for (i = 0; i < nsplits; i++) { -		u32 id = le32_to_cpu(info->fragtree.splits[i].frag); -		struct ceph_inode_frag *frag = __get_or_create_frag(ci, id); - -		if (IS_ERR(frag)) -			continue; -		frag->split_by = le32_to_cpu(info->fragtree.splits[i].by); -		dout(" frag %x split by %d\n", frag->frag, frag->split_by); +	/* set dir completion flag? */ +	if (S_ISDIR(inode->i_mode) && +	    ci->i_files == 0 && ci->i_subdirs == 0 && +	    ceph_snap(inode) == CEPH_NOSNAP && +	    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) && +	    (issued & CEPH_CAP_FILE_EXCL) == 0 && +	    !__ceph_dir_is_complete(ci)) { +		dout(" marking %p complete (empty)\n", inode); +		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));  	} -	mutex_unlock(&ci->i_fragtree_mutex);  	/* were we issued a capability? */  	if (info->cap.caps) { @@ -727,30 +859,41 @@ no_change:  				     le32_to_cpu(info->cap.seq),  				     le32_to_cpu(info->cap.mseq),  				     le64_to_cpu(info->cap.realm), -				     info->cap.flags, -				     caps_reservation); +				     info->cap.flags, &new_cap); +			wake = true;  		} else { -			spin_lock(&inode->i_lock);  			dout(" %p got snap_caps %s\n", inode,  			     ceph_cap_string(le32_to_cpu(info->cap.caps)));  			ci->i_snap_caps |= le32_to_cpu(info->cap.caps);  			if (cap_fmode >= 0)  				__ceph_get_fmode(ci, cap_fmode); -			spin_unlock(&inode->i_lock);  		}  	} else if (cap_fmode >= 0) { -		pr_warning("mds issued no caps on %llx.%llx\n", +		pr_warn("mds issued no caps on %llx.%llx\n",  			   ceph_vinop(inode));  		__ceph_get_fmode(ci, cap_fmode);  	} +	spin_unlock(&ci->i_ceph_lock); + +	if (wake) +		wake_up_all(&ci->i_cap_wq); + +	/* queue truncate if we saw i_size decrease */ +	if (queue_trunc) +		ceph_queue_vmtruncate(inode); + +	/* populate frag tree */ +	if (S_ISDIR(inode->i_mode)) +		ceph_fill_fragtree(inode, &info->fragtree, dirinfo);  	/* update delegation info? */  	if (dirinfo)  		ceph_fill_dirfrag(inode, dirinfo);  	err = 0; -  out: +	if (new_cap) +		ceph_put_cap(mdsc, new_cap);  	if (xattr_blob)  		ceph_buffer_put(xattr_blob);  	return err; @@ -775,14 +918,14 @@ static void update_dentry_lease(struct dentry *dentry,  		return;  	spin_lock(&dentry->d_lock); -	dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n", -	     dentry, le16_to_cpu(lease->mask), duration, ttl); +	dout("update_dentry_lease %p duration %lu ms ttl %lu\n", +	     dentry, duration, ttl);  	/* make lease_rdcache_gen match directory */  	dir = dentry->d_parent->d_inode;  	di->lease_shared_gen = ceph_inode(dir)->i_shared_gen; -	if (lease->mask == 0) +	if (duration == 0)  		goto out_unlock;  	if (di->lease_gen == session->s_cap_gen && @@ -807,37 +950,6 @@ out_unlock:  }  /* - * Set dentry's directory position based on the current dir's max, and - * order it in d_subdirs, so that dcache_readdir behaves. - */ -static void ceph_set_dentry_offset(struct dentry *dn) -{ -	struct dentry *dir = dn->d_parent; -	struct inode *inode = dn->d_parent->d_inode; -	struct ceph_dentry_info *di; - -	BUG_ON(!inode); - -	di = ceph_dentry(dn); - -	spin_lock(&inode->i_lock); -	if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) { -		spin_unlock(&inode->i_lock); -		return; -	} -	di->offset = ceph_inode(inode)->i_max_offset++; -	spin_unlock(&inode->i_lock); - -	spin_lock(&dcache_lock); -	spin_lock(&dn->d_lock); -	list_move(&dn->d_u.d_child, &dir->d_subdirs); -	dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset, -	     dn->d_u.d_child.prev, dn->d_u.d_child.next); -	spin_unlock(&dn->d_lock); -	spin_unlock(&dcache_lock); -} - -/*   * splice a dentry to an inode.   * caller must hold directory i_mutex for this to be safe.   * @@ -846,7 +958,7 @@ static void ceph_set_dentry_offset(struct dentry *dn)   * the caller) if we fail.   */  static struct dentry *splice_dentry(struct dentry *dn, struct inode *in, -				    bool *prehash, bool set_offset) +				    bool *prehash)  {  	struct dentry *realdn; @@ -866,8 +978,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,  	} else if (realdn) {  		dout("dn %p (%d) spliced with %p (%d) "  		     "inode %p ino %llx.%llx\n", -		     dn, atomic_read(&dn->d_count), -		     realdn, atomic_read(&realdn->d_count), +		     dn, d_count(dn), +		     realdn, d_count(realdn),  		     realdn->d_inode, ceph_vinop(realdn->d_inode));  		dput(dn);  		dn = realdn; @@ -878,8 +990,6 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,  	}  	if ((!prehash || *prehash) && d_unhashed(dn))  		d_rehash(dn); -	if (set_offset) -		ceph_set_dentry_offset(dn);  out:  	return dn;  } @@ -900,10 +1010,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  {  	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;  	struct inode *in = NULL; -	struct ceph_mds_reply_inode *ininfo;  	struct ceph_vino vino;  	struct ceph_fs_client *fsc = ceph_sb_to_client(sb); -	int i = 0;  	int err = 0;  	dout("fill_trace %p is_dentry %d is_target %d\n", req, @@ -953,11 +1061,87 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  	if (rinfo->head->is_dentry) {  		struct inode *dir = req->r_locked_dir; -		err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, -				 session, req->r_request_started, -1, -				 &req->r_caps_reservation); -		if (err < 0) -			return err; +		if (dir) { +			err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, +					 session, req->r_request_started, -1, +					 &req->r_caps_reservation); +			if (err < 0) +				goto done; +		} else { +			WARN_ON_ONCE(1); +		} + +		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) { +			struct qstr dname; +			struct dentry *dn, *parent; + +			BUG_ON(!rinfo->head->is_target); +			BUG_ON(req->r_dentry); + +			parent = d_find_any_alias(dir); +			BUG_ON(!parent); + +			dname.name = rinfo->dname; +			dname.len = rinfo->dname_len; +			dname.hash = full_name_hash(dname.name, dname.len); +			vino.ino = le64_to_cpu(rinfo->targeti.in->ino); +			vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); +retry_lookup: +			dn = d_lookup(parent, &dname); +			dout("d_lookup on parent=%p name=%.*s got %p\n", +			     parent, dname.len, dname.name, dn); + +			if (!dn) { +				dn = d_alloc(parent, &dname); +				dout("d_alloc %p '%.*s' = %p\n", parent, +				     dname.len, dname.name, dn); +				if (dn == NULL) { +					dput(parent); +					err = -ENOMEM; +					goto done; +				} +				err = ceph_init_dentry(dn); +				if (err < 0) { +					dput(dn); +					dput(parent); +					goto done; +				} +			} else if (dn->d_inode && +				   (ceph_ino(dn->d_inode) != vino.ino || +				    ceph_snap(dn->d_inode) != vino.snap)) { +				dout(" dn %p points to wrong inode %p\n", +				     dn, dn->d_inode); +				d_delete(dn); +				dput(dn); +				goto retry_lookup; +			} + +			req->r_dentry = dn; +			dput(parent); +		} +	} + +	if (rinfo->head->is_target) { +		vino.ino = le64_to_cpu(rinfo->targeti.in->ino); +		vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); + +		in = ceph_get_inode(sb, vino); +		if (IS_ERR(in)) { +			err = PTR_ERR(in); +			goto done; +		} +		req->r_target_inode = in; + +		err = fill_inode(in, &rinfo->targeti, NULL, +				session, req->r_request_started, +				(!req->r_aborted && rinfo->head->result == 0) ? +				req->r_fmode : -1, +				&req->r_caps_reservation); +		if (err < 0) { +			pr_err("fill_inode badness %p %llx.%llx\n", +				in, ceph_vinop(in)); +			goto done; +		}  	}  	/* @@ -965,6 +1149,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  	 * will have trouble splicing in the virtual snapdir later  	 */  	if (rinfo->head->is_dentry && !req->r_aborted && +	    req->r_locked_dir &&  	    (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,  					       fsc->mount_options->snapdir_name,  					       req->r_dentry->d_name.len))) { @@ -992,14 +1177,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  		/* do we have a dn lease? */  		have_lease = have_dir_cap || -			(le16_to_cpu(rinfo->dlease->mask) & -			 CEPH_LOCK_DN); - +			le32_to_cpu(rinfo->dlease->duration_ms);  		if (!have_lease)  			dout("fill_trace  no dentry lease or dir cap\n");  		/* rename? */  		if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) { +			struct inode *olddir = req->r_old_dentry_dir; +			BUG_ON(!olddir); +  			dout(" src %p '%.*s' dst %p '%.*s'\n",  			     req->r_old_dentry,  			     req->r_old_dentry->d_name.len, @@ -1008,9 +1194,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  			dout("fill_trace doing d_move %p -> %p\n",  			     req->r_old_dentry, dn); -			/* d_move screws up d_subdirs order */ -			ceph_i_clear(dir, CEPH_I_COMPLETE); -  			d_move(req->r_old_dentry, dn);  			dout(" src %p '%.*s' dst %p '%.*s'\n",  			     req->r_old_dentry, @@ -1022,15 +1205,14 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  			   rehashing bug in vfs_rename_dir */  			ceph_invalidate_dentry_lease(dn); -			/* take overwritten dentry's readdir offset */ -			dout("dn %p gets %p offset %lld (old offset %lld)\n", -			     req->r_old_dentry, dn, ceph_dentry(dn)->offset, +			/* d_move screws up sibling dentries' offsets */ +			ceph_dir_clear_complete(dir); +			ceph_dir_clear_complete(olddir); + +			dout("dn %p gets new offset %lld\n", req->r_old_dentry,  			     ceph_dentry(req->r_old_dentry)->offset); -			ceph_dentry(req->r_old_dentry)->offset = -				ceph_dentry(dn)->offset;  			dn = req->r_old_dentry;  /* use old_dentry */ -			in = dn->d_inode;  		}  		/* null dentry? */ @@ -1052,106 +1234,87 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,  		}  		/* attach proper inode */ -		ininfo = rinfo->targeti.in; -		vino.ino = le64_to_cpu(ininfo->ino); -		vino.snap = le64_to_cpu(ininfo->snapid);  		if (!dn->d_inode) { -			in = ceph_get_inode(sb, vino); -			if (IS_ERR(in)) { -				pr_err("fill_trace bad get_inode " -				       "%llx.%llx\n", vino.ino, vino.snap); -				err = PTR_ERR(in); -				d_delete(dn); -				goto done; -			} -			dn = splice_dentry(dn, in, &have_lease, true); +			ceph_dir_clear_complete(dir); +			ihold(in); +			dn = splice_dentry(dn, in, &have_lease);  			if (IS_ERR(dn)) {  				err = PTR_ERR(dn);  				goto done;  			}  			req->r_dentry = dn;  /* may have spliced */ -			igrab(in); -		} else if (ceph_ino(in) == vino.ino && -			   ceph_snap(in) == vino.snap) { -			igrab(in); -		} else { +		} else if (dn->d_inode && dn->d_inode != in) {  			dout(" %p links to %p %llx.%llx, not %llx.%llx\n", -			     dn, in, ceph_ino(in), ceph_snap(in), -			     vino.ino, vino.snap); +			     dn, dn->d_inode, ceph_vinop(dn->d_inode), +			     ceph_vinop(in));  			have_lease = false; -			in = NULL;  		}  		if (have_lease)  			update_dentry_lease(dn, rinfo->dlease, session,  					    req->r_request_started);  		dout(" final dn %p\n", dn); -		i++; -	} else if (req->r_op == CEPH_MDS_OP_LOOKUPSNAP || -		   req->r_op == CEPH_MDS_OP_MKSNAP) { +	} else if (!req->r_aborted && +		   (req->r_op == CEPH_MDS_OP_LOOKUPSNAP || +		    req->r_op == CEPH_MDS_OP_MKSNAP)) {  		struct dentry *dn = req->r_dentry; +		struct inode *dir = req->r_locked_dir;  		/* fill out a snapdir LOOKUPSNAP dentry */  		BUG_ON(!dn); -		BUG_ON(!req->r_locked_dir); -		BUG_ON(ceph_snap(req->r_locked_dir) != CEPH_SNAPDIR); -		ininfo = rinfo->targeti.in; -		vino.ino = le64_to_cpu(ininfo->ino); -		vino.snap = le64_to_cpu(ininfo->snapid); -		in = ceph_get_inode(sb, vino); -		if (IS_ERR(in)) { -			pr_err("fill_inode get_inode badness %llx.%llx\n", -			       vino.ino, vino.snap); -			err = PTR_ERR(in); -			d_delete(dn); -			goto done; -		} +		BUG_ON(!dir); +		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);  		dout(" linking snapped dir %p to dn %p\n", in, dn); -		dn = splice_dentry(dn, in, NULL, true); +		ceph_dir_clear_complete(dir); +		ihold(in); +		dn = splice_dentry(dn, in, NULL);  		if (IS_ERR(dn)) {  			err = PTR_ERR(dn);  			goto done;  		}  		req->r_dentry = dn;  /* may have spliced */ -		igrab(in); -		rinfo->head->is_dentry = 1;  /* fool notrace handlers */  	} +done: +	dout("fill_trace done err=%d\n", err); +	return err; +} -	if (rinfo->head->is_target) { -		vino.ino = le64_to_cpu(rinfo->targeti.in->ino); -		vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); +/* + * Prepopulate our cache with readdir results, leases, etc. + */ +static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, +					   struct ceph_mds_session *session) +{ +	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; +	int i, err = 0; -		if (in == NULL || ceph_ino(in) != vino.ino || -		    ceph_snap(in) != vino.snap) { -			in = ceph_get_inode(sb, vino); -			if (IS_ERR(in)) { -				err = PTR_ERR(in); -				goto done; -			} -		} -		req->r_target_inode = in; +	for (i = 0; i < rinfo->dir_nr; i++) { +		struct ceph_vino vino; +		struct inode *in; +		int rc; -		err = fill_inode(in, -				 &rinfo->targeti, NULL, -				 session, req->r_request_started, -				 (le32_to_cpu(rinfo->head->result) == 0) ? -				 req->r_fmode : -1, -				 &req->r_caps_reservation); -		if (err < 0) { -			pr_err("fill_inode badness %p %llx.%llx\n", -			       in, ceph_vinop(in)); -			goto done; +		vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino); +		vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid); + +		in = ceph_get_inode(req->r_dentry->d_sb, vino); +		if (IS_ERR(in)) { +			err = PTR_ERR(in); +			dout("new_inode badness got %d\n", err); +			continue; +		} +		rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, +				req->r_request_started, -1, +				&req->r_caps_reservation); +		if (rc < 0) { +			pr_err("fill_inode badness on %p got %d\n", in, rc); +			err = rc; +			continue;  		}  	} -done: -	dout("fill_trace done err=%d\n", err);  	return err;  } -/* - * Prepopulate our cache with readdir results, leases, etc. - */  int ceph_readdir_prepopulate(struct ceph_mds_request *req,  			     struct ceph_mds_session *session)  { @@ -1160,11 +1323,26 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,  	struct qstr dname;  	struct dentry *dn;  	struct inode *in; -	int err = 0, i; +	int err = 0, ret, i;  	struct inode *snapdir = NULL;  	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; -	u64 frag = le32_to_cpu(rhead->args.readdir.frag);  	struct ceph_dentry_info *di; +	u64 r_readdir_offset = req->r_readdir_offset; +	u32 frag = le32_to_cpu(rhead->args.readdir.frag); + +	if (rinfo->dir_dir && +	    le32_to_cpu(rinfo->dir_dir->frag) != frag) { +		dout("readdir_prepopulate got new frag %x -> %x\n", +		     frag, le32_to_cpu(rinfo->dir_dir->frag)); +		frag = le32_to_cpu(rinfo->dir_dir->frag); +		if (ceph_frag_is_leftmost(frag)) +			r_readdir_offset = 2; +		else +			r_readdir_offset = 0; +	} + +	if (req->r_aborted) +		return readdir_prepopulate_inodes_only(req, session);  	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {  		snapdir = ceph_get_snapdir(parent->d_inode); @@ -1178,6 +1356,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,  			ceph_fill_dirfrag(parent->d_inode, rinfo->dir_dir);  	} +	/* FIXME: release caps/leases if error occurs */  	for (i = 0; i < rinfo->dir_nr; i++) {  		struct ceph_vino vino; @@ -1202,9 +1381,10 @@ retry_lookup:  				err = -ENOMEM;  				goto out;  			} -			err = ceph_init_dentry(dn); -			if (err < 0) { +			ret = ceph_init_dentry(dn); +			if (ret < 0) {  				dput(dn); +				err = ret;  				goto out;  			}  		} else if (dn->d_inode && @@ -1217,16 +1397,13 @@ retry_lookup:  			goto retry_lookup;  		} else {  			/* reorder parent's d_subdirs */ -			spin_lock(&dcache_lock); -			spin_lock(&dn->d_lock); +			spin_lock(&parent->d_lock); +			spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);  			list_move(&dn->d_u.d_child, &parent->d_subdirs);  			spin_unlock(&dn->d_lock); -			spin_unlock(&dcache_lock); +			spin_unlock(&parent->d_lock);  		} -		di = dn->d_fsdata; -		di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); -  		/* inode */  		if (dn->d_inode) {  			in = dn->d_inode; @@ -1234,31 +1411,44 @@ retry_lookup:  			in = ceph_get_inode(parent->d_sb, vino);  			if (IS_ERR(in)) {  				dout("new_inode badness\n"); -				d_delete(dn); +				d_drop(dn);  				dput(dn);  				err = PTR_ERR(in);  				goto out;  			} -			dn = splice_dentry(dn, in, NULL, false); -			if (IS_ERR(dn)) -				dn = NULL;  		}  		if (fill_inode(in, &rinfo->dir_in[i], NULL, session,  			       req->r_request_started, -1,  			       &req->r_caps_reservation) < 0) {  			pr_err("fill_inode badness on %p\n", in); +			if (!dn->d_inode) +				iput(in); +			d_drop(dn);  			goto next_item;  		} -		if (dn) -			update_dentry_lease(dn, rinfo->dir_dlease[i], -					    req->r_session, -					    req->r_request_started); + +		if (!dn->d_inode) { +			dn = splice_dentry(dn, in, NULL); +			if (IS_ERR(dn)) { +				err = PTR_ERR(dn); +				dn = NULL; +				goto next_item; +			} +		} + +		di = dn->d_fsdata; +		di->offset = ceph_make_fpos(frag, i + r_readdir_offset); + +		update_dentry_lease(dn, rinfo->dir_dlease[i], +				    req->r_session, +				    req->r_request_started);  next_item:  		if (dn)  			dput(dn);  	} -	req->r_did_prepopulate = true; +	if (err == 0) +		req->r_did_prepopulate = true;  out:  	if (snapdir) { @@ -1274,7 +1464,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)  	struct ceph_inode_info *ci = ceph_inode(inode);  	int ret = 0; -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);  	inode->i_size = size;  	inode->i_blocks = (size + (1 << 9) - 1) >> 9; @@ -1284,7 +1474,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)  	    (ci->i_reported_size << 1) < ci->i_max_size)  		ret = 1; -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock);  	return ret;  } @@ -1294,12 +1484,13 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)   */  void ceph_queue_writeback(struct inode *inode)  { +	ihold(inode);  	if (queue_work(ceph_inode_to_client(inode)->wb_wq,  		       &ceph_inode(inode)->i_wb_work)) {  		dout("ceph_queue_writeback %p\n", inode); -		igrab(inode);  	} else {  		dout("ceph_queue_writeback %p failed\n", inode); +		iput(inode);  	}  } @@ -1319,55 +1510,13 @@ static void ceph_writeback_work(struct work_struct *work)   */  void ceph_queue_invalidate(struct inode *inode)  { +	ihold(inode);  	if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,  		       &ceph_inode(inode)->i_pg_inv_work)) {  		dout("ceph_queue_invalidate %p\n", inode); -		igrab(inode);  	} else {  		dout("ceph_queue_invalidate %p failed\n", inode); -	} -} - -/* - * invalidate any pages that are not dirty or under writeback.  this - * includes pages that are clean and mapped. - */ -static void ceph_invalidate_nondirty_pages(struct address_space *mapping) -{ -	struct pagevec pvec; -	pgoff_t next = 0; -	int i; - -	pagevec_init(&pvec, 0); -	while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) { -		for (i = 0; i < pagevec_count(&pvec); i++) { -			struct page *page = pvec.pages[i]; -			pgoff_t index; -			int skip_page = -				(PageDirty(page) || PageWriteback(page)); - -			if (!skip_page) -				skip_page = !trylock_page(page); - -			/* -			 * We really shouldn't be looking at the ->index of an -			 * unlocked page.  But we're not allowed to lock these -			 * pages.  So we rely upon nobody altering the ->index -			 * of this (pinned-by-us) page. -			 */ -			index = page->index; -			if (index > next) -				next = index; -			next++; - -			if (skip_page) -				continue; - -			generic_error_remove_page(mapping, page); -			unlock_page(page); -		} -		pagevec_release(&pvec); -		cond_resched(); +		iput(inode);  	}  } @@ -1383,44 +1532,47 @@ static void ceph_invalidate_work(struct work_struct *work)  	u32 orig_gen;  	int check = 0; -	spin_lock(&inode->i_lock); +	mutex_lock(&ci->i_truncate_mutex); +	spin_lock(&ci->i_ceph_lock);  	dout("invalidate_pages %p gen %d revoking %d\n", inode,  	     ci->i_rdcache_gen, ci->i_rdcache_revoking); -	if (ci->i_rdcache_gen == 0 || -	    ci->i_rdcache_revoking != ci->i_rdcache_gen) { -		BUG_ON(ci->i_rdcache_revoking > ci->i_rdcache_gen); -		/* nevermind! */ -		ci->i_rdcache_revoking = 0; -		spin_unlock(&inode->i_lock); +	if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { +		if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) +			check = 1; +		spin_unlock(&ci->i_ceph_lock); +		mutex_unlock(&ci->i_truncate_mutex);  		goto out;  	}  	orig_gen = ci->i_rdcache_gen; -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock); -	ceph_invalidate_nondirty_pages(inode->i_mapping); +	truncate_pagecache(inode, 0); -	spin_lock(&inode->i_lock); -	if (orig_gen == ci->i_rdcache_gen) { +	spin_lock(&ci->i_ceph_lock); +	if (orig_gen == ci->i_rdcache_gen && +	    orig_gen == ci->i_rdcache_revoking) {  		dout("invalidate_pages %p gen %d successful\n", inode,  		     ci->i_rdcache_gen); -		ci->i_rdcache_gen = 0; -		ci->i_rdcache_revoking = 0; +		ci->i_rdcache_revoking--;  		check = 1;  	} else { -		dout("invalidate_pages %p gen %d raced, gen now %d\n", -		     inode, orig_gen, ci->i_rdcache_gen); +		dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", +		     inode, orig_gen, ci->i_rdcache_gen, +		     ci->i_rdcache_revoking); +		if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) +			check = 1;  	} -	spin_unlock(&inode->i_lock); - +	spin_unlock(&ci->i_ceph_lock); +	mutex_unlock(&ci->i_truncate_mutex); +out:  	if (check)  		ceph_check_caps(ci, 0, NULL); -out:  	iput(inode);  }  /* - * called by trunc_wq; take i_mutex ourselves + * called by trunc_wq;   *   * We also truncate in a separate thread as well.   */ @@ -1431,9 +1583,7 @@ static void ceph_vmtruncate_work(struct work_struct *work)  	struct inode *inode = &ci->vfs_inode;  	dout("vmtruncate_work %p\n", inode); -	mutex_lock(&inode->i_mutex);  	__ceph_do_pending_vmtruncate(inode); -	mutex_unlock(&inode->i_mutex);  	iput(inode);  } @@ -1445,19 +1595,19 @@ void ceph_queue_vmtruncate(struct inode *inode)  {  	struct ceph_inode_info *ci = ceph_inode(inode); +	ihold(inode); +  	if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,  		       &ci->i_vmtruncate_work)) {  		dout("ceph_queue_vmtruncate %p\n", inode); -		igrab(inode);  	} else {  		dout("ceph_queue_vmtruncate %p failed, pending=%d\n",  		     inode, ci->i_truncate_pending); +		iput(inode);  	}  }  /* - * called with i_mutex held. - *   * Make sure any pending truncation is applied before doing anything   * that may depend on it.   */ @@ -1465,13 +1615,15 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)  {  	struct ceph_inode_info *ci = ceph_inode(inode);  	u64 to; -	int wrbuffer_refs, wake = 0; +	int wrbuffer_refs, finish = 0; +	mutex_lock(&ci->i_truncate_mutex);  retry: -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	if (ci->i_truncate_pending == 0) {  		dout("__do_pending_vmtruncate %p none pending\n", inode); -		spin_unlock(&inode->i_lock); +		spin_unlock(&ci->i_ceph_lock); +		mutex_unlock(&ci->i_truncate_mutex);  		return;  	} @@ -1482,32 +1634,39 @@ retry:  	if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {  		dout("__do_pending_vmtruncate %p flushing snaps first\n",  		     inode); -		spin_unlock(&inode->i_lock); +		spin_unlock(&ci->i_ceph_lock);  		filemap_write_and_wait_range(&inode->i_data, 0,  					     inode->i_sb->s_maxbytes);  		goto retry;  	} +	/* there should be no reader or writer */ +	WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref); +  	to = ci->i_truncate_size;  	wrbuffer_refs = ci->i_wrbuffer_ref;  	dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,  	     ci->i_truncate_pending, to); -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock); + +	truncate_pagecache(inode, to); -	truncate_inode_pages(inode->i_mapping, to); +	spin_lock(&ci->i_ceph_lock); +	if (to == ci->i_truncate_size) { +		ci->i_truncate_pending = 0; +		finish = 1; +	} +	spin_unlock(&ci->i_ceph_lock); +	if (!finish) +		goto retry; -	spin_lock(&inode->i_lock); -	ci->i_truncate_pending--; -	if (ci->i_truncate_pending == 0) -		wake = 1; -	spin_unlock(&inode->i_lock); +	mutex_unlock(&ci->i_truncate_mutex);  	if (wrbuffer_refs == 0)  		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); -	if (wake) -		wake_up_all(&ci->i_cap_wq); -} +	wake_up_all(&ci->i_cap_wq); +}  /*   * symlinks @@ -1522,6 +1681,12 @@ static void *ceph_sym_follow_link(struct dentry *dentry, struct nameidata *nd)  static const struct inode_operations ceph_symlink_iops = {  	.readlink = generic_readlink,  	.follow_link = ceph_sym_follow_link, +	.setattr = ceph_setattr, +	.getattr = ceph_getattr, +	.setxattr = ceph_setxattr, +	.getxattr = ceph_getxattr, +	.listxattr = ceph_listxattr, +	.removexattr = ceph_removexattr,  };  /* @@ -1531,7 +1696,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)  {  	struct inode *inode = dentry->d_inode;  	struct ceph_inode_info *ci = ceph_inode(inode); -	struct inode *parent_inode = dentry->d_parent->d_inode;  	const unsigned int ia_valid = attr->ia_valid;  	struct ceph_mds_request *req;  	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; @@ -1539,12 +1703,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)  	int release = 0, dirtied = 0;  	int mask = 0;  	int err = 0; +	int inode_dirty_flags = 0;  	if (ceph_snap(inode) != CEPH_NOSNAP)  		return -EROFS; -	__ceph_do_pending_vmtruncate(inode); -  	err = inode_change_ok(inode, attr);  	if (err != 0)  		return err; @@ -1554,32 +1717,36 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)  	if (IS_ERR(req))  		return PTR_ERR(req); -	spin_lock(&inode->i_lock); +	spin_lock(&ci->i_ceph_lock);  	issued = __ceph_caps_issued(ci, NULL);  	dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));  	if (ia_valid & ATTR_UID) {  		dout("setattr %p uid %d -> %d\n", inode, -		     inode->i_uid, attr->ia_uid); +		     from_kuid(&init_user_ns, inode->i_uid), +		     from_kuid(&init_user_ns, attr->ia_uid));  		if (issued & CEPH_CAP_AUTH_EXCL) {  			inode->i_uid = attr->ia_uid;  			dirtied |= CEPH_CAP_AUTH_EXCL;  		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || -			   attr->ia_uid != inode->i_uid) { -			req->r_args.setattr.uid = cpu_to_le32(attr->ia_uid); +			   !uid_eq(attr->ia_uid, inode->i_uid)) { +			req->r_args.setattr.uid = cpu_to_le32( +				from_kuid(&init_user_ns, attr->ia_uid));  			mask |= CEPH_SETATTR_UID;  			release |= CEPH_CAP_AUTH_SHARED;  		}  	}  	if (ia_valid & ATTR_GID) {  		dout("setattr %p gid %d -> %d\n", inode, -		     inode->i_gid, attr->ia_gid); +		     from_kgid(&init_user_ns, inode->i_gid), +		     from_kgid(&init_user_ns, attr->ia_gid));  		if (issued & CEPH_CAP_AUTH_EXCL) {  			inode->i_gid = attr->ia_gid;  			dirtied |= CEPH_CAP_AUTH_EXCL;  		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || -			   attr->ia_gid != inode->i_gid) { -			req->r_args.setattr.gid = cpu_to_le32(attr->ia_gid); +			   !gid_eq(attr->ia_gid, inode->i_gid)) { +			req->r_args.setattr.gid = cpu_to_le32( +				from_kgid(&init_user_ns, attr->ia_gid));  			mask |= CEPH_SETATTR_GID;  			release |= CEPH_CAP_AUTH_SHARED;  		} @@ -1592,6 +1759,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)  			dirtied |= CEPH_CAP_AUTH_EXCL;  		} else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 ||  			   attr->ia_mode != inode->i_mode) { +			inode->i_mode = attr->ia_mode;  			req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode);  			mask |= CEPH_SETATTR_MODE;  			release |= CEPH_CAP_AUTH_SHARED; @@ -1697,28 +1865,40 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)  		dout("setattr %p ATTR_FILE ... hrm!\n", inode);  	if (dirtied) { -		__ceph_mark_dirty_caps(ci, dirtied); +		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied);  		inode->i_ctime = CURRENT_TIME;  	}  	release &= issued; -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock); + +	if (inode_dirty_flags) +		__mark_inode_dirty(inode, inode_dirty_flags); + +	if (ia_valid & ATTR_MODE) { +		err = posix_acl_chmod(inode, attr->ia_mode); +		if (err) +			goto out_put; +	}  	if (mask) { -		req->r_inode = igrab(inode); +		req->r_inode = inode; +		ihold(inode);  		req->r_inode_drop = release;  		req->r_args.setattr.mask = cpu_to_le32(mask);  		req->r_num_caps = 1; -		err = ceph_mdsc_do_request(mdsc, parent_inode, req); +		err = ceph_mdsc_do_request(mdsc, NULL, req);  	}  	dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,  	     ceph_cap_string(dirtied), mask);  	ceph_mdsc_put_request(req); -	__ceph_do_pending_vmtruncate(inode); +	if (mask & CEPH_SETATTR_SIZE) +		__ceph_do_pending_vmtruncate(inode);  	return err;  out: -	spin_unlock(&inode->i_lock); +	spin_unlock(&ci->i_ceph_lock); +out_put:  	ceph_mdsc_put_request(req);  	return err;  } @@ -1739,14 +1919,15 @@ int ceph_do_getattr(struct inode *inode, int mask)  		return 0;  	} -	dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask)); +	dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);  	if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))  		return 0;  	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);  	if (IS_ERR(req))  		return PTR_ERR(req); -	req->r_inode = igrab(inode); +	req->r_inode = inode; +	ihold(inode);  	req->r_num_caps = 1;  	req->r_args.getattr.mask = cpu_to_le32(mask);  	err = ceph_mdsc_do_request(mdsc, NULL, req); @@ -1762,10 +1943,15 @@ int ceph_do_getattr(struct inode *inode, int mask)   */  int ceph_permission(struct inode *inode, int mask)  { -	int err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED); +	int err; + +	if (mask & MAY_NOT_BLOCK) +		return -ECHILD; + +	err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);  	if (!err) -		err = generic_permission(inode, mask, NULL); +		err = generic_permission(inode, mask);  	return err;  } @@ -1783,13 +1969,17 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,  	err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);  	if (!err) {  		generic_fillattr(inode, stat); -		stat->ino = inode->i_ino; +		stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);  		if (ceph_snap(inode) != CEPH_NOSNAP)  			stat->dev = ceph_snap(inode);  		else  			stat->dev = 0;  		if (S_ISDIR(inode->i_mode)) { -			stat->size = ci->i_rbytes; +			if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), +						RBYTES)) +				stat->size = ci->i_rbytes; +			else +				stat->size = ci->i_files + ci->i_subdirs;  			stat->blocks = 0;  			stat->blksize = 65536;  		}  | 
