aboutsummaryrefslogtreecommitdiff
path: root/fs/ocfs2/dlm/dlmrecovery.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmrecovery.c')
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c1262
1 files changed, 937 insertions, 325 deletions
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 186e9a76aa5..45067faf569 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -30,7 +30,6 @@
#include <linux/types.h>
#include <linux/slab.h>
#include <linux/highmem.h>
-#include <linux/utsname.h>
#include <linux/init.h>
#include <linux/sysctl.h>
#include <linux/random.h>
@@ -56,9 +55,6 @@
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
static int dlm_recovery_thread(void *data);
-void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
-int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
static int dlm_do_recovery(struct dlm_ctxt *dlm);
static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +74,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
u8 send_to,
struct dlm_lock_resource *res,
int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 *real_master);
static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master);
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
u8 dead_node, u8 send_to);
@@ -101,11 +91,14 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
static void dlm_request_all_locks_worker(struct dlm_work_item *item,
void *data);
static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ u8 *real_master);
static u64 dlm_get_next_mig_cookie(void);
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(dlm_reco_state_lock);
+static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
static u64 dlm_mig_cookie = 1;
static u64 dlm_get_next_mig_cookie(void)
@@ -121,30 +114,60 @@ static u64 dlm_get_next_mig_cookie(void)
return c;
}
+static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
+ u8 dead_node)
+{
+ assert_spin_locked(&dlm->spinlock);
+ if (dlm->reco.dead_node != dead_node)
+ mlog(0, "%s: changing dead_node from %u to %u\n",
+ dlm->name, dlm->reco.dead_node, dead_node);
+ dlm->reco.dead_node = dead_node;
+}
+
+static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
+ u8 master)
+{
+ assert_spin_locked(&dlm->spinlock);
+ mlog(0, "%s: changing new_master from %u to %u\n",
+ dlm->name, dlm->reco.new_master, master);
+ dlm->reco.new_master = master;
+}
+
+static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
+{
+ assert_spin_locked(&dlm->spinlock);
+ clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+}
+
static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
{
spin_lock(&dlm->spinlock);
- clear_bit(dlm->reco.dead_node, dlm->recovery_map);
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+ __dlm_reset_recovery(dlm);
spin_unlock(&dlm->spinlock);
}
/* Worker function used during recovery. */
-void dlm_dispatch_work(void *data)
+void dlm_dispatch_work(struct work_struct *work)
{
- struct dlm_ctxt *dlm = (struct dlm_ctxt *)data;
+ struct dlm_ctxt *dlm =
+ container_of(work, struct dlm_ctxt, dispatched_work);
LIST_HEAD(tmp_list);
- struct list_head *iter, *iter2;
- struct dlm_work_item *item;
+ struct dlm_work_item *item, *next;
dlm_workfunc_t *workfunc;
+ int tot=0;
spin_lock(&dlm->work_lock);
list_splice_init(&dlm->work_list, &tmp_list);
spin_unlock(&dlm->work_lock);
- list_for_each_safe(iter, iter2, &tmp_list) {
- item = list_entry(iter, struct dlm_work_item, list);
+ list_for_each_entry(item, &tmp_list, list) {
+ tot++;
+ }
+ mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
+
+ list_for_each_entry_safe(item, next, &tmp_list, list) {
workfunc = item->func;
list_del_init(&item->list);
@@ -165,7 +188,7 @@ void dlm_dispatch_work(void *data)
* RECOVERY THREAD
*/
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
{
/* wake the recovery thread
* this will wake the reco thread in one of three places
@@ -226,6 +249,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
*
*/
+static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
+{
+ struct dlm_reco_node_data *ndata;
+ struct dlm_lock_resource *res;
+
+ mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
+ dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
+ dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
+ dlm->reco.dead_node, dlm->reco.new_master);
+
+ list_for_each_entry(ndata, &dlm->reco.node_data, list) {
+ char *st = "unknown";
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ st = "init";
+ break;
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ st = "requesting";
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+ st = "dead";
+ break;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ st = "receiving";
+ break;
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ st = "requested";
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
+ st = "done";
+ break;
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ st = "finalize-sent";
+ break;
+ default:
+ st = "bad";
+ break;
+ }
+ mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
+ dlm->name, ndata->node_num, st);
+ }
+ list_for_each_entry(res, &dlm->reco.resources, recovering) {
+ mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ }
+}
#define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
@@ -238,7 +307,7 @@ static int dlm_recovery_thread(void *data)
mlog(0, "dlm thread running for %s...\n", dlm->name);
while (!kthread_should_stop()) {
- if (dlm_joined(dlm)) {
+ if (dlm_domain_fully_joined(dlm)) {
status = dlm_do_recovery(dlm);
if (status == -EAGAIN) {
/* do not sleep, recheck immediately. */
@@ -273,11 +342,57 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
{
int dead;
spin_lock(&dlm->spinlock);
- dead = test_bit(node, dlm->domain_map);
+ dead = !test_bit(node, dlm->domain_map);
spin_unlock(&dlm->spinlock);
return dead;
}
+/* returns true if node is no longer in the domain
+ * could be dead or just not joined */
+static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
+{
+ int recovered;
+ spin_lock(&dlm->spinlock);
+ recovered = !test_bit(node, dlm->recovery_map);
+ spin_unlock(&dlm->spinlock);
+ return recovered;
+}
+
+
+void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
+{
+ if (dlm_is_node_dead(dlm, node))
+ return;
+
+ printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
+ "domain %s\n", node, dlm->name);
+
+ if (timeout)
+ wait_event_timeout(dlm->dlm_reco_thread_wq,
+ dlm_is_node_dead(dlm, node),
+ msecs_to_jiffies(timeout));
+ else
+ wait_event(dlm->dlm_reco_thread_wq,
+ dlm_is_node_dead(dlm, node));
+}
+
+void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
+{
+ if (dlm_is_node_recovered(dlm, node))
+ return;
+
+ printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
+ "domain %s\n", node, dlm->name);
+
+ if (timeout)
+ wait_event_timeout(dlm->dlm_reco_thread_wq,
+ dlm_is_node_recovered(dlm, node),
+ msecs_to_jiffies(timeout));
+ else
+ wait_event(dlm->dlm_reco_thread_wq,
+ dlm_is_node_recovered(dlm, node));
+}
+
/* callers of the top-level api calls (dlmlock/dlmunlock) should
* block on the dlm->reco.event when recovery is in progress.
* the dlm recovery thread will set this state when it begins
@@ -296,6 +411,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
{
+ if (dlm_in_recovery(dlm)) {
+ mlog(0, "%s: reco thread %d in recovery: "
+ "state=%d, master=%u, dead=%u\n",
+ dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
+ dlm->reco.state, dlm->reco.new_master,
+ dlm->reco.dead_node);
+ }
wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
}
@@ -303,6 +425,8 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm)
{
spin_lock(&dlm->spinlock);
BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
+ printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n",
+ dlm->name, dlm->reco.dead_node);
dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
spin_unlock(&dlm->spinlock);
}
@@ -313,9 +437,18 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
spin_unlock(&dlm->spinlock);
+ printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
wake_up(&dlm->reco.event);
}
+static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
+{
+ printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the "
+ "dead node %u in domain %s\n", dlm->reco.new_master,
+ (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
+ dlm->reco.dead_node, dlm->name);
+}
+
static int dlm_do_recovery(struct dlm_ctxt *dlm)
{
int status = 0;
@@ -329,23 +462,23 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
mlog(0, "new master %u died while recovering %u!\n",
dlm->reco.new_master, dlm->reco.dead_node);
/* unset the new_master, leave dead_node */
- dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
}
/* select a target to recover */
if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
int bit;
- bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
+ bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit >= O2NM_MAX_NODES || bit < 0)
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
else
- dlm->reco.dead_node = bit;
+ dlm_set_reco_dead_node(dlm, bit);
} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
/* BUG? */
mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
dlm->reco.dead_node);
- dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+ dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
}
if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -354,7 +487,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
/* return to main thread loop and sleep. */
return 0;
}
- mlog(0, "recovery thread found node %u in the recovery map!\n",
+ mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
+ dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
dlm->reco.dead_node);
spin_unlock(&dlm->spinlock);
@@ -377,9 +511,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
}
mlog(0, "another node will master this recovery session.\n");
}
- mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n",
- dlm->name, dlm->reco.new_master,
- dlm->node_num, dlm->reco.dead_node);
+
+ dlm_print_recovery_master(dlm);
/* it is safe to start everything back up here
* because all of the dead node's lock resources
@@ -390,13 +523,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
return 0;
master_here:
- mlog(0, "mastering recovery of %s:%u here(this=%u)!\n",
- dlm->name, dlm->reco.dead_node, dlm->node_num);
+ dlm_print_recovery_master(dlm);
status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
if (status < 0) {
- mlog(ML_ERROR, "error %d remastering locks for node %u, "
- "retrying.\n", status, dlm->reco.dead_node);
+ /* we should never hit this anymore */
+ mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, "
+ "retrying.\n", dlm->name, status, dlm->reco.dead_node);
/* yield a bit to allow any final network messages
* to get handled on remaining nodes */
msleep(100);
@@ -404,7 +537,10 @@ master_here:
/* success! see if any other nodes need recovery */
mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
dlm->name, dlm->reco.dead_node, dlm->node_num);
- dlm_reset_recovery(dlm);
+ spin_lock(&dlm->spinlock);
+ __dlm_reset_recovery(dlm);
+ dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+ spin_unlock(&dlm->spinlock);
}
dlm_end_recovery(dlm);
@@ -416,23 +552,28 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
{
int status = 0;
struct dlm_reco_node_data *ndata;
- struct list_head *iter;
int all_nodes_done;
int destroy = 0;
int pass = 0;
- status = dlm_init_recovery_area(dlm, dead_node);
- if (status < 0)
- goto leave;
+ do {
+ /* we have become recovery master. there is no escaping
+ * this, so just keep trying until we get it. */
+ status = dlm_init_recovery_area(dlm, dead_node);
+ if (status < 0) {
+ mlog(ML_ERROR, "%s: failed to alloc recovery area, "
+ "retrying\n", dlm->name);
+ msleep(1000);
+ }
+ } while (status != 0);
/* safe to access the node data list without a lock, since this
* process is the only one to change the list */
- list_for_each(iter, &dlm->reco.node_data) {
- ndata = list_entry (iter, struct dlm_reco_node_data, list);
+ list_for_each_entry(ndata, &dlm->reco.node_data, list) {
BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);
ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
- mlog(0, "requesting lock info from node %u\n",
+ mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
ndata->node_num);
if (ndata->node_num == dlm->node_num) {
@@ -440,17 +581,38 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
continue;
}
- status = dlm_request_all_locks(dlm, ndata->node_num, dead_node);
- if (status < 0) {
- mlog_errno(status);
- if (dlm_is_host_down(status))
- ndata->state = DLM_RECO_NODE_DATA_DEAD;
- else {
- destroy = 1;
- goto leave;
+ do {
+ status = dlm_request_all_locks(dlm, ndata->node_num,
+ dead_node);
+ if (status < 0) {
+ mlog_errno(status);
+ if (dlm_is_host_down(status)) {
+ /* node died, ignore it for recovery */
+ status = 0;
+ ndata->state = DLM_RECO_NODE_DATA_DEAD;
+ /* wait for the domain map to catch up
+ * with the network state. */
+ wait_event_timeout(dlm->dlm_reco_thread_wq,
+ dlm_is_node_dead(dlm,
+ ndata->node_num),
+ msecs_to_jiffies(1000));
+ mlog(0, "waited 1 sec for %u, "
+ "dead? %s\n", ndata->node_num,
+ dlm_is_node_dead(dlm, ndata->node_num) ?
+ "yes" : "no");
+ } else {
+ /* -ENOMEM on the other node */
+ mlog(0, "%s: node %u returned "
+ "%d during recovery, retrying "
+ "after a short wait\n",
+ dlm->name, ndata->node_num,
+ status);
+ msleep(100);
+ }
}
- }
+ } while (status != 0);
+ spin_lock(&dlm_reco_state_lock);
switch (ndata->state) {
case DLM_RECO_NODE_DATA_INIT:
case DLM_RECO_NODE_DATA_FINALIZE_SENT:
@@ -461,10 +623,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
mlog(0, "node %u died after requesting "
"recovery info for node %u\n",
ndata->node_num, dead_node);
- // start all over
- destroy = 1;
- status = -EAGAIN;
- goto leave;
+ /* fine. don't need this node's info.
+ * continue without it. */
+ break;
case DLM_RECO_NODE_DATA_REQUESTING:
ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
mlog(0, "now receiving recovery data from "
@@ -482,9 +643,10 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
ndata->node_num, dead_node);
break;
}
+ spin_unlock(&dlm_reco_state_lock);
}
- mlog(0, "done requesting all lock info\n");
+ mlog(0, "%s: Done requesting all lock info\n", dlm->name);
/* nodes should be sending reco data now
* just need to wait */
@@ -494,9 +656,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
* done, or if anyone died */
all_nodes_done = 1;
spin_lock(&dlm_reco_state_lock);
- list_for_each(iter, &dlm->reco.node_data) {
- ndata = list_entry (iter, struct dlm_reco_node_data, list);
-
+ list_for_each_entry(ndata, &dlm->reco.node_data, list) {
mlog(0, "checking recovery state of node %u\n",
ndata->node_num);
switch (ndata->state) {
@@ -508,35 +668,26 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
BUG();
break;
case DLM_RECO_NODE_DATA_DEAD:
- mlog(ML_NOTICE, "node %u died after "
+ mlog(0, "node %u died after "
"requesting recovery info for "
"node %u\n", ndata->node_num,
dead_node);
- spin_unlock(&dlm_reco_state_lock);
- // start all over
- destroy = 1;
- status = -EAGAIN;
- /* instead of spinning like crazy here,
- * wait for the domain map to catch up
- * with the network state. otherwise this
- * can be hit hundreds of times before
- * the node is really seen as dead. */
- wait_event_timeout(dlm->dlm_reco_thread_wq,
- dlm_is_node_dead(dlm,
- ndata->node_num),
- msecs_to_jiffies(1000));
- mlog(0, "waited 1 sec for %u, "
- "dead? %s\n", ndata->node_num,
- dlm_is_node_dead(dlm, ndata->node_num) ?
- "yes" : "no");
- goto leave;
+ break;
case DLM_RECO_NODE_DATA_RECEIVING:
case DLM_RECO_NODE_DATA_REQUESTED:
+ mlog(0, "%s: node %u still in state %s\n",
+ dlm->name, ndata->node_num,
+ ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
+ "receiving" : "requested");
all_nodes_done = 0;
break;
case DLM_RECO_NODE_DATA_DONE:
+ mlog(0, "%s: node %u state is done\n",
+ dlm->name, ndata->node_num);
break;
case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ mlog(0, "%s: node %u state is finalize\n",
+ dlm->name, ndata->node_num);
break;
}
}
@@ -547,6 +698,14 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
if (all_nodes_done) {
int ret;
+ /* Set this flag on recovery master to avoid
+ * a new recovery for another dead node start
+ * before the recovery is not done. That may
+ * cause recovery hung.*/
+ spin_lock(&dlm->spinlock);
+ dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
+ spin_unlock(&dlm->spinlock);
+
/* all nodes are now in DLM_RECO_NODE_DATA_DONE state
* just send a finalize message to everyone and
* clean up */
@@ -566,7 +725,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
jiffies, dlm->reco.dead_node,
dlm->node_num, dlm->reco.new_master);
destroy = 1;
- status = ret;
+ status = 0;
/* rescan everything marked dirty along the way */
dlm_kick_thread(dlm, NULL);
break;
@@ -579,11 +738,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
}
-leave:
if (destroy)
dlm_destroy_recovery_area(dlm, dead_node);
- mlog_exit(status);
return status;
}
@@ -605,7 +762,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
}
BUG_ON(num == dead_node);
- ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL);
+ ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
if (!ndata) {
dlm_destroy_recovery_area(dlm, dead_node);
return -ENOMEM;
@@ -623,16 +780,14 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
{
- struct list_head *iter, *iter2;
- struct dlm_reco_node_data *ndata;
+ struct dlm_reco_node_data *ndata, *next;
LIST_HEAD(tmplist);
spin_lock(&dlm_reco_state_lock);
list_splice_init(&dlm->reco.node_data, &tmplist);
spin_unlock(&dlm_reco_state_lock);
- list_for_each_safe(iter, iter2, &tmplist) {
- ndata = list_entry (iter, struct dlm_reco_node_data, list);
+ list_for_each_entry_safe(ndata, next, &tmplist, list) {
list_del_init(&ndata->list);
kfree(ndata);
}
@@ -642,7 +797,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
u8 dead_node)
{
struct dlm_lock_request lr;
- enum dlm_status ret;
+ int ret;
+ int status;
mlog(0, "\n");
@@ -655,21 +811,24 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
lr.dead_node = dead_node;
// send message
- ret = DLM_NOLOCKMGR;
ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,
- &lr, sizeof(lr), request_from, NULL);
+ &lr, sizeof(lr), request_from, &status);
/* negative status is handled by caller */
if (ret < 0)
- mlog_errno(ret);
-
+ mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u "
+ "to recover dead node %u\n", dlm->name, ret,
+ request_from, dead_node);
+ else
+ ret = status;
// return from here, then
// sleep until all received or error
return ret;
}
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -679,16 +838,25 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
if (!dlm_grab(dlm))
return -EINVAL;
+ if (lr->dead_node != dlm->reco.dead_node) {
+ mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
+ "dead_node is %u\n", dlm->name, lr->node_idx,
+ lr->dead_node, dlm->reco.dead_node);
+ dlm_print_reco_node_status(dlm);
+ /* this is a hack */
+ dlm_put(dlm);
+ return -ENOMEM;
+ }
BUG_ON(lr->dead_node != dlm->reco.dead_node);
- item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+ item = kzalloc(sizeof(*item), GFP_NOFS);
if (!item) {
dlm_put(dlm);
return -ENOMEM;
}
/* this will get freed by dlm_request_all_locks_worker */
- buf = (char *) __get_free_page(GFP_KERNEL);
+ buf = (char *) __get_free_page(GFP_NOFS);
if (!buf) {
kfree(item);
dlm_put(dlm);
@@ -703,7 +871,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);
- schedule_work(&dlm->dispatched_work);
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
dlm_put(dlm);
return 0;
@@ -715,33 +883,36 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
struct dlm_lock_resource *res;
struct dlm_ctxt *dlm;
LIST_HEAD(resources);
- struct list_head *iter;
int ret;
u8 dead_node, reco_master;
+ int skip_all_done = 0;
dlm = item->dlm;
dead_node = item->u.ral.dead_node;
reco_master = item->u.ral.reco_master;
mres = (struct dlm_migratable_lockres *)data;
+ mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
+ dlm->name, dead_node, reco_master);
+
if (dead_node != dlm->reco.dead_node ||
reco_master != dlm->reco.new_master) {
- /* show extra debug info if the recovery state is messed */
- mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
- "request(dead=%u, master=%u)\n",
- dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
- dead_node, reco_master);
- mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
- "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
- dlm->name, mres->lockname_len, mres->lockname, mres->master,
- mres->num_locks, mres->total_locks, mres->flags,
- mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
- mres->ml[0].type, mres->ml[0].convert_type,
- mres->ml[0].highest_blocked, mres->ml[0].node);
- BUG();
+ /* worker could have been created before the recovery master
+ * died. if so, do not continue, but do not error. */
+ if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
+ mlog(ML_NOTICE, "%s: will not send recovery state, "
+ "recovery master %u died, thread=(dead=%u,mas=%u)"
+ " current=(dead=%u,mas=%u)\n", dlm->name,
+ reco_master, dead_node, reco_master,
+ dlm->reco.dead_node, dlm->reco.new_master);
+ } else {
+ mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
+ "master=%u), request(dead=%u, master=%u)\n",
+ dlm->name, dlm->reco.dead_node,
+ dlm->reco.new_master, dead_node, reco_master);
+ }
+ goto leave;
}
- BUG_ON(dead_node != dlm->reco.dead_node);
- BUG_ON(reco_master != dlm->reco.new_master);
/* lock resources should have already been moved to the
* dlm->reco.resources list. now move items from that list
@@ -752,12 +923,19 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
/* now we can begin blasting lockreses without the dlm lock */
- list_for_each(iter, &resources) {
- res = list_entry (iter, struct dlm_lock_resource, recovering);
+
+ /* any errors returned will be due to the new_master dying,
+ * the dlm_reco_thread should detect this */
+ list_for_each_entry(res, &resources, recovering) {
ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
DLM_MRES_RECOVERY);
- if (ret < 0)
- mlog_errno(ret);
+ if (ret < 0) {
+ mlog(ML_ERROR, "%s: node %u went down while sending "
+ "recovery state for dead node %u, ret=%d\n", dlm->name,
+ reco_master, dead_node, ret);
+ skip_all_done = 1;
+ break;
+ }
}
/* move the resources back to the list */
@@ -765,10 +943,15 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
list_splice_init(&resources, &dlm->reco.resources);
spin_unlock(&dlm->spinlock);
- ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
- if (ret < 0)
- mlog_errno(ret);
-
+ if (!skip_all_done) {
+ ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+ if (ret < 0) {
+ mlog(ML_ERROR, "%s: node %u went down while sending "
+ "recovery all-done for dead node %u, ret=%d\n",
+ dlm->name, reco_master, dead_node, ret);
+ }
+ }
+leave:
free_page((unsigned long)data);
}
@@ -787,18 +970,24 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
sizeof(done_msg), send_to, &tmpret);
- /* negative status is ignored by the caller */
- if (ret >= 0)
+ if (ret < 0) {
+ mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u "
+ "to recover dead node %u\n", dlm->name, ret, send_to,
+ dead_node);
+ if (!dlm_is_host_down(ret)) {
+ BUG();
+ }
+ } else
ret = tmpret;
return ret;
}
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
- struct list_head *iter;
struct dlm_reco_node_data *ndata = NULL;
int ret = -EINVAL;
@@ -808,11 +997,14 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
"node_idx=%u, this node=%u\n", done->dead_node,
dlm->reco.dead_node, done->node_idx, dlm->node_num);
- BUG_ON(done->dead_node != dlm->reco.dead_node);
+
+ mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
+ "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
+ "node_idx=%u, this node=%u\n", done->dead_node,
+ dlm->reco.dead_node, done->node_idx, dlm->node_num);
spin_lock(&dlm_reco_state_lock);
- list_for_each(iter, &dlm->reco.node_data) {
- ndata = list_entry (iter, struct dlm_reco_node_data, list);
+ list_for_each_entry(ndata, &dlm->reco.node_data, list) {
if (ndata->node_num != done->node_idx)
continue;
@@ -860,13 +1052,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
struct list_head *list,
u8 dead_node)
{
- struct dlm_lock_resource *res;
- struct list_head *iter, *iter2;
+ struct dlm_lock_resource *res, *next;
struct dlm_lock *lock;
spin_lock(&dlm->spinlock);
- list_for_each_safe(iter, iter2, &dlm->reco.resources) {
- res = list_entry (iter, struct dlm_lock_resource, recovering);
+ list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name,
@@ -876,7 +1066,7 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
if (lock->ml.node == dead_node) {
mlog(0, "AHA! there was "
"a $RECOVERY lock for dead "
- "node %u (%s)!\n",
+ "node %u (%s)!\n",
dead_node, dlm->name);
list_del_init(&lock->list);
dlm_lock_put(lock);
@@ -891,13 +1081,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
mlog(0, "found lockres owned by dead node while "
"doing recovery for node %u. sending it.\n",
dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "found UNKNOWN owner while doing recovery "
"for node %u. sending it.\n", dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
}
}
spin_unlock(&dlm->spinlock);
@@ -943,13 +1131,22 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
if (total_locks == mres_total_locks)
mres->flags |= DLM_MRES_ALL_DONE;
+ mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ orig_flags & DLM_MRES_MIGRATION ? "migration" : "recovery",
+ send_to);
+
/* send it */
ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
sz, send_to, &status);
if (ret < 0) {
/* XXX: negative status is not handled.
* this will end up killing this node. */
- mlog_errno(ret);
+ mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to "
+ "node %u (%s)\n", dlm->name, mres->lockname_len,
+ mres->lockname, ret, send_to,
+ (orig_flags & DLM_MRES_MIGRATION ?
+ "migration" : "recovery"));
} else {
/* might get an -ENOMEM back here */
ret = status;
@@ -977,7 +1174,7 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
u8 flags, u8 master)
{
/* mres here is one full page */
- memset(mres, 0, PAGE_SIZE);
+ clear_page(mres);
mres->lockname_len = namelen;
memcpy(mres->lockname, lockname, namelen);
mres->num_locks = 0;
@@ -987,6 +1184,39 @@ static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,
mres->master = master;
}
+static void dlm_prepare_lvb_for_migration(struct dlm_lock *lock,
+ struct dlm_migratable_lockres *mres,
+ int queue)
+{
+ if (!lock->lksb)
+ return;
+
+ /* Ignore lvb in all locks in the blocked list */
+ if (queue == DLM_BLOCKED_LIST)
+ return;
+
+ /* Only consider lvbs in locks with granted EX or PR lock levels */
+ if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE)
+ return;
+
+ if (dlm_lvb_is_empty(mres->lvb)) {
+ memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
+ return;
+ }
+
+ /* Ensure the lvb copied for migration matches in other valid locks */
+ if (!memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))
+ return;
+
+ mlog(ML_ERROR, "Mismatched lvb in lock cookie=%u:%llu, name=%.*s, "
+ "node=%u\n",
+ dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
+ lock->lockres->lockname.len, lock->lockres->lockname.name,
+ lock->ml.node);
+ dlm_print_one_lock_resource(lock->lockres);
+ BUG();
+}
/* returns 1 if this lock fills the network structure,
* 0 otherwise */
@@ -1004,19 +1234,7 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
ml->list = queue;
if (lock->lksb) {
ml->flags = lock->lksb->flags;
- /* send our current lvb */
- if (ml->type == LKM_EXMODE ||
- ml->type == LKM_PRMODE) {
- /* if it is already set, this had better be a PR
- * and it has to match */
- if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
- memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
- mlog(ML_ERROR, "mismatched lvbs!\n");
- __dlm_print_one_lock_resource(lock->lockres);
- BUG();
- }
- memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);
- }
+ dlm_prepare_lvb_for_migration(lock, mres, queue);
}
ml->node = lock->ml.node;
mres->num_locks++;
@@ -1026,12 +1244,40 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
return 0;
}
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+ struct dlm_migratable_lockres *mres)
+{
+ struct dlm_lock dummy;
+ memset(&dummy, 0, sizeof(dummy));
+ dummy.ml.cookie = 0;
+ dummy.ml.type = LKM_IVMODE;
+ dummy.ml.convert_type = LKM_IVMODE;
+ dummy.ml.highest_blocked = LKM_IVMODE;
+ dummy.lksb = NULL;
+ dummy.ml.node = dlm->node_num;
+ dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+ struct dlm_migratable_lock *ml,
+ u8 *nodenum)
+{
+ if (unlikely(ml->cookie == 0 &&
+ ml->type == LKM_IVMODE &&
+ ml->convert_type == LKM_IVMODE &&
+ ml->highest_blocked == LKM_IVMODE &&
+ ml->list == DLM_BLOCKED_LIST)) {
+ *nodenum = ml->node;
+ return 1;
+ }
+ return 0;
+}
int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres,
u8 send_to, u8 flags)
{
- struct list_head *queue, *iter;
+ struct list_head *queue;
int total_locks, i;
u64 mig_cookie = 0;
struct dlm_lock *lock;
@@ -1057,9 +1303,7 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
total_locks = 0;
for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {
queue = dlm_list_idx_to_ptr(res, i);
- list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
-
+ list_for_each_entry(lock, queue, list) {
/* add another lock. */
total_locks++;
if (!dlm_add_lock_to_array(lock, mres, i))
@@ -1069,22 +1313,33 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* we must send it immediately. */
ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
res, total_locks);
- if (ret < 0) {
- // TODO
- mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
- "returned %d, TODO\n", ret);
- BUG();
- }
+ if (ret < 0)
+ goto error;
}
}
+ if (total_locks == 0) {
+ /* send a dummy lock to indicate a mastery reference only */
+ mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+ "migration");
+ dlm_add_dummy_lock(dlm, mres);
+ }
/* flush any remaining locks */
ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
- if (ret < 0) {
- // TODO
- mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
- "TODO\n", ret);
+ if (ret < 0)
+ goto error;
+ return ret;
+
+error:
+ mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
+ dlm->name, ret);
+ if (!dlm_is_host_down(ret))
BUG();
- }
+ mlog(0, "%s: node %u went down while sending %s "
+ "lockres %.*s\n", dlm->name, send_to,
+ flags & DLM_MRES_RECOVERY ? "recovery" : "migration",
+ res->lockname.len, res->lockname.name);
return ret;
}
@@ -1103,13 +1358,15 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* do we spin? returning an error only delays the problem really
*/
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_migratable_lockres *mres =
(struct dlm_migratable_lockres *)msg->buf;
int ret = 0;
u8 real_master;
+ u8 extra_refs = 0;
char *buf = NULL;
struct dlm_work_item *item = NULL;
struct dlm_lock_resource *res = NULL;
@@ -1132,8 +1389,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "all done flag. all lockres data received!\n");
ret = -ENOMEM;
- buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL);
- item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+ buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
+ item = kzalloc(sizeof(*item), GFP_NOFS);
if (!buf || !item)
goto leave;
@@ -1161,6 +1418,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
mres->lockname_len, mres->lockname);
ret = -EFAULT;
spin_unlock(&res->spinlock);
+ dlm_lockres_put(res);
goto leave;
}
res->state |= DLM_LOCK_RES_MIGRATING;
@@ -1187,22 +1445,38 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
__dlm_insert_lockres(dlm, res);
spin_unlock(&dlm->spinlock);
+ /* Add an extra ref for this lock-less lockres lest the
+ * dlm_thread purges it before we get the chance to add
+ * locks to it */
+ dlm_lockres_get(res);
+
+ /* There are three refs that need to be put.
+ * 1. Taken above.
+ * 2. kref_init in dlm_new_lockres()->dlm_init_lockres().
+ * 3. dlm_lookup_lockres()
+ * The first one is handled at the end of this function. The
+ * other two are handled in the worker thread after locks have
+ * been attached. Yes, we don't wait for purge time to match
+ * kref_init. The lockres will still have atleast one ref
+ * added because it is in the hash __dlm_insert_lockres() */
+ extra_refs++;
+
/* now that the new lockres is inserted,
* make it usable by other processes */
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
-
- /* add an extra ref for just-allocated lockres
- * otherwise the lockres will be purged immediately */
- dlm_lockres_get(res);
-
+ wake_up(&res->wq);
}
/* at this point we have allocated everything we need,
* and we have a hashed lockres with an extra ref and
* the proper res->state flags. */
ret = 0;
+ spin_lock(&res->spinlock);
+ /* drop this either when master requery finds a different master
+ * or when a lock is added by the recovery worker */
+ dlm_lockres_grab_inflight_ref(dlm, res);
if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
/* migration cannot have an unknown master */
BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1210,10 +1484,11 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
"unknown owner.. will need to requery: "
"%.*s\n", mres->lockname_len, mres->lockname);
} else {
- spin_lock(&res->spinlock);
+ /* take a reference now to pin the lockres, drop it
+ * when locks are added in the worker */
dlm_change_lockres_owner(dlm, res, dlm->node_num);
- spin_unlock(&res->spinlock);
}
+ spin_unlock(&res->spinlock);
/* queue up work for dlm_mig_lockres_worker */
dlm_grab(dlm); /* get an extra ref for the work item */
@@ -1221,38 +1496,43 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
item->u.ml.lockres = res; /* already have a ref */
item->u.ml.real_master = real_master;
+ item->u.ml.extra_ref = extra_refs;
spin_lock(&dlm->work_lock);
list_add_tail(&item->list, &dlm->work_list);
spin_unlock(&dlm->work_lock);
- schedule_work(&dlm->dispatched_work);
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
leave:
+ /* One extra ref taken needs to be put here */
+ if (extra_refs)
+ dlm_lockres_put(res);
+
dlm_put(dlm);
if (ret < 0) {
- if (buf)
- kfree(buf);
- if (item)
- kfree(item);
+ kfree(buf);
+ kfree(item);
+ mlog_errno(ret);
}
- mlog_exit(ret);
return ret;
}
static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
{
- struct dlm_ctxt *dlm = data;
+ struct dlm_ctxt *dlm;
struct dlm_migratable_lockres *mres;
int ret = 0;
struct dlm_lock_resource *res;
u8 real_master;
+ u8 extra_ref;
dlm = item->dlm;
mres = (struct dlm_migratable_lockres *)data;
res = item->u.ml.lockres;
real_master = item->u.ml.real_master;
+ extra_ref = item->u.ml.extra_ref;
if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {
/* this case is super-rare. only occurs if
@@ -1269,6 +1549,9 @@ again:
"this node will take it.\n",
res->lockname.len, res->lockname.name);
} else {
+ spin_lock(&res->spinlock);
+ dlm_lockres_drop_inflight_ref(dlm, res);
+ spin_unlock(&res->spinlock);
mlog(0, "master needs to respond to sender "
"that node %u still owns %.*s\n",
real_master, res->lockname.len,
@@ -1292,8 +1575,13 @@ again:
}
leave:
+ /* See comment in dlm_mig_lockres_handler() */
+ if (res) {
+ if (extra_ref)
+ dlm_lockres_put(res);
+ dlm_lockres_put(res);
+ }
kfree(data);
- mlog_exit(ret);
}
@@ -1342,8 +1630,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
if (ret < 0) {
mlog_errno(ret);
- BUG();
- /* TODO: need to figure a way to restart this */
+ if (!dlm_is_host_down(ret))
+ BUG();
+ /* host is down, so answer for that node would be
+ * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "lock master is %u\n", *real_master);
@@ -1354,9 +1644,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
}
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+ u8 nodenum, u8 *real_master)
{
int ret = -EINVAL;
struct dlm_master_requery req;
@@ -1371,7 +1660,9 @@ static int dlm_do_master_requery(struct dlm_ctxt *dlm,
&req, sizeof(req), nodenum, &status);
/* XXX: negative status not handled properly here. */
if (ret < 0)
- mlog_errno(ret);
+ mlog(ML_ERROR, "Error %d when sending message %u (key "
+ "0x%x) to node %u\n", ret, DLM_MASTER_REQUERY_MSG,
+ dlm->key, nodenum);
else {
BUG_ON(status < 0);
BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN);
@@ -1387,11 +1678,13 @@ static int dlm_do_master_requery(struct dlm_ctxt *dlm,
/* this function cannot error, so unless the sending
* or receiving of the message failed, the owner can
* be trusted */
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
struct dlm_lock_resource *res = NULL;
+ unsigned int hash;
int master = DLM_LOCK_RES_OWNER_UNKNOWN;
u32 flags = DLM_ASSERT_MASTER_REQUERY;
@@ -1401,8 +1694,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
return master;
}
+ hash = dlm_lockid_hash(req->name, req->namelen);
+
spin_lock(&dlm->spinlock);
- res = __dlm_lookup_lockres(dlm, req->name, req->namelen);
+ res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
if (res) {
spin_lock(&res->spinlock);
master = res->owner;
@@ -1413,8 +1708,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
mlog_errno(-ENOMEM);
/* retry!? */
BUG();
- }
- }
+ } else
+ __dlm_lockres_grab_inflight_worker(dlm, res);
+ } else /* put.. incase we are not the master */
+ dlm_lockres_put(res);
spin_unlock(&res->spinlock);
}
spin_unlock(&dlm->spinlock);
@@ -1465,22 +1762,39 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_migratable_lockres *mres)
{
struct dlm_migratable_lock *ml;
- struct list_head *queue;
+ struct list_head *queue, *iter;
+ struct list_head *tmpq = NULL;
struct dlm_lock *newlock = NULL;
struct dlm_lockstatus *lksb = NULL;
int ret = 0;
- int i;
- struct list_head *iter;
- struct dlm_lock *lock = NULL;
+ int i, j, bad;
+ struct dlm_lock *lock;
+ u8 from = O2NM_MAX_NODES;
+ unsigned int added = 0;
+ __be64 c;
mlog(0, "running %d locks for this lockres\n", mres->num_locks);
for (i=0; i<mres->num_locks; i++) {
ml = &(mres->ml[i]);
+
+ if (dlm_is_dummy_lock(dlm, ml, &from)) {
+ /* placeholder, just need to set the refmap bit */
+ BUG_ON(mres->num_locks != 1);
+ mlog(0, "%s:%.*s: dummy lock for %u\n",
+ dlm->name, mres->lockname_len, mres->lockname,
+ from);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(dlm, res, from);
+ spin_unlock(&res->spinlock);
+ added++;
+ break;
+ }
BUG_ON(ml->highest_blocked != LKM_IVMODE);
newlock = NULL;
lksb = NULL;
queue = dlm_list_num_to_pointer(res, ml->list);
+ tmpq = NULL;
/* if the lock is for the local node it needs to
* be moved to the proper location within the queue.
@@ -1489,33 +1803,78 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
/* MIGRATION ONLY! */
BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
+ lock = NULL;
spin_lock(&res->spinlock);
- list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
- if (lock->ml.cookie != ml->cookie)
+ for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+ tmpq = dlm_list_idx_to_ptr(res, j);
+ list_for_each(iter, tmpq) {
+ lock = list_entry(iter,
+ struct dlm_lock, list);
+ if (lock->ml.cookie == ml->cookie)
+ break;
lock = NULL;
- else
+ }
+ if (lock)
break;
}
/* lock is always created locally first, and
* destroyed locally last. it must be on the list */
if (!lock) {
- mlog(ML_ERROR, "could not find local lock "
- "with cookie %"MLFu64"!\n",
- ml->cookie);
+ c = ml->cookie;
+ mlog(ML_ERROR, "Could not find local lock "
+ "with cookie %u:%llu, node %u, "
+ "list %u, flags 0x%x, type %d, "
+ "conv %d, highest blocked %d\n",
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+ ml->node, ml->list, ml->flags, ml->type,
+ ml->convert_type, ml->highest_blocked);
+ __dlm_print_one_lock_resource(res);
BUG();
}
- BUG_ON(lock->ml.node != ml->node);
+
+ if (lock->ml.node != ml->node) {
+ c = lock->ml.cookie;
+ mlog(ML_ERROR, "Mismatched node# in lock "
+ "cookie %u:%llu, name %.*s, node %u\n",
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+ res->lockname.len, res->lockname.name,
+ lock->ml.node);
+ c = ml->cookie;
+ mlog(ML_ERROR, "Migrate lock cookie %u:%llu, "
+ "node %u, list %u, flags 0x%x, type %d, "
+ "conv %d, highest blocked %d\n",
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+ ml->node, ml->list, ml->flags, ml->type,
+ ml->convert_type, ml->highest_blocked);
+ __dlm_print_one_lock_resource(res);
+ BUG();
+ }
+
+ if (tmpq != queue) {
+ c = ml->cookie;
+ mlog(0, "Lock cookie %u:%llu was on list %u "
+ "instead of list %u for %.*s\n",
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)),
+ j, ml->list, res->lockname.len,
+ res->lockname.name);
+ __dlm_print_one_lock_resource(res);
+ spin_unlock(&res->spinlock);
+ continue;
+ }
/* see NOTE above about why we do not update
* to match the master here */
/* move the lock to its proper place */
/* do not alter lock refcount. switching lists. */
- list_del_init(&lock->list);
- list_add_tail(&lock->list, queue);
+ list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
+ added++;
mlog(0, "just reordered a local lock!\n");
continue;
@@ -1537,28 +1896,55 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
}
lksb->flags |= (ml->flags &
(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
-
- if (mres->lvb[0]) {
+
+ if (ml->type == LKM_NLMODE)
+ goto skip_lvb;
+
+ /*
+ * If the lock is in the blocked list it can't have a valid lvb,
+ * so skip it
+ */
+ if (ml->list == DLM_BLOCKED_LIST)
+ goto skip_lvb;
+
+ if (!dlm_lvb_is_empty(mres->lvb)) {
if (lksb->flags & DLM_LKSB_PUT_LVB) {
/* other node was trying to update
* lvb when node died. recreate the
* lksb with the updated lvb. */
memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+ /* the lock resource lvb update must happen
+ * NOW, before the spinlock is dropped.
+ * we no longer wait for the AST to update
+ * the lvb. */
+ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
} else {
- /* otherwise, the node is sending its
+ /* otherwise, the node is sending its
* most recent valid lvb info */
BUG_ON(ml->type != LKM_EXMODE &&
ml->type != LKM_PRMODE);
- if (res->lvb[0] && (ml->type == LKM_EXMODE ||
- memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
- mlog(ML_ERROR, "received bad lvb!\n");
- __dlm_print_one_lock_resource(res);
- BUG();
+ if (!dlm_lvb_is_empty(res->lvb) &&
+ (ml->type == LKM_EXMODE ||
+ memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+ int i;
+ mlog(ML_ERROR, "%s:%.*s: received bad "
+ "lvb! type=%d\n", dlm->name,
+ res->lockname.len,
+ res->lockname.name, ml->type);
+ printk("lockres lvb=[");
+ for (i=0; i<DLM_LVB_LEN; i++)
+ printk("%02x", res->lvb[i]);
+ printk("]\nmigrated lvb=[");
+ for (i=0; i<DLM_LVB_LEN; i++)
+ printk("%02x", mres->lvb[i]);
+ printk("]\n");
+ dlm_print_one_lock_resource(res);
+ BUG();
}
memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
}
}
-
+skip_lvb:
/* NOTE:
* wrt lock queue ordering and recovery:
@@ -1576,21 +1962,62 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
* relative to each other, but clearly *not*
* preserved relative to locks from other nodes.
*/
+ bad = 0;
spin_lock(&res->spinlock);
- dlm_lock_get(newlock);
- list_add_tail(&newlock->list, queue);
+ list_for_each_entry(lock, queue, list) {
+ if (lock->ml.cookie == ml->cookie) {
+ c = lock->ml.cookie;
+ mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
+ "exists on this lockres!\n", dlm->name,
+ res->lockname.len, res->lockname.name,
+ dlm_get_lock_cookie_node(be64_to_cpu(c)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+
+ mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
+ "node=%u, cookie=%u:%llu, queue=%d\n",
+ ml->type, ml->convert_type, ml->node,
+ dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
+ dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
+ ml->list);
+
+ __dlm_print_one_lock_resource(res);
+ bad = 1;
+ break;
+ }
+ }
+ if (!bad) {
+ dlm_lock_get(newlock);
+ if (mres->flags & DLM_MRES_RECOVERY &&
+ ml->list == DLM_CONVERTING_LIST &&
+ newlock->ml.type >
+ newlock->ml.convert_type) {
+ /* newlock is doing downconvert, add it to the
+ * head of converting list */
+ list_add(&newlock->list, queue);
+ } else
+ list_add_tail(&newlock->list, queue);
+ mlog(0, "%s:%.*s: added lock for node %u, "
+ "setting refmap bit\n", dlm->name,
+ res->lockname.len, res->lockname.name, ml->node);
+ dlm_lockres_set_refmap_bit(dlm, res, ml->node);
+ added++;
+ }
spin_unlock(&res->spinlock);
}
mlog(0, "done running all the locks\n");
leave:
+ /* balance the ref taken when the work was queued */
+ spin_lock(&res->spinlock);
+ dlm_lockres_drop_inflight_ref(dlm, res);
+ spin_unlock(&res->spinlock);
+
if (ret < 0) {
mlog_errno(ret);
if (newlock)
dlm_lock_put(newlock);
}
- mlog_exit(ret);
return ret;
}
@@ -1598,19 +2025,27 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
int i;
- struct list_head *queue, *iter, *iter2;
- struct dlm_lock *lock;
+ struct list_head *queue;
+ struct dlm_lock *lock, *next;
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
res->state |= DLM_LOCK_RES_RECOVERING;
- if (!list_empty(&res->recovering))
+ if (!list_empty(&res->recovering)) {
+ mlog(0,
+ "Recovering res %s:%.*s, is already on recovery list!\n",
+ dlm->name, res->lockname.len, res->lockname.name);
list_del_init(&res->recovering);
+ dlm_lockres_put(res);
+ }
+ /* We need to hold a reference while on the recovery list */
+ dlm_lockres_get(res);
list_add_tail(&res->recovering, &dlm->reco.resources);
/* find any pending locks and put them back on proper list */
for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {
queue = dlm_list_idx_to_ptr(res, i);
- list_for_each_safe(iter, iter2, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
+ list_for_each_entry_safe(lock, next, queue, list) {
dlm_lock_get(lock);
if (lock->convert_pending) {
/* move converting lock back to granted */
@@ -1675,23 +2110,27 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
u8 dead_node, u8 new_master)
{
int i;
- struct list_head *iter, *iter2, *bucket;
- struct dlm_lock_resource *res;
-
- mlog_entry_void();
+ struct hlist_head *bucket;
+ struct dlm_lock_resource *res, *next;
assert_spin_locked(&dlm->spinlock);
- list_for_each_safe(iter, iter2, &dlm->reco.resources) {
- res = list_entry (iter, struct dlm_lock_resource, recovering);
+ list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
if (res->owner == dead_node) {
+ mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->owner, new_master);
list_del_init(&res->recovering);
spin_lock(&res->spinlock);
+ /* new_master has our reference from
+ * the lock state sent during recovery */
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
- __dlm_dirty_lockres(dlm, res);
+ if (__dlm_lockres_has_locks(res))
+ __dlm_dirty_lockres(dlm, res);
spin_unlock(&res->spinlock);
wake_up(&res->wq);
+ dlm_lockres_put(res);
}
}
@@ -1699,33 +2138,33 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
* for now we need to run the whole hash, clear
* the RECOVERING state and set the owner
* if necessary */
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
- if (res->owner == dead_node) {
- mlog(0, "(this=%u) res %.*s owner=%u "
- "was not on recovering list, but "
- "clearing state anyway\n",
- dlm->node_num, res->lockname.len,
- res->lockname.name, new_master);
- } else if (res->owner == dlm->node_num) {
- mlog(0, "(this=%u) res %.*s owner=%u "
- "was not on recovering list, "
- "owner is THIS node, clearing\n",
- dlm->node_num, res->lockname.len,
- res->lockname.name, new_master);
- } else
- continue;
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = dlm_lockres_hash(dlm, i);
+ hlist_for_each_entry(res, bucket, hash_node) {
+ if (!(res->state & DLM_LOCK_RES_RECOVERING))
+ continue;
- spin_lock(&res->spinlock);
- dlm_change_lockres_owner(dlm, res, new_master);
- res->state &= ~DLM_LOCK_RES_RECOVERING;
- __dlm_dirty_lockres(dlm, res);
- spin_unlock(&res->spinlock);
- wake_up(&res->wq);
+ if (res->owner != dead_node &&
+ res->owner != dlm->node_num)
+ continue;
+
+ if (!list_empty(&res->recovering)) {
+ list_del_init(&res->recovering);
+ dlm_lockres_put(res);
}
+
+ /* new_master has our reference from
+ * the lock state sent during recovery */
+ mlog(0, "%s: res %.*s, Changing owner from %u to %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->owner, new_master);
+ spin_lock(&res->spinlock);
+ dlm_change_lockres_owner(dlm, res, new_master);
+ res->state &= ~DLM_LOCK_RES_RECOVERING;
+ if (__dlm_lockres_has_locks(res))
+ __dlm_dirty_lockres(dlm, res);
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
}
}
}
@@ -1744,7 +2183,7 @@ static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local)
static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 dead_node)
{
- struct list_head *iter, *queue;
+ struct list_head *queue;
struct dlm_lock *lock;
int blank_lvb = 0, local = 0;
int i;
@@ -1754,7 +2193,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
assert_spin_locked(&res->spinlock);
if (res->owner == dlm->node_num)
- /* if this node owned the lockres, and if the dead node
+ /* if this node owned the lockres, and if the dead node
* had an EX when he died, blank out the lvb */
search_node = dead_node;
else {
@@ -1766,8 +2205,7 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {
queue = dlm_list_idx_to_ptr(res, i);
- list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
+ list_for_each_entry(lock, queue, list) {
if (lock->ml.node == search_node) {
if (dlm_lvb_needs_invalidation(lock, local)) {
/* zero the lksb lvb and lockres lvb */
@@ -1788,37 +2226,64 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 dead_node)
{
- struct list_head *iter, *tmpiter;
- struct dlm_lock *lock;
+ struct dlm_lock *lock, *next;
+ unsigned int freed = 0;
/* this node is the lockres master:
* 1) remove any stale locks for the dead node
- * 2) if the dead node had an EX when he died, blank out the lvb
+ * 2) if the dead node had an EX when he died, blank out the lvb
*/
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&res->spinlock);
+ /* We do two dlm_lock_put(). One for removing from list and the other is
+ * to force the DLM_UNLOCK_FREE_LOCK action so as to free the locks */
+
/* TODO: check pending_asts, pending_basts here */
- list_for_each_safe(iter, tmpiter, &res->granted) {
- lock = list_entry (iter, struct dlm_lock, list);
+ list_for_each_entry_safe(lock, next, &res->granted, list) {
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
+ dlm_lock_put(lock);
+ freed++;
}
}
- list_for_each_safe(iter, tmpiter, &res->converting) {
- lock = list_entry (iter, struct dlm_lock, list);
+ list_for_each_entry_safe(lock, next, &res->converting, list) {
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
+ dlm_lock_put(lock);
+ freed++;
}
}
- list_for_each_safe(iter, tmpiter, &res->blocked) {
- lock = list_entry (iter, struct dlm_lock, list);
+ list_for_each_entry_safe(lock, next, &res->blocked, list) {
if (lock->ml.node == dead_node) {
list_del_init(&lock->list);
dlm_lock_put(lock);
+ /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
+ dlm_lock_put(lock);
+ freed++;
+ }
+ }
+
+ if (freed) {
+ mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+ "dropping ref from lockres\n", dlm->name,
+ res->lockname.len, res->lockname.name, freed, dead_node);
+ if(!test_bit(dead_node, res->refmap)) {
+ mlog(ML_ERROR, "%s:%.*s: freed %u locks for dead node %u, "
+ "but ref was not set\n", dlm->name,
+ res->lockname.len, res->lockname.name, freed, dead_node);
+ __dlm_print_one_lock_resource(res);
}
+ dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
+ } else if (test_bit(dead_node, res->refmap)) {
+ mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+ "no locks and had not purged before dying\n", dlm->name,
+ res->lockname.len, res->lockname.name, dead_node);
+ dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
}
/* do not kick thread yet */
@@ -1834,10 +2299,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
{
- struct list_head *iter;
struct dlm_lock_resource *res;
int i;
- struct list_head *bucket;
+ struct hlist_head *bucket;
struct dlm_lock *lock;
@@ -1858,10 +2322,9 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
* can be kicked again to see if any ASTs or BASTs
* need to be fired as a result.
*/
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = dlm_lockres_hash(dlm, i);
+ hlist_for_each_entry(res, bucket, hash_node) {
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name,
@@ -1880,15 +2343,31 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
}
spin_unlock(&res->spinlock);
continue;
- }
+ }
spin_lock(&res->spinlock);
/* zero the lvb if necessary */
dlm_revalidate_lvb(dlm, res, dead_node);
- if (res->owner == dead_node)
- dlm_move_lockres_to_recovery_list(dlm, res);
- else if (res->owner == dlm->node_num) {
+ if (res->owner == dead_node) {
+ if (res->state & DLM_LOCK_RES_DROPPING_REF) {
+ mlog(ML_NOTICE, "%s: res %.*s, Skip "
+ "recovery as it is being freed\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name);
+ } else
+ dlm_move_lockres_to_recovery_list(dlm,
+ res);
+
+ } else if (res->owner == dlm->node_num) {
dlm_free_dead_locks(dlm, res, dead_node);
__dlm_lockres_calc_usage(dlm, res);
+ } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ if (test_bit(dead_node, res->refmap)) {
+ mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+ "no locks and had not purged before dying\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, dead_node);
+ dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
+ }
}
spin_unlock(&res->spinlock);
}
@@ -1900,6 +2379,26 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
{
assert_spin_locked(&dlm->spinlock);
+ if (dlm->reco.new_master == idx) {
+ mlog(0, "%s: recovery master %d just died\n",
+ dlm->name, idx);
+ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+ /* finalize1 was reached, so it is safe to clear
+ * the new_master and dead_node. that recovery
+ * is complete. */
+ mlog(0, "%s: dead master %d had reached "
+ "finalize1 state, clearing\n", dlm->name, idx);
+ dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+ __dlm_reset_recovery(dlm);
+ }
+ }
+
+ /* Clean up join state on node death. */
+ if (dlm->joining_node == idx) {
+ mlog(0, "Clearing join state for node %u\n", idx);
+ __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
+ }
+
/* check to see if the node is already considered dead */
if (!test_bit(idx, dlm->live_nodes_map)) {
mlog(0, "for domain %s, node %d is already dead. "
@@ -1918,12 +2417,6 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
clear_bit(idx, dlm->live_nodes_map);
- /* Clean up join state on node death. */
- if (dlm->joining_node == idx) {
- mlog(0, "Clearing join state for node %u\n", idx);
- __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
- }
-
/* make sure local cleanup occurs before the heartbeat events */
if (!test_bit(idx, dlm->recovery_map))
dlm_do_local_recovery_cleanup(dlm, idx);
@@ -1933,6 +2426,7 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
mlog(0, "node %u being removed from domain map!\n", idx);
clear_bit(idx, dlm->domain_map);
+ clear_bit(idx, dlm->exit_domain_map);
/* wake up migration waiters if a node goes down.
* perhaps later we can genericize this for other waiters. */
wake_up(&dlm->migration_wq);
@@ -1951,6 +2445,13 @@ void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data)
if (!dlm_grab(dlm))
return;
+ /*
+ * This will notify any dlm users that a node in our domain
+ * went away without notifying us first.
+ */
+ if (test_bit(idx, dlm->domain_map))
+ dlm_fire_domain_eviction_callbacks(dlm, idx);
+
spin_lock(&dlm->spinlock);
__dlm_hb_node_down(dlm, idx);
spin_unlock(&dlm->spinlock);
@@ -1998,7 +2499,7 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
* this function on each node racing to become the recovery
* master will not stop attempting this until either:
* a) this node gets the EX (and becomes the recovery master),
- * or b) dlm->reco.new_master gets set to some nodenum
+ * or b) dlm->reco.new_master gets set to some nodenum
* != O2NM_INVALID_NODE_NUM (another node will do the reco).
* so each time a recovery master is needed, the entire cluster
* will sync at this point. if the new master dies, that will
@@ -2011,11 +2512,12 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
-again:
+again:
memset(&lksb, 0, sizeof(lksb));
ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
- DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+ DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+ dlm_reco_ast, dlm, dlm_reco_bast);
mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
dlm->name, ret, lksb.status);
@@ -2023,8 +2525,8 @@ again:
if (ret == DLM_NORMAL) {
mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
dlm->name, dlm->node_num);
-
- /* got the EX lock. check to see if another node
+
+ /* got the EX lock. check to see if another node
* just became the reco master */
if (dlm_reco_master_ready(dlm)) {
mlog(0, "%s: got reco EX lock, but %u will "
@@ -2032,6 +2534,30 @@ again:
dlm->reco.new_master);
status = -EEXIST;
} else {
+ status = 0;
+
+ /* see if recovery was already finished elsewhere */
+ spin_lock(&dlm->spinlock);
+ if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+ status = -EINVAL;
+ mlog(0, "%s: got reco EX lock, but "
+ "node got recovered already\n", dlm->name);
+ if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
+ mlog(ML_ERROR, "%s: new master is %u "
+ "but no dead node!\n",
+ dlm->name, dlm->reco.new_master);
+ BUG();
+ }
+ }
+ spin_unlock(&dlm->spinlock);
+ }
+
+ /* if this node has actually become the recovery master,
+ * set the master and send the messages to begin recovery */
+ if (!status) {
+ mlog(0, "%s: dead=%u, this=%u, sending "
+ "begin_reco now\n", dlm->name,
+ dlm->reco.dead_node, dlm->node_num);
status = dlm_send_begin_reco_message(dlm,
dlm->reco.dead_node);
/* this always succeeds */
@@ -2039,7 +2565,7 @@ again:
/* set the new_master to this node */
spin_lock(&dlm->spinlock);
- dlm->reco.new_master = dlm->node_num;
+ dlm_set_reco_master(dlm, dlm->node_num);
spin_unlock(&dlm->spinlock);
}
@@ -2063,7 +2589,7 @@ again:
mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
dlm->name, dlm->node_num);
/* another node is master. wait on
- * reco.new_master != O2NM_INVALID_NODE_NUM
+ * reco.new_master != O2NM_INVALID_NODE_NUM
* for at most one second */
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_reco_master_ready(dlm),
@@ -2077,6 +2603,10 @@ again:
mlog(0, "%s: reco master %u is ready to recover %u\n",
dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
status = -EEXIST;
+ } else if (ret == DLM_RECOVERING) {
+ mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
+ dlm->name, dlm->node_num);
+ goto again;
} else {
struct dlm_lock_resource *res;
@@ -2106,9 +2636,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
int nodenum;
int status;
- mlog_entry("%u\n", dead_node);
-
- mlog(0, "dead node is %u\n", dead_node);
+ mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
spin_lock(&dlm->spinlock);
dlm_node_iter_init(dlm->domain_map, &iter);
@@ -2143,17 +2671,32 @@ retry:
if (dlm_is_host_down(ret)) {
/* node is down. not involved in recovery
* so just keep going */
- mlog(0, "%s: node %u was down when sending "
+ mlog(ML_NOTICE, "%s: node %u was down when sending "
"begin reco msg (%d)\n", dlm->name, nodenum, ret);
ret = 0;
}
+
+ /*
+ * Prior to commit aad1b15310b9bcd59fa81ab8f2b1513b59553ea8,
+ * dlm_begin_reco_handler() returned EAGAIN and not -EAGAIN.
+ * We are handling both for compatibility reasons.
+ */
+ if (ret == -EAGAIN || ret == EAGAIN) {
+ mlog(0, "%s: trying to start recovery of node "
+ "%u, but node %u is waiting for last recovery "
+ "to complete, backoff for a bit\n", dlm->name,
+ dead_node, nodenum);
+ msleep(100);
+ goto retry;
+ }
if (ret < 0) {
struct dlm_lock_resource *res;
- /* this is now a serious problem, possibly ENOMEM
+
+ /* this is now a serious problem, possibly ENOMEM
* in the network stack. must retry */
mlog_errno(ret);
mlog(ML_ERROR, "begin reco of dlm %s to node %u "
- " returned %d\n", dlm->name, nodenum, ret);
+ "returned %d\n", dlm->name, nodenum, ret);
res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
DLM_RECOVERY_LOCK_NAME_LEN);
if (res) {
@@ -2162,7 +2705,7 @@ retry:
} else {
mlog(ML_ERROR, "recovery lock not found\n");
}
- /* sleep for a bit in hopes that we can avoid
+ /* sleep for a bit in hopes that we can avoid
* another ENOMEM */
msleep(100);
goto retry;
@@ -2172,7 +2715,8 @@ retry:
return ret;
}
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2181,8 +2725,21 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
if (!dlm_grab(dlm))
return 0;
- mlog(0, "node %u wants to recover node %u\n",
- br->node_idx, br->dead_node);
+ spin_lock(&dlm->spinlock);
+ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+ mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
+ "but this node is in finalize state, waiting on finalize2\n",
+ dlm->name, br->node_idx, br->dead_node,
+ dlm->reco.dead_node, dlm->reco.new_master);
+ spin_unlock(&dlm->spinlock);
+ dlm_put(dlm);
+ return -EAGAIN;
+ }
+ spin_unlock(&dlm->spinlock);
+
+ mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
+ dlm->name, br->node_idx, br->dead_node,
+ dlm->reco.dead_node, dlm->reco.new_master);
dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
@@ -2201,11 +2758,11 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
}
if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
- "node %u changing it to %u\n", dlm->name,
+ "node %u changing it to %u\n", dlm->name,
dlm->reco.dead_node, br->node_idx, br->dead_node);
}
- dlm->reco.new_master = br->node_idx;
- dlm->reco.dead_node = br->dead_node;
+ dlm_set_reco_master(dlm, br->node_idx);
+ dlm_set_reco_dead_node(dlm, br->dead_node);
if (!test_bit(br->dead_node, dlm->recovery_map)) {
mlog(0, "recovery master %u sees %u as dead, but this "
"node has not yet. marking %u as dead\n",
@@ -2215,16 +2772,25 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
- set_bit(br->dead_node, dlm->recovery_map);
+ /* force the recovery cleanup in __dlm_hb_node_down
+ * both of these will be cleared in a moment */
+ set_bit(br->dead_node, dlm->domain_map);
+ set_bit(br->dead_node, dlm->live_nodes_map);
__dlm_hb_node_down(dlm, br->dead_node);
}
spin_unlock(&dlm->spinlock);
dlm_kick_recovery_thread(dlm);
+
+ mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
+ dlm->name, br->node_idx, br->dead_node,
+ dlm->reco.dead_node, dlm->reco.new_master);
+
dlm_put(dlm);
return 0;
}
+#define DLM_FINALIZE_STAGE2 0x01
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
{
int ret = 0;
@@ -2232,54 +2798,72 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
struct dlm_node_iter iter;
int nodenum;
int status;
+ int stage = 1;
- mlog(0, "finishing recovery for node %s:%u\n",
- dlm->name, dlm->reco.dead_node);
+ mlog(0, "finishing recovery for node %s:%u, "
+ "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
spin_lock(&dlm->spinlock);
dlm_node_iter_init(dlm->domain_map, &iter);
spin_unlock(&dlm->spinlock);
+stage2:
memset(&fr, 0, sizeof(fr));
fr.node_idx = dlm->node_num;
fr.dead_node = dlm->reco.dead_node;
+ if (stage == 2)
+ fr.flags |= DLM_FINALIZE_STAGE2;
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
if (nodenum == dlm->node_num)
continue;
ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
&fr, sizeof(fr), nodenum, &status);
- if (ret >= 0) {
+ if (ret >= 0)
ret = status;
+ if (ret < 0) {
+ mlog(ML_ERROR, "Error %d when sending message %u (key "
+ "0x%x) to node %u\n", ret, DLM_FINALIZE_RECO_MSG,
+ dlm->key, nodenum);
if (dlm_is_host_down(ret)) {
- /* this has no effect on this recovery
- * session, so set the status to zero to
+ /* this has no effect on this recovery
+ * session, so set the status to zero to
* finish out the last recovery */
mlog(ML_ERROR, "node %u went down after this "
"node finished recovery.\n", nodenum);
ret = 0;
+ continue;
}
- }
- if (ret < 0) {
- mlog_errno(ret);
break;
}
}
+ if (stage == 1) {
+ /* reset the node_iter back to the top and send finalize2 */
+ iter.curnode = -1;
+ stage = 2;
+ goto stage2;
+ }
return ret;
}
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
+ int stage = 1;
/* ok to return 0, domain has gone away */
if (!dlm_grab(dlm))
return 0;
- mlog(0, "node %u finalizing recovery of node %u\n",
- fr->node_idx, fr->dead_node);
+ if (fr->flags & DLM_FINALIZE_STAGE2)
+ stage = 2;
+
+ mlog(0, "%s: node %u finalizing recovery stage%d of "
+ "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
+ fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
spin_lock(&dlm->spinlock);
@@ -2296,13 +2880,41 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
BUG();
}
- dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
-
- spin_unlock(&dlm->spinlock);
+ switch (stage) {
+ case 1:
+ dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
+ if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+ mlog(ML_ERROR, "%s: received finalize1 from "
+ "new master %u for dead node %u, but "
+ "this node has already received it!\n",
+ dlm->name, fr->node_idx, fr->dead_node);
+ dlm_print_reco_node_status(dlm);
+ BUG();
+ }
+ dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
+ spin_unlock(&dlm->spinlock);
+ break;
+ case 2:
+ if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
+ mlog(ML_ERROR, "%s: received finalize2 from "
+ "new master %u for dead node %u, but "
+ "this node did not have finalize1!\n",
+ dlm->name, fr->node_idx, fr->dead_node);
+ dlm_print_reco_node_status(dlm);
+ BUG();
+ }
+ dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+ __dlm_reset_recovery(dlm);
+ spin_unlock(&dlm->spinlock);
+ dlm_kick_recovery_thread(dlm);
+ break;
+ default:
+ BUG();
+ }
- dlm_reset_recovery(dlm);
+ mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
+ dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
- dlm_kick_recovery_thread(dlm);
dlm_put(dlm);
return 0;
}