diff options
Diffstat (limited to 'fs')
-rw-r--r-- | fs/ceph/Kconfig | 13 | ||||
-rw-r--r-- | fs/ceph/Makefile | 1 | ||||
-rw-r--r-- | fs/ceph/acl.c | 332 | ||||
-rw-r--r-- | fs/ceph/addr.c | 93 | ||||
-rw-r--r-- | fs/ceph/cache.h | 13 | ||||
-rw-r--r-- | fs/ceph/caps.c | 338 | ||||
-rw-r--r-- | fs/ceph/dir.c | 16 | ||||
-rw-r--r-- | fs/ceph/file.c | 437 | ||||
-rw-r--r-- | fs/ceph/inode.c | 33 | ||||
-rw-r--r-- | fs/ceph/ioctl.c | 8 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 132 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 2 | ||||
-rw-r--r-- | fs/ceph/strings.c | 2 | ||||
-rw-r--r-- | fs/ceph/super.c | 9 | ||||
-rw-r--r-- | fs/ceph/super.h | 45 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 60 |
16 files changed, 1202 insertions, 332 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig index ac9a2ef5bb9..264e9bf83ff 100644 --- a/fs/ceph/Kconfig +++ b/fs/ceph/Kconfig @@ -25,3 +25,16 @@ config CEPH_FSCACHE caching support for Ceph clients using FS-Cache endif + +config CEPH_FS_POSIX_ACL + bool "Ceph POSIX Access Control Lists" + depends on CEPH_FS + select FS_POSIX_ACL + help + POSIX Access Control Lists (ACLs) support permissions for users and + groups beyond the owner/group/world scheme. + + To learn more about Access Control Lists, visit the POSIX ACLs for + Linux website <http://acl.bestbits.at/>. + + If you don't know what Access Control Lists are, say N diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 32e30106a2f..85a4230b9bf 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -10,3 +10,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ debugfs.o ceph-$(CONFIG_CEPH_FSCACHE) += cache.o +ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c new file mode 100644 index 00000000000..64fddbc1d17 --- /dev/null +++ b/fs/ceph/acl.c @@ -0,0 +1,332 @@ +/* + * linux/fs/ceph/acl.c + * + * Copyright (C) 2013 Guangliang Zhao, <lucienchao@gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License v2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 021110-1307, USA. + */ + +#include <linux/ceph/ceph_debug.h> +#include <linux/fs.h> +#include <linux/string.h> +#include <linux/xattr.h> +#include <linux/posix_acl_xattr.h> +#include <linux/posix_acl.h> +#include <linux/sched.h> +#include <linux/slab.h> + +#include "super.h" + +static inline void ceph_set_cached_acl(struct inode *inode, + int type, struct posix_acl *acl) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + set_cached_acl(inode, type, acl); + spin_unlock(&ci->i_ceph_lock); +} + +static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode, + int type) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct posix_acl *acl = ACL_NOT_CACHED; + + spin_lock(&ci->i_ceph_lock); + if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) + acl = get_cached_acl(inode, type); + spin_unlock(&ci->i_ceph_lock); + + return acl; +} + +void ceph_forget_all_cached_acls(struct inode *inode) +{ + forget_all_cached_acls(inode); +} + +struct posix_acl *ceph_get_acl(struct inode *inode, int type) +{ + int size; + const char *name; + char *value = NULL; + struct posix_acl *acl; + + if (!IS_POSIXACL(inode)) + return NULL; + + acl = ceph_get_cached_acl(inode, type); + if (acl != ACL_NOT_CACHED) + return acl; + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + break; + case ACL_TYPE_DEFAULT: + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + BUG(); + } + + size = __ceph_getxattr(inode, name, "", 0); + if (size > 0) { + value = kzalloc(size, GFP_NOFS); + if (!value) + return ERR_PTR(-ENOMEM); + size = __ceph_getxattr(inode, name, value, size); + } + + if (size > 0) + acl = posix_acl_from_xattr(&init_user_ns, value, size); + else if (size == -ERANGE || size == -ENODATA || size == 0) + acl = NULL; + else + acl = ERR_PTR(-EIO); + + kfree(value); + + if (!IS_ERR(acl)) + ceph_set_cached_acl(inode, type, acl); + + return acl; +} + +static int ceph_set_acl(struct dentry *dentry, struct inode *inode, + struct posix_acl *acl, int type) +{ + int ret = 0, size = 0; + const char *name = NULL; + char *value = NULL; + struct iattr newattrs; + umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; + + if (acl) { + ret = posix_acl_valid(acl); + if (ret < 0) + goto out; + } + + switch (type) { + case ACL_TYPE_ACCESS: + name = POSIX_ACL_XATTR_ACCESS; + if (acl) { + ret = posix_acl_equiv_mode(acl, &new_mode); + if (ret < 0) + goto out; + if (ret == 0) + acl = NULL; + } + break; + case ACL_TYPE_DEFAULT: + if (!S_ISDIR(inode->i_mode)) { + ret = acl ? -EINVAL : 0; + goto out; + } + name = POSIX_ACL_XATTR_DEFAULT; + break; + default: + ret = -EINVAL; + goto out; + } + + if (acl) { + size = posix_acl_xattr_size(acl->a_count); + value = kmalloc(size, GFP_NOFS); + if (!value) { + ret = -ENOMEM; + goto out; + } + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + if (ret < 0) + goto out_free; + } + + if (new_mode != old_mode) { + newattrs.ia_mode = new_mode; + newattrs.ia_valid = ATTR_MODE; + ret = ceph_setattr(dentry, &newattrs); + if (ret) + goto out_free; + } + + if (value) + ret = __ceph_setxattr(dentry, name, value, size, 0); + else + ret = __ceph_removexattr(dentry, name); + + if (ret) { + if (new_mode != old_mode) { + newattrs.ia_mode = old_mode; + newattrs.ia_valid = ATTR_MODE; + ceph_setattr(dentry, &newattrs); + } + goto out_free; + } + + ceph_set_cached_acl(inode, type, acl); + +out_free: + kfree(value); +out: + return ret; +} + +int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir) +{ + struct posix_acl *acl = NULL; + int ret = 0; + + if (!S_ISLNK(inode->i_mode)) { + if (IS_POSIXACL(dir)) { + acl = ceph_get_acl(dir, ACL_TYPE_DEFAULT); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + } + + if (!acl) + inode->i_mode &= ~current_umask(); + } + + if (IS_POSIXACL(dir) && acl) { + if (S_ISDIR(inode->i_mode)) { + ret = ceph_set_acl(dentry, inode, acl, + ACL_TYPE_DEFAULT); + if (ret) + goto out_release; + } + ret = posix_acl_create(&acl, GFP_NOFS, &inode->i_mode); + if (ret < 0) + goto out; + else if (ret > 0) + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + else + cache_no_acl(inode); + } else { + cache_no_acl(inode); + } + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + struct posix_acl *acl; + int ret = 0; + + if (S_ISLNK(inode->i_mode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (!IS_POSIXACL(inode)) + goto out; + + acl = ceph_get_acl(inode, ACL_TYPE_ACCESS); + if (IS_ERR_OR_NULL(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + ret = posix_acl_chmod(&acl, GFP_KERNEL, inode->i_mode); + if (ret) + goto out; + ret = ceph_set_acl(dentry, inode, acl, ACL_TYPE_ACCESS); + posix_acl_release(acl); +out: + return ret; +} + +static int ceph_xattr_acl_get(struct dentry *dentry, const char *name, + void *value, size_t size, int type) +{ + struct posix_acl *acl; + int ret = 0; + + if (!IS_POSIXACL(dentry->d_inode)) + return -EOPNOTSUPP; + + acl = ceph_get_acl(dentry->d_inode, type); + if (IS_ERR(acl)) + return PTR_ERR(acl); + if (acl == NULL) + return -ENODATA; + + ret = posix_acl_to_xattr(&init_user_ns, acl, value, size); + posix_acl_release(acl); + + return ret; +} + +static int ceph_xattr_acl_set(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags, int type) +{ + int ret = 0; + struct posix_acl *acl = NULL; + + if (!inode_owner_or_capable(dentry->d_inode)) { + ret = -EPERM; + goto out; + } + + if (!IS_POSIXACL(dentry->d_inode)) { + ret = -EOPNOTSUPP; + goto out; + } + + if (value) { + acl = posix_acl_from_xattr(&init_user_ns, value, size); + if (IS_ERR(acl)) { + ret = PTR_ERR(acl); + goto out; + } + + if (acl) { + ret = posix_acl_valid(acl); + if (ret) + goto out_release; + } + } + + ret = ceph_set_acl(dentry, dentry->d_inode, acl, type); + +out_release: + posix_acl_release(acl); +out: + return ret; +} + +const struct xattr_handler ceph_xattr_acl_default_handler = { + .prefix = POSIX_ACL_XATTR_DEFAULT, + .flags = ACL_TYPE_DEFAULT, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; + +const struct xattr_handler ceph_xattr_acl_access_handler = { + .prefix = POSIX_ACL_XATTR_ACCESS, + .flags = ACL_TYPE_ACCESS, + .get = ceph_xattr_acl_get, + .set = ceph_xattr_acl_set, +}; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ec3ba43b9fa..b53278c9fd9 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -209,6 +209,7 @@ static int readpage_nounlock(struct file *filp, struct page *page) err = 0; if (err < 0) { SetPageError(page); + ceph_fscache_readpage_cancel(inode, page); goto out; } else { if (err < PAGE_CACHE_SIZE) { @@ -256,6 +257,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) for (i = 0; i < num_pages; i++) { struct page *page = osd_data->pages[i]; + if (rc < 0) + goto unlock; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ int s = bytes < 0 ? 0 : bytes; @@ -266,6 +269,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) flush_dcache_page(page); SetPageUptodate(page); ceph_readpage_to_fscache(inode, page); +unlock: unlock_page(page); page_cache_release(page); bytes -= PAGE_CACHE_SIZE; @@ -1207,6 +1211,41 @@ const struct address_space_operations ceph_aops = { /* * vm ops */ +static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) +{ + struct inode *inode = file_inode(vma->vm_file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; + loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; + int want, got, ret; + + dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", + inode, ceph_vinop(inode), off, (size_t)PAGE_CACHE_SIZE); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_CACHE; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("filemap_fault %p %llu~%zd got cap refs on %s\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); + + ret = filemap_fault(vma, vmf); + + dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + + return ret; +} /* * Reuse write_begin here for simplicity. @@ -1214,23 +1253,41 @@ const struct address_space_operations ceph_aops = { static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) { struct inode *inode = file_inode(vma->vm_file); - struct page *page = vmf->page; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_file_info *fi = vma->vm_file->private_data; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct page *page = vmf->page; loff_t off = page_offset(page); - loff_t size, len; - int ret; - - /* Update time before taking page lock */ - file_update_time(vma->vm_file); + loff_t size = i_size_read(inode); + size_t len; + int want, got, ret; - size = i_size_read(inode); if (off + PAGE_CACHE_SIZE <= size) len = PAGE_CACHE_SIZE; else len = size & ~PAGE_CACHE_MASK; - dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, - off, len, page, page->index); + dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", + inode, ceph_vinop(inode), off, len, size); + if (fi->fmode & CEPH_FILE_MODE_LAZY) + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; + else + want = CEPH_CAP_FILE_BUFFER; + while (1) { + got = 0; + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); + if (ret == 0) + break; + if (ret != -ERESTARTSYS) { + WARN_ON(1); + return VM_FAULT_SIGBUS; + } + } + dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", + inode, off, len, ceph_cap_string(got)); + + /* Update time before taking page lock */ + file_update_time(vma->vm_file); lock_page(page); @@ -1252,14 +1309,26 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); - if (ret != VM_FAULT_LOCKED) + if (ret != VM_FAULT_LOCKED) { unlock_page(page); + } else { + int dirty; + spin_lock(&ci->i_ceph_lock); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + } + + dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", + inode, off, len, ceph_cap_string(got), ret); + ceph_put_cap_refs(ci, got); + return ret; } static struct vm_operations_struct ceph_vmops = { - .fault = filemap_fault, + .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, .remap_pages = generic_file_remap_pages, }; diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h index ba949408a33..da95f61b7a0 100644 --- a/fs/ceph/cache.h +++ b/fs/ceph/cache.h @@ -67,6 +67,14 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return fscache_maybe_release_page(ci->fscache, page, gfp); } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + if (fscache_cookie_valid(ci->fscache) && PageFsCache(page)) + __fscache_uncache_page(ci->fscache, page); +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { @@ -145,6 +153,11 @@ static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp) return 1; } +static inline void ceph_fscache_readpage_cancel(struct inode *inode, + struct page *page) +{ +} + static inline void ceph_fscache_readpages_cancel(struct inode *inode, struct list_head *pages) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3c0a4bd7499..17543383545 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -555,21 +555,34 @@ retry: cap->ci = ci; __insert_cap_node(ci, cap); - /* clear out old exporting info? (i.e. on cap import) */ - if (ci->i_cap_exporting_mds == mds) { - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; - } - /* add to session cap list */ cap->session = session; spin_lock(&session->s_cap_lock); list_add_tail(&cap->session_caps, &session->s_caps); session->s_nr_caps++; spin_unlock(&session->s_cap_lock); - } else if (new_cap) - ceph_put_cap(mdsc, new_cap); + } else { + if (new_cap) + ceph_put_cap(mdsc, new_cap); + + /* + * auth mds of the inode changed. we received the cap export + * message, but still haven't received the cap import message. + * handle_cap_export() updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing + * a message that was send before the cap import message. So + * don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != cap_id); + seq = cap->seq; + mseq = cap->mseq; + issued |= cap->issued; + flags |= CEPH_CAP_FLAG_AUTH; + } + } if (!ci->i_snap_realm) { /* @@ -611,15 +624,9 @@ retry: if (ci->i_auth_cap == NULL || ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) ci->i_auth_cap = cap; - } else if (ci->i_auth_cap == cap) { - ci->i_auth_cap = NULL; - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); - } - spin_unlock(&mdsc->cap_dirty_lock); + ci->i_cap_exporting_issued = 0; + } else { + WARN_ON(ci->i_auth_cap == cap); } dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", @@ -628,7 +635,7 @@ retry: cap->cap_id = cap_id; cap->issued = issued; cap->implemented |= issued; - if (mseq > cap->mseq) + if (ceph_seq_cmp(mseq, cap->mseq) > 0) cap->mds_wanted = wanted; else cap->mds_wanted |= wanted; @@ -816,7 +823,7 @@ int __ceph_caps_revoking_other(struct ceph_inode_info *ci, for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (cap != ocap && __cap_is_valid(cap) && + if (cap != ocap && (cap->implemented & ~cap->issued & mask)) return 1; } @@ -888,7 +895,19 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) */ static int __ceph_is_any_caps(struct ceph_inode_info *ci) { - return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0; + return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; +} + +int ceph_is_any_caps(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + spin_lock(&ci->i_ceph_lock); + ret = __ceph_is_any_caps(ci); + spin_unlock(&ci->i_ceph_lock); + + return ret; } /* @@ -1383,13 +1402,10 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ci->i_snap_realm->cached_context); dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); + WARN_ON(!ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); spin_lock(&mdsc->cap_dirty_lock); - if (ci->i_auth_cap) - list_add(&ci->i_dirty_item, &mdsc->cap_dirty); - else - list_add(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + list_add(&ci->i_dirty_item, &mdsc->cap_dirty); spin_unlock(&mdsc->cap_dirty_lock); if (ci->i_flushing_caps == 0) { ihold(inode); @@ -1735,13 +1751,12 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session, - unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, unsigned *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int unlock_session = session ? 0 : 1; int flushing = 0; + struct ceph_mds_session *session = NULL; retry: spin_lock(&ci->i_ceph_lock); @@ -1755,13 +1770,14 @@ retry: int want = __ceph_caps_wanted(ci); int delayed; - if (!session) { + if (!session || session != cap->session) { spin_unlock(&ci->i_ceph_lock); + if (session) + mutex_unlock(&session->s_mutex); session = cap->session; mutex_lock(&session->s_mutex); goto retry; } - BUG_ON(session != cap->session); if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; @@ -1780,7 +1796,7 @@ retry: out: spin_unlock(&ci->i_ceph_lock); out_unlocked: - if (session && unlock_session) + if (session) mutex_unlock(&session->s_mutex); return flushing; } @@ -1865,7 +1881,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) return ret; mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); /* @@ -1900,7 +1916,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, NULL, &flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -2350,11 +2366,11 @@ static void invalidate_aliases(struct inode *inode) d_prune_aliases(inode); /* * For non-directory inode, d_find_alias() only returns - * connected dentry. After calling d_invalidate(), the - * dentry become disconnected. + * hashed dentry. After calling d_invalidate(), the + * dentry becomes unhashed. * * For directory inode, d_find_alias() can return - * disconnected dentry. But directory inode should have + * unhashed dentry. But directory inode should have * one alias at most. */ while ((dn = d_find_alias(inode))) { @@ -2408,6 +2424,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); + + /* + * auth mds of the inode changed. we received the cap export message, + * but still haven't received the cap import message. handle_cap_export + * updated the new auth MDS' cap. + * + * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message + * that was sent before the cap import message. So don't remove caps. + */ + if (ceph_seq_cmp(seq, cap->seq) <= 0) { + WARN_ON(cap != ci->i_auth_cap); + WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); + seq = cap->seq; + newcaps |= cap->issued; + } + /* * If CACHE is being revoked, and we have no dirty buffers, * try to invalidate (once). (If there are dirty buffers, we @@ -2434,6 +2466,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, issued |= implemented | __ceph_caps_dirty(ci); cap->cap_gen = session->s_cap_gen; + cap->seq = seq; __check_cap_issue(ci, cap, newcaps); @@ -2464,6 +2497,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ceph_buffer_put(ci->i_xattrs.blob); ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); ci->i_xattrs.version = version; + ceph_forget_all_cached_acls(inode); } } @@ -2483,6 +2517,10 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, &atime); + + /* file layout may have changed */ + ci->i_layout = grant->layout; + /* max size increase? */ if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); @@ -2511,11 +2549,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, check_caps = 1; } - cap->seq = seq; - - /* file layout may have changed */ - ci->i_layout = grant->layout; - /* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { int revoking = cap->issued & ~newcaps; @@ -2741,65 +2774,114 @@ static void handle_cap_trunc(struct inode *inode, * caller holds s_mutex */ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, - struct ceph_mds_session *session, - int *open_target_sessions) + struct ceph_mds_cap_peer *ph, + struct ceph_mds_session *session) { struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; + struct ceph_mds_session *tsession = NULL; + struct ceph_cap *cap, *tcap; struct ceph_inode_info *ci = ceph_inode(inode); - int mds = session->s_mds; + u64 t_cap_id; unsigned mseq = le32_to_cpu(ex->migrate_seq); - struct ceph_cap *cap = NULL, *t; - struct rb_node *p; - int remember = 1; + unsigned t_seq, t_mseq; + int target, issued; + int mds = session->s_mds; - dout("handle_cap_export inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + if (ph) { + t_cap_id = le64_to_cpu(ph->cap_id); + t_seq = le32_to_cpu(ph->seq); + t_mseq = le32_to_cpu(ph->mseq); + target = le32_to_cpu(ph->mds); + } else { + t_cap_id = t_seq = t_mseq = 0; + target = -1; + } + dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", + inode, ci, mds, mseq, target); +retry: spin_lock(&ci->i_ceph_lock); + cap = __get_cap_for_mds(ci, mds); + if (!cap) + goto out_unlock; - /* make sure we haven't seen a higher mseq */ - for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { - t = rb_entry(p, struct ceph_cap, ci_node); - if (ceph_seq_cmp(t->mseq, mseq) > 0) { - dout(" higher mseq on cap from mds%d\n", - t->session->s_mds); - remember = 0; - } - if (t->session->s_mds == mds) - cap = t; + if (target < 0) { + __ceph_remove_cap(cap, false); + goto out_unlock; } - if (cap) { - if (remember) { - /* make note */ - ci->i_cap_exporting_mds = mds; - ci->i_cap_exporting_mseq = mseq; - ci->i_cap_exporting_issued = cap->issued; - - /* - * make sure we have open sessions with all possible - * export targets, so that we get the matching IMPORT - */ - *open_target_sessions = 1; + /* + * now we know we haven't received the cap import message yet + * because the exported cap still exist. + */ - /* - * we can't flush dirty caps that we've seen the - * EXPORT but no IMPORT for - */ - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p to cap_dirty_migrating\n", - inode); - list_move(&ci->i_dirty_item, - &mdsc->cap_dirty_migrating); + issued = cap->issued; + WARN_ON(issued != cap->implemented); + + tcap = __get_cap_for_mds(ci, target); + if (tcap) { + /* already have caps from the target */ + if (tcap->cap_id != t_cap_id || + ceph_seq_cmp(tcap->seq, t_seq) < 0) { + dout(" updating import cap %p mds%d\n", tcap, target); + tcap->cap_id = t_cap_id; + tcap->seq = t_seq - 1; + tcap->issue_seq = t_seq - 1; + tcap->mseq = t_mseq; + tcap->issued |= issued; + tcap->implemented |= issued; + if (cap == ci->i_auth_cap) + ci->i_auth_cap = tcap; + if (ci->i_flushing_caps && ci->i_auth_cap == tcap) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &tcap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); } - spin_unlock(&mdsc->cap_dirty_lock); } __ceph_remove_cap(cap, false); + goto out_unlock; } - /* else, we already released it */ + if (tsession) { + int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; + spin_unlock(&ci->i_ceph_lock); + /* add placeholder for the export tagert */ + ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, + t_seq - 1, t_mseq, (u64)-1, flag, NULL); + goto retry; + } + + spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + + /* open target session */ + tsession = ceph_mdsc_open_export_target_session(mdsc, target); + if (!IS_ERR(tsession)) { + if (mds > target) { + mutex_lock(&session->s_mutex); + mutex_lock_nested(&tsession->s_mutex, + SINGLE_DEPTH_NESTING); + } else { + mutex_lock(&tsession->s_mutex); + mutex_lock_nested(&session->s_mutex, + SINGLE_DEPTH_NESTING); + } + ceph_add_cap_releases(mdsc, tsession); + } else { + WARN_ON(1); + tsession = NULL; + target = -1; + } + goto retry; + +out_unlock: spin_unlock(&ci->i_ceph_lock); + mutex_unlock(&session->s_mutex); + if (tsession) { + mutex_unlock(&tsession->s_mutex); + ceph_put_mds_session(tsession); + } } /* @@ -2810,10 +2892,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, */ static void handle_cap_import(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *im, + struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session, void *snaptrace, int snaptrace_len) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap *cap; int mds = session->s_mds; unsigned issued = le32_to_cpu(im->caps); unsigned wanted = le32_to_cpu(im->wanted); @@ -2821,28 +2905,44 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, unsigned mseq = le32_to_cpu(im->migrate_seq); u64 realmino = le64_to_cpu(im->realm); u64 cap_id = le64_to_cpu(im->cap_id); + u64 p_cap_id; + int peer; - if (ci->i_cap_exporting_mds >= 0 && - ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) { - dout("handle_cap_import inode %p ci %p mds%d mseq %d" - " - cleared exporting from mds%d\n", - inode, ci, mds, mseq, - ci->i_cap_exporting_mds); - ci->i_cap_exporting_issued = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_mds = -1; + if (ph) { + p_cap_id = le64_to_cpu(ph->cap_id); + peer = le32_to_cpu(ph->mds); + } else { + p_cap_id = 0; + peer = -1; + } - spin_lock(&mdsc->cap_dirty_lock); - if (!list_empty(&ci->i_dirty_item)) { - dout(" moving %p back to cap_dirty\n", inode); - list_move(&ci->i_dirty_item, &mdsc->cap_dirty); + dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", + inode, ci, mds, mseq, peer); + + spin_lock(&ci->i_ceph_lock); + cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; + if (cap && cap->cap_id == p_cap_id) { + dout(" remove export cap %p mds%d flags %d\n", + cap, peer, ph->flags); + if ((ph->flags & CEPH_CAP_FLAG_AUTH) && + (cap->seq != le32_to_cpu(ph->seq) || + cap->mseq != le32_to_cpu(ph->mseq))) { + pr_err("handle_cap_import: mismatched seq/mseq: " + "ino (%llx.%llx) mds%d seq %d mseq %d " + "importer mds%d has peer seq %d mseq %d\n", + ceph_vinop(inode), peer, cap->seq, + cap->mseq, mds, le32_to_cpu(ph->seq), + le32_to_cpu(ph->mseq)); } - spin_unlock(&mdsc->cap_dirty_lock); - } else { - dout("handle_cap_import inode %p ci %p mds%d mseq %d\n", - inode, ci, mds, mseq); + ci->i_cap_exporting_issued = cap->issued; + __ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); } + /* make sure we re-request max_size, if necessary */ + ci->i_wanted_max_size = 0; + ci->i_requested_max_size = 0; + spin_unlock(&ci->i_ceph_lock); + down_write(&mdsc->snap_rwsem); ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len, false); @@ -2853,11 +2953,6 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, kick_flushing_inode_caps(mdsc, session, inode); up_read(&mdsc->snap_rwsem); - /* make sure we re-request max_size, if necessary */ - spin_lock(&ci->i_ceph_lock); - ci->i_wanted_max_size = 0; /* reset */ - ci->i_requested_max_size = 0; - spin_unlock(&ci->i_ceph_lock); } /* @@ -2875,6 +2970,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; + struct ceph_mds_cap_peer *peer = NULL; int mds = session->s_mds; int op; u32 seq, mseq; @@ -2885,12 +2981,13 @@ void ceph_handle_caps(struct ceph_mds_session *session, void *snaptrace; size_t snaptrace_len; void *flock; + void *end; u32 flock_len; - int open_target_sessions = 0; dout("handle_caps from mds%d\n", mds); /* decode */ + end = msg->front.iov_base + msg->front.iov_len; tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; @@ -2908,17 +3005,28 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace_len = le32_to_cpu(h->snap_trace_len); if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p, *end; - - p = snaptrace + snaptrace_len; - end = msg->front.iov_base + msg->front.iov_len; + void *p = snaptrace + snaptrace_len; ceph_decode_32_safe(&p, end, flock_len, bad); + if (p + flock_len > end) + goto bad; flock = p; } else { flock = NULL; flock_len = 0; } + if (le16_to_cpu(msg->hdr.version) >= 3) { + if (op == CEPH_CAP_OP_IMPORT) { + void *p = flock + flock_len; + if (p + sizeof(*peer) > end) + goto bad; + peer = p; + } else if (op == CEPH_CAP_OP_EXPORT) { + /* recorded in unused fields */ + peer = (void *)&h->size; + } + } + mutex_lock(&session->s_mutex); session->s_seq++; dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, @@ -2951,11 +3059,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, goto done; case CEPH_CAP_OP_EXPORT: - handle_cap_export(inode, h, session, &open_target_sessions); - goto done; + handle_cap_export(inode, h, peer, session); + goto done_unlocked; case CEPH_CAP_OP_IMPORT: - handle_cap_import(mdsc, inode, h, session, + handle_cap_import(mdsc, inode, h, peer, session, snaptrace, snaptrace_len); } @@ -3007,8 +3115,6 @@ done: done_unlocked: if (inode) iput(inode); - if (open_target_sessions) - ceph_mdsc_open_export_target_sessions(mdsc, session); return; bad: diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 2a0bcaeb189..619616d585b 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -693,6 +693,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); + + if (!err) + err = ceph_init_acl(dentry, dentry->d_inode, dir); + if (err) d_drop(dentry); return err; @@ -1037,14 +1041,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) valid = 1; } else if (dentry_lease_is_valid(dentry) || dir_lease_is_valid(dir, dentry)) { - valid = 1; + if (dentry->d_inode) + valid = ceph_is_any_caps(dentry->d_inode); + else + valid = 1; } dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); - if (valid) + if (valid) { ceph_dentry_lru_touch(dentry); - else + } else { + ceph_dir_clear_complete(dir); d_drop(dentry); + } iput(dir); return valid; } @@ -1293,6 +1302,7 @@ const struct inode_operations ceph_dir_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, .mknod = ceph_mknod, .symlink = ceph_symlink, .mkdir = ceph_mkdir, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3de89829e2a..dfd2ce3419f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -408,51 +408,92 @@ more: * * If the read spans object boundary, just do multiple reads. */ -static ssize_t ceph_sync_read(struct file *file, char __user *data, - unsigned len, loff_t *poff, int *checkeof) +static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, + int *checkeof) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct page **pages; - u64 off = *poff; + u64 off = iocb->ki_pos; int num_pages, ret; + size_t len = i->count; - dout("sync_read on file %p %llu~%u %s\n", file, off, len, + dout("sync_read on file %p %llu~%u %s\n", file, off, + (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); - - if (file->f_flags & O_DIRECT) { - num_pages = calc_pages_for((unsigned long)data, len); - pages = ceph_get_direct_page_vector(data, num_pages, true); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); - } - if (IS_ERR(pages)) - return PTR_ERR(pages); - /* * flush any page cache pages in this range. this * will make concurrent normal and sync io slow, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait(inode->i_mapping); + ret = filemap_write_and_wait_range(inode->i_mapping, off, + off + len); if (ret < 0) - goto done; + return ret; - ret = striped_read(inode, off, len, pages, num_pages, checkeof, - file->f_flags & O_DIRECT, - (unsigned long)data & ~PAGE_MASK); + if (file->f_flags & O_DIRECT) { + while (iov_iter_count(i)) { + void __user *data = i->iov[0].iov_base + i->iov_offset; + size_t len = i->iov[0].iov_len - i->iov_offset; + + num_pages = calc_pages_for((unsigned long)data, len); + pages = ceph_get_direct_page_vector(data, + num_pages, true); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = striped_read(inode, off, len, + pages, num_pages, checkeof, + 1, (unsigned long)data & ~PAGE_MASK); + ceph_put_page_vector(pages, num_pages, true); + + if (ret <= 0) + break; + off += ret; + iov_iter_advance(i, ret); + if (ret < len) + break; + } + } else { + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + if (IS_ERR(pages)) + return PTR_ERR(pages); + ret = striped_read(inode, off, len, pages, + num_pages, checkeof, 0, 0); + if (ret > 0) { + int l, k = 0; + size_t left = len = ret; + + while (left) { + void __user *data = i->iov[0].iov_base + + i->iov_offset; + l = min(i->iov[0].iov_len - i->iov_offset, + left); + + ret = ceph_copy_page_vector_to_user(&pages[k], + data, off, + l); + if (ret > 0) { + iov_iter_advance(i, ret); + left -= ret; + off += ret; + k = calc_pages_for(iocb->ki_pos, + len - left + 1) - 1; + BUG_ON(k >= num_pages && left); + } else + break; + } + } + ceph_release_page_vector(pages, num_pages); + } - if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) - ret = ceph_copy_page_vector_to_user(pages, data, off, ret); - if (ret >= 0) - *poff = off + ret; + if (off > iocb->ki_pos) { + ret = off - iocb->ki_pos; + iocb->ki_pos = off; + } -done: - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, true); - else - ceph_release_page_vector(pages, num_pages); dout("sync_read result %d\n", ret); return ret; } @@ -489,83 +530,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } + /* - * Synchronous write, straight from __user pointer or user pages (if - * O_DIRECT). + * Synchronous write, straight from __user pointer or user pages. * * If write spans object boundary, just do multiple writes. (For a * correct atomic write, we should e.g. take write locks on all * objects, rollback on failure, etc.) */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t pos, loff_t *ppos) +static ssize_t +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; - int num_ops = 1; struct page **pages; int num_pages; - u64 len; int written = 0; int flags; int check_caps = 0; - int page_align, io_align; - unsigned long buf_align; + int page_align; int ret; struct timespec mtime = CURRENT_TIME; - bool own_pages = false; + loff_t pos = iocb->ki_pos; + struct iov_iter i; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, pos, - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); + dout("sync_direct_write on file %p %lld~%u\n", file, pos, + (unsigned)count); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_CACHE_SHIFT, - (pos + left) >> PAGE_CACHE_SHIFT); + (pos + count) >> PAGE_CACHE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_FLAG_ACK; - else - num_ops++; /* Also include a 'startsync' command. */ - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ -more: - io_align = pos & ~PAGE_MASK; - buf_align = (unsigned long)data & ~PAGE_MASK; - len = left; - - snapc = ci->i_snap_realm->cached_context; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, ci->i_truncate_size, - false); - if (IS_ERR(req)) - return PTR_ERR(req); + iov_iter_init(&i, iov, nr_segs, count, 0); + + while (iov_iter_count(&i) > 0) { + void __user *data = i.iov->iov_base + i.iov_offset; + u64 len = i.iov->iov_len - i.iov_offset; + + page_align = (unsigned long)data & ~PAGE_MASK; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, + 2,/*include a 'startsync' command*/ + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); - if (file->f_flags & O_DIRECT) { + num_pages = calc_pages_for(page_align, len); pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -577,60 +614,175 @@ more: * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_CACHE_SIZE-1)); - } else { + (pos+len) | (PAGE_CACHE_SIZE-1)); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, false); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + ceph_put_page_vector(pages, num_pages, false); + +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + iov_iter_advance(&i, (size_t)len); + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + + if (ret != -EOLDSNAPC && written > 0) { + iocb->ki_pos = pos; + ret = written; + } + return ret; +} + + +/* + * Synchronous write, straight from __user pointer or user pages. + * + * If write spans object boundary, just do multiple writes. (For a + * correct atomic write, we should e.g. take write locks on all + * objects, rollback on failure, etc.) + */ +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file_inode(file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; + struct ceph_osd_request *req; + struct page **pages; + u64 len; + int num_pages; + int written = 0; + int flags; + int check_caps = 0; + int ret; + struct timespec mtime = CURRENT_TIME; + loff_t pos = iocb->ki_pos; + struct iov_iter i; + + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + return -EROFS; + + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); + + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + if (ret < 0) + return ret; + + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); + + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK; + + iov_iter_init(&i, iov, nr_segs, count, 0); + + while ((len = iov_iter_count(&i)) > 0) { + size_t left; + int n; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, 1, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + /* + * write from beginning of first page, + * regardless of io alignment + */ + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; } - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); + + left = len; + for (n = 0; n < num_pages; n++) { + size_t plen = min_t(size_t, left, PAGE_SIZE); + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); + if (ret != plen) { + ret = -EFAULT; + break; + } + left -= ret; + iov_iter_advance(&i, ret); + } + if (ret < 0) { ceph_release_page_vector(pages, num_pages); goto out; } - if ((file->f_flags & O_SYNC) == 0) { - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; - req->r_inode = inode; - own_pages = true; - } - } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, - false, own_pages); + /* get a second commit callback */ + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + false, true); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, false); - else if (file->f_flags & O_SYNC) - ceph_release_page_vector(pages, num_pages); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); out: - ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - left -= len; - data += len; - if (left) - goto more; + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + if (ret != -EOLDSNAPC && written > 0) { ret = written; - *ppos = pos; - if (pos > i_size_read(inode)) - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, - NULL); - } else if (ret != -EOLDSNAPC && written > 0) { - ret = written; + iocb->ki_pos = pos; } return ret; } @@ -647,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, { struct file *filp = iocb->ki_filp; struct ceph_file_info *fi = filp->private_data; - loff_t *ppos = &iocb->ki_pos; - size_t len = iov->iov_len; + size_t len = iocb->ki_nbytes; struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); - void __user *base = iov->iov_base; ssize_t ret; int want, got = 0; int checkeof = 0, read = 0; - dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", - inode, ceph_vinop(inode), pos, (unsigned)len, inode); again: + dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); + if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); if (ret < 0) - goto out; - dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, (unsigned)len, - ceph_cap_string(got)); + return ret; if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) + (fi->flags & CEPH_F_SYNC)) { + struct iov_iter i; + + dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, + ceph_cap_string(got)); + + if (!read) { + ret = generic_segment_checks(iov, &nr_segs, + &len, VERIFY_WRITE); + if (ret) + goto out; + } + + iov_iter_init(&i, iov, nr_segs, len, read); + /* hmm, this isn't really async... */ - ret = ceph_sync_read(filp, base, len, ppos, &checkeof); - else - ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + ret = ceph_sync_read(iocb, &i, &checkeof); + } else { + /* + * We can't modify the content of iov, + * so we only read from beginning. + */ + if (read) { + iocb->ki_pos = pos; + len = iocb->ki_nbytes; + read = 0; + } + dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", + inode, ceph_vinop(inode), pos, (unsigned)len, + ceph_cap_string(got)); + ret = generic_file_aio_read(iocb, iov, nr_segs, pos); + } out: dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); + int statret = ceph_do_getattr(inode, + CEPH_STAT_CAP_SIZE); /* hit EOF or hole? */ - if (statret == 0 && *ppos < inode->i_size) { - dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); + if (statret == 0 && iocb->ki_pos < inode->i_size && + ret < len) { + dout("sync_read hit hole, ppos %lld < size %lld" + ", reading more\n", iocb->ki_pos, + inode->i_size); + read += ret; - base += ret; len -= ret; checkeof = 0; goto again; } } + if (ret >= 0) ret += read; @@ -772,11 +953,13 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) { + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); - written = ceph_sync_write(file, iov->iov_base, count, - pos, &iocb->ki_pos); + if (file->f_flags & O_DIRECT) + written = ceph_sync_direct_write(iocb, iov, + nr_segs, count); + else + written = ceph_sync_write(iocb, iov, nr_segs, count); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", @@ -1018,7 +1201,7 @@ static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t length) { struct ceph_file_info *fi = file->private_data; - struct inode *inode = file->f_dentry->d_inode; + struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 278fd289128..6fc10a7d7c5 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -95,6 +95,7 @@ const struct inode_operations ceph_file_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; @@ -335,12 +336,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; INIT_LIST_HEAD(&ci->i_cap_delay_list); - ci->i_cap_exporting_mds = 0; - ci->i_cap_exporting_mseq = 0; - ci->i_cap_exporting_issued = 0; INIT_LIST_HEAD(&ci->i_cap_snaps); ci->i_head_snapc = NULL; ci->i_snap_caps = 0; + ci->i_cap_exporting_issued = 0; for (i = 0; i < CEPH_FILE_MODE_NUM; i++) ci->i_nr_by_mode[i] = 0; @@ -436,6 +435,16 @@ void ceph_destroy_inode(struct inode *inode) call_rcu(&inode->i_rcu, ceph_i_callback); } +int ceph_drop_inode(struct inode *inode) +{ + /* + * Positve dentry and corresponding inode are always accompanied + * in MDS reply. So no need to keep inode in the cache after + * dropping all its aliases. + */ + return 1; +} + /* * Helpers to fill in size, ctime, mtime, and atime. We have to be * careful because either the client or MDS may have more up to date @@ -670,6 +679,7 @@ static int fill_inode(struct inode *inode, memcpy(ci->i_xattrs.blob->vec.iov_base, iinfo->xattr_data, iinfo->xattr_len); ci->i_xattrs.version = le64_to_cpu(info->xattr_version); + ceph_forget_all_cached_acls(inode); xattr_blob = NULL; } @@ -1454,7 +1464,8 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d revoking %d\n", inode, ci->i_rdcache_gen, ci->i_rdcache_revoking); if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { - /* nevermind! */ + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); goto out; @@ -1475,13 +1486,14 @@ static void ceph_invalidate_work(struct work_struct *work) dout("invalidate_pages %p gen %d raced, now %d revoking %d\n", inode, orig_gen, ci->i_rdcache_gen, ci->i_rdcache_revoking); + if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE)) + check = 1; } spin_unlock(&ci->i_ceph_lock); mutex_unlock(&ci->i_truncate_mutex); - +out: if (check) ceph_check_caps(ci, 0, NULL); -out: iput(inode); } @@ -1602,6 +1614,7 @@ static const struct inode_operations ceph_symlink_iops = { .getxattr = ceph_getxattr, .listxattr = ceph_listxattr, .removexattr = ceph_removexattr, + .get_acl = ceph_get_acl, }; /* @@ -1675,6 +1688,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dirtied |= CEPH_CAP_AUTH_EXCL; } else if ((issued & CEPH_CAP_AUTH_SHARED) == 0 || attr->ia_mode != inode->i_mode) { + inode->i_mode = attr->ia_mode; req->r_args.setattr.mode = cpu_to_le32(attr->ia_mode); mask |= CEPH_SETATTR_MODE; release |= CEPH_CAP_AUTH_SHARED; @@ -1790,6 +1804,12 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); + if (ia_valid & ATTR_MODE) { + err = ceph_acl_chmod(dentry, inode); + if (err) + goto out_put; + } + if (mask) { req->r_inode = inode; ihold(inode); @@ -1809,6 +1829,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) return err; out: spin_unlock(&ci->i_ceph_lock); +out_put: ceph_mdsc_put_request(req); return err; } diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 669622fd1ae..dc66c9e023e 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -183,6 +183,8 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_object_locator oloc; + struct ceph_object_id oid; u64 len = 1, olen; u64 tmp; struct ceph_pg pgid; @@ -211,8 +213,10 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx", ceph_ino(inode), dl.object_no); - r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap, - ceph_file_layout_pg_pool(ci->i_layout)); + oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); + ceph_oid_set_name(&oid, dl.object_name); + + r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); if (r < 0) { up_read(&osdc->map_sem); return r; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index d90861f4521..f4f050a69a4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -63,7 +63,7 @@ static const struct ceph_connection_operations mds_con_ops; */ static int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info, - int features) + u64 features) { int err = -EIO; @@ -98,7 +98,7 @@ bad: */ static int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { int err; @@ -145,7 +145,7 @@ out_bad: */ static int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { u32 num, i = 0; int err; @@ -217,7 +217,7 @@ out_bad: */ static int parse_reply_info_filelock(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (*p + sizeof(*info->filelock_reply) > end) goto bad; @@ -238,7 +238,7 @@ bad: */ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { if (*p == end) { @@ -262,7 +262,7 @@ bad: */ static int parse_reply_info_extra(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { if (info->head->op == CEPH_MDS_OP_GETFILELOCK) return parse_reply_info_filelock(p, end, info, features); @@ -280,7 +280,7 @@ static int parse_reply_info_extra(void **p, void *end, */ static int parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info_parsed *info, - int features) + u64 features) { void *p, *end; u32 len; @@ -713,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc, struct dentry *dn = get_nonsnap_parent(parent); inode = dn->d_inode; dout("__choose_mds using nonsnap parent %p\n", inode); - } else if (req->r_dentry->d_inode) { + } else { /* dentry target */ inode = req->r_dentry->d_inode; - } else { - /* dir + name */ - inode = dir; - hash = ceph_dentry_hash(dir, req->r_dentry); - is_hash = true; + if (!inode || mode == USE_AUTH_MDS) { + /* dir + name */ + inode = dir; + hash = ceph_dentry_hash(dir, req->r_dentry); + is_hash = true; + } } } @@ -846,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc, * * called under mdsc->mutex */ +static struct ceph_mds_session * +__open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + session = __ceph_lookup_mds_session(mdsc, target); + if (!session) { + session = register_session(mdsc, target); + if (IS_ERR(session)) + return session; + } + if (session->s_state == CEPH_MDS_SESSION_NEW || + session->s_state == CEPH_MDS_SESSION_CLOSING) + __open_session(mdsc, session); + + return session; +} + +struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) +{ + struct ceph_mds_session *session; + + dout("open_export_target_session to mds%d\n", target); + + mutex_lock(&mdsc->mutex); + session = __open_export_target_session(mdsc, target); + mutex_unlock(&mdsc->mutex); + + return session; +} + static void __open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { struct ceph_mds_info *mi; struct ceph_mds_session *ts; int i, mds = session->s_mds; - int target; if (mds >= mdsc->mdsmap->m_max_mds) return; + mi = &mdsc->mdsmap->m_info[mds]; dout("open_export_target_sessions for mds%d (%d targets)\n", session->s_mds, mi->num_export_targets); for (i = 0; i < mi->num_export_targets; i++) { - target = mi->export_targets[i]; - ts = __ceph_lookup_mds_session(mdsc, target); - if (!ts) { - ts = register_session(mdsc, target); - if (IS_ERR(ts)) - return; - } - if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - __open_session(mdsc, session); - else - dout(" mds%d target mds%d %p is %s\n", session->s_mds, - i, ts, session_state_name(ts->s_state)); - ceph_put_mds_session(ts); + ts = __open_export_target_session(mdsc, mi->export_targets[i]); + if (!IS_ERR(ts)) + ceph_put_mds_session(ts); } } @@ -1136,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc, return 0; } +static int send_flushmsg_ack(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, u64 seq) +{ + struct ceph_msg *msg; + + dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", + session->s_mds, session_state_name(session->s_state), seq); + msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); + if (!msg) + return -ENOMEM; + ceph_con_send(&session->s_con, msg); + return 0; +} + + /* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * @@ -1214,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_mds_session *session = arg; struct ceph_inode_info *ci = ceph_inode(inode); - int used, oissued, mine; + int used, wanted, oissued, mine; if (session->s_trim_caps <= 0) return -1; @@ -1222,14 +1259,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) spin_lock(&ci->i_ceph_lock); mine = cap->issued | cap->implemented; used = __ceph_caps_used(ci); + wanted = __ceph_caps_file_wanted(ci); oissued = __ceph_caps_issued_other(ci, cap); - dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", + dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), - ceph_cap_string(used)); - if (ci->i_dirty_caps) - goto out; /* dirty caps */ - if ((used & ~oissued) & mine) + ceph_cap_string(used), ceph_cap_string(wanted)); + if (cap == ci->i_auth_cap) { + if (ci->i_dirty_caps | ci->i_flushing_caps) + goto out; + if ((used | wanted) & CEPH_CAP_ANY_WR) + goto out; + } + if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */ session->s_trim_caps--; @@ -2156,26 +2198,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) */ if (result == -ESTALE) { dout("got ESTALE on request %llu", req->r_tid); - if (!req->r_inode) { - /* do nothing; not an authority problem */ - } else if (req->r_direct_mode != USE_AUTH_MDS) { + if (req->r_direct_mode != USE_AUTH_MDS) { dout("not using auth, setting for that now"); req->r_direct_mode = USE_AUTH_MDS; __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; } else { - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - struct ceph_cap *cap = NULL; - - if (req->r_session) - cap = ceph_get_cap_for_mds(ci, - req->r_session->s_mds); - - dout("already using auth"); - if ((!cap || cap != ci->i_auth_cap) || - (cap->mseq != req->r_sent_on_mseq)) { - dout("but cap changed, so resending"); + int mds = __choose_mds(mdsc, req); + if (mds >= 0 && mds != req->r_session->s_mds) { + dout("but auth changed, so resending"); __do_request(mdsc, req); mutex_unlock(&mdsc->mutex); goto out; @@ -2400,6 +2432,10 @@ static void handle_session(struct ceph_mds_session *session, trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break; + case CEPH_SESSION_FLUSHMSG: + send_flushmsg_ack(mdsc, session, seq); + break; + default: pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); WARN_ON(1); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 4c053d099ae..68288917c73 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -383,6 +383,8 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg); +extern struct ceph_mds_session * +ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target); extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 89fa4a940a0..4440f447fd3 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -41,6 +41,8 @@ const char *ceph_session_op_name(int op) case CEPH_SESSION_RENEWCAPS: return "renewcaps"; case CEPH_SESSION_STALE: return "stale"; case CEPH_SESSION_RECALL_STATE: return "recall_state"; + case CEPH_SESSION_FLUSHMSG: return "flushmsg"; + case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack"; } return "???"; } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 6a0951e4304..2df963f1cf5 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -490,10 +490,10 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_options *opt) { struct ceph_fs_client *fsc; - const unsigned supported_features = + const u64 supported_features = CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH; - const unsigned required_features = 0; + const u64 required_features = 0; int page_count; size_t size; int err = -ENOMEM; @@ -686,6 +686,7 @@ static const struct super_operations ceph_super_ops = { .alloc_inode = ceph_alloc_inode, .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, + .drop_inode = ceph_drop_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, @@ -818,7 +819,11 @@ static int ceph_set_super(struct super_block *s, void *data) s->s_flags = fsc->mount_options->sb_flags; s->s_maxbytes = 1ULL << 40; /* temp value until we get mdsmap */ +#ifdef CONFIG_CEPH_FS_POSIX_ACL + s->s_flags |= MS_POSIXACL; +#endif + s->s_xattr = ceph_xattr_handlers; s->s_fs_info = fsc; fsc->sb = s; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index ef4ac38bb61..c299f7d19bf 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -287,14 +287,12 @@ struct ceph_inode_info { unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ struct list_head i_cap_delay_list; /* for delayed cap release to mds */ - int i_cap_exporting_mds; /* to handle cap migration between */ - unsigned i_cap_exporting_mseq; /* mds's. */ - unsigned i_cap_exporting_issued; struct ceph_cap_reservation i_cap_migration_resv; struct list_head i_cap_snaps; /* snapped state pending flush to mds */ struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or dirty|flushing caps */ unsigned i_snap_caps; /* cap bits for snapped files */ + unsigned i_cap_exporting_issued; int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ @@ -335,7 +333,6 @@ struct ceph_inode_info { u32 i_fscache_gen; /* sequence, for delayed fscache validate */ struct work_struct i_revalidate_work; #endif - struct inode vfs_inode; /* at end */ }; @@ -529,6 +526,8 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) } extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, + struct ceph_cap *ocap, int mask); extern int ceph_caps_revoking(struct ceph_inode_info *ci, int mask); extern int __ceph_caps_used(struct ceph_inode_info *ci); @@ -691,6 +690,7 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); +extern int ceph_drop_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -724,6 +724,9 @@ extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, /* xattr.c */ extern int ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +int __ceph_setxattr(struct dentry *, const char *, const void *, size_t, int); +ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); +int __ceph_removexattr(struct dentry *, const char *); extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern int ceph_removexattr(struct dentry *, const char *); @@ -732,6 +735,39 @@ extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __init ceph_xattr_init(void); extern void ceph_xattr_exit(void); +/* acl.c */ +extern const struct xattr_handler ceph_xattr_acl_access_handler; +extern const struct xattr_handler ceph_xattr_acl_default_handler; +extern const struct xattr_handler *ceph_xattr_handlers[]; + +#ifdef CONFIG_CEPH_FS_POSIX_ACL + +struct posix_acl *ceph_get_acl(struct inode *, int); +int ceph_init_acl(struct dentry *, struct inode *, struct inode *); +int ceph_acl_chmod(struct dentry *, struct inode *); +void ceph_forget_all_cached_acls(struct inode *inode); + +#else + +#define ceph_get_acl NULL + +static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode, + struct inode *dir) +{ + return 0; +} + +static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode) +{ + return 0; +} + +static inline void ceph_forget_all_cached_acls(struct inode *inode) +{ +} + +#endif + /* caps.c */ extern const char *ceph_cap_string(int c); extern void ceph_handle_caps(struct ceph_mds_session *session, @@ -744,6 +780,7 @@ extern int ceph_add_cap(struct inode *inode, extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); +extern int ceph_is_any_caps(struct inode *inode); extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, u64 cap_id, u32 migrate_seq, u32 issue_seq); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index be661d8f532..c7581f3733c 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -11,11 +11,24 @@ #define XATTR_CEPH_PREFIX "ceph." #define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1) +/* + * List of handlers for synthetic system.* attributes. Other + * attributes are handled directly. + */ +const struct xattr_handler *ceph_xattr_handlers[] = { +#ifdef CONFIG_CEPH_FS_POSIX_ACL + &ceph_xattr_acl_access_handler, + &ceph_xattr_acl_default_handler, +#endif + NULL, +}; + static bool ceph_is_valid_xattr(const char *name) { return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) || !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) || + !strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN) || !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) || !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN); } @@ -663,10 +676,9 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci) } } -ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, +ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, size_t size) { - struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); int err; struct ceph_inode_xattr *xattr; @@ -675,7 +687,6 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, if (!ceph_is_valid_xattr(name)) return -ENODATA; - /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) { @@ -725,6 +736,15 @@ out: return err; } +ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value, + size_t size) +{ + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_getxattr(dentry, name, value, size); + + return __ceph_getxattr(dentry->d_inode, name, value, size); +} + ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) { struct inode *inode = dentry->d_inode; @@ -863,8 +883,8 @@ out: return err; } -int ceph_setxattr(struct dentry *dentry, const char *name, - const void *value, size_t size, int flags) +int __ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -879,9 +899,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name, struct ceph_inode_xattr *xattr = NULL; int required_blob_size; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -958,6 +975,18 @@ out: return err; } +int ceph_setxattr(struct dentry *dentry, const char *name, + const void *value, size_t size, int flags) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_setxattr(dentry, name, value, size, flags); + + return __ceph_setxattr(dentry, name, value, size, flags); +} + static int ceph_send_removexattr(struct dentry *dentry, const char *name) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); @@ -984,7 +1013,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name) return err; } -int ceph_removexattr(struct dentry *dentry, const char *name) +int __ceph_removexattr(struct dentry *dentry, const char *name) { struct inode *inode = dentry->d_inode; struct ceph_vxattr *vxattr; @@ -994,9 +1023,6 @@ int ceph_removexattr(struct dentry *dentry, const char *name) int required_blob_size; int dirty; - if (ceph_snap(inode) != CEPH_NOSNAP) - return -EROFS; - if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1053,3 +1079,13 @@ out: return err; } +int ceph_removexattr(struct dentry *dentry, const char *name) +{ + if (ceph_snap(dentry->d_inode) != CEPH_NOSNAP) + return -EROFS; + + if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) + return generic_removexattr(dentry, name); + + return __ceph_removexattr(dentry, name); +} |