aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c730
1 files changed, 485 insertions, 245 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a1ee8fa3a8e..92a2548278f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
#include <linux/fs.h>
#include <linux/wait.h>
#include <linux/slab.h>
+#include <linux/gfp.h>
#include <linux/sched.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
@@ -10,6 +11,7 @@
#include "super.h"
#include "mds_client.h"
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
@@ -42,6 +44,7 @@
*/
struct ceph_reconnect_state {
+ int nr_caps;
struct ceph_pagelist *pagelist;
bool flock;
};
@@ -61,7 +64,7 @@ static const struct ceph_connection_operations mds_con_ops;
*/
static int parse_reply_info_in(void **p, void *end,
struct ceph_mds_reply_info_in *info,
- int features)
+ u64 features)
{
int err = -EIO;
@@ -96,7 +99,7 @@ bad:
*/
static int parse_reply_info_trace(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
int err;
@@ -143,7 +146,7 @@ out_bad:
*/
static int parse_reply_info_dir(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
u32 num, i = 0;
int err;
@@ -163,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end,
if (num == 0)
goto done;
- /* alloc large array */
- info->dir_nr = num;
- info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
- sizeof(*info->dir_dname) +
- sizeof(*info->dir_dname_len) +
- sizeof(*info->dir_dlease),
- GFP_NOFS);
- if (info->dir_in == NULL) {
- err = -ENOMEM;
- goto out_bad;
- }
+ BUG_ON(!info->dir_in);
info->dir_dname = (void *)(info->dir_in + num);
info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num);
+ if ((unsigned long)(info->dir_dlease + num) >
+ (unsigned long)info->dir_in + info->dir_buf_size) {
+ pr_err("dir contents are larger than expected\n");
+ WARN_ON(1);
+ goto bad;
+ }
+ info->dir_nr = num;
while (num) {
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
@@ -215,7 +215,7 @@ out_bad:
*/
static int parse_reply_info_filelock(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
if (*p + sizeof(*info->filelock_reply) > end)
goto bad;
@@ -232,16 +232,45 @@ bad:
}
/*
+ * parse create results
+ */
+static int parse_reply_info_create(void **p, void *end,
+ struct ceph_mds_reply_info_parsed *info,
+ u64 features)
+{
+ if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+ if (*p == end) {
+ info->has_create_ino = false;
+ } else {
+ info->has_create_ino = true;
+ info->ino = ceph_decode_64(p);
+ }
+ }
+
+ if (unlikely(*p != end))
+ goto bad;
+ return 0;
+
+bad:
+ return -EIO;
+}
+
+/*
* parse extra results
*/
static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features);
- else
+ else if (info->head->op == CEPH_MDS_OP_READDIR ||
+ info->head->op == CEPH_MDS_OP_LSSNAP)
return parse_reply_info_dir(p, end, info, features);
+ else if (info->head->op == CEPH_MDS_OP_CREATE)
+ return parse_reply_info_create(p, end, info, features);
+ else
+ return -EIO;
}
/*
@@ -249,7 +278,7 @@ static int parse_reply_info_extra(void **p, void *end,
*/
static int parse_reply_info(struct ceph_msg *msg,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
void *p, *end;
u32 len;
@@ -262,6 +291,7 @@ static int parse_reply_info(struct ceph_msg *msg,
/* trace */
ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) {
+ ceph_decode_need(&p, end, len, bad);
err = parse_reply_info_trace(&p, p+len, info, features);
if (err < 0)
goto out_bad;
@@ -270,6 +300,7 @@ static int parse_reply_info(struct ceph_msg *msg,
/* extra */
ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) {
+ ceph_decode_need(&p, end, len, bad);
err = parse_reply_info_extra(&p, p+len, info, features);
if (err < 0)
goto out_bad;
@@ -294,7 +325,9 @@ out_bad:
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
- kfree(info->dir_in);
+ if (!info->dir_in)
+ return;
+ free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
}
@@ -332,10 +365,10 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
dout("mdsc put_session %p %d -> %d\n", s,
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
if (atomic_dec_and_test(&s->s_ref)) {
- if (s->s_authorizer)
- s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
- s->s_mdsc->fsc->client->monc.auth,
- s->s_authorizer);
+ if (s->s_auth.authorizer)
+ ceph_auth_destroy_authorizer(
+ s->s_mdsc->fsc->client->monc.auth,
+ s->s_auth.authorizer);
kfree(s);
}
}
@@ -382,6 +415,9 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
{
struct ceph_mds_session *s;
+ if (mds >= mdsc->mdsmap->m_max_mds)
+ return ERR_PTR(-EINVAL);
+
s = kzalloc(sizeof(*s), GFP_NOFS);
if (!s)
return ERR_PTR(-ENOMEM);
@@ -392,15 +428,13 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_seq = 0;
mutex_init(&s->s_mutex);
- ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
- s->s_con.private = s;
- s->s_con.ops = &mds_con_ops;
- s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
- s->s_con.peer_name.num = cpu_to_le64(mds);
+ ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
- spin_lock_init(&s->s_cap_lock);
+ spin_lock_init(&s->s_gen_ttl_lock);
s->s_cap_gen = 0;
- s->s_cap_ttl = 0;
+ s->s_cap_ttl = jiffies - 1;
+
+ spin_lock_init(&s->s_cap_lock);
s->s_renew_requested = 0;
s->s_renew_seq = 0;
INIT_LIST_HEAD(&s->s_caps);
@@ -410,6 +444,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
+ s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
@@ -436,7 +471,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
mdsc->sessions[mds] = s;
atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
- ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+ ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
+ ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
return s;
@@ -476,29 +512,33 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
+ destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_reply) {
+ if (req->r_reply)
ceph_msg_put(req->r_reply);
- destroy_reply_info(&req->r_reply_info);
- }
if (req->r_inode) {
- ceph_put_cap_refs(ceph_inode(req->r_inode),
- CEPH_CAP_PIN);
+ ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
}
if (req->r_locked_dir)
- ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
- CEPH_CAP_PIN);
+ ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
if (req->r_target_inode)
iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
- if (req->r_old_dentry) {
- ceph_put_cap_refs(
- ceph_inode(req->r_old_dentry->d_parent->d_inode),
- CEPH_CAP_PIN);
+ if (req->r_old_dentry)
dput(req->r_old_dentry);
+ if (req->r_old_dentry_dir) {
+ /*
+ * track (and drop pins for) r_old_dentry_dir
+ * separately, since r_old_dentry's d_parent may have
+ * changed between the dir mutex being dropped and
+ * this request being freed.
+ */
+ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
+ CEPH_CAP_PIN);
+ iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
@@ -578,6 +618,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
if (dir) {
struct ceph_inode_info *ci = ceph_inode(dir);
+ ihold(dir);
spin_lock(&ci->i_unsafe_lock);
req->r_unsafe_dir = dir;
list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
@@ -598,8 +639,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
spin_lock(&ci->i_unsafe_lock);
list_del_init(&req->r_unsafe_dir_item);
spin_unlock(&ci->i_unsafe_lock);
+
+ iput(req->r_unsafe_dir);
+ req->r_unsafe_dir = NULL;
}
+ complete_all(&req->r_safe_completion);
+
ceph_mdsc_put_request(req);
}
@@ -611,8 +657,14 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
*
* Called under mdsc->mutex.
*/
-struct dentry *get_nonsnap_parent(struct dentry *dentry)
+static struct dentry *get_nonsnap_parent(struct dentry *dentry)
{
+ /*
+ * we don't need to worry about protecting the d_parent access
+ * here because we never renaming inside the snapped namespace
+ * except to resplice to another snapdir, and either the old or new
+ * result is a valid result.
+ */
while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
dentry = dentry->d_parent;
return dentry;
@@ -648,7 +700,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
if (req->r_inode) {
inode = req->r_inode;
} else if (req->r_dentry) {
- struct inode *dir = req->r_dentry->d_parent->d_inode;
+ /* ignore race with rename; old or new d_parent is okay */
+ struct dentry *parent = req->r_dentry->d_parent;
+ struct inode *dir = parent->d_inode;
if (dir->i_sb != mdsc->fsc->sb) {
/* not this fs! */
@@ -656,18 +710,18 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
} else if (ceph_snap(dir) != CEPH_NOSNAP) {
/* direct snapped/virtual snapdir requests
* based on parent dir inode */
- struct dentry *dn =
- get_nonsnap_parent(req->r_dentry->d_parent);
+ struct dentry *dn = get_nonsnap_parent(parent);
inode = dn->d_inode;
dout("__choose_mds using nonsnap parent %p\n", inode);
- } else if (req->r_dentry->d_inode) {
+ } else {
/* dentry target */
inode = req->r_dentry->d_inode;
- } else {
- /* dir + name */
- inode = dir;
- hash = ceph_dentry_hash(req->r_dentry);
- is_hash = true;
+ if (!inode || mode == USE_AUTH_MDS) {
+ /* dir + name */
+ inode = dir;
+ hash = ceph_dentry_hash(dir, req->r_dentry);
+ is_hash = true;
+ }
}
}
@@ -717,21 +771,21 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
}
}
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap = NULL;
if (mode == USE_AUTH_MDS)
cap = ci->i_auth_cap;
if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) {
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
goto random;
}
mds = cap->session->s_mds;
dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
inode, ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return mds;
random:
@@ -749,7 +803,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
struct ceph_msg *msg;
struct ceph_mds_session_head *h;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
+ false);
if (!msg) {
pr_err("create_session_msg ENOMEM creating msg\n");
return NULL;
@@ -792,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc,
*
* called under mdsc->mutex
*/
+static struct ceph_mds_session *
+__open_export_target_session(struct ceph_mds_client *mdsc, int target)
+{
+ struct ceph_mds_session *session;
+
+ session = __ceph_lookup_mds_session(mdsc, target);
+ if (!session) {
+ session = register_session(mdsc, target);
+ if (IS_ERR(session))
+ return session;
+ }
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+
+ return session;
+}
+
+struct ceph_mds_session *
+ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
+{
+ struct ceph_mds_session *session;
+
+ dout("open_export_target_session to mds%d\n", target);
+
+ mutex_lock(&mdsc->mutex);
+ session = __open_export_target_session(mdsc, target);
+ mutex_unlock(&mdsc->mutex);
+
+ return session;
+}
+
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_mds_info *mi;
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- int target;
if (mds >= mdsc->mdsmap->m_max_mds)
return;
+
mi = &mdsc->mdsmap->m_info[mds];
dout("open_export_target_sessions for mds%d (%d targets)\n",
session->s_mds, mi->num_export_targets);
for (i = 0; i < mi->num_export_targets; i++) {
- target = mi->export_targets[i];
- ts = __ceph_lookup_mds_session(mdsc, target);
- if (!ts) {
- ts = register_session(mdsc, target);
- if (IS_ERR(ts))
- return;
- }
- if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
- __open_session(mdsc, session);
- else
- dout(" mds%d target mds%d %p is %s\n", session->s_mds,
- i, ts, session_state_name(ts->s_state));
- ceph_put_mds_session(ts);
+ ts = __open_export_target_session(mdsc, mi->export_targets[i]);
+ if (!IS_ERR(ts))
+ ceph_put_mds_session(ts);
}
}
@@ -935,8 +1011,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
- spin_lock(&inode->i_lock);
- __ceph_remove_cap(cap);
+ spin_lock(&ci->i_ceph_lock);
+ __ceph_remove_cap(cap, false);
if (!__ceph_is_any_real_caps(ci)) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -968,7 +1044,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
}
spin_unlock(&mdsc->cap_dirty_lock);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
while (drop--)
iput(inode);
return 0;
@@ -981,6 +1057,37 @@ static void remove_session_caps(struct ceph_mds_session *session)
{
dout("remove_session_caps on %p\n", session);
iterate_session_caps(session, remove_session_caps_cb, NULL);
+
+ spin_lock(&session->s_cap_lock);
+ if (session->s_nr_caps > 0) {
+ struct super_block *sb = session->s_mdsc->fsc->sb;
+ struct inode *inode;
+ struct ceph_cap *cap, *prev = NULL;
+ struct ceph_vino vino;
+ /*
+ * iterate_session_caps() skips inodes that are being
+ * deleted, we need to wait until deletions are complete.
+ * __wait_on_freeing_inode() is designed for the job,
+ * but it is not exported, so use lookup inode function
+ * to access it.
+ */
+ while (!list_empty(&session->s_caps)) {
+ cap = list_entry(session->s_caps.next,
+ struct ceph_cap, session_caps);
+ if (cap == prev)
+ break;
+ prev = cap;
+ vino = cap->ci->i_vino;
+ spin_unlock(&session->s_cap_lock);
+
+ inode = ceph_find_inode(sb, vino);
+ iput(inode);
+
+ spin_lock(&session->s_cap_lock);
+ }
+ }
+ spin_unlock(&session->s_cap_lock);
+
BUG_ON(session->s_nr_caps > 0);
BUG_ON(!list_empty(&session->s_cap_flushing));
cleanup_cap_releases(session);
@@ -999,10 +1106,10 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
wake_up_all(&ci->i_cap_wq);
if (arg) {
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0;
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
return 0;
}
@@ -1051,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
return 0;
}
+static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session, u64 seq)
+{
+ struct ceph_msg *msg;
+
+ dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
+ session->s_mds, session_state_name(session->s_state), seq);
+ msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
+ if (!msg)
+ return -ENOMEM;
+ ceph_con_send(&session->s_con, msg);
+ return 0;
+}
+
+
/*
* Note new cap ttl, and any transition from stale -> not stale (fresh?).
*
@@ -1063,8 +1185,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
int wake = 0;
spin_lock(&session->s_cap_lock);
- was_stale = is_renew && (session->s_cap_ttl == 0 ||
- time_after_eq(jiffies, session->s_cap_ttl));
+ was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
session->s_cap_ttl = session->s_renew_requested +
mdsc->mdsmap->m_session_timeout*HZ;
@@ -1130,31 +1251,36 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
{
struct ceph_mds_session *session = arg;
struct ceph_inode_info *ci = ceph_inode(inode);
- int used, oissued, mine;
+ int used, wanted, oissued, mine;
if (session->s_trim_caps <= 0)
return -1;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
mine = cap->issued | cap->implemented;
used = __ceph_caps_used(ci);
+ wanted = __ceph_caps_file_wanted(ci);
oissued = __ceph_caps_issued_other(ci, cap);
- dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
+ dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
- ceph_cap_string(used));
- if (ci->i_dirty_caps)
- goto out; /* dirty caps */
- if ((used & ~oissued) & mine)
+ ceph_cap_string(used), ceph_cap_string(wanted));
+ if (cap == ci->i_auth_cap) {
+ if (ci->i_dirty_caps | ci->i_flushing_caps)
+ goto out;
+ if ((used | wanted) & CEPH_CAP_ANY_WR)
+ goto out;
+ }
+ if ((used | wanted) & ~oissued & mine)
goto out; /* we need these caps */
session->s_trim_caps--;
if (oissued) {
/* we aren't the only cap.. just remove us */
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, true);
} else {
/* try to drop referring dentries */
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
d_prune_aliases(inode);
dout("trim_caps_cb %p cap %p pruned, count now %d\n",
inode, cap, atomic_read(&inode->i_count));
@@ -1162,7 +1288,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
}
out:
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
return 0;
}
@@ -1185,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc,
trim_caps - session->s_trim_caps);
session->s_trim_caps = 0;
}
+
+ ceph_add_cap_releases(mdsc, session);
+ ceph_send_cap_releases(mdsc, session);
return 0;
}
@@ -1225,7 +1354,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
while (session->s_num_cap_releases < session->s_nr_caps + extra) {
spin_unlock(&session->s_cap_lock);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- GFP_NOFS);
+ GFP_NOFS, false);
if (!msg)
goto out_unlocked;
dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1280,7 +1409,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
i_flushing_item);
struct inode *inode = &ci->vfs_inode;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
if (ci->i_cap_flush_seq <= want_flush_seq) {
dout("check_cap_flush still flushing %p "
"seq %lld <= %lld to mds%d\n", inode,
@@ -1288,7 +1417,7 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
session->s_mds);
ret = 0;
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
}
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
@@ -1334,16 +1463,19 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
unsigned num;
dout("discard_cap_releases mds%d\n", session->s_mds);
- spin_lock(&session->s_cap_lock);
- /* zero out the in-progress message */
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
- head->num = cpu_to_le32(0);
- session->s_num_cap_releases += num;
+ if (!list_empty(&session->s_cap_releases)) {
+ /* zero out the in-progress message */
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg, list_head);
+ head = msg->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout("discard_cap_releases mds%d %p %u\n",
+ session->s_mds, msg, num);
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ session->s_num_cap_releases += num;
+ }
/* requeue completed messages */
while (!list_empty(&session->s_cap_releases_done)) {
@@ -1360,14 +1492,49 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
msg->front.iov_len = sizeof(*head);
list_add(&msg->list_head, &session->s_cap_releases);
}
-
- spin_unlock(&session->s_cap_lock);
}
/*
* requests
*/
+int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
+ struct inode *dir)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
+ struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
+ size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
+ sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
+ int order, num_entries;
+
+ spin_lock(&ci->i_ceph_lock);
+ num_entries = ci->i_files + ci->i_subdirs;
+ spin_unlock(&ci->i_ceph_lock);
+ num_entries = max(num_entries, 1);
+ num_entries = min(num_entries, opt->max_readdir);
+
+ order = get_order(size * num_entries);
+ while (order >= 0) {
+ rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
+ order);
+ if (rinfo->dir_in)
+ break;
+ order--;
+ }
+ if (!rinfo->dir_in)
+ return -ENOMEM;
+
+ num_entries = (PAGE_SIZE << order) / size;
+ num_entries = min(num_entries, opt->max_readdir);
+
+ rinfo->dir_buf_size = PAGE_SIZE << order;
+ req->r_num_caps = num_entries + 1;
+ req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
+ req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
+ return 0;
+}
+
/*
* Create an mds request.
*/
@@ -1391,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
+ req->r_stamp = CURRENT_TIME;
+
req->r_op = op;
req->r_direct_mode = mode;
return req;
@@ -1434,12 +1603,15 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
struct dentry *temp;
char *path;
int len, pos;
+ unsigned seq;
if (dentry == NULL)
return ERR_PTR(-EINVAL);
retry:
len = 0;
+ seq = read_seqbegin(&rename_lock);
+ rcu_read_lock();
for (temp = dentry; !IS_ROOT(temp);) {
struct inode *inode = temp->d_inode;
if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
@@ -1450,11 +1622,8 @@ retry:
else
len += 1 + temp->d_name.len;
temp = temp->d_parent;
- if (temp == NULL) {
- pr_err("build_path corrupt dentry %p\n", dentry);
- return ERR_PTR(-EINVAL);
- }
}
+ rcu_read_unlock();
if (len)
len--; /* no leading '/' */
@@ -1463,32 +1632,35 @@ retry:
return ERR_PTR(-ENOMEM);
pos = len;
path[pos] = 0; /* trailing null */
+ rcu_read_lock();
for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
- struct inode *inode = temp->d_inode;
+ struct inode *inode;
+ spin_lock(&temp->d_lock);
+ inode = temp->d_inode;
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
dout("build_path path+%d: %p SNAPDIR\n",
pos, temp);
} else if (stop_on_nosnap && inode &&
ceph_snap(inode) == CEPH_NOSNAP) {
+ spin_unlock(&temp->d_lock);
break;
} else {
pos -= temp->d_name.len;
- if (pos < 0)
+ if (pos < 0) {
+ spin_unlock(&temp->d_lock);
break;
+ }
strncpy(path + pos, temp->d_name.name,
temp->d_name.len);
}
+ spin_unlock(&temp->d_lock);
if (pos)
path[--pos] = '/';
temp = temp->d_parent;
- if (temp == NULL) {
- pr_err("build_path corrupt dentry\n");
- kfree(path);
- return ERR_PTR(-EINVAL);
- }
}
- if (pos != 0) {
+ rcu_read_unlock();
+ if (pos != 0 || read_seqretry(&rename_lock, seq)) {
pr_err("build_path did not end path lookup where "
"expected, namelen is %d, pos is %d\n", len, pos);
/* presumably this is only possible if racing with a
@@ -1502,7 +1674,7 @@ retry:
*base = ceph_ino(temp->d_inode);
*plen = len;
dout("build_path on %p %d built %llx '%.*s'\n",
- dentry, dentry->d_count, *base, len, path);
+ dentry, d_count(dentry), *base, len, path);
return path;
}
@@ -1567,10 +1739,10 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
*ppath);
- } else if (rpath) {
+ } else if (rpath || rino) {
*ino = rino;
*ppath = rpath;
- *pathlen = strlen(rpath);
+ *pathlen = rpath ? strlen(rpath) : 0;
dout(" path %.*s\n", *pathlen, rpath);
}
@@ -1613,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
}
len = sizeof(*head) +
- pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
+ pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+ sizeof(struct timespec);
/* calculate (max) length for cap releases */
len += sizeof(struct ceph_mds_request_release) *
@@ -1624,12 +1797,13 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (req->r_old_dentry_drop)
len += req->r_old_dentry->d_name.len;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
if (!msg) {
msg = ERR_PTR(-ENOMEM);
goto out_free2;
}
+ msg->hdr.version = 2;
msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base;
@@ -1638,8 +1812,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
head->op = cpu_to_le32(req->r_op);
- head->caller_uid = cpu_to_le32(req->r_uid);
- head->caller_gid = cpu_to_le32(req->r_gid);
+ head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
+ head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
head->args = req->r_args;
ceph_encode_filepath(&p, end, ino1, path1);
@@ -1666,12 +1840,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
head->num_releases = cpu_to_le16(releases);
+ /* time stamp */
+ ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+
BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- msg->pages = req->r_pages;
- msg->nr_pages = req->r_num_pages;
+ if (req->r_data_len) {
+ /* outbound data set only by ceph_sync_setxattr() */
+ BUG_ON(!req->r_pages);
+ ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
+ }
+
msg->hdr.data_len = cpu_to_le32(req->r_data_len);
msg->hdr.data_off = cpu_to_le16(0);
@@ -1785,8 +1966,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1;
int err = -EAGAIN;
- if (req->r_err || req->r_got_result)
+ if (req->r_err || req->r_got_result) {
+ if (req->r_aborted)
+ __unregister_request(mdsc, req);
goto out;
+ }
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
@@ -1856,10 +2040,16 @@ finish:
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head)
{
- struct ceph_mds_request *req, *nreq;
+ struct ceph_mds_request *req;
+ LIST_HEAD(tmp_list);
- list_for_each_entry_safe(req, nreq, head, r_wait) {
+ list_splice_init(head, &tmp_list);
+
+ while (!list_empty(&tmp_list)) {
+ req = list_entry(tmp_list.next,
+ struct ceph_mds_request, r_wait);
list_del_init(&req->r_wait);
+ dout(" wake request %p tid %llu\n", req, req->r_tid);
__do_request(mdsc, req);
}
}
@@ -1913,10 +2103,9 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
if (req->r_locked_dir)
ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
- if (req->r_old_dentry)
- ceph_get_cap_refs(
- ceph_inode(req->r_old_dentry->d_parent->d_inode),
- CEPH_CAP_PIN);
+ if (req->r_old_dentry_dir)
+ ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
+ CEPH_CAP_PIN);
/* issue */
mutex_lock(&mdsc->mutex);
@@ -1974,20 +2163,16 @@ out:
}
/*
- * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
+ * Invalidate dir's completeness, dentry lease state on an aborted MDS
* namespace request.
*/
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
{
struct inode *inode = req->r_locked_dir;
- struct ceph_inode_info *ci = ceph_inode(inode);
- dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
- spin_lock(&inode->i_lock);
- ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
- ci->i_release_count++;
- spin_unlock(&inode->i_lock);
+ dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
+ ceph_dir_clear_complete(inode);
if (req->r_dentry)
ceph_invalidate_dentry_lease(req->r_dentry);
if (req->r_old_dentry)
@@ -2040,13 +2225,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* dup? */
if ((req->r_got_unsafe && !head->safe) ||
(req->r_got_safe && head->safe)) {
- pr_warning("got a dup %s reply on %llu from mds%d\n",
+ pr_warn("got a dup %s reply on %llu from mds%d\n",
head->safe ? "safe" : "unsafe", tid, mds);
mutex_unlock(&mdsc->mutex);
goto out;
}
if (req->r_got_safe && !head->safe) {
- pr_warning("got unsafe after safe on %llu from mds%d\n",
+ pr_warn("got unsafe after safe on %llu from mds%d\n",
tid, mds);
mutex_unlock(&mdsc->mutex);
goto out;
@@ -2063,26 +2248,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
*/
if (result == -ESTALE) {
dout("got ESTALE on request %llu", req->r_tid);
- if (!req->r_inode) {
- /* do nothing; not an authority problem */
- } else if (req->r_direct_mode != USE_AUTH_MDS) {
+ if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
} else {
- struct ceph_inode_info *ci = ceph_inode(req->r_inode);
- struct ceph_cap *cap = NULL;
-
- if (req->r_session)
- cap = ceph_get_cap_for_mds(ci,
- req->r_session->s_mds);
-
- dout("already using auth");
- if ((!cap || cap != ci->i_auth_cap) ||
- (cap->mseq != req->r_sent_on_mseq)) {
- dout("but cap changed, so resending");
+ int mds = __choose_mds(mdsc, req);
+ if (mds >= 0 && mds != req->r_session->s_mds) {
+ dout("but auth changed, so resending");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
@@ -2095,7 +2270,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
- complete_all(&req->r_safe_completion);
if (req->r_got_unsafe) {
/*
@@ -2146,8 +2320,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_lock(&req->r_fill_mutex);
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) {
- if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
- rinfo->dir_nr)
+ if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
+ req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
@@ -2297,10 +2471,10 @@ static void handle_session(struct ceph_mds_session *session,
case CEPH_SESSION_STALE:
pr_info("mds%d caps went stale, renewing\n",
session->s_mds);
- spin_lock(&session->s_cap_lock);
+ spin_lock(&session->s_gen_ttl_lock);
session->s_cap_gen++;
- session->s_cap_ttl = 0;
- spin_unlock(&session->s_cap_lock);
+ session->s_cap_ttl = jiffies - 1;
+ spin_unlock(&session->s_gen_ttl_lock);
send_renew_caps(mdsc, session);
break;
@@ -2308,6 +2482,10 @@ static void handle_session(struct ceph_mds_session *session,
trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
break;
+ case CEPH_SESSION_FLUSHMSG:
+ send_flushmsg_ack(mdsc, session, seq);
+ break;
+
default:
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
WARN_ON(1);
@@ -2394,9 +2572,11 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (err)
goto out_free;
- spin_lock(&inode->i_lock);
+ spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
+ cap->mseq = 0; /* and migrate_seq */
+ cap->cap_gen = cap->session->s_cap_gen;
if (recon_state->flock) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -2417,43 +2597,50 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
rec.v1.pathbase = cpu_to_le64(pathbase);
reclen = sizeof(rec.v1);
}
- spin_unlock(&inode->i_lock);
+ spin_unlock(&ci->i_ceph_lock);
if (recon_state->flock) {
int num_fcntl_locks, num_flock_locks;
- struct ceph_pagelist_cursor trunc_point;
-
- ceph_pagelist_set_cursor(pagelist, &trunc_point);
- do {
- lock_flocks();
- ceph_count_locks(inode, &num_fcntl_locks,
- &num_flock_locks);
- rec.v2.flock_len = (2*sizeof(u32) +
- (num_fcntl_locks+num_flock_locks) *
- sizeof(struct ceph_filelock));
- unlock_flocks();
-
- /* pre-alloc pagelist */
- ceph_pagelist_truncate(pagelist, &trunc_point);
- err = ceph_pagelist_append(pagelist, &rec, reclen);
- if (!err)
- err = ceph_pagelist_reserve(pagelist,
- rec.v2.flock_len);
-
- /* encode locks */
- if (!err) {
- lock_flocks();
- err = ceph_encode_locks(inode,
- pagelist,
- num_fcntl_locks,
- num_flock_locks);
- unlock_flocks();
- }
- } while (err == -ENOSPC);
+ struct ceph_filelock *flocks;
+
+encode_again:
+ spin_lock(&inode->i_lock);
+ ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+ spin_unlock(&inode->i_lock);
+ flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
+ sizeof(struct ceph_filelock), GFP_NOFS);
+ if (!flocks) {
+ err = -ENOMEM;
+ goto out_free;
+ }
+ spin_lock(&inode->i_lock);
+ err = ceph_encode_locks_to_buffer(inode, flocks,
+ num_fcntl_locks,
+ num_flock_locks);
+ spin_unlock(&inode->i_lock);
+ if (err) {
+ kfree(flocks);
+ if (err == -ENOSPC)
+ goto encode_again;
+ goto out_free;
+ }
+ /*
+ * number of encoded locks is stable, so copy to pagelist
+ */
+ rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
+ (num_fcntl_locks+num_flock_locks) *
+ sizeof(struct ceph_filelock));
+ err = ceph_pagelist_append(pagelist, &rec, reclen);
+ if (!err)
+ err = ceph_locks_to_pagelist(flocks, pagelist,
+ num_fcntl_locks,
+ num_flock_locks);
+ kfree(flocks);
} else {
err = ceph_pagelist_append(pagelist, &rec, reclen);
}
+ recon_state->nr_caps++;
out_free:
kfree(path);
out_dput:
@@ -2481,6 +2668,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct rb_node *p;
int mds = session->s_mds;
int err = -ENOMEM;
+ int s_nr_caps;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
@@ -2491,7 +2679,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail_nopagelist;
ceph_pagelist_init(pagelist);
- reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
if (!reply)
goto fail_nomsg;
@@ -2499,7 +2687,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
session->s_seq = 0;
+ ceph_con_close(&session->s_con);
ceph_con_open(&session->s_con,
+ CEPH_ENTITY_TYPE_MDS, mds,
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
/* replay unsafe requests */
@@ -2510,20 +2700,38 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
dout("session %p state %s\n", session,
session_state_name(session->s_state));
+ spin_lock(&session->s_gen_ttl_lock);
+ session->s_cap_gen++;
+ spin_unlock(&session->s_gen_ttl_lock);
+
+ spin_lock(&session->s_cap_lock);
+ /*
+ * notify __ceph_remove_cap() that we are composing cap reconnect.
+ * If a cap get released before being added to the cap reconnect,
+ * __ceph_remove_cap() should skip queuing cap release.
+ */
+ session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
discard_cap_releases(mdsc, session);
+ spin_unlock(&session->s_cap_lock);
/* traverse this session's caps */
- err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+ s_nr_caps = session->s_nr_caps;
+ err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
if (err)
goto fail;
+ recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
+ spin_lock(&session->s_cap_lock);
+ session->s_cap_reconnect = 0;
+ spin_unlock(&session->s_cap_lock);
+
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
@@ -2544,11 +2752,20 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
goto fail;
}
- reply->pagelist = pagelist;
if (recon_state.flock)
reply->hdr.version = cpu_to_le16(2);
+
+ /* raced with cap release? */
+ if (s_nr_caps != recon_state.nr_caps) {
+ struct page *page = list_first_entry(&pagelist->head,
+ struct page, lru);
+ __le32 *addr = kmap_atomic(page);
+ *addr = cpu_to_le32(recon_state.nr_caps);
+ kunmap_atomic(addr);
+ }
+
reply->hdr.data_len = cpu_to_le32(pagelist->length);
- reply->nr_pages = calc_pages_for(0, pagelist->length);
+ ceph_msg_data_add_pagelist(reply, pagelist);
ceph_con_send(&session->s_con, reply);
mutex_unlock(&session->s_mutex);
@@ -2604,7 +2821,8 @@ static void check_new_map(struct ceph_mds_client *mdsc,
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
session_state_name(s->s_state));
- if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
+ if (i >= newmap->m_max_mds ||
+ memcmp(ceph_mdsmap_get_addr(oldmap, i),
ceph_mdsmap_get_addr(newmap, i),
sizeof(struct ceph_entity_addr))) {
if (s->s_state == CEPH_MDS_SESSION_OPENING) {
@@ -2691,14 +2909,12 @@ static void handle_lease(struct ceph_mds_client *mdsc,
{
struct super_block *sb = mdsc->fsc->sb;
struct inode *inode;
- struct ceph_inode_info *ci;
struct dentry *parent, *dentry;
struct ceph_dentry_info *di;
int mds = session->s_mds;
struct ceph_mds_lease *h = msg->front.iov_base;
u32 seq;
struct ceph_vino vino;
- int mask;
struct qstr dname;
int release = 0;
@@ -2709,7 +2925,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
goto bad;
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
- mask = le16_to_cpu(h->mask);
seq = le32_to_cpu(h->seq);
dname.name = (void *)h + sizeof(*h) + sizeof(u32);
dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
@@ -2721,14 +2936,13 @@ static void handle_lease(struct ceph_mds_client *mdsc,
/* lookup inode */
inode = ceph_find_inode(sb, vino);
- dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
- ceph_lease_op_name(h->action), mask, vino.ino, inode,
+ dout("handle_lease %s, ino %llx %p %.*s\n",
+ ceph_lease_op_name(h->action), vino.ino, inode,
dname.len, dname.name);
if (inode == NULL) {
dout("handle_lease no inode %llx\n", vino.ino);
goto release;
}
- ci = ceph_inode(inode);
/* dentry */
parent = d_find_alias(inode);
@@ -2747,7 +2961,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
di = ceph_dentry(dentry);
switch (h->action) {
case CEPH_MDS_LEASE_REVOKE:
- if (di && di->lease_session == session) {
+ if (di->lease_session == session) {
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
h->seq = cpu_to_le32(di->lease_seq);
__ceph_mdsc_drop_dentry_lease(dentry);
@@ -2756,7 +2970,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
break;
case CEPH_MDS_LEASE_RENEW:
- if (di && di->lease_session == session &&
+ if (di->lease_session == session &&
di->lease_gen == session->s_cap_gen &&
di->lease_renew_from &&
di->lease_renew_after == 0) {
@@ -2808,12 +3022,11 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
dnamelen = dentry->d_name.len;
len += dnamelen;
- msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
if (!msg)
return;
lease = msg->front.iov_base;
lease->action = action;
- lease->mask = cpu_to_le16(1);
lease->ino = cpu_to_le64(ceph_vino(inode).ino);
lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
lease->seq = cpu_to_le32(seq);
@@ -2835,7 +3048,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
* Pass @inode always, @dentry is optional.
*/
void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
- struct dentry *dentry, int mask)
+ struct dentry *dentry)
{
struct ceph_dentry_info *di;
struct ceph_mds_session *session;
@@ -2843,7 +3056,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
BUG_ON(inode == NULL);
BUG_ON(dentry == NULL);
- BUG_ON(mask == 0);
/* is dentry lease valid? */
spin_lock(&dentry->d_lock);
@@ -2853,8 +3065,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
di->lease_gen != di->lease_session->s_cap_gen ||
!time_before(jiffies, dentry->d_time)) {
dout("lease_release inode %p dentry %p -- "
- "no lease on %d\n",
- inode, dentry, mask);
+ "no lease\n",
+ inode, dentry);
spin_unlock(&dentry->d_lock);
return;
}
@@ -2865,8 +3077,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
__ceph_mdsc_drop_dentry_lease(dentry);
spin_unlock(&dentry->d_lock);
- dout("lease_release inode %p dentry %p mask %d to mds%d\n",
- inode, dentry, mask, session->s_mds);
+ dout("lease_release inode %p dentry %p to mds%d\n",
+ inode, dentry, session->s_mds);
ceph_mdsc_lease_send_msg(session, inode, dentry,
CEPH_MDS_LEASE_RELEASE, seq);
ceph_put_mds_session(session);
@@ -2979,8 +3191,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
fsc->mdsc = mdsc;
mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
- if (mdsc->mdsmap == NULL)
+ if (mdsc->mdsmap == NULL) {
+ kfree(mdsc);
return -ENOMEM;
+ }
init_completion(&mdsc->safe_umount_waiters);
init_waitqueue_head(&mdsc->session_close_wq);
@@ -3002,6 +3216,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->snap_flush_lock);
mdsc->cap_flush_seq = 0;
INIT_LIST_HEAD(&mdsc->cap_dirty);
+ INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
@@ -3131,7 +3346,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
/*
* true if all sessions are closed, or we force unmount
*/
-bool done_closing_sessions(struct ceph_mds_client *mdsc)
+static bool done_closing_sessions(struct ceph_mds_client *mdsc)
{
int i, n = 0;
@@ -3215,9 +3430,15 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
{
struct ceph_mds_client *mdsc = fsc->mdsc;
+ dout("mdsc_destroy %p\n", mdsc);
ceph_mdsc_stop(mdsc);
+
+ /* flush out any connection work with references to us */
+ ceph_msgr_flush();
+
fsc->mdsc = NULL;
kfree(mdsc);
+ dout("mdsc_destroy %p done\n", mdsc);
}
@@ -3298,8 +3519,8 @@ static void con_put(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
+ dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
ceph_put_mds_session(s);
- dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
}
/*
@@ -3311,7 +3532,7 @@ static void peer_reset(struct ceph_connection *con)
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
- pr_warning("mds%d closed our session\n", s->s_mds);
+ pr_warn("mds%d closed our session\n", s->s_mds);
send_mds_reconnect(mdsc, s);
}
@@ -3362,39 +3583,37 @@ out:
/*
* authentication
*/
-static int get_authorizer(struct ceph_connection *con,
- void **buf, int *len, int *proto,
- void **reply_buf, int *reply_len, int force_new)
+
+/*
+ * Note: returned pointer is the address of a structure that's
+ * managed separately. Caller must *not* attempt to free it.
+ */
+static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
+ int *proto, int force_new)
{
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
- int ret = 0;
-
- if (force_new && s->s_authorizer) {
- ac->ops->destroy_authorizer(ac, s->s_authorizer);
- s->s_authorizer = NULL;
- }
- if (s->s_authorizer == NULL) {
- if (ac->ops->create_authorizer) {
- ret = ac->ops->create_authorizer(
- ac, CEPH_ENTITY_TYPE_MDS,
- &s->s_authorizer,
- &s->s_authorizer_buf,
- &s->s_authorizer_buf_len,
- &s->s_authorizer_reply_buf,
- &s->s_authorizer_reply_buf_len);
- if (ret)
- return ret;
- }
- }
+ struct ceph_auth_handshake *auth = &s->s_auth;
+ if (force_new && auth->authorizer) {
+ ceph_auth_destroy_authorizer(ac, auth->authorizer);
+ auth->authorizer = NULL;
+ }
+ if (!auth->authorizer) {
+ int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
+ auth);
+ if (ret)
+ return ERR_PTR(ret);
+ } else {
+ int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
+ auth);
+ if (ret)
+ return ERR_PTR(ret);
+ }
*proto = ac->protocol;
- *buf = s->s_authorizer_buf;
- *len = s->s_authorizer_buf_len;
- *reply_buf = s->s_authorizer_reply_buf;
- *reply_len = s->s_authorizer_reply_buf_len;
- return 0;
+
+ return auth;
}
@@ -3404,7 +3623,7 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
- return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
+ return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
}
static int invalidate_authorizer(struct ceph_connection *con)
@@ -3413,12 +3632,32 @@ static int invalidate_authorizer(struct ceph_connection *con)
struct ceph_mds_client *mdsc = s->s_mdsc;
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
- if (ac->ops->invalidate_authorizer)
- ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
+ ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
}
+static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr, int *skip)
+{
+ struct ceph_msg *msg;
+ int type = (int) le16_to_cpu(hdr->type);
+ int front_len = (int) le32_to_cpu(hdr->front_len);
+
+ if (con->in_msg)
+ return con->in_msg;
+
+ *skip = 0;
+ msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+ if (!msg) {
+ pr_err("unable to allocate msg type %d len %d\n",
+ type, front_len);
+ return NULL;
+ }
+
+ return msg;
+}
+
static const struct ceph_connection_operations mds_con_ops = {
.get = con_get,
.put = con_put,
@@ -3427,6 +3666,7 @@ static const struct ceph_connection_operations mds_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.peer_reset = peer_reset,
+ .alloc_msg = mds_alloc_msg,
};
/* eof */