aboutsummaryrefslogtreecommitdiff
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c353
1 files changed, 242 insertions, 111 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 188b91c027e..9d61947d473 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -2,7 +2,7 @@
*******************************************************************************
**
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
-** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
+** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
**
** This copyrighted material is made available to anyone wishing to use,
** modify, copy, or redistribute it subject to the terms and conditions
@@ -24,7 +24,6 @@
#include "lock.h"
#include "util.h"
-
static int rcom_response(struct dlm_ls *ls)
{
return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
@@ -38,7 +37,7 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
char *mb;
int mb_len = sizeof(struct dlm_rcom) + len;
- mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
+ mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
if (!mh) {
log_print("create_rcom to %d type %d len %d ENOBUFS",
to_nodeid, type, len);
@@ -72,17 +71,28 @@ static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
dlm_lowcomms_commit_buffer(mh);
}
+static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
+ uint32_t flags)
+{
+ rs->rs_flags = cpu_to_le32(flags);
+}
+
/* When replying to a status request, a node also sends back its
configuration values. The requesting node then checks that the remote
node is configured the same way as itself. */
-static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
+static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
+ uint32_t num_slots)
{
- rf->rf_lvblen = ls->ls_lvblen;
- rf->rf_lsflags = ls->ls_exflags;
+ rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
+ rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
+
+ rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
+ rf->rf_num_slots = cpu_to_le16(num_slots);
+ rf->rf_generation = cpu_to_le32(ls->ls_generation);
}
-static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
+static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
@@ -93,11 +103,12 @@ static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
return -EPROTO;
}
- if (rf->rf_lvblen != ls->ls_lvblen ||
- rf->rf_lsflags != ls->ls_exflags) {
+ if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
+ le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
- ls->ls_lvblen, ls->ls_exflags,
- nodeid, rf->rf_lvblen, rf->rf_lsflags);
+ ls->ls_lvblen, ls->ls_exflags, nodeid,
+ le32_to_cpu(rf->rf_lvblen),
+ le32_to_cpu(rf->rf_lsflags));
return -EPROTO;
}
return 0;
@@ -119,7 +130,18 @@ static void disallow_sync_reply(struct dlm_ls *ls)
spin_unlock(&ls->ls_rcom_spin);
}
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
+/*
+ * low nodeid gathers one slot value at a time from each node.
+ * it sets need_slots=0, and saves rf_our_slot returned from each
+ * rcom_config.
+ *
+ * other nodes gather all slot values at once from the low nodeid.
+ * they set need_slots=1, and ignore the rf_our_slot returned from each
+ * rcom_config. they use the rf_num_slots returned from the low
+ * node's rcom_config.
+ */
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@@ -128,15 +150,18 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
ls->ls_recover_nodeid = nodeid;
if (nodeid == dlm_our_nodeid()) {
- rc = (struct dlm_rcom *) ls->ls_recover_buf;
+ rc = ls->ls_recover_buf;
rc->rc_result = dlm_recover_status(ls);
goto out;
}
- error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
+ error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
+ sizeof(struct rcom_status), &rc, &mh);
if (error)
goto out;
+ set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
+
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
@@ -147,14 +172,17 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
if (error)
goto out;
- rc = (struct dlm_rcom *) ls->ls_recover_buf;
+ rc = ls->ls_recover_buf;
if (rc->rc_result == -ESRCH) {
/* we pretend the remote lockspace exists with 0 status */
log_debug(ls, "remote node %d not ready", nodeid);
rc->rc_result = 0;
- } else
- error = check_config(ls, rc, nodeid);
+ error = 0;
+ } else {
+ error = check_rcom_config(ls, rc, nodeid);
+ }
+
/* the caller looks at rc_result for the remote recovery status */
out:
return error;
@@ -164,17 +192,60 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
- int error, nodeid = rc_in->rc_header.h_nodeid;
+ struct rcom_status *rs;
+ uint32_t status;
+ int nodeid = rc_in->rc_header.h_nodeid;
+ int len = sizeof(struct rcom_config);
+ int num_slots = 0;
+ int error;
+
+ if (!dlm_slots_version(&rc_in->rc_header)) {
+ status = dlm_recover_status(ls);
+ goto do_create;
+ }
+ rs = (struct rcom_status *)rc_in->rc_buf;
+
+ if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
+ status = dlm_recover_status(ls);
+ goto do_create;
+ }
+
+ spin_lock(&ls->ls_recover_lock);
+ status = ls->ls_recover_status;
+ num_slots = ls->ls_num_slots;
+ spin_unlock(&ls->ls_recover_lock);
+ len += num_slots * sizeof(struct rcom_slot);
+
+ do_create:
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
- sizeof(struct rcom_config), &rc, &mh);
+ len, &rc, &mh);
if (error)
return;
+
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
- rc->rc_result = dlm_recover_status(ls);
- make_config(ls, (struct rcom_config *) rc->rc_buf);
+ rc->rc_result = status;
+
+ set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
+
+ if (!num_slots)
+ goto do_send;
+ spin_lock(&ls->ls_recover_lock);
+ if (ls->ls_num_slots != num_slots) {
+ spin_unlock(&ls->ls_recover_lock);
+ log_debug(ls, "receive_rcom_status num_slots %d to %d",
+ num_slots, ls->ls_num_slots);
+ rc->rc_result = 0;
+ set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
+ goto do_send;
+ }
+
+ dlm_slots_copy_out(ls, rc);
+ spin_unlock(&ls->ls_recover_lock);
+
+ do_send:
send_rcom(ls, mh, rc);
}
@@ -197,26 +268,14 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_rcom_spin);
}
-static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
-{
- receive_sync_reply(ls, rc_in);
-}
-
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
- int error = 0, len = sizeof(struct dlm_rcom);
+ int error = 0;
ls->ls_recover_nodeid = nodeid;
- if (nodeid == dlm_our_nodeid()) {
- dlm_copy_master_names(ls, last_name, last_len,
- ls->ls_recover_buf + len,
- dlm_config.ci_buffer_size - len, nodeid);
- goto out;
- }
-
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error)
goto out;
@@ -254,24 +313,38 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom(ls, mh, rc);
}
-static void receive_rcom_names_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
+int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
{
- receive_sync_reply(ls, rc_in);
+ struct dlm_rcom *rc;
+ struct dlm_mhandle *mh;
+ struct dlm_ls *ls = r->res_ls;
+ int error;
+
+ error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+ &rc, &mh);
+ if (error)
+ goto out;
+ memcpy(rc->rc_buf, r->res_name, r->res_length);
+ rc->rc_id = (unsigned long) r->res_id;
+
+ send_rcom(ls, mh, rc);
+ out:
+ return error;
}
-int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
+int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_ls *ls = r->res_ls;
int error;
- error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+ error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
- rc->rc_id = (unsigned long) r;
+ rc->rc_id = 0xFFFFFFFF;
send_rcom(ls, mh, rc);
out:
@@ -289,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error)
return;
- error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
+ if (rc_in->rc_id == 0xFFFFFFFF) {
+ log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
+ dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
+ return;
+ }
+
+ error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
+ DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error)
ret_nodeid = error;
rc->rc_result = ret_nodeid;
@@ -309,22 +389,22 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
{
memset(rl, 0, sizeof(*rl));
- rl->rl_ownpid = lkb->lkb_ownpid;
- rl->rl_lkid = lkb->lkb_id;
- rl->rl_exflags = lkb->lkb_exflags;
- rl->rl_flags = lkb->lkb_flags;
- rl->rl_lvbseq = lkb->lkb_lvbseq;
+ rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
+ rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
+ rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
+ rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
+ rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
rl->rl_rqmode = lkb->lkb_rqmode;
rl->rl_grmode = lkb->lkb_grmode;
rl->rl_status = lkb->lkb_status;
- rl->rl_wait_type = lkb->lkb_wait_type;
+ rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
- if (lkb->lkb_bastaddr)
- rl->rl_asts |= AST_BAST;
- if (lkb->lkb_astaddr)
- rl->rl_asts |= AST_COMP;
+ if (lkb->lkb_bastfn)
+ rl->rl_asts |= DLM_CB_BAST;
+ if (lkb->lkb_astfn)
+ rl->rl_asts |= DLM_CB_CAST;
- rl->rl_namelen = r->res_length;
+ rl->rl_namelen = cpu_to_le16(r->res_length);
memcpy(rl->rl_name, r->res_name, r->res_length);
/* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
@@ -358,6 +438,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
return error;
}
+/* needs at least dlm_rcom + rcom_lock */
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
@@ -381,12 +462,10 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom(ls, mh, rc);
}
-static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
-{
- dlm_recover_process_copy(ls, rc_in);
-}
+/* If the lockspace doesn't exist then still send a status message
+ back; it's possible that it just doesn't have its global_id yet. */
-static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
+int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
struct rcom_config *rf;
@@ -413,7 +492,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
rc->rc_result = -ESRCH;
rf = (struct rcom_config *) rc->rc_buf;
- rf->rf_lvblen = -1;
+ rf->rf_lvblen = cpu_to_le32(~0U);
dlm_rcom_out(rc);
dlm_lowcomms_commit_buffer(mh);
@@ -421,67 +500,102 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
return 0;
}
-static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+/*
+ * Ignore messages for stage Y before we set
+ * recover_status bit for stage X:
+ *
+ * recover_status = 0
+ *
+ * dlm_recover_members()
+ * - send nothing
+ * - recv nothing
+ * - ignore NAMES, NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= NODES
+ *
+ * dlm_recover_members_wait()
+ *
+ * dlm_recover_directory()
+ * - send NAMES
+ * - recv NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= DIR
+ *
+ * dlm_recover_directory_wait()
+ *
+ * dlm_recover_masters()
+ * - send LOOKUP
+ * - recv LOOKUP_REPLY
+ *
+ * dlm_recover_locks()
+ * - send LOCKS
+ * - recv LOCKS_REPLY
+ *
+ * recover_status |= LOCKS
+ *
+ * dlm_recover_locks_wait()
+ *
+ * recover_status |= DONE
+ */
+
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
+ recovery-only comms are sent through here. */
+
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
+ int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
+ int stop, reply = 0, names = 0, lookup = 0, lock = 0;
+ uint32_t status;
uint64_t seq;
- int rv = 0;
switch (rc->rc_type) {
case DLM_RCOM_STATUS_REPLY:
+ reply = 1;
+ break;
+ case DLM_RCOM_NAMES:
+ names = 1;
+ break;
case DLM_RCOM_NAMES_REPLY:
+ names = 1;
+ reply = 1;
+ break;
+ case DLM_RCOM_LOOKUP:
+ lookup = 1;
+ break;
case DLM_RCOM_LOOKUP_REPLY:
+ lookup = 1;
+ reply = 1;
+ break;
+ case DLM_RCOM_LOCK:
+ lock = 1;
+ break;
case DLM_RCOM_LOCK_REPLY:
- spin_lock(&ls->ls_recover_lock);
- seq = ls->ls_recover_seq;
- spin_unlock(&ls->ls_recover_lock);
- if (rc->rc_seq_reply != seq) {
- log_debug(ls, "ignoring old reply %x from %d "
- "seq_reply %llx expect %llx",
- rc->rc_type, rc->rc_header.h_nodeid,
- (unsigned long long)rc->rc_seq_reply,
- (unsigned long long)seq);
- rv = 1;
- }
- }
- return rv;
-}
-
-/* Called by dlm_recvd; corresponds to dlm_receive_message() but special
- recovery-only comms are sent through here. */
-
-void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
-{
- struct dlm_rcom *rc = (struct dlm_rcom *) hd;
- struct dlm_ls *ls;
-
- dlm_rcom_in(rc);
+ lock = 1;
+ reply = 1;
+ break;
+ };
- /* If the lockspace doesn't exist then still send a status message
- back; it's possible that it just doesn't have its global_id yet. */
+ spin_lock(&ls->ls_recover_lock);
+ status = ls->ls_recover_status;
+ stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
+ seq = ls->ls_recover_seq;
+ spin_unlock(&ls->ls_recover_lock);
- ls = dlm_find_lockspace_global(hd->h_lockspace);
- if (!ls) {
- log_print("lockspace %x from %d type %x not found",
- hd->h_lockspace, nodeid, rc->rc_type);
- if (rc->rc_type == DLM_RCOM_STATUS)
- send_ls_not_ready(nodeid, rc);
- return;
- }
+ if (stop && (rc->rc_type != DLM_RCOM_STATUS))
+ goto ignore;
- if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
- log_debug(ls, "ignoring recovery message %x from %d",
- rc->rc_type, nodeid);
- goto out;
- }
+ if (reply && (rc->rc_seq_reply != seq))
+ goto ignore;
- if (is_old_reply(ls, rc))
- goto out;
+ if (!(status & DLM_RS_NODES) && (names || lookup || lock))
+ goto ignore;
- if (nodeid != rc->rc_header.h_nodeid) {
- log_error(ls, "bad rcom nodeid %d from %d",
- rc->rc_header.h_nodeid, nodeid);
- goto out;
- }
+ if (!(status & DLM_RS_DIR) && (lookup || lock))
+ goto ignore;
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
@@ -497,15 +611,17 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
break;
case DLM_RCOM_LOCK:
+ if (rc->rc_header.h_length < lock_size)
+ goto Eshort;
receive_rcom_lock(ls, rc);
break;
case DLM_RCOM_STATUS_REPLY:
- receive_rcom_status_reply(ls, rc);
+ receive_sync_reply(ls, rc);
break;
case DLM_RCOM_NAMES_REPLY:
- receive_rcom_names_reply(ls, rc);
+ receive_sync_reply(ls, rc);
break;
case DLM_RCOM_LOOKUP_REPLY:
@@ -513,13 +629,28 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
break;
case DLM_RCOM_LOCK_REPLY:
- receive_rcom_lock_reply(ls, rc);
+ if (rc->rc_header.h_length < lock_size)
+ goto Eshort;
+ dlm_recover_process_copy(ls, rc);
break;
default:
- DLM_ASSERT(0, printk("rc_type=%x\n", rc->rc_type););
+ log_error(ls, "receive_rcom bad type %d", rc->rc_type);
}
- out:
- dlm_put_lockspace(ls);
+ return;
+
+ignore:
+ log_limit(ls, "dlm_receive_rcom ignore msg %d "
+ "from %d %llu %llu recover seq %llu sts %x gen %u",
+ rc->rc_type,
+ nodeid,
+ (unsigned long long)rc->rc_seq,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq,
+ status, ls->ls_generation);
+ return;
+Eshort:
+ log_error(ls, "recovery message %d from %d is too short",
+ rc->rc_type, nodeid);
}