aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/mds_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r--fs/ceph/mds_client.c305
1 files changed, 210 insertions, 95 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index b7bda5d9611..92a2548278f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3,6 +3,7 @@
#include <linux/fs.h>
#include <linux/wait.h>
#include <linux/slab.h>
+#include <linux/gfp.h>
#include <linux/sched.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
@@ -43,6 +44,7 @@
*/
struct ceph_reconnect_state {
+ int nr_caps;
struct ceph_pagelist *pagelist;
bool flock;
};
@@ -62,7 +64,7 @@ static const struct ceph_connection_operations mds_con_ops;
*/
static int parse_reply_info_in(void **p, void *end,
struct ceph_mds_reply_info_in *info,
- int features)
+ u64 features)
{
int err = -EIO;
@@ -97,7 +99,7 @@ bad:
*/
static int parse_reply_info_trace(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
int err;
@@ -144,7 +146,7 @@ out_bad:
*/
static int parse_reply_info_dir(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
u32 num, i = 0;
int err;
@@ -164,21 +166,18 @@ static int parse_reply_info_dir(void **p, void *end,
if (num == 0)
goto done;
- /* alloc large array */
- info->dir_nr = num;
- info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
- sizeof(*info->dir_dname) +
- sizeof(*info->dir_dname_len) +
- sizeof(*info->dir_dlease),
- GFP_NOFS);
- if (info->dir_in == NULL) {
- err = -ENOMEM;
- goto out_bad;
- }
+ BUG_ON(!info->dir_in);
info->dir_dname = (void *)(info->dir_in + num);
info->dir_dname_len = (void *)(info->dir_dname + num);
info->dir_dlease = (void *)(info->dir_dname_len + num);
+ if ((unsigned long)(info->dir_dlease + num) >
+ (unsigned long)info->dir_in + info->dir_buf_size) {
+ pr_err("dir contents are larger than expected\n");
+ WARN_ON(1);
+ goto bad;
+ }
+ info->dir_nr = num;
while (num) {
/* dentry */
ceph_decode_need(p, end, sizeof(u32)*2, bad);
@@ -216,7 +215,7 @@ out_bad:
*/
static int parse_reply_info_filelock(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
if (*p + sizeof(*info->filelock_reply) > end)
goto bad;
@@ -237,7 +236,7 @@ bad:
*/
static int parse_reply_info_create(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
if (*p == end) {
@@ -261,7 +260,7 @@ bad:
*/
static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info, features);
@@ -279,7 +278,7 @@ static int parse_reply_info_extra(void **p, void *end,
*/
static int parse_reply_info(struct ceph_msg *msg,
struct ceph_mds_reply_info_parsed *info,
- int features)
+ u64 features)
{
void *p, *end;
u32 len;
@@ -326,7 +325,9 @@ out_bad:
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
- kfree(info->dir_in);
+ if (!info->dir_in)
+ return;
+ free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
}
@@ -443,6 +444,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_waiting);
INIT_LIST_HEAD(&s->s_unsafe);
s->s_num_cap_releases = 0;
+ s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_LIST_HEAD(&s->s_cap_releases_done);
@@ -510,12 +512,11 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
+ destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_reply) {
+ if (req->r_reply)
ceph_msg_put(req->r_reply);
- destroy_reply_info(&req->r_reply_info);
- }
if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
@@ -526,7 +527,9 @@ void ceph_mdsc_release_request(struct kref *kref)
iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
- if (req->r_old_dentry) {
+ if (req->r_old_dentry)
+ dput(req->r_old_dentry);
+ if (req->r_old_dentry_dir) {
/*
* track (and drop pins for) r_old_dentry_dir
* separately, since r_old_dentry's d_parent may have
@@ -535,7 +538,6 @@ void ceph_mdsc_release_request(struct kref *kref)
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
- dput(req->r_old_dentry);
iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
@@ -642,6 +644,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
req->r_unsafe_dir = NULL;
}
+ complete_all(&req->r_safe_completion);
+
ceph_mdsc_put_request(req);
}
@@ -709,14 +713,15 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
struct dentry *dn = get_nonsnap_parent(parent);
inode = dn->d_inode;
dout("__choose_mds using nonsnap parent %p\n", inode);
- } else if (req->r_dentry->d_inode) {
+ } else {
/* dentry target */
inode = req->r_dentry->d_inode;
- } else {
- /* dir + name */
- inode = dir;
- hash = ceph_dentry_hash(dir, req->r_dentry);
- is_hash = true;
+ if (!inode || mode == USE_AUTH_MDS) {
+ /* dir + name */
+ inode = dir;
+ hash = ceph_dentry_hash(dir, req->r_dentry);
+ is_hash = true;
+ }
}
}
@@ -842,35 +847,56 @@ static int __open_session(struct ceph_mds_client *mdsc,
*
* called under mdsc->mutex
*/
+static struct ceph_mds_session *
+__open_export_target_session(struct ceph_mds_client *mdsc, int target)
+{
+ struct ceph_mds_session *session;
+
+ session = __ceph_lookup_mds_session(mdsc, target);
+ if (!session) {
+ session = register_session(mdsc, target);
+ if (IS_ERR(session))
+ return session;
+ }
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+
+ return session;
+}
+
+struct ceph_mds_session *
+ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
+{
+ struct ceph_mds_session *session;
+
+ dout("open_export_target_session to mds%d\n", target);
+
+ mutex_lock(&mdsc->mutex);
+ session = __open_export_target_session(mdsc, target);
+ mutex_unlock(&mdsc->mutex);
+
+ return session;
+}
+
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_mds_info *mi;
struct ceph_mds_session *ts;
int i, mds = session->s_mds;
- int target;
if (mds >= mdsc->mdsmap->m_max_mds)
return;
+
mi = &mdsc->mdsmap->m_info[mds];
dout("open_export_target_sessions for mds%d (%d targets)\n",
session->s_mds, mi->num_export_targets);
for (i = 0; i < mi->num_export_targets; i++) {
- target = mi->export_targets[i];
- ts = __ceph_lookup_mds_session(mdsc, target);
- if (!ts) {
- ts = register_session(mdsc, target);
- if (IS_ERR(ts))
- return;
- }
- if (session->s_state == CEPH_MDS_SESSION_NEW ||
- session->s_state == CEPH_MDS_SESSION_CLOSING)
- __open_session(mdsc, session);
- else
- dout(" mds%d target mds%d %p is %s\n", session->s_mds,
- i, ts, session_state_name(ts->s_state));
- ceph_put_mds_session(ts);
+ ts = __open_export_target_session(mdsc, mi->export_targets[i]);
+ if (!IS_ERR(ts))
+ ceph_put_mds_session(ts);
}
}
@@ -986,7 +1012,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, false);
if (!__ceph_is_any_real_caps(ci)) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1132,6 +1158,21 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
return 0;
}
+static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session, u64 seq)
+{
+ struct ceph_msg *msg;
+
+ dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
+ session->s_mds, session_state_name(session->s_state), seq);
+ msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
+ if (!msg)
+ return -ENOMEM;
+ ceph_con_send(&session->s_con, msg);
+ return 0;
+}
+
+
/*
* Note new cap ttl, and any transition from stale -> not stale (fresh?).
*
@@ -1210,7 +1251,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
{
struct ceph_mds_session *session = arg;
struct ceph_inode_info *ci = ceph_inode(inode);
- int used, oissued, mine;
+ int used, wanted, oissued, mine;
if (session->s_trim_caps <= 0)
return -1;
@@ -1218,22 +1259,25 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
spin_lock(&ci->i_ceph_lock);
mine = cap->issued | cap->implemented;
used = __ceph_caps_used(ci);
+ wanted = __ceph_caps_file_wanted(ci);
oissued = __ceph_caps_issued_other(ci, cap);
- dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
+ dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
- ceph_cap_string(used));
- if (ci->i_dirty_caps)
- goto out; /* dirty caps */
- if ((used & ~oissued) & mine)
+ ceph_cap_string(used), ceph_cap_string(wanted));
+ if (cap == ci->i_auth_cap) {
+ if (ci->i_dirty_caps | ci->i_flushing_caps)
+ goto out;
+ if ((used | wanted) & CEPH_CAP_ANY_WR)
+ goto out;
+ }
+ if ((used | wanted) & ~oissued & mine)
goto out; /* we need these caps */
session->s_trim_caps--;
if (oissued) {
/* we aren't the only cap.. just remove us */
- __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
- cap->mseq, cap->issue_seq);
- __ceph_remove_cap(cap);
+ __ceph_remove_cap(cap, true);
} else {
/* try to drop referring dentries */
spin_unlock(&ci->i_ceph_lock);
@@ -1267,6 +1311,9 @@ static int trim_caps(struct ceph_mds_client *mdsc,
trim_caps - session->s_trim_caps);
session->s_trim_caps = 0;
}
+
+ ceph_add_cap_releases(mdsc, session);
+ ceph_send_cap_releases(mdsc, session);
return 0;
}
@@ -1416,17 +1463,19 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
unsigned num;
dout("discard_cap_releases mds%d\n", session->s_mds);
- spin_lock(&session->s_cap_lock);
- /* zero out the in-progress message */
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- session->s_num_cap_releases += num;
+ if (!list_empty(&session->s_cap_releases)) {
+ /* zero out the in-progress message */
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg, list_head);
+ head = msg->front.iov_base;
+ num = le32_to_cpu(head->num);
+ dout("discard_cap_releases mds%d %p %u\n",
+ session->s_mds, msg, num);
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ session->s_num_cap_releases += num;
+ }
/* requeue completed messages */
while (!list_empty(&session->s_cap_releases_done)) {
@@ -1443,14 +1492,49 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
msg->front.iov_len = sizeof(*head);
list_add(&msg->list_head, &session->s_cap_releases);
}
-
- spin_unlock(&session->s_cap_lock);
}
/*
* requests
*/
+int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
+ struct inode *dir)
+{
+ struct ceph_inode_info *ci = ceph_inode(dir);
+ struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
+ struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
+ size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
+ sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
+ int order, num_entries;
+
+ spin_lock(&ci->i_ceph_lock);
+ num_entries = ci->i_files + ci->i_subdirs;
+ spin_unlock(&ci->i_ceph_lock);
+ num_entries = max(num_entries, 1);
+ num_entries = min(num_entries, opt->max_readdir);
+
+ order = get_order(size * num_entries);
+ while (order >= 0) {
+ rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
+ order);
+ if (rinfo->dir_in)
+ break;
+ order--;
+ }
+ if (!rinfo->dir_in)
+ return -ENOMEM;
+
+ num_entries = (PAGE_SIZE << order) / size;
+ num_entries = min(num_entries, opt->max_readdir);
+
+ rinfo->dir_buf_size = PAGE_SIZE << order;
+ req->r_num_caps = num_entries + 1;
+ req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
+ req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
+ return 0;
+}
+
/*
* Create an mds request.
*/
@@ -1474,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
+ req->r_stamp = CURRENT_TIME;
+
req->r_op = op;
req->r_direct_mode = mode;
return req;
@@ -1699,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
}
len = sizeof(*head) +
- pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
+ pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+ sizeof(struct timespec);
/* calculate (max) length for cap releases */
len += sizeof(struct ceph_mds_request_release) *
@@ -1716,6 +1803,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
goto out_free2;
}
+ msg->hdr.version = 2;
msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base;
@@ -1752,6 +1840,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
head->num_releases = cpu_to_le16(releases);
+ /* time stamp */
+ ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+
BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -1875,8 +1966,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
int mds = -1;
int err = -EAGAIN;
- if (req->r_err || req->r_got_result)
+ if (req->r_err || req->r_got_result) {
+ if (req->r_aborted)
+ __unregister_request(mdsc, req);
goto out;
+ }
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
@@ -2009,7 +2103,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
if (req->r_locked_dir)
ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
- if (req->r_old_dentry)
+ if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
@@ -2131,13 +2225,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* dup? */
if ((req->r_got_unsafe && !head->safe) ||
(req->r_got_safe && head->safe)) {
- pr_warning("got a dup %s reply on %llu from mds%d\n",
+ pr_warn("got a dup %s reply on %llu from mds%d\n",
head->safe ? "safe" : "unsafe", tid, mds);
mutex_unlock(&mdsc->mutex);
goto out;
}
if (req->r_got_safe && !head->safe) {
- pr_warning("got unsafe after safe on %llu from mds%d\n",
+ pr_warn("got unsafe after safe on %llu from mds%d\n",
tid, mds);
mutex_unlock(&mdsc->mutex);
goto out;
@@ -2154,26 +2248,16 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
*/
if (result == -ESTALE) {
dout("got ESTALE on request %llu", req->r_tid);
- if (!req->r_inode) {
- /* do nothing; not an authority problem */
- } else if (req->r_direct_mode != USE_AUTH_MDS) {
+ if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now");
req->r_direct_mode = USE_AUTH_MDS;
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
} else {
- struct ceph_inode_info *ci = ceph_inode(req->r_inode);
- struct ceph_cap *cap = NULL;
-
- if (req->r_session)
- cap = ceph_get_cap_for_mds(ci,
- req->r_session->s_mds);
-
- dout("already using auth");
- if ((!cap || cap != ci->i_auth_cap) ||
- (cap->mseq != req->r_sent_on_mseq)) {
- dout("but cap changed, so resending");
+ int mds = __choose_mds(mdsc, req);
+ if (mds >= 0 && mds != req->r_session->s_mds) {
+ dout("but auth changed, so resending");
__do_request(mdsc, req);
mutex_unlock(&mdsc->mutex);
goto out;
@@ -2186,7 +2270,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (head->safe) {
req->r_got_safe = true;
__unregister_request(mdsc, req);
- complete_all(&req->r_safe_completion);
if (req->r_got_unsafe) {
/*
@@ -2238,8 +2321,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) {
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
- req->r_op == CEPH_MDS_OP_LSSNAP) &&
- rinfo->dir_nr)
+ req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
@@ -2400,6 +2482,10 @@ static void handle_session(struct ceph_mds_session *session,
trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
break;
+ case CEPH_SESSION_FLUSHMSG:
+ send_flushmsg_ack(mdsc, session, seq);
+ break;
+
default:
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
WARN_ON(1);
@@ -2490,6 +2576,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
cap->mseq = 0; /* and migrate_seq */
+ cap->cap_gen = cap->session->s_cap_gen;
if (recon_state->flock) {
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -2552,6 +2639,8 @@ encode_again:
} else {
err = ceph_pagelist_append(pagelist, &rec, reclen);
}
+
+ recon_state->nr_caps++;
out_free:
kfree(path);
out_dput:
@@ -2579,6 +2668,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct rb_node *p;
int mds = session->s_mds;
int err = -ENOMEM;
+ int s_nr_caps;
struct ceph_pagelist *pagelist;
struct ceph_reconnect_state recon_state;
@@ -2610,20 +2700,38 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
dout("session %p state %s\n", session,
session_state_name(session->s_state));
+ spin_lock(&session->s_gen_ttl_lock);
+ session->s_cap_gen++;
+ spin_unlock(&session->s_gen_ttl_lock);
+
+ spin_lock(&session->s_cap_lock);
+ /*
+ * notify __ceph_remove_cap() that we are composing cap reconnect.
+ * If a cap get released before being added to the cap reconnect,
+ * __ceph_remove_cap() should skip queuing cap release.
+ */
+ session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
discard_cap_releases(mdsc, session);
+ spin_unlock(&session->s_cap_lock);
/* traverse this session's caps */
- err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+ s_nr_caps = session->s_nr_caps;
+ err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
if (err)
goto fail;
+ recon_state.nr_caps = 0;
recon_state.pagelist = pagelist;
recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
err = iterate_session_caps(session, encode_caps_cb, &recon_state);
if (err < 0)
goto fail;
+ spin_lock(&session->s_cap_lock);
+ session->s_cap_reconnect = 0;
+ spin_unlock(&session->s_cap_lock);
+
/*
* snaprealms. we provide mds with the ino, seq (version), and
* parent for all of our realms. If the mds has any newer info,
@@ -2646,11 +2754,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
if (recon_state.flock)
reply->hdr.version = cpu_to_le16(2);
- if (pagelist->length) {
- /* set up outbound data if we have any */
- reply->hdr.data_len = cpu_to_le32(pagelist->length);
- ceph_msg_data_add_pagelist(reply, pagelist);
+
+ /* raced with cap release? */
+ if (s_nr_caps != recon_state.nr_caps) {
+ struct page *page = list_first_entry(&pagelist->head,
+ struct page, lru);
+ __le32 *addr = kmap_atomic(page);
+ *addr = cpu_to_le32(recon_state.nr_caps);
+ kunmap_atomic(addr);
}
+
+ reply->hdr.data_len = cpu_to_le32(pagelist->length);
+ ceph_msg_data_add_pagelist(reply, pagelist);
ceph_con_send(&session->s_con, reply);
mutex_unlock(&session->s_mutex);
@@ -3417,7 +3532,7 @@ static void peer_reset(struct ceph_connection *con)
struct ceph_mds_session *s = con->private;
struct ceph_mds_client *mdsc = s->s_mdsc;
- pr_warning("mds%d closed our session\n", s->s_mds);
+ pr_warn("mds%d closed our session\n", s->s_mds);
send_mds_reconnect(mdsc, s);
}