aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r--fs/ceph/dir.c131
1 files changed, 84 insertions, 47 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 868b61d56ca..c29d6ae6887 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -100,6 +100,14 @@ static unsigned fpos_off(loff_t p)
return p & 0xffffffff;
}
+static int fpos_cmp(loff_t l, loff_t r)
+{
+ int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
+ if (v)
+ return v;
+ return (int)(fpos_off(l) - fpos_off(r));
+}
+
/*
* When possible, we try to satisfy a readdir by peeking at the
* dcache. We make this work by carefully ordering dentries on
@@ -111,7 +119,8 @@ static unsigned fpos_off(loff_t p)
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
* the MDS if/when the directory is modified).
*/
-static int __dcache_readdir(struct file *file, struct dir_context *ctx)
+static int __dcache_readdir(struct file *file, struct dir_context *ctx,
+ u32 shared_gen)
{
struct ceph_file_info *fi = file->private_data;
struct dentry *parent = file->f_dentry;
@@ -125,14 +134,14 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
last = fi->dentry;
fi->dentry = NULL;
- dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos,
- last);
+ dout("__dcache_readdir %p v%u at %llu (last %p)\n",
+ dir, shared_gen, ctx->pos, last);
spin_lock(&parent->d_lock);
/* start at beginning? */
if (ctx->pos == 2 || last == NULL ||
- ctx->pos < ceph_dentry(last)->offset) {
+ fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) {
if (list_empty(&parent->d_subdirs))
goto out_unlock;
p = parent->d_subdirs.prev;
@@ -153,10 +162,11 @@ more:
goto out_unlock;
}
spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
- if (!d_unhashed(dentry) && dentry->d_inode &&
+ if (di->lease_shared_gen == shared_gen &&
+ !d_unhashed(dentry) && dentry->d_inode &&
ceph_snap(dentry->d_inode) != CEPH_SNAPDIR &&
ceph_ino(dentry->d_inode) != CEPH_INO_CEPH &&
- ctx->pos <= di->offset)
+ fpos_cmp(ctx->pos, di->offset) <= 0)
break;
dout(" skipping %p %.*s at %llu (%llu)%s%s\n", dentry,
dentry->d_name.len, dentry->d_name.name, di->offset,
@@ -172,9 +182,16 @@ more:
spin_unlock(&dentry->d_lock);
spin_unlock(&parent->d_lock);
+ /* make sure a dentry wasn't dropped while we didn't have parent lock */
+ if (!ceph_dir_is_complete(dir)) {
+ dout(" lost dir complete on %p; falling back to mds\n", dir);
+ dput(dentry);
+ err = -EAGAIN;
+ goto out;
+ }
+
dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, ctx->pos,
dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
- ctx->pos = di->offset;
if (!dir_emit(ctx, dentry->d_name.name,
dentry->d_name.len,
ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino),
@@ -182,25 +199,18 @@ more:
if (last) {
/* remember our position */
fi->dentry = last;
- fi->next_offset = di->offset;
+ fi->next_offset = fpos_off(di->offset);
}
dput(dentry);
return 0;
}
+ ctx->pos = di->offset + 1;
+
if (last)
dput(last);
last = dentry;
- ctx->pos++;
-
- /* make sure a dentry wasn't dropped while we didn't have parent lock */
- if (!ceph_dir_is_complete(dir)) {
- dout(" lost dir complete on %p; falling back to mds\n", dir);
- err = -EAGAIN;
- goto out;
- }
-
spin_lock(&parent->d_lock);
p = p->prev; /* advance to next dentry */
goto more;
@@ -244,8 +254,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
int err;
u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo;
- const int max_entries = fsc->mount_options->max_readdir;
- const int max_bytes = fsc->mount_options->max_readdir_bytes;
dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
if (fi->flags & CEPH_F_ATEND)
@@ -283,10 +291,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+ u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock);
- err = __dcache_readdir(file, ctx);
+ err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN)
return err;
+ frag = fpos_frag(ctx->pos);
+ off = fpos_off(ctx->pos);
} else {
spin_unlock(&ci->i_ceph_lock);
}
@@ -314,14 +325,16 @@ more:
fi->last_readdir = NULL;
}
- /* requery frag tree, as the frag topology may have changed */
- frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
-
dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
ceph_vinop(inode), frag, fi->last_name);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
+ err = ceph_alloc_readdir_reply_buffer(req, inode);
+ if (err) {
+ ceph_mdsc_put_request(req);
+ return err;
+ }
req->r_inode = inode;
ihold(inode);
req->r_dentry = dget(file->f_dentry);
@@ -332,9 +345,6 @@ more:
req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag);
- req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
- req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes);
- req->r_num_caps = max_entries + 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) {
ceph_mdsc_put_request(req);
@@ -352,6 +362,16 @@ more:
}
/* note next offset and last dentry name */
+ rinfo = &req->r_reply_info;
+ if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+ frag = le32_to_cpu(rinfo->dir_dir->frag);
+ if (ceph_frag_is_leftmost(frag))
+ fi->next_offset = 2;
+ else
+ fi->next_offset = 0;
+ off = fi->next_offset;
+ }
+ fi->frag = frag;
fi->offset = fi->next_offset;
fi->last_readdir = req;
@@ -363,7 +383,6 @@ more:
else
fi->next_offset = 0;
} else {
- rinfo = &req->r_reply_info;
err = note_last_dentry(fi,
rinfo->dir_dname[rinfo->dir_nr-1],
rinfo->dir_dname_len[rinfo->dir_nr-1]);
@@ -429,7 +448,6 @@ more:
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
dout(" marking %p complete\n", inode);
__ceph_dir_set_complete(ci, fi->dir_release_count);
- ci->i_max_offset = ctx->pos;
}
spin_unlock(&ci->i_ceph_lock);
@@ -437,7 +455,7 @@ more:
return 0;
}
-static void reset_readdir(struct ceph_file_info *fi)
+static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
{
if (fi->last_readdir) {
ceph_mdsc_put_request(fi->last_readdir);
@@ -445,7 +463,10 @@ static void reset_readdir(struct ceph_file_info *fi)
}
kfree(fi->last_name);
fi->last_name = NULL;
- fi->next_offset = 2; /* compensate for . and .. */
+ if (ceph_frag_is_leftmost(frag))
+ fi->next_offset = 2; /* compensate for . and .. */
+ else
+ fi->next_offset = 0;
if (fi->dentry) {
dput(fi->dentry);
fi->dentry = NULL;
@@ -457,7 +478,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
{
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_mapping->host;
- loff_t old_offset = offset;
+ loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
loff_t retval;
mutex_lock(&inode->i_mutex);
@@ -474,7 +495,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
goto out;
}
- if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
+ if (offset >= 0) {
if (offset != file->f_pos) {
file->f_pos = offset;
file->f_version = 0;
@@ -487,14 +508,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
* seek to new frag, or seek prior to current chunk.
*/
if (offset == 0 ||
- fpos_frag(offset) != fpos_frag(old_offset) ||
+ fpos_frag(offset) != fi->frag ||
fpos_off(offset) < fi->offset) {
dout("dir_llseek dropping %p content\n", file);
- reset_readdir(fi);
+ reset_readdir(fi, fpos_frag(offset));
}
/* bump dir_release_count if we did a forward seek */
- if (offset > old_offset)
+ if (fpos_cmp(offset, old_offset) > 0)
fi->dir_release_count--;
}
out:
@@ -684,7 +705,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
ceph_mdsc_put_request(req);
- if (err)
+
+ if (!err)
+ ceph_init_acl(dentry, dentry->d_inode, dir);
+ else
d_drop(dentry);
return err;
}
@@ -722,7 +746,9 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
if (!err && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
ceph_mdsc_put_request(req);
- if (err)
+ if (!err)
+ ceph_init_acl(dentry, dentry->d_inode, dir);
+ else
d_drop(dentry);
return err;
}
@@ -763,7 +789,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
err = ceph_handle_notrace_create(dir, dentry);
ceph_mdsc_put_request(req);
out:
- if (err < 0)
+ if (!err)
+ ceph_init_acl(dentry, dentry->d_inode, dir);
+ else
d_drop(dentry);
return err;
}
@@ -788,8 +816,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
- req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
- req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
+ req->r_old_dentry = dget(old_dentry);
req->r_locked_dir = dir;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -887,10 +914,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
+ ihold(old_dir);
req->r_dentry = dget(new_dentry);
req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry);
- req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
+ req->r_old_dentry_dir = old_dir;
req->r_locked_dir = new_dir;
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -908,14 +936,16 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
* to do it here.
*/
- /* d_move screws up d_subdirs order */
- ceph_dir_clear_complete(new_dir);
-
d_move(old_dentry, new_dentry);
/* ensure target dentry is invalidated, despite
rehashing bug in vfs_rename_dir */
ceph_invalidate_dentry_lease(new_dentry);
+
+ /* d_move screws up sibling dentries' offsets */
+ ceph_dir_clear_complete(old_dir);
+ ceph_dir_clear_complete(new_dir);
+
}
ceph_mdsc_put_request(req);
return err;
@@ -1028,14 +1058,19 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
valid = 1;
} else if (dentry_lease_is_valid(dentry) ||
dir_lease_is_valid(dir, dentry)) {
- valid = 1;
+ if (dentry->d_inode)
+ valid = ceph_is_any_caps(dentry->d_inode);
+ else
+ valid = 1;
}
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
- if (valid)
+ if (valid) {
ceph_dentry_lru_touch(dentry);
- else
+ } else {
+ ceph_dir_clear_complete(dir);
d_drop(dentry);
+ }
iput(dir);
return valid;
}
@@ -1284,6 +1319,8 @@ const struct inode_operations ceph_dir_iops = {
.getxattr = ceph_getxattr,
.listxattr = ceph_listxattr,
.removexattr = ceph_removexattr,
+ .get_acl = ceph_get_acl,
+ .set_acl = ceph_set_acl,
.mknod = ceph_mknod,
.symlink = ceph_symlink,
.mkdir = ceph_mkdir,