aboutsummaryrefslogtreecommitdiff
path: root/fs/ocfs2/dlm/dlmmaster.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/dlm/dlmmaster.c')
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c526
1 files changed, 256 insertions, 270 deletions
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index f564b0e5f80..82abf0cc9a1 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -82,9 +82,9 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
return 1;
}
-static struct kmem_cache *dlm_lockres_cache = NULL;
-static struct kmem_cache *dlm_lockname_cache = NULL;
-static struct kmem_cache *dlm_mle_cache = NULL;
+static struct kmem_cache *dlm_lockres_cache;
+static struct kmem_cache *dlm_lockname_cache;
+static struct kmem_cache *dlm_mle_cache;
static void dlm_mle_release(struct kref *kref);
static void dlm_init_mle(struct dlm_master_list_entry *mle,
@@ -342,16 +342,13 @@ static int dlm_find_mle(struct dlm_ctxt *dlm,
{
struct dlm_master_list_entry *tmpmle;
struct hlist_head *bucket;
- struct hlist_node *list;
unsigned int hash;
assert_spin_locked(&dlm->master_lock);
hash = dlm_lockid_hash(name, namelen);
bucket = dlm_master_hash(dlm, hash);
- hlist_for_each(list, bucket) {
- tmpmle = hlist_entry(list, struct dlm_master_list_entry,
- master_hash_node);
+ hlist_for_each_entry(tmpmle, bucket, master_hash_node) {
if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
continue;
dlm_get_mle(tmpmle);
@@ -426,8 +423,6 @@ static void dlm_mle_release(struct kref *kref)
struct dlm_master_list_entry *mle;
struct dlm_ctxt *dlm;
- mlog_entry_void();
-
mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
dlm = mle->dlm;
@@ -477,11 +472,15 @@ bail:
void dlm_destroy_master_caches(void)
{
- if (dlm_lockname_cache)
+ if (dlm_lockname_cache) {
kmem_cache_destroy(dlm_lockname_cache);
+ dlm_lockname_cache = NULL;
+ }
- if (dlm_lockres_cache)
+ if (dlm_lockres_cache) {
kmem_cache_destroy(dlm_lockres_cache);
+ dlm_lockres_cache = NULL;
+ }
}
static void dlm_lockres_release(struct kref *kref)
@@ -582,6 +581,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
res->inflight_locks = 0;
+ res->inflight_assert_workers = 0;
res->dlm = dlm;
@@ -633,42 +633,94 @@ error:
return NULL;
}
-void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- int new_lockres,
- const char *file,
- int line)
+void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, int bit)
{
- if (!new_lockres)
- assert_spin_locked(&res->spinlock);
+ assert_spin_locked(&res->spinlock);
+
+ mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
+ res->lockname.name, bit, __builtin_return_address(0));
+
+ set_bit(bit, res->refmap);
+}
+
+void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, int bit)
+{
+ assert_spin_locked(&res->spinlock);
+
+ mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
+ res->lockname.name, bit, __builtin_return_address(0));
+
+ clear_bit(bit, res->refmap);
+}
+
+
+void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ assert_spin_locked(&res->spinlock);
- if (!test_bit(dlm->node_num, res->refmap)) {
- BUG_ON(res->inflight_locks != 0);
- dlm_lockres_set_refmap_bit(dlm->node_num, res);
- }
res->inflight_locks++;
- mlog(0, "%s:%.*s: inflight++: now %u\n",
- dlm->name, res->lockname.len, res->lockname.name,
- res->inflight_locks);
+
+ mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
+ res->lockname.len, res->lockname.name, res->inflight_locks,
+ __builtin_return_address(0));
}
-void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- const char *file,
- int line)
+void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
{
assert_spin_locked(&res->spinlock);
BUG_ON(res->inflight_locks == 0);
+
res->inflight_locks--;
- mlog(0, "%s:%.*s: inflight--: now %u\n",
- dlm->name, res->lockname.len, res->lockname.name,
- res->inflight_locks);
- if (res->inflight_locks == 0)
- dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+
+ mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
+ res->lockname.len, res->lockname.name, res->inflight_locks,
+ __builtin_return_address(0));
+
wake_up(&res->wq);
}
+void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ assert_spin_locked(&res->spinlock);
+ res->inflight_assert_workers++;
+ mlog(0, "%s:%.*s: inflight assert worker++: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_assert_workers);
+}
+
+static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ spin_lock(&res->spinlock);
+ __dlm_lockres_grab_inflight_worker(dlm, res);
+ spin_unlock(&res->spinlock);
+}
+
+static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ assert_spin_locked(&res->spinlock);
+ BUG_ON(res->inflight_assert_workers == 0);
+ res->inflight_assert_workers--;
+ mlog(0, "%s:%.*s: inflight assert worker--: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_assert_workers);
+}
+
+static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
+{
+ spin_lock(&res->spinlock);
+ __dlm_lockres_drop_inflight_worker(dlm, res);
+ spin_unlock(&res->spinlock);
+}
+
/*
* lookup a lock resource by name.
* may already exist in the hashtable.
@@ -699,7 +751,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
unsigned int hash;
int tries = 0;
int bit, wait_on_recovery = 0;
- int drop_inflight_if_nonlocal = 0;
BUG_ON(!lockid);
@@ -711,36 +762,33 @@ lookup:
spin_lock(&dlm->spinlock);
tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
if (tmpres) {
- int dropping_ref = 0;
-
spin_unlock(&dlm->spinlock);
-
spin_lock(&tmpres->spinlock);
- /* We wait for the other thread that is mastering the resource */
+ /* Wait on the thread that is mastering the resource */
if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
__dlm_wait_on_lockres(tmpres);
BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
+ spin_unlock(&tmpres->spinlock);
+ dlm_lockres_put(tmpres);
+ tmpres = NULL;
+ goto lookup;
}
- if (tmpres->owner == dlm->node_num) {
- BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
- dlm_lockres_grab_inflight_ref(dlm, tmpres);
- } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
- dropping_ref = 1;
- spin_unlock(&tmpres->spinlock);
-
- /* wait until done messaging the master, drop our ref to allow
- * the lockres to be purged, start over. */
- if (dropping_ref) {
- spin_lock(&tmpres->spinlock);
- __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+ /* Wait on the resource purge to complete before continuing */
+ if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
+ BUG_ON(tmpres->owner == dlm->node_num);
+ __dlm_wait_on_lockres_flags(tmpres,
+ DLM_LOCK_RES_DROPPING_REF);
spin_unlock(&tmpres->spinlock);
dlm_lockres_put(tmpres);
tmpres = NULL;
goto lookup;
}
- mlog(0, "found in hash!\n");
+ /* Grab inflight ref to pin the resource */
+ dlm_lockres_grab_inflight_ref(dlm, tmpres);
+
+ spin_unlock(&tmpres->spinlock);
if (res)
dlm_lockres_put(res);
res = tmpres;
@@ -810,7 +858,7 @@ lookup:
dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
mle = NULL;
- /* this is lame, but we cant wait on either
+ /* this is lame, but we can't wait on either
* the mle or lockres waitqueue here */
if (mig)
msleep(100);
@@ -831,8 +879,8 @@ lookup:
* but they might own this lockres. wait on them. */
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
- mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
- "recover before lock mastery can begin\n",
+ mlog(0, "%s: res %.*s, At least one node (%d) "
+ "to recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
}
@@ -845,12 +893,11 @@ lookup:
/* finally add the lockres to its hash bucket */
__dlm_insert_lockres(dlm, res);
- /* since this lockres is new it doesnt not require the spinlock */
- dlm_lockres_grab_inflight_ref_new(dlm, res);
- /* if this node does not become the master make sure to drop
- * this inflight reference below */
- drop_inflight_if_nonlocal = 1;
+ /* Grab inflight ref to pin the resource */
+ spin_lock(&res->spinlock);
+ dlm_lockres_grab_inflight_ref(dlm, res);
+ spin_unlock(&res->spinlock);
/* get an extra ref on the mle in case this is a BLOCK
* if so, the creator of the BLOCK may try to put the last
@@ -866,8 +913,8 @@ redo_request:
* dlm spinlock would be detectable be a change on the mle,
* so we only need to clear out the recovery map once. */
if (dlm_is_recovery_lock(lockid, namelen)) {
- mlog(ML_NOTICE, "%s: recovery map is not empty, but "
- "must master $RECOVERY lock now\n", dlm->name);
+ mlog(0, "%s: Recovery map is not empty, but must "
+ "master $RECOVERY lock now\n", dlm->name);
if (!dlm_pre_master_reco_lockres(dlm, res))
wait_on_recovery = 0;
else {
@@ -885,8 +932,8 @@ redo_request:
spin_lock(&dlm->spinlock);
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
- mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
- "recover before lock mastery can begin\n",
+ mlog(0, "%s: res %.*s, At least one node (%d) "
+ "to recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
} else
@@ -915,8 +962,8 @@ redo_request:
* yet, keep going until it does. this is how the
* master will know that asserts are needed back to
* the lower nodes. */
- mlog(0, "%s:%.*s: requests only up to %u but master "
- "is %u, keep going\n", dlm->name, namelen,
+ mlog(0, "%s: res %.*s, Requests only up to %u but "
+ "master is %u, keep going\n", dlm->name, namelen,
lockid, nodenum, mle->master);
}
}
@@ -926,13 +973,12 @@ wait:
ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
if (ret < 0) {
wait_on_recovery = 1;
- mlog(0, "%s:%.*s: node map changed, redo the "
- "master request now, blocked=%d\n",
- dlm->name, res->lockname.len,
+ mlog(0, "%s: res %.*s, Node map changed, redo the master "
+ "request now, blocked=%d\n", dlm->name, res->lockname.len,
res->lockname.name, blocked);
if (++tries > 20) {
- mlog(ML_ERROR, "%s:%.*s: spinning on "
- "dlm_wait_for_lock_mastery, blocked=%d\n",
+ mlog(ML_ERROR, "%s: res %.*s, Spinning on "
+ "dlm_wait_for_lock_mastery, blocked = %d\n",
dlm->name, res->lockname.len,
res->lockname.name, blocked);
dlm_print_one_lock_resource(res);
@@ -942,7 +988,8 @@ wait:
goto redo_request;
}
- mlog(0, "lockres mastered by %u\n", res->owner);
+ mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
+ res->lockname.name, res->owner);
/* make sure we never continue without this */
BUG_ON(res->owner == O2NM_MAX_NODES);
@@ -954,8 +1001,6 @@ wait:
wake_waiters:
spin_lock(&res->spinlock);
- if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
- dlm_lockres_drop_inflight_ref(dlm, res);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
@@ -1428,9 +1473,7 @@ way_up_top:
}
if (res->owner == dlm->node_num) {
- mlog(0, "%s:%.*s: setting bit %u in refmap\n",
- dlm->name, namelen, name, request->node_idx);
- dlm_lockres_set_refmap_bit(request->node_idx, res);
+ dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
spin_unlock(&res->spinlock);
response = DLM_MASTER_RESP_YES;
if (mle)
@@ -1495,10 +1538,8 @@ way_up_top:
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
- dlm_lockres_set_refmap_bit(request->node_idx, res);
- mlog(0, "%s:%.*s: setting bit %u in refmap\n",
- dlm->name, namelen, name,
- request->node_idx);
+ dlm_lockres_set_refmap_bit(dlm, res,
+ request->node_idx);
} else
response = DLM_MASTER_RESP_NO;
} else {
@@ -1600,7 +1641,8 @@ send_response:
mlog(ML_ERROR, "failed to dispatch assert master work\n");
response = DLM_MASTER_RESP_ERROR;
dlm_lockres_put(res);
- }
+ } else
+ dlm_lockres_grab_inflight_worker(dlm, res);
} else {
if (res)
dlm_lockres_put(res);
@@ -1704,7 +1746,7 @@ again:
"lockres, set the bit in the refmap\n",
namelen, lockname, to);
spin_lock(&res->spinlock);
- dlm_lockres_set_refmap_bit(to, res);
+ dlm_lockres_set_refmap_bit(dlm, res, to);
spin_unlock(&res->spinlock);
}
}
@@ -1886,8 +1928,10 @@ ok:
* up nodes that this node contacted */
while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
nn+1)) < O2NM_MAX_NODES) {
- if (nn != dlm->node_num && nn != assert->node_idx)
+ if (nn != dlm->node_num && nn != assert->node_idx) {
master_request = 1;
+ break;
+ }
}
}
mle->master = assert->node_idx;
@@ -2018,7 +2062,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
int ignore_higher, u8 request_from, u32 flags)
{
struct dlm_work_item *item;
- item = kzalloc(sizeof(*item), GFP_NOFS);
+ item = kzalloc(sizeof(*item), GFP_ATOMIC);
if (!item)
return -ENOMEM;
@@ -2113,6 +2157,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
dlm_lockres_release_ast(dlm, res);
put:
+ dlm_lockres_drop_inflight_worker(dlm, res);
+
dlm_lockres_put(res);
mlog(0, "finished with dlm_assert_master_worker\n");
@@ -2189,8 +2235,6 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
- mlog(0, "%s:%.*s: sending deref to %d\n",
- dlm->name, namelen, lockname, res->owner);
memset(&deref, 0, sizeof(deref));
deref.node_idx = dlm->node_num;
deref.namelen = namelen;
@@ -2199,14 +2243,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
&deref, sizeof(deref), res->owner, &r);
if (ret < 0)
- mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
- "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key,
- res->owner);
+ mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
+ dlm->name, namelen, lockname, ret, res->owner);
else if (r < 0) {
/* BAD. other node says I did not have a ref. */
- mlog(ML_ERROR,"while dropping ref on %s:%.*s "
- "(master=%u) got %d.\n", dlm->name, namelen,
- lockname, res->owner, r);
+ mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
+ dlm->name, namelen, lockname, res->owner, r);
dlm_print_one_lock_resource(res);
BUG();
}
@@ -2262,7 +2304,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
else {
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
- dlm_lockres_clear_refmap_bit(node, res);
+ dlm_lockres_clear_refmap_bit(dlm, res, node);
cleared = 1;
}
}
@@ -2322,7 +2364,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
if (test_bit(node, res->refmap)) {
__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
- dlm_lockres_clear_refmap_bit(node, res);
+ dlm_lockres_clear_refmap_bit(dlm, res, node);
cleared = 1;
}
spin_unlock(&res->spinlock);
@@ -2341,55 +2383,59 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
dlm_lockres_put(res);
}
-/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
- * if not. If 0, numlocks is set to the number of locks in the lockres.
+/*
+ * A migrateable resource is one that is :
+ * 1. locally mastered, and,
+ * 2. zero local locks, and,
+ * 3. one or more non-local locks, or, one or more references
+ * Returns 1 if yes, 0 if not.
*/
static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- int *numlocks)
+ struct dlm_lock_resource *res)
{
- int ret;
- int i;
- int count = 0;
+ enum dlm_lockres_list idx;
+ int nonlocal = 0, node_ref;
struct list_head *queue;
struct dlm_lock *lock;
+ u64 cookie;
assert_spin_locked(&res->spinlock);
- ret = -EINVAL;
- if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
- mlog(0, "cannot migrate lockres with unknown owner!\n");
- goto leave;
- }
+ /* delay migration when the lockres is in MIGRATING state */
+ if (res->state & DLM_LOCK_RES_MIGRATING)
+ return 0;
- if (res->owner != dlm->node_num) {
- mlog(0, "cannot migrate lockres this node doesn't own!\n");
- goto leave;
- }
+ if (res->owner != dlm->node_num)
+ return 0;
- ret = 0;
- queue = &res->granted;
- for (i = 0; i < 3; i++) {
+ for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
+ queue = dlm_list_idx_to_ptr(res, idx);
list_for_each_entry(lock, queue, list) {
- ++count;
- if (lock->ml.node == dlm->node_num) {
- mlog(0, "found a lock owned by this node still "
- "on the %s queue! will not migrate this "
- "lockres\n", (i == 0 ? "granted" :
- (i == 1 ? "converting" :
- "blocked")));
- ret = -ENOTEMPTY;
- goto leave;
+ if (lock->ml.node != dlm->node_num) {
+ nonlocal++;
+ continue;
}
+ cookie = be64_to_cpu(lock->ml.cookie);
+ mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on "
+ "%s list\n", dlm->name, res->lockname.len,
+ res->lockname.name,
+ dlm_get_lock_cookie_node(cookie),
+ dlm_get_lock_cookie_seq(cookie),
+ dlm_list_in_text(idx));
+ return 0;
}
- queue++;
}
- *numlocks = count;
- mlog(0, "migrateable lockres having %d locks\n", *numlocks);
+ if (!nonlocal) {
+ node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+ if (node_ref >= O2NM_MAX_NODES)
+ return 0;
+ }
-leave:
- return ret;
+ mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len,
+ res->lockname.name);
+
+ return 1;
}
/*
@@ -2398,8 +2444,7 @@ leave:
static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 target)
+ struct dlm_lock_resource *res, u8 target)
{
struct dlm_master_list_entry *mle = NULL;
struct dlm_master_list_entry *oldmle = NULL;
@@ -2408,39 +2453,20 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
const char *name;
unsigned int namelen;
int mle_added = 0;
- int numlocks;
int wake = 0;
if (!dlm_grab(dlm))
return -EINVAL;
+ BUG_ON(target == O2NM_MAX_NODES);
+
name = res->lockname.name;
namelen = res->lockname.len;
- mlog(0, "migrating %.*s to %u\n", namelen, name, target);
-
- /*
- * ensure this lockres is a proper candidate for migration
- */
- spin_lock(&res->spinlock);
- ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
- if (ret < 0) {
- spin_unlock(&res->spinlock);
- goto leave;
- }
- spin_unlock(&res->spinlock);
-
- /* no work to do */
- if (numlocks == 0) {
- mlog(0, "no locks were found on this lockres! done!\n");
- goto leave;
- }
-
- /*
- * preallocate up front
- * if this fails, abort
- */
+ mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
+ target);
+ /* preallocate up front. if this fails, abort */
ret = -ENOMEM;
mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
if (!mres) {
@@ -2456,35 +2482,10 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
ret = 0;
/*
- * find a node to migrate the lockres to
- */
-
- mlog(0, "picking a migration node\n");
- spin_lock(&dlm->spinlock);
- /* pick a new node */
- if (!test_bit(target, dlm->domain_map) ||
- target >= O2NM_MAX_NODES) {
- target = dlm_pick_migration_target(dlm, res);
- }
- mlog(0, "node %u chosen for migration\n", target);
-
- if (target >= O2NM_MAX_NODES ||
- !test_bit(target, dlm->domain_map)) {
- /* target chosen is not alive */
- ret = -EINVAL;
- }
-
- if (ret) {
- spin_unlock(&dlm->spinlock);
- goto fail;
- }
-
- mlog(0, "continuing with target = %u\n", target);
-
- /*
* clear any existing master requests and
* add the migration mle to the list
*/
+ spin_lock(&dlm->spinlock);
spin_lock(&dlm->master_lock);
ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
namelen, target, dlm->node_num);
@@ -2525,6 +2526,7 @@ fail:
dlm_put_mle(mle);
} else if (mle) {
kmem_cache_free(dlm_mle_cache, mle);
+ mle = NULL;
}
goto leave;
}
@@ -2568,6 +2570,9 @@ fail:
res->state &= ~DLM_LOCK_RES_MIGRATING;
wake = 1;
spin_unlock(&res->spinlock);
+ if (dlm_is_host_down(ret))
+ dlm_wait_for_node_death(dlm, target,
+ DLM_NODE_DEATH_WAIT_MAX);
goto leave;
}
@@ -2643,69 +2648,52 @@ leave:
if (wake)
wake_up(&res->wq);
- /* TODO: cleanup */
if (mres)
free_page((unsigned long)mres);
dlm_put(dlm);
- mlog(0, "returning %d\n", ret);
+ mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
+ name, target, ret);
return ret;
}
#define DLM_MIGRATION_RETRY_MS 100
-/* Should be called only after beginning the domain leave process.
+/*
+ * Should be called only after beginning the domain leave process.
* There should not be any remaining locks on nonlocal lock resources,
* and there should be no local locks left on locally mastered resources.
*
* Called with the dlm spinlock held, may drop it to do migration, but
* will re-acquire before exit.
*
- * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
+ * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
+ */
int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
{
int ret;
int lock_dropped = 0;
- int numlocks;
+ u8 target = O2NM_MAX_NODES;
+
+ assert_spin_locked(&dlm->spinlock);
spin_lock(&res->spinlock);
- if (res->owner != dlm->node_num) {
- if (!__dlm_lockres_unused(res)) {
- mlog(ML_ERROR, "%s:%.*s: this node is not master, "
- "trying to free this but locks remain\n",
- dlm->name, res->lockname.len, res->lockname.name);
- }
- spin_unlock(&res->spinlock);
- goto leave;
- }
+ if (dlm_is_lockres_migrateable(dlm, res))
+ target = dlm_pick_migration_target(dlm, res);
+ spin_unlock(&res->spinlock);
- /* No need to migrate a lockres having no locks */
- ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
- if (ret >= 0 && numlocks == 0) {
- spin_unlock(&res->spinlock);
+ if (target == O2NM_MAX_NODES)
goto leave;
- }
- spin_unlock(&res->spinlock);
/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
spin_unlock(&dlm->spinlock);
lock_dropped = 1;
- while (1) {
- ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
- if (ret >= 0)
- break;
- if (ret == -ENOTEMPTY) {
- mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
- res->lockname.len, res->lockname.name);
- BUG();
- }
-
- mlog(0, "lockres %.*s: migrate failed, "
- "retrying\n", res->lockname.len,
- res->lockname.name);
- msleep(DLM_MIGRATION_RETRY_MS);
- }
+ ret = dlm_migrate_lockres(dlm, res, target);
+ if (ret)
+ mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ target, ret);
spin_lock(&dlm->spinlock);
leave:
return lock_dropped;
@@ -2862,7 +2850,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
BUG_ON(!list_empty(&lock->bast_list));
BUG_ON(lock->ast_pending);
BUG_ON(lock->bast_pending);
- dlm_lockres_clear_refmap_bit(lock->ml.node, res);
+ dlm_lockres_clear_refmap_bit(dlm, res,
+ lock->ml.node);
list_del_init(&lock->list);
dlm_lock_put(lock);
/* In a normal unlock, we would have added a
@@ -2883,61 +2872,61 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
mlog(0, "%s:%.*s: node %u had a ref to this "
"migrating lockres, clearing\n", dlm->name,
res->lockname.len, res->lockname.name, bit);
- dlm_lockres_clear_refmap_bit(bit, res);
+ dlm_lockres_clear_refmap_bit(dlm, res, bit);
}
bit++;
}
}
-/* for now this is not too intelligent. we will
- * need stats to make this do the right thing.
- * this just finds the first lock on one of the
- * queues and uses that node as the target. */
+/*
+ * Pick a node to migrate the lock resource to. This function selects a
+ * potential target based first on the locks and then on refmap. It skips
+ * nodes that are in the process of exiting the domain.
+ */
static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
- int i;
+ enum dlm_lockres_list idx;
struct list_head *queue = &res->granted;
struct dlm_lock *lock;
- int nodenum;
+ int noderef;
+ u8 nodenum = O2NM_MAX_NODES;
assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&res->spinlock);
- spin_lock(&res->spinlock);
- for (i=0; i<3; i++) {
+ /* Go through all the locks */
+ for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) {
+ queue = dlm_list_idx_to_ptr(res, idx);
list_for_each_entry(lock, queue, list) {
- /* up to the caller to make sure this node
- * is alive */
- if (lock->ml.node != dlm->node_num) {
- spin_unlock(&res->spinlock);
- return lock->ml.node;
- }
+ if (lock->ml.node == dlm->node_num)
+ continue;
+ if (test_bit(lock->ml.node, dlm->exit_domain_map))
+ continue;
+ nodenum = lock->ml.node;
+ goto bail;
}
- queue++;
}
- spin_unlock(&res->spinlock);
- mlog(0, "have not found a suitable target yet! checking domain map\n");
- /* ok now we're getting desperate. pick anyone alive. */
- nodenum = -1;
+ /* Go thru the refmap */
+ noderef = -1;
while (1) {
- nodenum = find_next_bit(dlm->domain_map,
- O2NM_MAX_NODES, nodenum+1);
- mlog(0, "found %d in domain map\n", nodenum);
- if (nodenum >= O2NM_MAX_NODES)
+ noderef = find_next_bit(res->refmap, O2NM_MAX_NODES,
+ noderef + 1);
+ if (noderef >= O2NM_MAX_NODES)
break;
- if (nodenum != dlm->node_num) {
- mlog(0, "picking %d\n", nodenum);
- return nodenum;
- }
+ if (noderef == dlm->node_num)
+ continue;
+ if (test_bit(noderef, dlm->exit_domain_map))
+ continue;
+ nodenum = noderef;
+ goto bail;
}
- mlog(0, "giving up. no master to migrate to\n");
- return DLM_LOCK_RES_OWNER_UNKNOWN;
+bail:
+ return nodenum;
}
-
-
/* this is called by the new master once all lockres
* data has been received */
static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
@@ -2976,9 +2965,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
&migrate, sizeof(migrate), nodenum,
&status);
if (ret < 0) {
- mlog(ML_ERROR, "Error %d when sending message %u (key "
- "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG,
- dlm->key, nodenum);
+ mlog(ML_ERROR, "%s: res %.*s, Error %d send "
+ "MIGRATE_REQUEST to node %u\n", dlm->name,
+ migrate.namelen, migrate.name, ret, nodenum);
if (!dlm_is_host_down(ret)) {
mlog(ML_ERROR, "unhandled error=%d!\n", ret);
BUG();
@@ -2997,7 +2986,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
dlm->name, res->lockname.len, res->lockname.name,
nodenum);
spin_lock(&res->spinlock);
- dlm_lockres_set_refmap_bit(nodenum, res);
+ dlm_lockres_set_refmap_bit(dlm, res, nodenum);
spin_unlock(&res->spinlock);
}
}
@@ -3106,8 +3095,6 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
*oldmle = NULL;
- mlog_entry_void();
-
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&dlm->master_lock);
@@ -3142,11 +3129,15 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
/* remove it so that only one mle will be found */
__dlm_unlink_mle(dlm, tmp);
__dlm_mle_detach_hb_events(dlm, tmp);
- ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
- mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
- "telling master to get ref for cleared out mle "
- "during migration\n", dlm->name, namelen, name,
- master, new_master);
+ if (tmp->type == DLM_MLE_MASTER) {
+ ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
+ mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
+ "telling master to get ref "
+ "for cleared out mle during "
+ "migration\n", dlm->name,
+ namelen, name, master,
+ new_master);
+ }
}
spin_unlock(&tmp->spinlock);
}
@@ -3244,10 +3235,10 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
struct dlm_master_list_entry *mle;
struct dlm_lock_resource *res;
struct hlist_head *bucket;
- struct hlist_node *list;
+ struct hlist_node *tmp;
unsigned int i;
- mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
+ mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
top:
assert_spin_locked(&dlm->spinlock);
@@ -3255,10 +3246,7 @@ top:
spin_lock(&dlm->master_lock);
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = dlm_master_hash(dlm, i);
- hlist_for_each(list, bucket) {
- mle = hlist_entry(list, struct dlm_master_list_entry,
- master_hash_node);
-
+ hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
BUG_ON(mle->type != DLM_MLE_BLOCK &&
mle->type != DLM_MLE_MASTER &&
mle->type != DLM_MLE_MIGRATION);
@@ -3333,7 +3321,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
* mastery reference here since old_master will briefly have
* a reference after the migration completes */
spin_lock(&res->spinlock);
- dlm_lockres_set_refmap_bit(old_master, res);
+ dlm_lockres_set_refmap_bit(dlm, res, old_master);
spin_unlock(&res->spinlock);
mlog(0, "now time to do a migrate request to other nodes\n");
@@ -3439,7 +3427,7 @@ void dlm_force_free_mles(struct dlm_ctxt *dlm)
int i;
struct hlist_head *bucket;
struct dlm_master_list_entry *mle;
- struct hlist_node *tmp, *list;
+ struct hlist_node *tmp;
/*
* We notified all other nodes that we are exiting the domain and
@@ -3455,9 +3443,7 @@ void dlm_force_free_mles(struct dlm_ctxt *dlm)
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
bucket = dlm_master_hash(dlm, i);
- hlist_for_each_safe(list, tmp, bucket) {
- mle = hlist_entry(list, struct dlm_master_list_entry,
- master_hash_node);
+ hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) {
if (mle->type != DLM_MLE_BLOCK) {
mlog(ML_ERROR, "bad mle: %p\n", mle);
dlm_print_one_mle(mle);