diff options
Diffstat (limited to 'fs/ceph/caps.c')
| -rw-r--r-- | fs/ceph/caps.c | 777 |
1 files changed, 510 insertions, 267 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 620daad201d..1fde164b74b 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -10,6 +10,7 @@ #include "super.h" #include "mds_client.h" +#include "cache.h" #include <linux/ceph/decode.h> #include <linux/ceph/messenger.h> @@ -147,7 +148,7 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) spin_unlock(&mdsc->caps_list_lock); } -int ceph_reserve_caps(struct ceph_mds_client *mdsc, +void ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need) { int i; @@ -155,7 +156,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, int have; int alloc = 0; LIST_HEAD(newcaps); - int ret = 0; dout("reserve caps ctx=%p need=%d\n", ctx, need); @@ -174,14 +174,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, for (i = have; i < need; i++) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); - if (!cap) { - ret = -ENOMEM; - goto out_alloc_count; - } + if (!cap) + break; list_add(&cap->caps_item, &newcaps); alloc++; } - BUG_ON(have + alloc != need); + /* we didn't manage to reserve as much as we needed */ + if (have + alloc != need) + pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", + ctx, need, have + alloc); spin_lock(&mdsc->caps_list_lock); mdsc->caps_total_count += alloc; @@ -197,13 +198,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc, dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", ctx, mdsc->caps_total_count, mdsc->caps_use_count, mdsc->caps_reserve_count, mdsc->caps_avail_count); - return 0; - -out_alloc_count: - /* we didn't manage to reserve as much as we needed */ - pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n", - ctx, need, have); - return ret; } int ceph_unreserve_caps(struct ceph_mds_client *mdsc, @@ -227,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc, return 0; } -static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, - struct ceph_cap_reservation *ctx) +struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, + struct ceph_cap_reservation *ctx) { struct ceph_cap *cap = NULL; @@ -236,8 +230,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, if (!ctx) { cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) { + spin_lock(&mdsc->caps_list_lock); mdsc->caps_use_count++; mdsc->caps_total_count++; + spin_unlock(&mdsc->caps_list_lock); } return cap; } @@ -484,19 +480,22 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * i_rdcache_gen. */ if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) + (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { ci->i_rdcache_gen++; + } /* - * if we are newly issued FILE_SHARED, clear D_COMPLETE; we + * if we are newly issued FILE_SHARED, mark dir not complete; we * don't know what happened to this directory while we didn't * have the cap. */ if ((issued & CEPH_CAP_FILE_SHARED) && (had & CEPH_CAP_FILE_SHARED) == 0) { ci->i_shared_gen++; - if (S_ISDIR(ci->vfs_inode.i_mode)) - ceph_dir_clear_complete(&ci->vfs_inode); + if (S_ISDIR(ci->vfs_inode.i_mode)) { + dout(" marking %p NOT complete\n", &ci->vfs_inode); + __ceph_dir_clear_complete(ci); + } } } @@ -509,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, * it is < 0. (This is so we can atomically add the cap and add an * open file reference to it.) */ -int ceph_add_cap(struct inode *inode, - struct ceph_mds_session *session, u64 cap_id, - int fmode, unsigned issued, unsigned wanted, - unsigned seq, unsigned mseq, u64 realmino, int flags, - struct ceph_cap_reservation *caps_reservation) +void ceph_add_cap(struct inode *inode, + struct ceph_mds_session *session, u64 cap_id, + int fmode, unsigned issued, unsigned wanted, + unsigned seq, unsigned mseq, u64 realmino, int flags, + struct ceph_cap **new_cap) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap *new_cap = NULL; struct ceph_cap *cap; int mds = session->s_mds; int actual_wanted; @@ -532,44 +530,45 @@ int ceph_add_cap(struct inode *inode, if (fmode >= 0) wanted |= ceph_caps_for_mode(fmode); -retry: - spin_lock(&ci->i_ceph_lock); cap = __get_cap_for_mds(ci, mds); if (!cap) { - if (new_cap) { - cap = new_cap; - new_cap = NULL; - } else { - spin_unlock(&ci->i_ceph_lock); - new_cap = get_cap(mdsc, caps_reservation); - if (new_cap == NULL) - return -ENOMEM; - goto retry; - } + cap = *new_cap; + *new_cap = NULL; cap->issued = 0; cap->implemented = 0; cap->mds = mds; cap->mds_wanted = 0; + cap->mseq = 0; cap->ci = ci; __insert_cap_node(ci, cap); - /* clear out old exporting info? (i.e. on cap import) */ - if (ci->i_cap_exporting_mds == mds) { - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; - } - /* add to session cap list */ cap->session = session; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_caps); session->s_nr_caps++; spin_unlock(&session->s_cap_lock); - } else if (new_cap) - ceph_put_cap(mdsc, new_cap); + } else { + /* + * auth mds of the inode changed. we received the cap export + * message, but still haven't received the cap import message. + * handle_cap_export() updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing + * a message that was send before the cap import message. So + * don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != cap_id); + seq = cap->seq; + mseq = cap->mseq; + issued |= cap->issued; + flags |= CEPH_CAP_FLAG_AUTH; + } + } if (!ci->i_snap_realm) { /* @@ -607,10 +606,15 @@ retry: __cap_delay_requeue(mdsc, ci); } - if (flags & CEPH_CAP_FLAG_AUTH) - ci->i_auth_cap = cap; - else if (ci->i_auth_cap == cap) - ci->i_auth_cap = NULL; + if (flags & CEPH_CAP_FLAG_AUTH) { + if (ci->i_auth_cap == NULL || + ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { + ci->i_auth_cap = cap; + cap->mds_wanted = wanted; + } + } else { + WARN_ON(ci->i_auth_cap == cap); + } dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", inode, ceph_vinop(inode), cap, ceph_cap_string(issued), @@ -618,7 +622,10 @@ retry: cap->cap_id = cap_id; cap->issued = issued; cap->implemented |= issued; - cap->mds_wanted |= wanted; + if (ceph_seq_cmp(mseq, cap->mseq) > 0) + cap->mds_wanted = wanted; + else + cap->mds_wanted |= wanted; cap->seq = seq; cap->issue_seq = seq; cap->mseq = mseq; @@ -626,9 +633,6 @@ retry: if (fmode >= 0) __ceph_get_fmode(ci, fmode); - spin_unlock(&ci->i_ceph_lock); - wake_up_all(&ci->i_cap_wq); - return 0; } /* @@ -663,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap) */ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) { - int have = ci->i_snap_caps | ci->i_cap_exporting_issued; + int have = ci->i_snap_caps; struct ceph_cap *cap; struct rb_node *p; @@ -679,6 +683,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) if (implemented) *implemented |= cap->implemented; } + /* + * exclude caps issued by non-auth MDS, but are been revoking + * by the auth MDS. The non-auth MDS should be revoking/exporting + * these caps, but the message is delayed. + */ + if (ci->i_auth_cap) { + cap = ci->i_auth_cap; + have &= ~cap->implemented | cap->issued; + } return have; } @@ -786,22 +799,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) /* * Return true if mask caps are currently being revoked by an MDS. */ -int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) +int __ceph_caps_revoking_other(struct ceph_inode_info *ci, + struct ceph_cap *ocap, int mask) { - struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; struct rb_node *p; - int ret = 0; - spin_lock(&ci->i_ceph_lock); for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (__cap_is_valid(cap) && - (cap->implemented & ~cap->issued & mask)) { - ret = 1; - break; - } + if (cap != ocap && + (cap->implemented & ~cap->issued & mask)) + return 1; } + return 0; +} + +int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) +{ + struct inode *inode = &ci->vfs_inode; + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_caps_revoking_other(ci, NULL, mask); spin_unlock(&ci->i_ceph_lock); dout("ceph_caps_revoking %p %s = %d\n", inode, ceph_cap_string(mask), ret); @@ -850,7 +869,10 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue; - mds_wanted |= cap->mds_wanted; + if (cap == ci->i_auth_cap) + mds_wanted |= cap->mds_wanted; + else + mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); } return mds_wanted; } @@ -860,7 +882,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) */ static int __ceph_is_any_caps(struct ceph_inode_info *ci) { - return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; + return !RB_EMPTY_ROOT(&ci->i_caps); +} + +int ceph_is_any_caps(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_is_any_caps(ci); + spin_unlock(&ci->i_ceph_lock); + + return ret; } /* @@ -869,7 +903,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci) * caller should hold i_ceph_lock. * caller will not hold session s_mutex if called from destroy_inode. */ -void __ceph_remove_cap(struct ceph_cap *cap) +void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) { struct ceph_mds_session *session = cap->session; struct ceph_inode_info *ci = cap->ci; @@ -881,6 +915,16 @@ void __ceph_remove_cap(struct ceph_cap *cap) /* remove from session list */ spin_lock(&session->s_cap_lock); + /* + * s_cap_reconnect is protected by s_cap_lock. no one changes + * s_cap_gen while session is in the reconnect state. + */ + if (queue_release && + (!session->s_cap_reconnect || + cap->cap_gen == session->s_cap_gen)) + __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, + cap->mseq, cap->issue_seq); + if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */ dout("__ceph_remove_cap delaying %p removal from session %p\n", @@ -928,7 +972,7 @@ static int send_cap_msg(struct ceph_mds_session *session, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, u64 time_warp_seq, - uid_t uid, gid_t gid, umode_t mode, + kuid_t uid, kgid_t gid, umode_t mode, u64 xattr_version, struct ceph_buffer *xattrs_buf, u64 follows) @@ -972,8 +1016,8 @@ static int send_cap_msg(struct ceph_mds_session *session, ceph_encode_timespec(&fc->atime, atime); fc->time_warp_seq = cpu_to_le32(time_warp_seq); - fc->uid = cpu_to_le32(uid); - fc->gid = cpu_to_le32(gid); + fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid)); + fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); fc->mode = cpu_to_le32(mode); fc->xattr_version = cpu_to_le64(xattr_version); @@ -987,15 +1031,14 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } -static void __queue_cap_release(struct ceph_mds_session *session, - u64 ino, u64 cap_id, u32 migrate_seq, - u32 issue_seq) +void __queue_cap_release(struct ceph_mds_session *session, + u64 ino, u64 cap_id, u32 migrate_seq, + u32 issue_seq) { struct ceph_msg *msg; struct ceph_mds_cap_release *head; struct ceph_mds_cap_item *item; - spin_lock(&session->s_cap_lock); BUG_ON(!session->s_num_cap_releases); msg = list_first_entry(&session->s_cap_releases, struct ceph_msg, list_head); @@ -1005,7 +1048,7 @@ static void __queue_cap_release(struct ceph_mds_session *session, BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); head = msg->front.iov_base; - head->num = cpu_to_le32(le32_to_cpu(head->num) + 1); + le32_add_cpu(&head->num, 1); item = msg->front.iov_base + msg->front.iov_len; item->ino = cpu_to_le64(ino); item->cap_id = cpu_to_le64(cap_id); @@ -1024,7 +1067,6 @@ static void __queue_cap_release(struct ceph_mds_session *session, (int)CEPH_CAPS_PER_RELEASE, (int)msg->front.iov_len); } - spin_unlock(&session->s_cap_lock); } /* @@ -1039,12 +1081,8 @@ void ceph_queue_caps_release(struct inode *inode) p = rb_first(&ci->i_caps); while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); - struct ceph_mds_session *session = cap->session; - - __queue_cap_release(session, ceph_ino(inode), cap->cap_id, - cap->mseq, cap->issue_seq); p = rb_next(p); - __ceph_remove_cap(cap); + __ceph_remove_cap(cap, true); } } @@ -1079,8 +1117,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, struct timespec mtime, atime; int wake = 0; umode_t mode; - uid_t uid; - gid_t gid; + kuid_t uid; + kgid_t gid; struct ceph_mds_session *session; u64 xattr_version = 0; struct ceph_buffer *xattr_blob = NULL; @@ -1349,8 +1387,9 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) if (!ci->i_head_snapc) ci->i_head_snapc = ceph_get_snap_context( ci->i_snap_realm->cached_context); - dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode, - ci->i_head_snapc); + dout(" inode %p now dirty snapc %p auth cap %p\n", + &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); + WARN_ON(!ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); list_add(&ci->i_dirty_item, &mdsc->cap_dirty); @@ -1454,7 +1493,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; - int file_wanted, used; + int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list @@ -1557,9 +1596,14 @@ retry_locked: /* NOTE: no side-effects allowed, until we take s_mutex */ + cap_used = used; + if (ci->i_auth_cap && cap != ci->i_auth_cap) + cap_used &= ~ci->i_auth_cap->issued; + revoking = cap->implemented & ~cap->issued; - dout(" mds%d cap %p issued %s implemented %s revoking %s\n", + dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", cap->mds, cap, ceph_cap_string(cap->issued), + ceph_cap_string(cap_used), ceph_cap_string(cap->implemented), ceph_cap_string(revoking)); @@ -1587,7 +1631,7 @@ retry_locked: } /* completed revocation? going down and there are no caps? */ - if (revoking && (revoking & used) == 0) { + if (revoking && (revoking & cap_used) == 0) { dout("completed revocation of %s\n", ceph_cap_string(cap->implemented & ~cap->issued)); goto ack; @@ -1664,8 +1708,8 @@ ack: sent++; /* __send_cap drops i_ceph_lock */ - delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want, - retain, flushing, NULL); + delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, + want, retain, flushing, NULL); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1694,13 +1738,12 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, - unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, unsigned *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int unlock_session = session ? 0 : 1; int flushing = 0; + struct ceph_mds_session *session = NULL; retry: spin_lock(&ci->i_ceph_lock); @@ -1714,13 +1757,14 @@ retry: int want = __ceph_caps_wanted(ci); int delayed; - if (!session) { + if (!session || session != cap->session) { spin_unlock(&ci->i_ceph_lock); + if (session) + mutex_unlock(&session->s_mutex); session = cap->session; mutex_lock(&session->s_mutex); goto retry; } - BUG_ON(session != cap->session); if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; @@ -1739,7 +1783,7 @@ retry: out: spin_unlock(&ci->i_ceph_lock); out_unlocked: - if (session && unlock_session) + if (session) mutex_unlock(&session->s_mutex); return flushing; } @@ -1824,7 +1868,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) return ret; mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); /* @@ -1859,7 +1903,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -1955,8 +1999,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, cap = ci->i_auth_cap; dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + __ceph_flush_snaps(ci, &session, 1); + if (ci->i_flushing_caps) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &cap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); + delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, __ceph_caps_used(ci), __ceph_caps_wanted(ci), @@ -2027,11 +2078,20 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, goto out; } - if (need & CEPH_CAP_FILE_WR) { + /* finish pending truncate */ + while (ci->i_truncate_pending) { + spin_unlock(&ci->i_ceph_lock); + __ceph_do_pending_vmtruncate(inode); + spin_lock(&ci->i_ceph_lock); + } + + have = __ceph_caps_issued(ci, &implemented); + + if (have & need & CEPH_CAP_FILE_WR) { if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); - if (endoff > ci->i_wanted_max_size) { + if (endoff > ci->i_requested_max_size) { *check_max = 1; ret = 1; } @@ -2046,13 +2106,6 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, goto out; } } - have = __ceph_caps_issued(ci, &implemented); - - /* - * disallow writes while a truncate is pending - */ - if (ci->i_truncate_pending) - have &= ~CEPH_CAP_FILE_WR; if ((have & need) == need) { /* @@ -2094,14 +2147,17 @@ static void check_max_size(struct inode *inode, loff_t endoff) /* do we need to explicitly request a larger max_size? */ spin_lock(&ci->i_ceph_lock); - if ((endoff >= ci->i_max_size || - endoff > (inode->i_size << 1)) && - endoff > ci->i_wanted_max_size) { + if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { dout("write %p at large endoff %llu, req max_size\n", inode, endoff); ci->i_wanted_max_size = endoff; - check = 1; } + /* duplicate ceph_check_caps()'s logic */ + if (ci->i_auth_cap && + (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && + ci->i_wanted_max_size > ci->i_max_size && + ci->i_wanted_max_size > ci->i_requested_max_size) + check = 1; spin_unlock(&ci->i_ceph_lock); if (check) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); @@ -2287,41 +2343,88 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } /* + * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. + */ +static void invalidate_aliases(struct inode *inode) +{ + struct dentry *dn, *prev = NULL; + + dout("invalidate_aliases inode %p\n", inode); + d_prune_aliases(inode); + /* + * For non-directory inode, d_find_alias() only returns + * hashed dentry. After calling d_invalidate(), the + * dentry becomes unhashed. + * + * For directory inode, d_find_alias() can return + * unhashed dentry. But directory inode should have + * one alias at most. + */ + while ((dn = d_find_alias(inode))) { + if (dn == prev) { + dput(dn); + break; + } + d_invalidate(dn); + if (prev) + dput(prev); + prev = dn; + } + if (prev) + dput(prev); +} + +/* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * * caller holds s_mutex and i_ceph_lock, we drop both. - * - * return value: - * 0 - ok - * 1 - check_caps on auth cap only (writeback) - * 2 - check_caps (ack revoke) */ -static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, +static void handle_cap_grant(struct ceph_mds_client *mdsc, + struct inode *inode, struct ceph_mds_caps *grant, + void *snaptrace, int snaptrace_len, + struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, - struct ceph_cap *cap, - struct ceph_buffer *xattr_buf) - __releases(ci->i_ceph_lock) + struct ceph_cap *cap, int issued) + __releases(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); int mds = session->s_mds; int seq = le32_to_cpu(grant->seq); int newcaps = le32_to_cpu(grant->caps); - int issued, implemented, used, wanted, dirty; + int used, wanted, dirty; u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); struct timespec mtime, atime, ctime; int check_caps = 0; - int wake = 0; - int writeback = 0; - int revoked_rdcache = 0; - int queue_invalidate = 0; + bool wake = 0; + bool writeback = 0; + bool queue_trunc = 0; + bool queue_invalidate = 0; + bool queue_revalidate = 0; + bool deleted_inode = 0; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); + + /* + * auth mds of the inode changed. we received the cap export message, + * but still haven't received the cap import message. handle_cap_export + * updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message + * that was sent before the cap import message. So don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); + seq = cap->seq; + newcaps |= cap->issued; + } + /* * If CACHE is being revoked, and we have no dirty buffers, * try to invalidate (once). (If there are dirty buffers, we @@ -2330,9 +2433,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { - if (try_nonblocking_invalidate(inode) == 0) { - revoked_rdcache = 1; - } else { + if (try_nonblocking_invalidate(inode)) { /* there were locked pages.. invalidate later in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { @@ -2340,27 +2441,33 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ci->i_rdcache_revoking = ci->i_rdcache_gen; } } + + ceph_fscache_invalidate(inode); } /* side effects now are allowed */ - - issued = __ceph_caps_issued(ci, &implemented); - issued |= implemented | __ceph_caps_dirty(ci); - cap->cap_gen = session->s_cap_gen; + cap->seq = seq; __check_cap_issue(ci, cap, newcaps); - if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { + if ((newcaps & CEPH_CAP_AUTH_SHARED) && + (issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = le32_to_cpu(grant->mode); - inode->i_uid = le32_to_cpu(grant->uid); - inode->i_gid = le32_to_cpu(grant->gid); + inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); + inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, - inode->i_uid, inode->i_gid); + from_kuid(&init_user_ns, inode->i_uid), + from_kgid(&init_user_ns, inode->i_gid)); } - if ((issued & CEPH_CAP_LINK_EXCL) == 0) + if ((newcaps & CEPH_CAP_AUTH_SHARED) && + (issued & CEPH_CAP_LINK_EXCL) == 0) { set_nlink(inode, le32_to_cpu(grant->nlink)); + if (inode->i_nlink == 0 && + (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) + deleted_inode = 1; + } if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len); @@ -2373,29 +2480,44 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_buffer_put(ci->i_xattrs.blob); ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; + ceph_forget_all_cached_acls(inode); } } - /* size/ctime/mtime/atime? */ - ceph_fill_file_size(inode, issued, - le32_to_cpu(grant->truncate_seq), - le64_to_cpu(grant->truncate_size), size); - ceph_decode_timespec(&mtime, &grant->mtime); - ceph_decode_timespec(&atime, &grant->atime); - ceph_decode_timespec(&ctime, &grant->ctime); - ceph_fill_file_time(inode, issued, - le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, - &atime); - - /* max size increase? */ - if (max_size != ci->i_max_size) { - dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); - ci->i_max_size = max_size; - if (max_size >= ci->i_wanted_max_size) { - ci->i_wanted_max_size = 0; /* reset */ - ci->i_requested_max_size = 0; + /* Do we need to revalidate our fscache cookie. Don't bother on the + * first cache cap as we already validate at cookie creation time. */ + if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) + queue_revalidate = 1; + + if (newcaps & CEPH_CAP_ANY_RD) { + /* ctime/mtime/atime? */ + ceph_decode_timespec(&mtime, &grant->mtime); + ceph_decode_timespec(&atime, &grant->atime); + ceph_decode_timespec(&ctime, &grant->ctime); + ceph_fill_file_time(inode, issued, + le32_to_cpu(grant->time_warp_seq), + &ctime, &mtime, &atime); + } + + if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { + /* file layout may have changed */ + ci->i_layout = grant->layout; + /* size/truncate_seq? */ + queue_trunc = ceph_fill_file_size(inode, issued, + le32_to_cpu(grant->truncate_seq), + le64_to_cpu(grant->truncate_size), + size); + /* max size increase? */ + if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { + dout("max_size %lld -> %llu\n", + ci->i_max_size, max_size); + ci->i_max_size = max_size; + if (max_size >= ci->i_wanted_max_size) { + ci->i_wanted_max_size = 0; /* reset */ + ci->i_requested_max_size = 0; + } + wake = 1; } - wake = 1; } /* check cap bits */ @@ -2410,14 +2532,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, dout("mds wanted %s -> %s\n", ceph_cap_string(le32_to_cpu(grant->wanted)), ceph_cap_string(wanted)); - grant->wanted = cpu_to_le32(wanted); + /* imported cap may not have correct mds_wanted */ + if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) + check_caps = 1; } - cap->seq = seq; - - /* file layout may have changed */ - ci->i_layout = grant->layout; - /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { int revoking = cap->issued & ~newcaps; @@ -2444,6 +2563,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, } else { dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); + /* non-auth MDS is revoking the newly grant caps ? */ + if (cap == ci->i_auth_cap && + __ceph_caps_revoking_other(ci, cap, newcaps)) + check_caps = 2; + cap->issued = newcaps; cap->implemented |= newcaps; /* add bits only, to * avoid stepping on a @@ -2453,6 +2577,24 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, BUG_ON(cap->issued & ~cap->implemented); spin_unlock(&ci->i_ceph_lock); + + if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { + down_write(&mdsc->snap_rwsem); + ceph_update_snap_trace(mdsc, snaptrace, + snaptrace + snaptrace_len, false); + downgrade_write(&mdsc->snap_rwsem); + kick_flushing_inode_caps(mdsc, session, inode); + up_read(&mdsc->snap_rwsem); + if (newcaps & ~issued) + wake = 1; + } + + if (queue_trunc) { + ceph_queue_vmtruncate(inode); + ceph_queue_revalidate(inode); + } else if (queue_revalidate) + ceph_queue_revalidate(inode); + if (writeback) /* * queue inode for writeback: we can't actually call @@ -2462,6 +2604,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_queue_writeback(inode); if (queue_invalidate) ceph_queue_invalidate(inode); + if (deleted_inode) + invalidate_aliases(inode); if (wake) wake_up_all(&ci->i_cap_wq); @@ -2618,8 +2762,10 @@ static void handle_cap_trunc(struct inode *inode, truncate_seq, truncate_size, size); spin_unlock(&ci->i_ceph_lock); - if (queue_trunc) + if (queue_trunc) { ceph_queue_vmtruncate(inode); + ceph_fscache_invalidate(inode); + } } /* @@ -2631,122 +2777,200 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session, - int *open_target_sessions) + struct ceph_mds_cap_peer *ph, + struct ceph_mds_session *session) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *tsession = NULL; + struct ceph_cap *cap, *tcap, *new_cap = NULL; struct ceph_inode_info *ci = ceph_inode(inode); - int mds = session->s_mds; + u64 t_cap_id; unsigned mseq = le32_to_cpu(ex->migrate_seq); - struct ceph_cap *cap = NULL, *t; - struct rb_node *p; - int remember = 1; + unsigned t_seq, t_mseq; + int target, issued; + int mds = session->s_mds; - dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + if (ph) { + t_cap_id = le64_to_cpu(ph->cap_id); + t_seq = le32_to_cpu(ph->seq); + t_mseq = le32_to_cpu(ph->mseq); + target = le32_to_cpu(ph->mds); + } else { + t_cap_id = t_seq = t_mseq = 0; + target = -1; + } + dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", + inode, ci, mds, mseq, target); +retry: spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) + goto out_unlock; - /* make sure we haven't seen a higher mseq */ - for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { - t = rb_entry(p, struct ceph_cap, ci_node); - if (ceph_seq_cmp(t->mseq, mseq) > 0) { - dout(" higher mseq on cap from mds%d\n", - t->session->s_mds); - remember = 0; - } - if (t->session->s_mds == mds) - cap = t; + if (target < 0) { + __ceph_remove_cap(cap, false); + goto out_unlock; } - if (cap) { - if (remember) { - /* make note */ - ci->i_cap_exporting_mds = mds; - ci->i_cap_exporting_mseq = mseq; - ci->i_cap_exporting_issued = cap->issued; - - /* - * make sure we have open sessions with all possible - * export targets, so that we get the matching IMPORT - */ - *open_target_sessions = 1; + /* + * now we know we haven't received the cap import message yet + * because the exported cap still exist. + */ - /* - * we can't flush dirty caps that we've seen the - * EXPORT but no IMPORT for - */ - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", - inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + issued = cap->issued; + WARN_ON(issued != cap->implemented); + + tcap = __get_cap_for_mds(ci, target); + if (tcap) { + /* already have caps from the target */ + if (tcap->cap_id != t_cap_id || + ceph_seq_cmp(tcap->seq, t_seq) < 0) { + dout(" updating import cap %p mds%d\n", tcap, target); + tcap->cap_id = t_cap_id; + tcap->seq = t_seq - 1; + tcap->issue_seq = t_seq - 1; + tcap->mseq = t_mseq; + tcap->issued |= issued; + tcap->implemented |= issued; + if (cap == ci->i_auth_cap) + ci->i_auth_cap = tcap; + if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &tcap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&mdsc->cap_dirty_lock); } - __ceph_remove_cap(cap); + __ceph_remove_cap(cap, false); + goto out_unlock; + } else if (tsession) { + /* add placeholder for the export tagert */ + int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; + ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, + t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); + + __ceph_remove_cap(cap, false); + goto out_unlock; } - /* else, we already released it */ spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + + /* open target session */ + tsession = ceph_mdsc_open_export_target_session(mdsc, target); + if (!IS_ERR(tsession)) { + if (mds > target) { + mutex_lock(&session->s_mutex); + mutex_lock_nested(&tsession->s_mutex, + SINGLE_DEPTH_NESTING); + } else { + mutex_lock(&tsession->s_mutex); + mutex_lock_nested(&session->s_mutex, + SINGLE_DEPTH_NESTING); + } + ceph_add_cap_releases(mdsc, tsession); + new_cap = ceph_get_cap(mdsc, NULL); + } else { + WARN_ON(1); + tsession = NULL; + target = -1; + } + goto retry; + +out_unlock: + spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + if (tsession) { + mutex_unlock(&tsession->s_mutex); + ceph_put_mds_session(tsession); + } + if (new_cap) + ceph_put_cap(mdsc, new_cap); } /* - * Handle cap IMPORT. If there are temp bits from an older EXPORT, - * clean them up. + * Handle cap IMPORT. * - * caller holds s_mutex. + * caller holds s_mutex. acquires i_ceph_lock */ static void handle_cap_import(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *im, + struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session, - void *snaptrace, int snaptrace_len) + struct ceph_cap **target_cap, int *old_issued) + __acquires(ci->i_ceph_lock) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap *cap, *ocap, *new_cap = NULL; int mds = session->s_mds; - unsigned issued = le32_to_cpu(im->caps); + int issued; + unsigned caps = le32_to_cpu(im->caps); unsigned wanted = le32_to_cpu(im->wanted); unsigned seq = le32_to_cpu(im->seq); unsigned mseq = le32_to_cpu(im->migrate_seq); u64 realmino = le64_to_cpu(im->realm); u64 cap_id = le64_to_cpu(im->cap_id); + u64 p_cap_id; + int peer; + + if (ph) { + p_cap_id = le64_to_cpu(ph->cap_id); + peer = le32_to_cpu(ph->mds); + } else { + p_cap_id = 0; + peer = -1; + } - if (ci->i_cap_exporting_mds >= 0 && - ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { - dout("handle_cap_import inode %p ci %p mds%d mseq %d" - " - cleared exporting from mds%d\n", - inode, ci, mds, mseq, - ci->i_cap_exporting_mds); - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; + dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", + inode, ci, mds, mseq, peer); - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p back to cap_dirty\n", inode); - list_move(&ci->i_dirty_item, &mdsc->cap_dirty); +retry: + spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) { + if (!new_cap) { + spin_unlock(&ci->i_ceph_lock); + new_cap = ceph_get_cap(mdsc, NULL); + goto retry; } - spin_unlock(&mdsc->cap_dirty_lock); + cap = new_cap; } else { - dout("handle_cap_import inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + if (new_cap) { + ceph_put_cap(mdsc, new_cap); + new_cap = NULL; + } } - down_write(&mdsc->snap_rwsem); - ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, - false); - downgrade_write(&mdsc->snap_rwsem); - ceph_add_cap(inode, session, cap_id, -1, - issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH, - NULL /* no caps context */); - kick_flushing_inode_caps(mdsc, session, inode); - up_read(&mdsc->snap_rwsem); + __ceph_caps_issued(ci, &issued); + issued |= __ceph_caps_dirty(ci); + + ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, + realmino, CEPH_CAP_FLAG_AUTH, &new_cap); + + ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; + if (ocap && ocap->cap_id == p_cap_id) { + dout(" remove export cap %p mds%d flags %d\n", + ocap, peer, ph->flags); + if ((ph->flags & CEPH_CAP_FLAG_AUTH) && + (ocap->seq != le32_to_cpu(ph->seq) || + ocap->mseq != le32_to_cpu(ph->mseq))) { + pr_err("handle_cap_import: mismatched seq/mseq: " + "ino (%llx.%llx) mds%d seq %d mseq %d " + "importer mds%d has peer seq %d mseq %d\n", + ceph_vinop(inode), peer, ocap->seq, + ocap->mseq, mds, le32_to_cpu(ph->seq), + le32_to_cpu(ph->mseq)); + } + __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); + } /* make sure we re-request max_size, if necessary */ - spin_lock(&ci->i_ceph_lock); + ci->i_wanted_max_size = 0; ci->i_requested_max_size = 0; - spin_unlock(&ci->i_ceph_lock); + + *old_issued = issued; + *target_cap = cap; } /* @@ -2764,8 +2988,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; + struct ceph_mds_cap_peer *peer = NULL; int mds = session->s_mds; - int op; + int op, issued; u32 seq, mseq; struct ceph_vino vino; u64 cap_id; @@ -2774,12 +2999,13 @@ void ceph_handle_caps(struct ceph_mds_session *session, void *snaptrace; size_t snaptrace_len; void *flock; + void *end; u32 flock_len; - int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); /* decode */ + end = msg->front.iov_base + msg->front.iov_len; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; @@ -2797,22 +3023,36 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace_len = le32_to_cpu(h->snap_trace_len); if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p, *end; - - p = snaptrace + snaptrace_len; - end = msg->front.iov_base + msg->front.iov_len; + void *p = snaptrace + snaptrace_len; ceph_decode_32_safe(&p, end, flock_len, bad); + if (p + flock_len > end) + goto bad; flock = p; } else { flock = NULL; flock_len = 0; } + if (le16_to_cpu(msg->hdr.version) >= 3) { + if (op == CEPH_CAP_OP_IMPORT) { + void *p = flock + flock_len; + if (p + sizeof(*peer) > end) + goto bad; + peer = p; + } else if (op == CEPH_CAP_OP_EXPORT) { + /* recorded in unused fields */ + peer = (void *)&h->size; + } + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); + if (op == CEPH_CAP_OP_IMPORT) + ceph_add_cap_releases(mdsc, session); + /* lookup ino */ inode = ceph_find_inode(sb, vino); ci = ceph_inode(inode); @@ -2821,9 +3061,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, if (!inode) { dout(" i don't have ino %llx\n", vino.ino); - if (op == CEPH_CAP_OP_IMPORT) + if (op == CEPH_CAP_OP_IMPORT) { + spin_lock(&session->s_cap_lock); __queue_cap_release(session, vino.ino, cap_id, mseq, seq); + spin_unlock(&session->s_cap_lock); + } goto flush_cap_releases; } @@ -2834,13 +3077,14 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session, &open_target_sessions); - goto done; + handle_cap_export(inode, h, peer, session); + goto done_unlocked; case CEPH_CAP_OP_IMPORT: - handle_cap_import(mdsc, inode, h, session, - snaptrace, snaptrace_len); - ceph_check_caps(ceph_inode(inode), 0, session); + handle_cap_import(mdsc, inode, h, peer, session, + &cap, &issued); + handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, + msg->middle, session, cap, issued); goto done_unlocked; } @@ -2858,7 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT: - handle_cap_grant(inode, h, session, cap, msg->middle); + __ceph_caps_issued(ci, &issued); + issued |= __ceph_caps_dirty(ci); + handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, + session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: @@ -2891,8 +3138,6 @@ done: done_unlocked: if (inode) iput(inode); - if (open_target_sessions) - ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: @@ -3010,21 +3255,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode, (cap->issued & unless) == 0)) { if ((cap->issued & drop) && (cap->issued & unless) == 0) { - dout("encode_inode_release %p cap %p %s -> " - "%s\n", inode, cap, + int wanted = __ceph_caps_wanted(ci); + if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) + wanted |= cap->mds_wanted; + dout("encode_inode_release %p cap %p " + "%s -> %s, wanted %s -> %s\n", inode, cap, ceph_cap_string(cap->issued), - ceph_cap_string(cap->issued & ~drop)); + ceph_cap_string(cap->issued & ~drop), + ceph_cap_string(cap->mds_wanted), + ceph_cap_string(wanted)); + cap->issued &= ~drop; cap->implemented &= ~drop; - if (ci->i_ceph_flags & CEPH_I_NODELAY) { - int wanted = __ceph_caps_wanted(ci); - dout(" wanted %s -> %s (act %s)\n", - ceph_cap_string(cap->mds_wanted), - ceph_cap_string(cap->mds_wanted & - ~wanted), - ceph_cap_string(wanted)); - cap->mds_wanted &= wanted; - } + cap->mds_wanted = wanted; } else { dout("encode_inode_release %p cap %p %s" " (force)\n", inode, cap, @@ -3036,7 +3279,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, rel->seq = cpu_to_le32(cap->seq); rel->issue_seq = cpu_to_le32(cap->issue_seq), rel->mseq = cpu_to_le32(cap->mseq); - rel->caps = cpu_to_le32(cap->issued); + rel->caps = cpu_to_le32(cap->implemented); rel->wanted = cpu_to_le32(cap->mds_wanted); rel->dname_len = 0; rel->dname_seq = 0; |
