aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r--fs/ceph/dir.c86
1 files changed, 46 insertions, 40 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 45eda6d7a40..c29d6ae6887 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -119,7 +119,8 @@ static int fpos_cmp(loff_t l, loff_t r)
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
* the MDS if/when the directory is modified).
*/
-static int __dcache_readdir(struct file *file, struct dir_context *ctx)
+static int __dcache_readdir(struct file *file, struct dir_context *ctx,
+ u32 shared_gen)
{
struct ceph_file_info *fi = file->private_data;
struct dentry *parent = file->f_dentry;
@@ -133,14 +134,14 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx)
last = fi->dentry;
fi->dentry = NULL;
- dout("__dcache_readdir %p at %llu (last %p)\n", dir, ctx->pos,
- last);
+ dout("__dcache_readdir %p v%u at %llu (last %p)\n",
+ dir, shared_gen, ctx->pos, last);
spin_lock(&parent->d_lock);
/* start at beginning? */
if (ctx->pos == 2 || last == NULL ||
- ctx->pos < ceph_dentry(last)->offset) {
+ fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) {
if (list_empty(&parent->d_subdirs))
goto out_unlock;
p = parent->d_subdirs.prev;
@@ -161,7 +162,8 @@ more:
goto out_unlock;
}
spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
- if (!d_unhashed(dentry) && dentry->d_inode &&
+ if (di->lease_shared_gen == shared_gen &&
+ !d_unhashed(dentry) && dentry->d_inode &&
ceph_snap(dentry->d_inode) != CEPH_SNAPDIR &&
ceph_ino(dentry->d_inode) != CEPH_INO_CEPH &&
fpos_cmp(ctx->pos, di->offset) <= 0)
@@ -180,9 +182,16 @@ more:
spin_unlock(&dentry->d_lock);
spin_unlock(&parent->d_lock);
+ /* make sure a dentry wasn't dropped while we didn't have parent lock */
+ if (!ceph_dir_is_complete(dir)) {
+ dout(" lost dir complete on %p; falling back to mds\n", dir);
+ dput(dentry);
+ err = -EAGAIN;
+ goto out;
+ }
+
dout(" %llu (%llu) dentry %p %.*s %p\n", di->offset, ctx->pos,
dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
- ctx->pos = di->offset;
if (!dir_emit(ctx, dentry->d_name.name,
dentry->d_name.len,
ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino),
@@ -190,25 +199,18 @@ more:
if (last) {
/* remember our position */
fi->dentry = last;
- fi->next_offset = di->offset;
+ fi->next_offset = fpos_off(di->offset);
}
dput(dentry);
return 0;
}
+ ctx->pos = di->offset + 1;
+
if (last)
dput(last);
last = dentry;
- ctx->pos++;
-
- /* make sure a dentry wasn't dropped while we didn't have parent lock */
- if (!ceph_dir_is_complete(dir)) {
- dout(" lost dir complete on %p; falling back to mds\n", dir);
- err = -EAGAIN;
- goto out;
- }
-
spin_lock(&parent->d_lock);
p = p->prev; /* advance to next dentry */
goto more;
@@ -252,8 +254,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
int err;
u32 ftype;
struct ceph_mds_reply_info_parsed *rinfo;
- const int max_entries = fsc->mount_options->max_readdir;
- const int max_bytes = fsc->mount_options->max_readdir_bytes;
dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
if (fi->flags & CEPH_F_ATEND)
@@ -291,10 +291,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
+ u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock);
- err = __dcache_readdir(file, ctx);
+ err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN)
return err;
+ frag = fpos_frag(ctx->pos);
+ off = fpos_off(ctx->pos);
} else {
spin_unlock(&ci->i_ceph_lock);
}
@@ -322,14 +325,16 @@ more:
fi->last_readdir = NULL;
}
- /* requery frag tree, as the frag topology may have changed */
- frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
-
dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
ceph_vinop(inode), frag, fi->last_name);
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
+ err = ceph_alloc_readdir_reply_buffer(req, inode);
+ if (err) {
+ ceph_mdsc_put_request(req);
+ return err;
+ }
req->r_inode = inode;
ihold(inode);
req->r_dentry = dget(file->f_dentry);
@@ -340,9 +345,6 @@ more:
req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
req->r_readdir_offset = fi->next_offset;
req->r_args.readdir.frag = cpu_to_le32(frag);
- req->r_args.readdir.max_entries = cpu_to_le32(max_entries);
- req->r_args.readdir.max_bytes = cpu_to_le32(max_bytes);
- req->r_num_caps = max_entries + 1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) {
ceph_mdsc_put_request(req);
@@ -369,9 +371,9 @@ more:
fi->next_offset = 0;
off = fi->next_offset;
}
+ fi->frag = frag;
fi->offset = fi->next_offset;
fi->last_readdir = req;
- fi->frag = frag;
if (req->r_reply_info.dir_end) {
kfree(fi->last_name);
@@ -446,7 +448,6 @@ more:
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
dout(" marking %p complete\n", inode);
__ceph_dir_set_complete(ci, fi->dir_release_count);
- ci->i_max_offset = ctx->pos;
}
spin_unlock(&ci->i_ceph_lock);
@@ -454,7 +455,7 @@ more:
return 0;
}
-static void reset_readdir(struct ceph_file_info *fi)
+static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
{
if (fi->last_readdir) {
ceph_mdsc_put_request(fi->last_readdir);
@@ -462,7 +463,10 @@ static void reset_readdir(struct ceph_file_info *fi)
}
kfree(fi->last_name);
fi->last_name = NULL;
- fi->next_offset = 2; /* compensate for . and .. */
+ if (ceph_frag_is_leftmost(frag))
+ fi->next_offset = 2; /* compensate for . and .. */
+ else
+ fi->next_offset = 0;
if (fi->dentry) {
dput(fi->dentry);
fi->dentry = NULL;
@@ -474,7 +478,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
{
struct ceph_file_info *fi = file->private_data;
struct inode *inode = file->f_mapping->host;
- loff_t old_offset = offset;
+ loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
loff_t retval;
mutex_lock(&inode->i_mutex);
@@ -491,7 +495,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
goto out;
}
- if (offset >= 0 && offset <= inode->i_sb->s_maxbytes) {
+ if (offset >= 0) {
if (offset != file->f_pos) {
file->f_pos = offset;
file->f_version = 0;
@@ -504,14 +508,14 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
* seek to new frag, or seek prior to current chunk.
*/
if (offset == 0 ||
- fpos_frag(offset) != fpos_frag(old_offset) ||
+ fpos_frag(offset) != fi->frag ||
fpos_off(offset) < fi->offset) {
dout("dir_llseek dropping %p content\n", file);
- reset_readdir(fi);
+ reset_readdir(fi, fpos_frag(offset));
}
/* bump dir_release_count if we did a forward seek */
- if (offset > old_offset)
+ if (fpos_cmp(offset, old_offset) > 0)
fi->dir_release_count--;
}
out:
@@ -812,8 +816,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
}
req->r_dentry = dget(dentry);
req->r_num_caps = 2;
- req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
- req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
+ req->r_old_dentry = dget(old_dentry);
req->r_locked_dir = dir;
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -911,10 +914,11 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
+ ihold(old_dir);
req->r_dentry = dget(new_dentry);
req->r_num_caps = 2;
req->r_old_dentry = dget(old_dentry);
- req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
+ req->r_old_dentry_dir = old_dir;
req->r_locked_dir = new_dir;
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -932,14 +936,16 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
* to do it here.
*/
- /* d_move screws up d_subdirs order */
- ceph_dir_clear_complete(new_dir);
-
d_move(old_dentry, new_dentry);
/* ensure target dentry is invalidated, despite
rehashing bug in vfs_rename_dir */
ceph_invalidate_dentry_lease(new_dentry);
+
+ /* d_move screws up sibling dentries' offsets */
+ ceph_dir_clear_complete(old_dir);
+ ceph_dir_clear_complete(new_dir);
+
}
ceph_mdsc_put_request(req);
return err;