aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSunil Mushran <sunil.mushran@oracle.com>2007-01-29 15:19:16 -0800
committerMark Fasheh <mark.fasheh@oracle.com>2007-02-07 12:08:13 -0800
commitf3f854648de64c4b6f13f6f13113bc9525c621e5 (patch)
treeaeacd37bd30e2f416cb5eef88d2e2bc00b457f88
parentab81afd30bc154bb1e8749e5aeeffe9b93c90834 (diff)
ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres
Eventhough the set refmap bit message is sent before the clear refmap message, currently there is no guarentee that the set message will be handled before the clear. This patch prevents the clear refmap to be processed while the node is sending assert master messages to other nodes. (The set refmap message is sent as a response to the assert master request). Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com> Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h6
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c99
2 files changed, 94 insertions, 11 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 3f554711efe..2f4f5d4edb0 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -180,6 +180,11 @@ struct dlm_assert_master_priv
unsigned ignore_higher:1;
};
+struct dlm_deref_lockres_priv
+{
+ struct dlm_lock_resource *deref_res;
+ u8 deref_node;
+};
struct dlm_work_item
{
@@ -191,6 +196,7 @@ struct dlm_work_item
struct dlm_request_all_locks_priv ral;
struct dlm_mig_lockres_priv ml;
struct dlm_assert_master_priv am;
+ struct dlm_deref_lockres_priv dl;
} u;
};
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 84f36db8ada..77e4e6169a0 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -102,6 +102,7 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
static int dlm_do_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
@@ -1717,6 +1718,11 @@ int dlm_do_assert_master(struct dlm_ctxt *dlm,
unsigned int namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+
again:
reassert = 0;
@@ -1789,6 +1795,11 @@ again:
if (reassert)
goto again;
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
return ret;
}
@@ -2296,6 +2307,9 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
int ret = -EINVAL;
u8 node;
unsigned int hash;
+ struct dlm_work_item *item;
+ int cleared = 0;
+ int dispatch = 0;
if (!dlm_grab(dlm))
return 0;
@@ -2326,27 +2340,90 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
spin_unlock(&dlm->spinlock);
spin_lock(&res->spinlock);
- BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
- if (test_bit(node, res->refmap)) {
- ret = 0;
- dlm_lockres_clear_refmap_bit(node, res);
- } else {
- mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
- "but it is already dropped!\n", dlm->name, namelen,
- name, node);
- __dlm_print_one_lock_resource(res);
+ if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+ dispatch = 1;
+ else {
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
}
spin_unlock(&res->spinlock);
- if (!ret)
- dlm_lockres_calc_usage(dlm, res);
+ if (!dispatch) {
+ if (cleared)
+ dlm_lockres_calc_usage(dlm, res);
+ else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+ ret = 0;
+ goto done;
+ }
+
+ item = kzalloc(sizeof(*item), GFP_NOFS);
+ if (!item) {
+ ret = -ENOMEM;
+ mlog_errno(ret);
+ goto done;
+ }
+
+ dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+ item->u.dl.deref_res = res;
+ item->u.dl.deref_node = node;
+
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+ return 0;
+
done:
if (res)
dlm_lockres_put(res);
dlm_put(dlm);
+
return ret;
}
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+ struct dlm_ctxt *dlm;
+ struct dlm_lock_resource *res;
+ u8 node;
+ u8 cleared = 0;
+
+ dlm = item->dlm;
+ res = item->u.dl.deref_res;
+ node = item->u.dl.deref_node;
+
+ spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ spin_unlock(&res->spinlock);
+
+ if (cleared) {
+ mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+ dlm->name, res->lockname.len, res->lockname.name, node);
+ dlm_lockres_calc_usage(dlm, res);
+ } else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+
+ dlm_lockres_put(res);
+}
+
/*
* DLM_MIGRATE_LOCKRES