diff options
Diffstat (limited to 'fs/ceph/caps.c')
| -rw-r--r-- | fs/ceph/caps.c | 554 | 
1 files changed, 345 insertions, 209 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 13976c33332..1fde164b74b 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,  	return 0;  } -static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, -				struct ceph_cap_reservation *ctx) +struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, +			      struct ceph_cap_reservation *ctx)  {  	struct ceph_cap *cap = NULL; @@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,   * it is < 0.  (This is so we can atomically add the cap and add an   * open file reference to it.)   */ -int ceph_add_cap(struct inode *inode, -		 struct ceph_mds_session *session, u64 cap_id, -		 int fmode, unsigned issued, unsigned wanted, -		 unsigned seq, unsigned mseq, u64 realmino, int flags, -		 struct ceph_cap_reservation *caps_reservation) +void ceph_add_cap(struct inode *inode, +		  struct ceph_mds_session *session, u64 cap_id, +		  int fmode, unsigned issued, unsigned wanted, +		  unsigned seq, unsigned mseq, u64 realmino, int flags, +		  struct ceph_cap **new_cap)  {  	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;  	struct ceph_inode_info *ci = ceph_inode(inode); -	struct ceph_cap *new_cap = NULL;  	struct ceph_cap *cap;  	int mds = session->s_mds;  	int actual_wanted; @@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,  	if (fmode >= 0)  		wanted |= ceph_caps_for_mode(fmode); -retry: -	spin_lock(&ci->i_ceph_lock);  	cap = __get_cap_for_mds(ci, mds);  	if (!cap) { -		if (new_cap) { -			cap = new_cap; -			new_cap = NULL; -		} else { -			spin_unlock(&ci->i_ceph_lock); -			new_cap = get_cap(mdsc, caps_reservation); -			if (new_cap == NULL) -				return -ENOMEM; -			goto retry; -		} +		cap = *new_cap; +		*new_cap = NULL;  		cap->issued = 0;  		cap->implemented = 0; @@ -555,21 +544,31 @@ retry:  		cap->ci = ci;  		__insert_cap_node(ci, cap); -		/* clear out old exporting info?  (i.e. on cap import) */ -		if (ci->i_cap_exporting_mds == mds) { -			ci->i_cap_exporting_issued = 0; -			ci->i_cap_exporting_mseq = 0; -			ci->i_cap_exporting_mds = -1; -		} -  		/* add to session cap list */  		cap->session = session;  		spin_lock(&session->s_cap_lock);  		list_add_tail(&cap->session_caps, &session->s_caps);  		session->s_nr_caps++;  		spin_unlock(&session->s_cap_lock); -	} else if (new_cap) -		ceph_put_cap(mdsc, new_cap); +	} else { +		/* +		 * auth mds of the inode changed. we received the cap export +		 * message, but still haven't received the cap import message. +		 * handle_cap_export() updated the new auth MDS' cap. +		 * +		 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing +		 * a message that was send before the cap import message. So +		 * don't remove caps. +		 */ +		if (ceph_seq_cmp(seq, cap->seq) <= 0) { +			WARN_ON(cap != ci->i_auth_cap); +			WARN_ON(cap->cap_id != cap_id); +			seq = cap->seq; +			mseq = cap->mseq; +			issued |= cap->issued; +			flags |= CEPH_CAP_FLAG_AUTH; +		} +	}  	if (!ci->i_snap_realm) {  		/* @@ -609,17 +608,12 @@ retry:  	if (flags & CEPH_CAP_FLAG_AUTH) {  		if (ci->i_auth_cap == NULL || -		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) +		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {  			ci->i_auth_cap = cap; -	} else if (ci->i_auth_cap == cap) { -		ci->i_auth_cap = NULL; -		spin_lock(&mdsc->cap_dirty_lock); -		if (!list_empty(&ci->i_dirty_item)) { -			dout(" moving %p to cap_dirty_migrating\n", inode); -			list_move(&ci->i_dirty_item, -				  &mdsc->cap_dirty_migrating); +			cap->mds_wanted = wanted;  		} -		spin_unlock(&mdsc->cap_dirty_lock); +	} else { +		WARN_ON(ci->i_auth_cap == cap);  	}  	dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", @@ -628,7 +622,7 @@ retry:  	cap->cap_id = cap_id;  	cap->issued = issued;  	cap->implemented |= issued; -	if (mseq > cap->mseq) +	if (ceph_seq_cmp(mseq, cap->mseq) > 0)  		cap->mds_wanted = wanted;  	else  		cap->mds_wanted |= wanted; @@ -639,9 +633,6 @@ retry:  	if (fmode >= 0)  		__ceph_get_fmode(ci, fmode); -	spin_unlock(&ci->i_ceph_lock); -	wake_up_all(&ci->i_cap_wq); -	return 0;  }  /* @@ -676,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)   */  int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)  { -	int have = ci->i_snap_caps | ci->i_cap_exporting_issued; +	int have = ci->i_snap_caps;  	struct ceph_cap *cap;  	struct rb_node *p; @@ -816,7 +807,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci,  	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {  		cap = rb_entry(p, struct ceph_cap, ci_node); -		if (cap != ocap && __cap_is_valid(cap) && +		if (cap != ocap &&  		    (cap->implemented & ~cap->issued & mask))  			return 1;  	} @@ -878,7 +869,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)  		cap = rb_entry(p, struct ceph_cap, ci_node);  		if (!__cap_is_valid(cap))  			continue; -		mds_wanted |= cap->mds_wanted; +		if (cap == ci->i_auth_cap) +			mds_wanted |= cap->mds_wanted; +		else +			mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);  	}  	return mds_wanted;  } @@ -888,7 +882,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)   */  static int __ceph_is_any_caps(struct ceph_inode_info *ci)  { -	return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; +	return !RB_EMPTY_ROOT(&ci->i_caps); +} + +int ceph_is_any_caps(struct inode *inode) +{ +	struct ceph_inode_info *ci = ceph_inode(inode); +	int ret; + +	spin_lock(&ci->i_ceph_lock); +	ret = __ceph_is_any_caps(ci); +	spin_unlock(&ci->i_ceph_lock); + +	return ret;  }  /* @@ -897,7 +903,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)   * caller should hold i_ceph_lock.   * caller will not hold session s_mutex if called from destroy_inode.   */ -void __ceph_remove_cap(struct ceph_cap *cap) +void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)  {  	struct ceph_mds_session *session = cap->session;  	struct ceph_inode_info *ci = cap->ci; @@ -909,6 +915,16 @@ void __ceph_remove_cap(struct ceph_cap *cap)  	/* remove from session list */  	spin_lock(&session->s_cap_lock); +	/* +	 * s_cap_reconnect is protected by s_cap_lock. no one changes +	 * s_cap_gen while session is in the reconnect state. +	 */ +	if (queue_release && +	    (!session->s_cap_reconnect || +	     cap->cap_gen == session->s_cap_gen)) +		__queue_cap_release(session, ci->i_vino.ino, cap->cap_id, +				    cap->mseq, cap->issue_seq); +  	if (session->s_cap_iterator == cap) {  		/* not yet, we are iterating over this very cap */  		dout("__ceph_remove_cap  delaying %p removal from session %p\n", @@ -1023,7 +1039,6 @@ void __queue_cap_release(struct ceph_mds_session *session,  	struct ceph_mds_cap_release *head;  	struct ceph_mds_cap_item *item; -	spin_lock(&session->s_cap_lock);  	BUG_ON(!session->s_num_cap_releases);  	msg = list_first_entry(&session->s_cap_releases,  			       struct ceph_msg, list_head); @@ -1052,7 +1067,6 @@ void __queue_cap_release(struct ceph_mds_session *session,  		     (int)CEPH_CAPS_PER_RELEASE,  		     (int)msg->front.iov_len);  	} -	spin_unlock(&session->s_cap_lock);  }  /* @@ -1067,12 +1081,8 @@ void ceph_queue_caps_release(struct inode *inode)  	p = rb_first(&ci->i_caps);  	while (p) {  		struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); -		struct ceph_mds_session *session = cap->session; - -		__queue_cap_release(session, ceph_ino(inode), cap->cap_id, -				    cap->mseq, cap->issue_seq);  		p = rb_next(p); -		__ceph_remove_cap(cap); +		__ceph_remove_cap(cap, true);  	}  } @@ -1379,13 +1389,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)  				ci->i_snap_realm->cached_context);  		dout(" inode %p now dirty snapc %p auth cap %p\n",  		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); +		WARN_ON(!ci->i_auth_cap);  		BUG_ON(!list_empty(&ci->i_dirty_item));  		spin_lock(&mdsc->cap_dirty_lock); -		if (ci->i_auth_cap) -			list_add(&ci->i_dirty_item, &mdsc->cap_dirty); -		else -			list_add(&ci->i_dirty_item, -				 &mdsc->cap_dirty_migrating); +		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);  		spin_unlock(&mdsc->cap_dirty_lock);  		if (ci->i_flushing_caps == 0) {  			ihold(inode); @@ -1731,13 +1738,12 @@ ack:  /*   * Try to flush dirty caps back to the auth mds.   */ -static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, -			  unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, unsigned *flush_tid)  {  	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;  	struct ceph_inode_info *ci = ceph_inode(inode); -	int unlock_session = session ? 0 : 1;  	int flushing = 0; +	struct ceph_mds_session *session = NULL;  retry:  	spin_lock(&ci->i_ceph_lock); @@ -1751,13 +1757,14 @@ retry:  		int want = __ceph_caps_wanted(ci);  		int delayed; -		if (!session) { +		if (!session || session != cap->session) {  			spin_unlock(&ci->i_ceph_lock); +			if (session) +				mutex_unlock(&session->s_mutex);  			session = cap->session;  			mutex_lock(&session->s_mutex);  			goto retry;  		} -		BUG_ON(session != cap->session);  		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)  			goto out; @@ -1776,7 +1783,7 @@ retry:  out:  	spin_unlock(&ci->i_ceph_lock);  out_unlocked: -	if (session && unlock_session) +	if (session)  		mutex_unlock(&session->s_mutex);  	return flushing;  } @@ -1861,7 +1868,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)  		return ret;  	mutex_lock(&inode->i_mutex); -	dirty = try_flush_caps(inode, NULL, &flush_tid); +	dirty = try_flush_caps(inode, &flush_tid);  	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));  	/* @@ -1896,7 +1903,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)  	dout("write_inode %p wait=%d\n", inode, wait);  	if (wait) { -		dirty = try_flush_caps(inode, NULL, &flush_tid); +		dirty = try_flush_caps(inode, &flush_tid);  		if (dirty)  			err = wait_event_interruptible(ci->i_cap_wq,  				       caps_are_flushed(inode, flush_tid)); @@ -2346,11 +2353,11 @@ static void invalidate_aliases(struct inode *inode)  	d_prune_aliases(inode);  	/*  	 * For non-directory inode, d_find_alias() only returns -	 * connected dentry. After calling d_invalidate(), the -	 * dentry become disconnected. +	 * hashed dentry. After calling d_invalidate(), the +	 * dentry becomes unhashed.  	 *  	 * For directory inode, d_find_alias() can return -	 * disconnected dentry. But directory inode should have +	 * unhashed dentry. But directory inode should have  	 * one alias at most.  	 */  	while ((dn = d_find_alias(inode))) { @@ -2372,38 +2379,52 @@ static void invalidate_aliases(struct inode *inode)   * actually be a revocation if it specifies a smaller cap set.)   *   * caller holds s_mutex and i_ceph_lock, we drop both. - * - * return value: - *  0 - ok - *  1 - check_caps on auth cap only (writeback) - *  2 - check_caps (ack revoke)   */ -static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, +static void handle_cap_grant(struct ceph_mds_client *mdsc, +			     struct inode *inode, struct ceph_mds_caps *grant, +			     void *snaptrace, int snaptrace_len, +			     struct ceph_buffer *xattr_buf,  			     struct ceph_mds_session *session, -			     struct ceph_cap *cap, -			     struct ceph_buffer *xattr_buf) -		__releases(ci->i_ceph_lock) +			     struct ceph_cap *cap, int issued) +	__releases(ci->i_ceph_lock)  {  	struct ceph_inode_info *ci = ceph_inode(inode);  	int mds = session->s_mds;  	int seq = le32_to_cpu(grant->seq);  	int newcaps = le32_to_cpu(grant->caps); -	int issued, implemented, used, wanted, dirty; +	int used, wanted, dirty;  	u64 size = le64_to_cpu(grant->size);  	u64 max_size = le64_to_cpu(grant->max_size);  	struct timespec mtime, atime, ctime;  	int check_caps = 0; -	int wake = 0; -	int writeback = 0; -	int queue_invalidate = 0; -	int deleted_inode = 0; -	int queue_revalidate = 0; +	bool wake = 0; +	bool writeback = 0; +	bool queue_trunc = 0; +	bool queue_invalidate = 0; +	bool queue_revalidate = 0; +	bool deleted_inode = 0;  	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",  	     inode, cap, mds, seq, ceph_cap_string(newcaps));  	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,  		inode->i_size); + +	/* +	 * auth mds of the inode changed. we received the cap export message, +	 * but still haven't received the cap import message. handle_cap_export +	 * updated the new auth MDS' cap. +	 * +	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message +	 * that was sent before the cap import message. So don't remove caps. +	 */ +	if (ceph_seq_cmp(seq, cap->seq) <= 0) { +		WARN_ON(cap != ci->i_auth_cap); +		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); +		seq = cap->seq; +		newcaps |= cap->issued; +	} +  	/*  	 * If CACHE is being revoked, and we have no dirty buffers,  	 * try to invalidate (once).  (If there are dirty buffers, we @@ -2425,15 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  	}  	/* side effects now are allowed */ - -	issued = __ceph_caps_issued(ci, &implemented); -	issued |= implemented | __ceph_caps_dirty(ci); -  	cap->cap_gen = session->s_cap_gen; +	cap->seq = seq;  	__check_cap_issue(ci, cap, newcaps); -	if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { +	if ((newcaps & CEPH_CAP_AUTH_SHARED) && +	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {  		inode->i_mode = le32_to_cpu(grant->mode);  		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));  		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); @@ -2442,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  		     from_kgid(&init_user_ns, inode->i_gid));  	} -	if ((issued & CEPH_CAP_LINK_EXCL) == 0) { +	if ((newcaps & CEPH_CAP_AUTH_SHARED) && +	    (issued & CEPH_CAP_LINK_EXCL) == 0) {  		set_nlink(inode, le32_to_cpu(grant->nlink));  		if (inode->i_nlink == 0 &&  		    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) @@ -2460,6 +2480,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  				ceph_buffer_put(ci->i_xattrs.blob);  			ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);  			ci->i_xattrs.version = version; +			ceph_forget_all_cached_acls(inode);  		}  	} @@ -2468,26 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  	if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)  		queue_revalidate = 1; -	/* size/ctime/mtime/atime? */ -	ceph_fill_file_size(inode, issued, -			    le32_to_cpu(grant->truncate_seq), -			    le64_to_cpu(grant->truncate_size), size); -	ceph_decode_timespec(&mtime, &grant->mtime); -	ceph_decode_timespec(&atime, &grant->atime); -	ceph_decode_timespec(&ctime, &grant->ctime); -	ceph_fill_file_time(inode, issued, -			    le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, -			    &atime); - -	/* max size increase? */ -	if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { -		dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); -		ci->i_max_size = max_size; -		if (max_size >= ci->i_wanted_max_size) { -			ci->i_wanted_max_size = 0;  /* reset */ -			ci->i_requested_max_size = 0; +	if (newcaps & CEPH_CAP_ANY_RD) { +		/* ctime/mtime/atime? */ +		ceph_decode_timespec(&mtime, &grant->mtime); +		ceph_decode_timespec(&atime, &grant->atime); +		ceph_decode_timespec(&ctime, &grant->ctime); +		ceph_fill_file_time(inode, issued, +				    le32_to_cpu(grant->time_warp_seq), +				    &ctime, &mtime, &atime); +	} + +	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { +		/* file layout may have changed */ +		ci->i_layout = grant->layout; +		/* size/truncate_seq? */ +		queue_trunc = ceph_fill_file_size(inode, issued, +					le32_to_cpu(grant->truncate_seq), +					le64_to_cpu(grant->truncate_size), +					size); +		/* max size increase? */ +		if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { +			dout("max_size %lld -> %llu\n", +			     ci->i_max_size, max_size); +			ci->i_max_size = max_size; +			if (max_size >= ci->i_wanted_max_size) { +				ci->i_wanted_max_size = 0;  /* reset */ +				ci->i_requested_max_size = 0; +			} +			wake = 1;  		} -		wake = 1;  	}  	/* check cap bits */ @@ -2507,11 +2537,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  			check_caps = 1;  	} -	cap->seq = seq; - -	/* file layout may have changed */ -	ci->i_layout = grant->layout; -  	/* revocation, grant, or no-op? */  	if (cap->issued & ~newcaps) {  		int revoking = cap->issued & ~newcaps; @@ -2553,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  	spin_unlock(&ci->i_ceph_lock); +	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { +		down_write(&mdsc->snap_rwsem); +		ceph_update_snap_trace(mdsc, snaptrace, +				       snaptrace + snaptrace_len, false); +		downgrade_write(&mdsc->snap_rwsem); +		kick_flushing_inode_caps(mdsc, session, inode); +		up_read(&mdsc->snap_rwsem); +		if (newcaps & ~issued) +			wake = 1; +	} + +	if (queue_trunc) { +		ceph_queue_vmtruncate(inode); +		ceph_queue_revalidate(inode); +	} else if (queue_revalidate) +		ceph_queue_revalidate(inode); +  	if (writeback)  		/*  		 * queue inode for writeback: we can't actually call @@ -2564,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,  		ceph_queue_invalidate(inode);  	if (deleted_inode)  		invalidate_aliases(inode); -	if (queue_revalidate) -		ceph_queue_revalidate(inode);  	if (wake)  		wake_up_all(&ci->i_cap_wq); @@ -2737,123 +2777,200 @@ static void handle_cap_trunc(struct inode *inode,   * caller holds s_mutex   */  static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, -			      struct ceph_mds_session *session, -			      int *open_target_sessions) +			      struct ceph_mds_cap_peer *ph, +			      struct ceph_mds_session *session)  {  	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; +	struct ceph_mds_session *tsession = NULL; +	struct ceph_cap *cap, *tcap, *new_cap = NULL;  	struct ceph_inode_info *ci = ceph_inode(inode); -	int mds = session->s_mds; +	u64 t_cap_id;  	unsigned mseq = le32_to_cpu(ex->migrate_seq); -	struct ceph_cap *cap = NULL, *t; -	struct rb_node *p; -	int remember = 1; +	unsigned t_seq, t_mseq; +	int target, issued; +	int mds = session->s_mds; -	dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", -	     inode, ci, mds, mseq); +	if (ph) { +		t_cap_id = le64_to_cpu(ph->cap_id); +		t_seq = le32_to_cpu(ph->seq); +		t_mseq = le32_to_cpu(ph->mseq); +		target = le32_to_cpu(ph->mds); +	} else { +		t_cap_id = t_seq = t_mseq = 0; +		target = -1; +	} +	dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", +	     inode, ci, mds, mseq, target); +retry:  	spin_lock(&ci->i_ceph_lock); +	cap = __get_cap_for_mds(ci, mds); +	if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) +		goto out_unlock; -	/* make sure we haven't seen a higher mseq */ -	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { -		t = rb_entry(p, struct ceph_cap, ci_node); -		if (ceph_seq_cmp(t->mseq, mseq) > 0) { -			dout(" higher mseq on cap from mds%d\n", -			     t->session->s_mds); -			remember = 0; -		} -		if (t->session->s_mds == mds) -			cap = t; +	if (target < 0) { +		__ceph_remove_cap(cap, false); +		goto out_unlock;  	} -	if (cap) { -		if (remember) { -			/* make note */ -			ci->i_cap_exporting_mds = mds; -			ci->i_cap_exporting_mseq = mseq; -			ci->i_cap_exporting_issued = cap->issued; - -			/* -			 * make sure we have open sessions with all possible -			 * export targets, so that we get the matching IMPORT -			 */ -			*open_target_sessions = 1; +	/* +	 * now we know we haven't received the cap import message yet +	 * because the exported cap still exist. +	 */ -			/* -			 * we can't flush dirty caps that we've seen the -			 * EXPORT but no IMPORT for -			 */ -			spin_lock(&mdsc->cap_dirty_lock); -			if (!list_empty(&ci->i_dirty_item)) { -				dout(" moving %p to cap_dirty_migrating\n", -				     inode); -				list_move(&ci->i_dirty_item, -					  &mdsc->cap_dirty_migrating); +	issued = cap->issued; +	WARN_ON(issued != cap->implemented); + +	tcap = __get_cap_for_mds(ci, target); +	if (tcap) { +		/* already have caps from the target */ +		if (tcap->cap_id != t_cap_id || +		    ceph_seq_cmp(tcap->seq, t_seq) < 0) { +			dout(" updating import cap %p mds%d\n", tcap, target); +			tcap->cap_id = t_cap_id; +			tcap->seq = t_seq - 1; +			tcap->issue_seq = t_seq - 1; +			tcap->mseq = t_mseq; +			tcap->issued |= issued; +			tcap->implemented |= issued; +			if (cap == ci->i_auth_cap) +				ci->i_auth_cap = tcap; +			if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { +				spin_lock(&mdsc->cap_dirty_lock); +				list_move_tail(&ci->i_flushing_item, +					       &tcap->session->s_cap_flushing); +				spin_unlock(&mdsc->cap_dirty_lock);  			} -			spin_unlock(&mdsc->cap_dirty_lock);  		} -		__ceph_remove_cap(cap); +		__ceph_remove_cap(cap, false); +		goto out_unlock; +	} else if (tsession) { +		/* add placeholder for the export tagert */ +		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; +		ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, +			     t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); + +		__ceph_remove_cap(cap, false); +		goto out_unlock;  	} -	/* else, we already released it */  	spin_unlock(&ci->i_ceph_lock); +	mutex_unlock(&session->s_mutex); + +	/* open target session */ +	tsession = ceph_mdsc_open_export_target_session(mdsc, target); +	if (!IS_ERR(tsession)) { +		if (mds > target) { +			mutex_lock(&session->s_mutex); +			mutex_lock_nested(&tsession->s_mutex, +					  SINGLE_DEPTH_NESTING); +		} else { +			mutex_lock(&tsession->s_mutex); +			mutex_lock_nested(&session->s_mutex, +					  SINGLE_DEPTH_NESTING); +		} +		ceph_add_cap_releases(mdsc, tsession); +		new_cap = ceph_get_cap(mdsc, NULL); +	} else { +		WARN_ON(1); +		tsession = NULL; +		target = -1; +	} +	goto retry; + +out_unlock: +	spin_unlock(&ci->i_ceph_lock); +	mutex_unlock(&session->s_mutex); +	if (tsession) { +		mutex_unlock(&tsession->s_mutex); +		ceph_put_mds_session(tsession); +	} +	if (new_cap) +		ceph_put_cap(mdsc, new_cap);  }  /* - * Handle cap IMPORT.  If there are temp bits from an older EXPORT, - * clean them up. + * Handle cap IMPORT.   * - * caller holds s_mutex. + * caller holds s_mutex. acquires i_ceph_lock   */  static void handle_cap_import(struct ceph_mds_client *mdsc,  			      struct inode *inode, struct ceph_mds_caps *im, +			      struct ceph_mds_cap_peer *ph,  			      struct ceph_mds_session *session, -			      void *snaptrace, int snaptrace_len) +			      struct ceph_cap **target_cap, int *old_issued) +	__acquires(ci->i_ceph_lock)  {  	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_cap *cap, *ocap, *new_cap = NULL;  	int mds = session->s_mds; -	unsigned issued = le32_to_cpu(im->caps); +	int issued; +	unsigned caps = le32_to_cpu(im->caps);  	unsigned wanted = le32_to_cpu(im->wanted);  	unsigned seq = le32_to_cpu(im->seq);  	unsigned mseq = le32_to_cpu(im->migrate_seq);  	u64 realmino = le64_to_cpu(im->realm);  	u64 cap_id = le64_to_cpu(im->cap_id); +	u64 p_cap_id; +	int peer; -	if (ci->i_cap_exporting_mds >= 0 && -	    ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { -		dout("handle_cap_import inode %p ci %p mds%d mseq %d" -		     " - cleared exporting from mds%d\n", -		     inode, ci, mds, mseq, -		     ci->i_cap_exporting_mds); -		ci->i_cap_exporting_issued = 0; -		ci->i_cap_exporting_mseq = 0; -		ci->i_cap_exporting_mds = -1; +	if (ph) { +		p_cap_id = le64_to_cpu(ph->cap_id); +		peer = le32_to_cpu(ph->mds); +	} else { +		p_cap_id = 0; +		peer = -1; +	} -		spin_lock(&mdsc->cap_dirty_lock); -		if (!list_empty(&ci->i_dirty_item)) { -			dout(" moving %p back to cap_dirty\n", inode); -			list_move(&ci->i_dirty_item, &mdsc->cap_dirty); +	dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", +	     inode, ci, mds, mseq, peer); + +retry: +	spin_lock(&ci->i_ceph_lock); +	cap = __get_cap_for_mds(ci, mds); +	if (!cap) { +		if (!new_cap) { +			spin_unlock(&ci->i_ceph_lock); +			new_cap = ceph_get_cap(mdsc, NULL); +			goto retry;  		} -		spin_unlock(&mdsc->cap_dirty_lock); +		cap = new_cap;  	} else { -		dout("handle_cap_import inode %p ci %p mds%d mseq %d\n", -		     inode, ci, mds, mseq); +		if (new_cap) { +			ceph_put_cap(mdsc, new_cap); +			new_cap = NULL; +		}  	} -	down_write(&mdsc->snap_rwsem); -	ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, -			       false); -	downgrade_write(&mdsc->snap_rwsem); -	ceph_add_cap(inode, session, cap_id, -1, -		     issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH, -		     NULL /* no caps context */); -	kick_flushing_inode_caps(mdsc, session, inode); -	up_read(&mdsc->snap_rwsem); +	__ceph_caps_issued(ci, &issued); +	issued |= __ceph_caps_dirty(ci); + +	ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, +		     realmino, CEPH_CAP_FLAG_AUTH, &new_cap); + +	ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; +	if (ocap && ocap->cap_id == p_cap_id) { +		dout(" remove export cap %p mds%d flags %d\n", +		     ocap, peer, ph->flags); +		if ((ph->flags & CEPH_CAP_FLAG_AUTH) && +		    (ocap->seq != le32_to_cpu(ph->seq) || +		     ocap->mseq != le32_to_cpu(ph->mseq))) { +			pr_err("handle_cap_import: mismatched seq/mseq: " +			       "ino (%llx.%llx) mds%d seq %d mseq %d " +			       "importer mds%d has peer seq %d mseq %d\n", +			       ceph_vinop(inode), peer, ocap->seq, +			       ocap->mseq, mds, le32_to_cpu(ph->seq), +			       le32_to_cpu(ph->mseq)); +		} +		__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); +	}  	/* make sure we re-request max_size, if necessary */ -	spin_lock(&ci->i_ceph_lock); -	ci->i_wanted_max_size = 0;  /* reset */ +	ci->i_wanted_max_size = 0;  	ci->i_requested_max_size = 0; -	spin_unlock(&ci->i_ceph_lock); + +	*old_issued = issued; +	*target_cap = cap;  }  /* @@ -2871,8 +2988,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,  	struct ceph_inode_info *ci;  	struct ceph_cap *cap;  	struct ceph_mds_caps *h; +	struct ceph_mds_cap_peer *peer = NULL;  	int mds = session->s_mds; -	int op; +	int op, issued;  	u32 seq, mseq;  	struct ceph_vino vino;  	u64 cap_id; @@ -2881,12 +2999,13 @@ void ceph_handle_caps(struct ceph_mds_session *session,  	void *snaptrace;  	size_t snaptrace_len;  	void *flock; +	void *end;  	u32 flock_len; -	int open_target_sessions = 0;  	dout("handle_caps from mds%d\n", mds);  	/* decode */ +	end = msg->front.iov_base + msg->front.iov_len;  	tid = le64_to_cpu(msg->hdr.tid);  	if (msg->front.iov_len < sizeof(*h))  		goto bad; @@ -2904,17 +3023,28 @@ void ceph_handle_caps(struct ceph_mds_session *session,  	snaptrace_len = le32_to_cpu(h->snap_trace_len);  	if (le16_to_cpu(msg->hdr.version) >= 2) { -		void *p, *end; - -		p = snaptrace + snaptrace_len; -		end = msg->front.iov_base + msg->front.iov_len; +		void *p = snaptrace + snaptrace_len;  		ceph_decode_32_safe(&p, end, flock_len, bad); +		if (p + flock_len > end) +			goto bad;  		flock = p;  	} else {  		flock = NULL;  		flock_len = 0;  	} +	if (le16_to_cpu(msg->hdr.version) >= 3) { +		if (op == CEPH_CAP_OP_IMPORT) { +			void *p = flock + flock_len; +			if (p + sizeof(*peer) > end) +				goto bad; +			peer = p; +		} else if (op == CEPH_CAP_OP_EXPORT) { +			/* recorded in unused fields */ +			peer = (void *)&h->size; +		} +	} +  	mutex_lock(&session->s_mutex);  	session->s_seq++;  	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2931,9 +3061,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,  	if (!inode) {  		dout(" i don't have ino %llx\n", vino.ino); -		if (op == CEPH_CAP_OP_IMPORT) +		if (op == CEPH_CAP_OP_IMPORT) { +			spin_lock(&session->s_cap_lock);  			__queue_cap_release(session, vino.ino, cap_id,  					    mseq, seq); +			spin_unlock(&session->s_cap_lock); +		}  		goto flush_cap_releases;  	} @@ -2944,12 +3077,15 @@ void ceph_handle_caps(struct ceph_mds_session *session,  		goto done;  	case CEPH_CAP_OP_EXPORT: -		handle_cap_export(inode, h, session, &open_target_sessions); -		goto done; +		handle_cap_export(inode, h, peer, session); +		goto done_unlocked;  	case CEPH_CAP_OP_IMPORT: -		handle_cap_import(mdsc, inode, h, session, -				  snaptrace, snaptrace_len); +		handle_cap_import(mdsc, inode, h, peer, session, +				  &cap, &issued); +		handle_cap_grant(mdsc, inode, h,  snaptrace, snaptrace_len, +				 msg->middle, session, cap, issued); +		goto done_unlocked;  	}  	/* the rest require a cap */ @@ -2966,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,  	switch (op) {  	case CEPH_CAP_OP_REVOKE:  	case CEPH_CAP_OP_GRANT: -	case CEPH_CAP_OP_IMPORT: -		handle_cap_grant(inode, h, session, cap, msg->middle); +		__ceph_caps_issued(ci, &issued); +		issued |= __ceph_caps_dirty(ci); +		handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, +				 session, cap, issued);  		goto done_unlocked;  	case CEPH_CAP_OP_FLUSH_ACK: @@ -3000,8 +3138,6 @@ done:  done_unlocked:  	if (inode)  		iput(inode); -	if (open_target_sessions) -		ceph_mdsc_open_export_target_sessions(mdsc, session);  	return;  bad: @@ -3143,7 +3279,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,  			rel->seq = cpu_to_le32(cap->seq);  			rel->issue_seq = cpu_to_le32(cap->issue_seq),  			rel->mseq = cpu_to_le32(cap->mseq); -			rel->caps = cpu_to_le32(cap->issued); +			rel->caps = cpu_to_le32(cap->implemented);  			rel->wanted = cpu_to_le32(cap->mds_wanted);  			rel->dname_len = 0;  			rel->dname_seq = 0;  | 
