aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/snap.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/snap.c')
-rw-r--r--fs/ceph/snap.c271
1 files changed, 148 insertions, 123 deletions
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index e6f9bc57d47..f01645a2775 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -1,10 +1,12 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
#include <linux/sort.h>
#include <linux/slab.h>
#include "super.h"
-#include "decode.h"
+#include "mds_client.h"
+
+#include <linux/ceph/decode.h>
/*
* Snapshots in ceph are driven in large part by cooperation from the
@@ -119,6 +121,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item);
INIT_LIST_HEAD(&realm->empty_item);
+ INIT_LIST_HEAD(&realm->dirty_item);
INIT_LIST_HEAD(&realm->inodes_with_caps);
spin_lock_init(&realm->inodes_with_caps_lock);
__insert_snap_realm(&mdsc->snap_realms, realm);
@@ -203,7 +206,7 @@ void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
up_write(&mdsc->snap_rwsem);
} else {
spin_lock(&mdsc->snap_empty_lock);
- list_add(&mdsc->snap_empty, &realm->empty_item);
+ list_add(&realm->empty_item, &mdsc->snap_empty);
spin_unlock(&mdsc->snap_empty_lock);
}
}
@@ -293,8 +296,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
struct ceph_snap_realm *parent = realm->parent;
struct ceph_snap_context *snapc;
int err = 0;
- int i;
- int num = realm->num_prior_parent_snaps + realm->num_snaps;
+ u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
/*
* build parent context, if it hasn't been built.
@@ -318,28 +320,29 @@ static int build_snap_context(struct ceph_snap_realm *realm)
realm->cached_context->seq == realm->seq &&
(!parent ||
realm->cached_context->seq >= parent->cached_context->seq)) {
- dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
+ dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
" (unchanged)\n",
realm->ino, realm, realm->cached_context,
realm->cached_context->seq,
- realm->cached_context->num_snaps);
+ (unsigned int) realm->cached_context->num_snaps);
return 0;
}
/* alloc new snap context */
err = -ENOMEM;
- if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc))
+ if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
goto fail;
- snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS);
+ snapc = ceph_create_snap_context(num, GFP_NOFS);
if (!snapc)
goto fail;
- atomic_set(&snapc->nref, 1);
/* build (reverse sorted) snap vector */
num = 0;
snapc->seq = realm->seq;
if (parent) {
- /* include any of parent's snaps occuring _after_ my
+ u32 i;
+
+ /* include any of parent's snaps occurring _after_ my
parent became my parent */
for (i = 0; i < parent->cached_context->num_snaps; i++)
if (parent->cached_context->snaps[i] >=
@@ -358,8 +361,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
snapc->num_snaps = num;
- dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
- realm->ino, realm, snapc, snapc->seq, snapc->num_snaps);
+ dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
+ realm->ino, realm, snapc, snapc->seq,
+ (unsigned int) snapc->num_snaps);
if (realm->cached_context)
ceph_put_snap_context(realm->cached_context);
@@ -399,9 +403,9 @@ static void rebuild_snap_realms(struct ceph_snap_realm *realm)
* helper to allocate and decode an array of snapids. free prior
* instance, if any.
*/
-static int dup_array(u64 **dst, __le64 *src, int num)
+static int dup_array(u64 **dst, __le64 *src, u32 num)
{
- int i;
+ u32 i;
kfree(*dst);
if (num) {
@@ -431,12 +435,11 @@ static int dup_array(u64 **dst, __le64 *src, int num)
* Caller must hold snap_rwsem for read (i.e., the realm topology won't
* change).
*/
-void ceph_queue_cap_snap(struct ceph_inode_info *ci,
- struct ceph_snap_context *snapc)
+void ceph_queue_cap_snap(struct ceph_inode_info *ci)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
- int used;
+ int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
if (!capsnap) {
@@ -444,44 +447,72 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
return;
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
used = __ceph_caps_used(ci);
+ dirty = __ceph_caps_dirty(ci);
+
+ /*
+ * If there is a write in progress, treat that as a dirty Fw,
+ * even though it hasn't completed yet; by the time we finish
+ * up this capsnap it will be.
+ */
+ if (used & CEPH_CAP_FILE_WR)
+ dirty |= CEPH_CAP_FILE_WR;
+
if (__ceph_have_pending_cap_snap(ci)) {
/* there is no point in queuing multiple "pending" cap_snaps,
as no new writes are allowed to start when pending, so any
writes in progress now were started before the previous
cap_snap. lucky us. */
- dout("queue_cap_snap %p snapc %p seq %llu used %d"
- " already pending\n", inode, snapc, snapc->seq, used);
+ dout("queue_cap_snap %p already pending\n", inode);
kfree(capsnap);
- } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
- igrab(inode);
+ } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+ CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
+ struct ceph_snap_context *snapc = ci->i_head_snapc;
+
+ /*
+ * if we are a sync write, we may need to go to the snaprealm
+ * to get the current snapc.
+ */
+ if (!snapc)
+ snapc = ci->i_snap_realm->cached_context;
+
+ dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
+ inode, capsnap, snapc, ceph_cap_string(dirty));
+ ihold(inode);
atomic_set(&capsnap->nref, 1);
capsnap->ci = ci;
INIT_LIST_HEAD(&capsnap->ci_item);
INIT_LIST_HEAD(&capsnap->flushing_item);
- capsnap->follows = snapc->seq - 1;
- capsnap->context = ceph_get_snap_context(snapc);
+ capsnap->follows = snapc->seq;
capsnap->issued = __ceph_caps_issued(ci, NULL);
- capsnap->dirty = __ceph_caps_dirty(ci);
+ capsnap->dirty = dirty;
capsnap->mode = inode->i_mode;
capsnap->uid = inode->i_uid;
capsnap->gid = inode->i_gid;
- /* fixme? */
- capsnap->xattr_blob = NULL;
- capsnap->xattr_len = 0;
+ if (dirty & CEPH_CAP_XATTR_EXCL) {
+ __ceph_build_xattrs_blob(ci);
+ capsnap->xattr_blob =
+ ceph_buffer_get(ci->i_xattrs.blob);
+ capsnap->xattr_version = ci->i_xattrs.version;
+ } else {
+ capsnap->xattr_blob = NULL;
+ capsnap->xattr_version = 0;
+ }
/* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this
snapshot. */
capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
ci->i_wrbuffer_ref_head = 0;
- ceph_put_snap_context(ci->i_head_snapc);
- ci->i_head_snapc = NULL;
+ capsnap->context = snapc;
+ ci->i_head_snapc =
+ ceph_get_snap_context(ci->i_snap_realm->cached_context);
+ dout(" new snapc is %p\n", ci->i_head_snapc);
list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
if (used & CEPH_CAP_FILE_WR) {
@@ -498,7 +529,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
kfree(capsnap);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
/*
@@ -507,13 +538,13 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
*
* If capsnap can now be flushed, add to snap_flush list, and return 1.
*
- * Caller must hold i_lock.
+ * Caller must hold i_ceph_lock.
*/
int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap)
{
struct inode *inode = &ci->vfs_inode;
- struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
+ struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
BUG_ON(capsnap->writing);
capsnap->size = inode->i_size;
@@ -522,15 +553,17 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
capsnap->ctime = inode->i_ctime;
capsnap->time_warp_seq = ci->i_time_warp_seq;
if (capsnap->dirty_pages) {
- dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
+ dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
"still has %d dirty pages\n", inode, capsnap,
capsnap->context, capsnap->context->seq,
- capsnap->size, capsnap->dirty_pages);
+ ceph_cap_string(capsnap->dirty), capsnap->size,
+ capsnap->dirty_pages);
return 0;
}
- dout("finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
+ dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
inode, capsnap, capsnap->context,
- capsnap->context->seq, capsnap->size);
+ capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+ capsnap->size);
spin_lock(&mdsc->snap_flush_lock);
list_add_tail(&ci->i_snap_flush_item, &mdsc->snap_flush_list);
@@ -538,6 +571,45 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
return 1; /* caller may want to ceph_flush_snaps */
}
+/*
+ * Queue cap_snaps for snap writeback for this realm and its children.
+ * Called under snap_rwsem, so realm topology won't change.
+ */
+static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
+{
+ struct ceph_inode_info *ci;
+ struct inode *lastinode = NULL;
+ struct ceph_snap_realm *child;
+
+ dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
+
+ spin_lock(&realm->inodes_with_caps_lock);
+ list_for_each_entry(ci, &realm->inodes_with_caps,
+ i_snap_realm_item) {
+ struct inode *inode = igrab(&ci->vfs_inode);
+ if (!inode)
+ continue;
+ spin_unlock(&realm->inodes_with_caps_lock);
+ if (lastinode)
+ iput(lastinode);
+ lastinode = inode;
+ ceph_queue_cap_snap(ci);
+ spin_lock(&realm->inodes_with_caps_lock);
+ }
+ spin_unlock(&realm->inodes_with_caps_lock);
+ if (lastinode)
+ iput(lastinode);
+
+ list_for_each_entry(child, &realm->children, child_item) {
+ dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
+ realm, realm->ino, child, child->ino);
+ list_del_init(&child->dirty_item);
+ list_add(&child->dirty_item, &realm->dirty_item);
+ }
+
+ list_del_init(&realm->dirty_item);
+ dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
+}
/*
* Parse and apply a snapblob "snap trace" from the MDS. This specifies
@@ -555,6 +627,7 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm;
int invalidate = 0;
int err = -ENOMEM;
+ LIST_HEAD(dirty_realms);
dout("update_snap_trace deletion=%d\n", deletion);
more:
@@ -577,45 +650,6 @@ more:
}
}
- if (le64_to_cpu(ri->seq) > realm->seq) {
- dout("update_snap_trace updating %llx %p %lld -> %lld\n",
- realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
- /*
- * if the realm seq has changed, queue a cap_snap for every
- * inode with open caps. we do this _before_ we update
- * the realm info so that we prepare for writeback under the
- * _previous_ snap context.
- *
- * ...unless it's a snap deletion!
- */
- if (!deletion) {
- struct ceph_inode_info *ci;
- struct inode *lastinode = NULL;
-
- spin_lock(&realm->inodes_with_caps_lock);
- list_for_each_entry(ci, &realm->inodes_with_caps,
- i_snap_realm_item) {
- struct inode *inode = igrab(&ci->vfs_inode);
- if (!inode)
- continue;
- spin_unlock(&realm->inodes_with_caps_lock);
- if (lastinode)
- iput(lastinode);
- lastinode = inode;
- ceph_queue_cap_snap(ci, realm->cached_context);
- spin_lock(&realm->inodes_with_caps_lock);
- }
- spin_unlock(&realm->inodes_with_caps_lock);
- if (lastinode)
- iput(lastinode);
- dout("update_snap_trace cap_snaps queued\n");
- }
-
- } else {
- dout("update_snap_trace %llx %p seq %lld unchanged\n",
- realm->ino, realm, realm->seq);
- }
-
/* ensure the parent is correct */
err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
if (err < 0)
@@ -623,6 +657,8 @@ more:
invalidate += err;
if (le64_to_cpu(ri->seq) > realm->seq) {
+ dout("update_snap_trace updating %llx %p %lld -> %lld\n",
+ realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
/* update realm parameters, snap lists */
realm->seq = le64_to_cpu(ri->seq);
realm->created = le64_to_cpu(ri->created);
@@ -640,9 +676,17 @@ more:
if (err < 0)
goto fail;
+ /* queue realm for cap_snap creation */
+ list_add(&realm->dirty_item, &dirty_realms);
+
invalidate = 1;
} else if (!realm->cached_context) {
+ dout("update_snap_trace %llx %p seq %lld new\n",
+ realm->ino, realm, realm->seq);
invalidate = 1;
+ } else {
+ dout("update_snap_trace %llx %p seq %lld unchanged\n",
+ realm->ino, realm, realm->seq);
}
dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
@@ -655,6 +699,16 @@ more:
if (invalidate)
rebuild_snap_realms(realm);
+ /*
+ * queue cap snaps _after_ we've built the new snap contexts,
+ * so that i_head_snapc can be set appropriately.
+ */
+ while (!list_empty(&dirty_realms)) {
+ realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
+ dirty_item);
+ queue_realm_cap_snaps(realm);
+ }
+
__cleanup_empty_realms(mdsc);
return 0;
@@ -684,11 +738,11 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
ci = list_first_entry(&mdsc->snap_flush_list,
struct ceph_inode_info, i_snap_flush_item);
inode = &ci->vfs_inode;
- igrab(inode);
+ ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
- spin_lock(&inode->i_lock);
- __ceph_flush_snaps(ci, &session);
- spin_unlock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
+ __ceph_flush_snaps(ci, &session, 0);
+ spin_unlock(&ci->i_ceph_lock);
iput(inode);
spin_lock(&mdsc->snap_flush_lock);
}
@@ -717,7 +771,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session,
struct ceph_msg *msg)
{
- struct super_block *sb = mdsc->client->sb;
+ struct super_block *sb = mdsc->fsc->sb;
int mds = session->s_mds;
u64 split;
int op;
@@ -788,12 +842,13 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
};
struct inode *inode = ceph_find_inode(sb, vino);
struct ceph_inode_info *ci;
+ struct ceph_snap_realm *oldrealm;
if (!inode)
continue;
ci = ceph_inode(inode);
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (!ci->i_snap_realm)
goto skip_inode;
/*
@@ -813,25 +868,25 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
dout(" will move %p to split realm %llx %p\n",
inode, realm->ino, realm);
/*
- * Remove the inode from the realm's inode
- * list, but don't add it to the new realm
- * yet. We don't want the cap_snap to be
- * queued (again) by ceph_update_snap_trace()
- * below. Queue it _now_, under the old context.
+ * Move the inode to the new realm
*/
spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
+ list_add(&ci->i_snap_realm_item,
+ &realm->inodes_with_caps);
+ oldrealm = ci->i_snap_realm;
+ ci->i_snap_realm = realm;
spin_unlock(&realm->inodes_with_caps_lock);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
- ceph_queue_cap_snap(ci,
- ci->i_snap_realm->cached_context);
+ ceph_get_snap_realm(mdsc, realm);
+ ceph_put_snap_realm(mdsc, oldrealm);
iput(inode);
continue;
skip_inode:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
iput(inode);
}
@@ -853,39 +908,9 @@ skip_inode:
ceph_update_snap_trace(mdsc, p, e,
op == CEPH_SNAP_OP_DESTROY);
- if (op == CEPH_SNAP_OP_SPLIT) {
- /*
- * ok, _now_ add the inodes into the new realm.
- */
- for (i = 0; i < num_split_inos; i++) {
- struct ceph_vino vino = {
- .ino = le64_to_cpu(split_inos[i]),
- .snap = CEPH_NOSNAP,
- };
- struct inode *inode = ceph_find_inode(sb, vino);
- struct ceph_inode_info *ci;
-
- if (!inode)
- continue;
- ci = ceph_inode(inode);
- spin_lock(&inode->i_lock);
- if (!ci->i_snap_realm)
- goto split_skip_inode;
- ceph_put_snap_realm(mdsc, ci->i_snap_realm);
- spin_lock(&realm->inodes_with_caps_lock);
- list_add(&ci->i_snap_realm_item,
- &realm->inodes_with_caps);
- ci->i_snap_realm = realm;
- spin_unlock(&realm->inodes_with_caps_lock);
- ceph_get_snap_realm(mdsc, realm);
-split_skip_inode:
- spin_unlock(&inode->i_lock);
- iput(inode);
- }
-
+ if (op == CEPH_SNAP_OP_SPLIT)
/* we took a reference when we created the realm, above */
ceph_put_snap_realm(mdsc, realm);
- }
__cleanup_empty_realms(mdsc);