aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@g5.osdl.org>2006-06-26 16:06:08 -0700
committerLinus Torvalds <torvalds@g5.osdl.org>2006-06-26 16:06:08 -0700
commiteb99adde31b7d85c67a5e1c2fa5e098e1056dd79 (patch)
tree05d61251361c04f3afa64c5b1fe162558724d345
parentf6e6e883730aff2718610d3eba7608fcf73328ed (diff)
parent3fb5a9891dbb553dda96783dbc0dc4e77cbb2529 (diff)
Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2
* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2: (56 commits) [PATCH] fs/ocfs2/dlm/: cleanups ocfs2: fix compiler warnings in dlm_convert_lock_handler() ocfs2: dlm_print_one_mle() needs to be defined ocfs2: remove whitespace in dlmunlock.c ocfs2: move dlm work to a private work queue ocfs2: fix incorrect error returns ocfs2: tune down some noisy messages during dlm recovery ocfs2: display message before waiting for recovery to complete ocfs2: mlog in dlm_convert_lock_handler() should be ML_ERROR ocfs2: retry operations when a lock is marked in recovery ocfs2: use cond_resched() in dlm_thread() ocfs2: use GFP_NOFS in some dlm operations ocfs2: wait for recovery when starting lock mastery ocfs2: continue recovery when a dead node is encountered ocfs2: remove unneccesary spin_unlock() in dlm_remaster_locks() ocfs2: dlm_remaster_locks() should never exit without completing ocfs2: special case recovery lock in dlmlock_remote() ocfs2: pending mastery asserts and migrations should block each other ocfs2: temporarily disable automatic lock migration ocfs2: do not unconditionally purge the lockres in dlmlock_remote() ...
-rw-r--r--fs/ocfs2/dlm/dlmast.c12
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h63
-rw-r--r--fs/ocfs2/dlm/dlmconvert.c24
-rw-r--r--fs/ocfs2/dlm/dlmdebug.c6
-rw-r--r--fs/ocfs2/dlm/dlmdebug.h30
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c101
-rw-r--r--fs/ocfs2/dlm/dlmfs.c6
-rw-r--r--fs/ocfs2/dlm/dlmlock.c68
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c448
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c580
-rw-r--r--fs/ocfs2/dlm/dlmthread.c68
-rw-r--r--fs/ocfs2/dlm/dlmunlock.c10
-rw-r--r--fs/ocfs2/dlm/userdlm.c2
13 files changed, 1045 insertions, 373 deletions
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 87ee29cad50..42775e2bbe2 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -197,12 +197,14 @@ static void dlm_update_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
lock->ml.node == dlm->node_num ? "master" :
"remote");
memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
- } else if (lksb->flags & DLM_LKSB_PUT_LVB) {
- mlog(0, "setting lvb from lockres for %s node\n",
- lock->ml.node == dlm->node_num ? "master" :
- "remote");
- memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
}
+ /* Do nothing for lvb put requests - they should be done in
+ * place when the lock is downconverted - otherwise we risk
+ * racing gets and puts which could result in old lvb data
+ * being propagated. We leave the put flag set and clear it
+ * here. In the future we might want to clear it at the time
+ * the put is actually done.
+ */
spin_unlock(&res->spinlock);
}
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 88cc43df18f..9bdc9cf6599 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -37,7 +37,17 @@
#define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes
#define DLM_THREAD_MS 200 // flush at least every 200 ms
-#define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head))
+#define DLM_HASH_SIZE_DEFAULT (1 << 14)
+#if DLM_HASH_SIZE_DEFAULT < PAGE_SIZE
+# define DLM_HASH_PAGES 1
+#else
+# define DLM_HASH_PAGES (DLM_HASH_SIZE_DEFAULT / PAGE_SIZE)
+#endif
+#define DLM_BUCKETS_PER_PAGE (PAGE_SIZE / sizeof(struct hlist_head))
+#define DLM_HASH_BUCKETS (DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE)
+
+/* Intended to make it easier for us to switch out hash functions */
+#define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l)
enum dlm_ast_type {
DLM_AST = 0,
@@ -61,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len)
return 0;
}
-#define DLM_RECO_STATE_ACTIVE 0x0001
+#define DLM_RECO_STATE_ACTIVE 0x0001
+#define DLM_RECO_STATE_FINALIZE 0x0002
struct dlm_recovery_ctxt
{
@@ -85,7 +96,7 @@ enum dlm_ctxt_state {
struct dlm_ctxt
{
struct list_head list;
- struct hlist_head *lockres_hash;
+ struct hlist_head **lockres_hash;
struct list_head dirty_list;
struct list_head purge_list;
struct list_head pending_asts;
@@ -120,6 +131,7 @@ struct dlm_ctxt
struct o2hb_callback_func dlm_hb_down;
struct task_struct *dlm_thread_task;
struct task_struct *dlm_reco_thread_task;
+ struct workqueue_struct *dlm_worker;
wait_queue_head_t dlm_thread_wq;
wait_queue_head_t dlm_reco_thread_wq;
wait_queue_head_t ast_wq;
@@ -132,6 +144,11 @@ struct dlm_ctxt
struct list_head dlm_eviction_callbacks;
};
+static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i)
+{
+ return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE);
+}
+
/* these keventd work queue items are for less-frequently
* called functions that cannot be directly called from the
* net message handlers for some reason, usually because
@@ -216,20 +233,29 @@ struct dlm_lock_resource
/* WARNING: Please see the comment in dlm_init_lockres before
* adding fields here. */
struct hlist_node hash_node;
+ struct qstr lockname;
struct kref refs;
- /* please keep these next 3 in this order
- * some funcs want to iterate over all lists */
+ /*
+ * Please keep granted, converting, and blocked in this order,
+ * as some funcs want to iterate over all lists.
+ *
+ * All four lists are protected by the hash's reference.
+ */
struct list_head granted;
struct list_head converting;
struct list_head blocked;
+ struct list_head purge;
+ /*
+ * These two lists require you to hold an additional reference
+ * while they are on the list.
+ */
struct list_head dirty;
struct list_head recovering; // dlm_recovery_ctxt.resources list
/* unused lock resources have their last_used stamped and are
* put on a list for the dlm thread to run. */
- struct list_head purge;
unsigned long last_used;
unsigned migration_pending:1;
@@ -238,7 +264,6 @@ struct dlm_lock_resource
wait_queue_head_t wq;
u8 owner; //node which owns the lock resource, or unknown
u16 state;
- struct qstr lockname;
char lvb[DLM_LVB_LEN];
};
@@ -300,6 +325,15 @@ enum dlm_lockres_list {
DLM_BLOCKED_LIST
};
+static inline int dlm_lvb_is_empty(char *lvb)
+{
+ int i;
+ for (i=0; i<DLM_LVB_LEN; i++)
+ if (lvb[i])
+ return 0;
+ return 1;
+}
+
static inline struct list_head *
dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx)
{
@@ -609,7 +643,8 @@ struct dlm_finalize_reco
{
u8 node_idx;
u8 dead_node;
- __be16 pad1;
+ u8 flags;
+ u8 pad1;
__be32 pad2;
};
@@ -676,6 +711,7 @@ void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
+int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout);
void dlm_put(struct dlm_ctxt *dlm);
struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
@@ -687,14 +723,20 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
void dlm_purge_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *lockres);
-void dlm_lockres_get(struct dlm_lock_resource *res);
+static inline void dlm_lockres_get(struct dlm_lock_resource *res)
+{
+ /* This is called on every lookup, so it might be worth
+ * inlining. */
+ kref_get(&res->refs);
+}
void dlm_lockres_put(struct dlm_lock_resource *res);
void __dlm_unhash_lockres(struct dlm_lock_resource *res);
void __dlm_insert_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
- unsigned int len);
+ unsigned int len,
+ unsigned int hash);
struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
const char *name,
unsigned int len);
@@ -819,6 +861,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm,
u8 dead_node);
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
+int __dlm_lockres_unused(struct dlm_lock_resource *res);
static inline const char * dlm_lock_mode_name(int mode)
{
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index 70888b31e75..c764dc8e40a 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -214,6 +214,9 @@ grant:
if (lock->ml.node == dlm->node_num)
mlog(0, "doing in-place convert for nonlocal lock\n");
lock->ml.type = type;
+ if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
+ memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
+
status = DLM_NORMAL;
*call_ast = 1;
goto unlock_exit;
@@ -461,6 +464,12 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
}
spin_lock(&res->spinlock);
+ status = __dlm_lockres_state_to_status(res);
+ if (status != DLM_NORMAL) {
+ spin_unlock(&res->spinlock);
+ dlm_error(status);
+ goto leave;
+ }
list_for_each(iter, &res->granted) {
lock = list_entry(iter, struct dlm_lock, list);
if (lock->ml.cookie == cnv->cookie &&
@@ -470,6 +479,21 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
}
lock = NULL;
}
+ if (!lock) {
+ __dlm_print_one_lock_resource(res);
+ list_for_each(iter, &res->granted) {
+ lock = list_entry(iter, struct dlm_lock, list);
+ if (lock->ml.node == cnv->node_idx) {
+ mlog(ML_ERROR, "There is something here "
+ "for node %u, lock->ml.cookie=%llu, "
+ "cnv->cookie=%llu\n", cnv->node_idx,
+ (unsigned long long)lock->ml.cookie,
+ (unsigned long long)cnv->cookie);
+ break;
+ }
+ }
+ lock = NULL;
+ }
spin_unlock(&res->spinlock);
if (!lock) {
status = DLM_IVLOCKID;
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index c7eae5d3324..3f6c8d88f7a 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -37,10 +37,8 @@
#include "dlmapi.h"
#include "dlmcommon.h"
-#include "dlmdebug.h"
#include "dlmdomain.h"
-#include "dlmdebug.h"
#define MLOG_MASK_PREFIX ML_DLM
#include "cluster/masklog.h"
@@ -120,6 +118,7 @@ void dlm_print_one_lock(struct dlm_lock *lockid)
}
EXPORT_SYMBOL_GPL(dlm_print_one_lock);
+#if 0
void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
{
struct dlm_lock_resource *res;
@@ -136,12 +135,13 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm)
spin_lock(&dlm->spinlock);
for (i=0; i<DLM_HASH_BUCKETS; i++) {
- bucket = &(dlm->lockres_hash[i]);
+ bucket = dlm_lockres_hash(dlm, i);
hlist_for_each_entry(res, iter, bucket, hash_node)
dlm_print_one_lock_resource(res);
}
spin_unlock(&dlm->spinlock);
}
+#endif /* 0 */
static const char *dlm_errnames[] = {
[DLM_NORMAL] = "DLM_NORMAL",
diff --git a/fs/ocfs2/dlm/dlmdebug.h b/fs/ocfs2/dlm/dlmdebug.h
deleted file mode 100644
index 6858510c3cc..00000000000
--- a/fs/ocfs2/dlm/dlmdebug.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; -*-
- * vim: noexpandtab sw=8 ts=8 sts=0:
- *
- * dlmdebug.h
- *
- * Copyright (C) 2004 Oracle. All rights reserved.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public
- * License along with this program; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 021110-1307, USA.
- *
- */
-
-#ifndef DLMDEBUG_H
-#define DLMDEBUG_H
-
-void dlm_dump_lock_resources(struct dlm_ctxt *dlm);
-
-#endif
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 8f3a9e3106f..ba27c5c5e95 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -41,7 +41,6 @@
#include "dlmapi.h"
#include "dlmcommon.h"
-#include "dlmdebug.h"
#include "dlmdomain.h"
#include "dlmver.h"
@@ -49,6 +48,33 @@
#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
#include "cluster/masklog.h"
+static void dlm_free_pagevec(void **vec, int pages)
+{
+ while (pages--)
+ free_page((unsigned long)vec[pages]);
+ kfree(vec);
+}
+
+static void **dlm_alloc_pagevec(int pages)
+{
+ void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
+ int i;
+
+ if (!vec)
+ return NULL;
+
+ for (i = 0; i < pages; i++)
+ if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
+ goto out_free;
+
+ mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
+ pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE);
+ return vec;
+out_free:
+ dlm_free_pagevec(vec, i);
+ return NULL;
+}
+
/*
*
* spinlock lock ordering: if multiple locks are needed, obey this ordering:
@@ -90,8 +116,7 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
assert_spin_locked(&dlm->spinlock);
q = &res->lockname;
- q->hash = full_name_hash(q->name, q->len);
- bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]);
+ bucket = dlm_lockres_hash(dlm, q->hash);
/* get a reference for our hashtable */
dlm_lockres_get(res);
@@ -100,34 +125,32 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm,
}
struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
- const char *name,
- unsigned int len)
+ const char *name,
+ unsigned int len,
+ unsigned int hash)
{
- unsigned int hash;
- struct hlist_node *iter;
- struct dlm_lock_resource *tmpres=NULL;
struct hlist_head *bucket;
+ struct hlist_node *list;
mlog_entry("%.*s\n", len, name);
assert_spin_locked(&dlm->spinlock);
- hash = full_name_hash(name, len);
-
- bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]);
-
- /* check for pre-existing lock */
- hlist_for_each(iter, bucket) {
- tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node);
- if (tmpres->lockname.len == len &&
- memcmp(tmpres->lockname.name, name, len) == 0) {
- dlm_lockres_get(tmpres);
- break;
- }
+ bucket = dlm_lockres_hash(dlm, hash);
- tmpres = NULL;
+ hlist_for_each(list, bucket) {
+ struct dlm_lock_resource *res = hlist_entry(list,
+ struct dlm_lock_resource, hash_node);
+ if (res->lockname.name[0] != name[0])
+ continue;
+ if (unlikely(res->lockname.len != len))
+ continue;
+ if (memcmp(res->lockname.name + 1, name + 1, len - 1))
+ continue;
+ dlm_lockres_get(res);
+ return res;
}
- return tmpres;
+ return NULL;
}
struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
@@ -135,9 +158,10 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
unsigned int len)
{
struct dlm_lock_resource *res;
+ unsigned int hash = dlm_lockid_hash(name, len);
spin_lock(&dlm->spinlock);
- res = __dlm_lookup_lockres(dlm, name, len);
+ res = __dlm_lookup_lockres(dlm, name, len, hash);
spin_unlock(&dlm->spinlock);
return res;
}
@@ -194,7 +218,7 @@ static int dlm_wait_on_domain_helper(const char *domain)
static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
{
if (dlm->lockres_hash)
- free_page((unsigned long) dlm->lockres_hash);
+ dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
if (dlm->name)
kfree(dlm->name);
@@ -278,11 +302,21 @@ int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
return ret;
}
+static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
+{
+ if (dlm->dlm_worker) {
+ flush_workqueue(dlm->dlm_worker);
+ destroy_workqueue(dlm->dlm_worker);
+ dlm->dlm_worker = NULL;
+ }
+}
+
static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
{
dlm_unregister_domain_handlers(dlm);
dlm_complete_thread(dlm);
dlm_complete_recovery_thread(dlm);
+ dlm_destroy_dlm_worker(dlm);
/* We've left the domain. Now we can take ourselves out of the
* list and allow the kref stuff to help us free the
@@ -304,8 +338,8 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
restart:
spin_lock(&dlm->spinlock);
for (i = 0; i < DLM_HASH_BUCKETS; i++) {
- while (!hlist_empty(&dlm->lockres_hash[i])) {
- res = hlist_entry(dlm->lockres_hash[i].first,
+ while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
+ res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
struct dlm_lock_resource, hash_node);
/* need reference when manually grabbing lockres */
dlm_lockres_get(res);
@@ -1126,6 +1160,13 @@ static int dlm_join_domain(struct dlm_ctxt *dlm)
goto bail;
}
+ dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
+ if (!dlm->dlm_worker) {
+ status = -ENOMEM;
+ mlog_errno(status);
+ goto bail;
+ }
+
do {
unsigned int backoff;
status = dlm_try_to_join_domain(dlm);
@@ -1166,6 +1207,7 @@ bail:
dlm_unregister_domain_handlers(dlm);
dlm_complete_thread(dlm);
dlm_complete_recovery_thread(dlm);
+ dlm_destroy_dlm_worker(dlm);
}
return status;
@@ -1191,7 +1233,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
goto leave;
}
- dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL);
+ dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
if (!dlm->lockres_hash) {
mlog_errno(-ENOMEM);
kfree(dlm->name);
@@ -1200,8 +1242,8 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
goto leave;
}
- for (i=0; i<DLM_HASH_BUCKETS; i++)
- INIT_HLIST_HEAD(&dlm->lockres_hash[i]);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++)
+ INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
strcpy(dlm->name, domain);
dlm->key = key;
@@ -1231,6 +1273,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
dlm->dlm_thread_task = NULL;
dlm->dlm_reco_thread_task = NULL;
+ dlm->dlm_worker = NULL;
init_waitqueue_head(&dlm->dlm_thread_wq);
init_waitqueue_head(&dlm->dlm_reco_thread_wq);
init_waitqueue_head(&dlm->reco.event);
diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 7273d9fa6ba..033ad170123 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -116,7 +116,7 @@ static int dlmfs_file_open(struct inode *inode,
* doesn't make sense for LVB writes. */
file->f_flags &= ~O_APPEND;
- fp = kmalloc(sizeof(*fp), GFP_KERNEL);
+ fp = kmalloc(sizeof(*fp), GFP_NOFS);
if (!fp) {
status = -ENOMEM;
goto bail;
@@ -196,7 +196,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
else
readlen = count - *ppos;
- lvb_buf = kmalloc(readlen, GFP_KERNEL);
+ lvb_buf = kmalloc(readlen, GFP_NOFS);
if (!lvb_buf)
return -ENOMEM;
@@ -240,7 +240,7 @@ static ssize_t dlmfs_file_write(struct file *filp,
else
writelen = count - *ppos;
- lvb_buf = kmalloc(writelen, GFP_KERNEL);
+ lvb_buf = kmalloc(writelen, GFP_NOFS);
if (!lvb_buf)
return -ENOMEM;
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 55cda25ae11..d6f89577e25 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
struct dlm_lock *lock, int flags)
{
enum dlm_status status = DLM_DENIED;
+ int lockres_changed = 1;
mlog_entry("type=%d\n", lock->ml.type);
mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len,
@@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
lock->lock_pending = 0;
if (status != DLM_NORMAL) {
- if (status != DLM_NOTQUEUED)
+ if (status == DLM_RECOVERING &&
+ dlm_is_recovery_lock(res->lockname.name,
+ res->lockname.len)) {
+ /* recovery lock was mastered by dead node.
+ * we need to have calc_usage shoot down this
+ * lockres and completely remaster it. */
+ mlog(0, "%s: recovery lock was owned by "
+ "dead node %u, remaster it now.\n",
+ dlm->name, res->owner);
+ } else if (status != DLM_NOTQUEUED) {
+ /*
+ * DO NOT call calc_usage, as this would unhash
+ * the remote lockres before we ever get to use
+ * it. treat as if we never made any change to
+ * the lockres.
+ */
+ lockres_changed = 0;
dlm_error(status);
+ }
dlm_revert_pending_lock(res, lock);
dlm_lock_put(lock);
} else if (dlm_is_recovery_lock(res->lockname.name,
@@ -243,7 +261,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
}
spin_unlock(&res->spinlock);
- dlm_lockres_calc_usage(dlm, res);
+ if (lockres_changed)
+ dlm_lockres_calc_usage(dlm, res);
wake_up(&res->wq);
return status;
@@ -280,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
if (tmpret >= 0) {
// successfully sent and received
ret = status; // this is already a dlm_status
+ if (ret == DLM_REJECTED) {
+ mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres "
+ "no longer owned by %u. that node is coming back "
+ "up currently.\n", dlm->name, create.namelen,
+ create.name, res->owner);
+ dlm_print_one_lock_resource(res);
+ BUG();
+ }
} else {
mlog_errno(tmpret);
if (dlm_is_host_down(tmpret)) {
@@ -381,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
struct dlm_lock *lock;
int kernel_allocated = 0;
- lock = kcalloc(1, sizeof(*lock), GFP_KERNEL);
+ lock = kcalloc(1, sizeof(*lock), GFP_NOFS);
if (!lock)
return NULL;
if (!lksb) {
/* zero memory only if kernel-allocated */
- lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL);
+ lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS);
if (!lksb) {
kfree(lock);
return NULL;
@@ -428,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
if (!dlm_grab(dlm))
return DLM_REJECTED;
- mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
- "Domain %s not fully joined!\n", dlm->name);
-
name = create->name;
namelen = create->namelen;
+ status = DLM_REJECTED;
+ if (!dlm_domain_fully_joined(dlm)) {
+ mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
+ "sending a create_lock message for lock %.*s!\n",
+ dlm->name, create->node_idx, namelen, name);
+ dlm_error(status);
+ goto leave;
+ }
status = DLM_IVBUFLEN;
if (namelen > DLM_LOCKID_NAME_MAX) {
@@ -668,18 +700,22 @@ retry_lock:
msleep(100);
/* no waiting for dlm_reco_thread */
if (recovery) {
- if (status == DLM_RECOVERING) {
- mlog(0, "%s: got RECOVERING "
- "for $REOCVERY lock, master "
- "was %u\n", dlm->name,
- res->owner);
- dlm_wait_for_node_death(dlm, res->owner,
- DLM_NODE_DEATH_WAIT_MAX);
- }
+ if (status != DLM_RECOVERING)
+ goto retry_lock;
+
+ mlog(0, "%s: got RECOVERING "
+ "for $RECOVERY lock, master "
+ "was %u\n", dlm->name,
+ res->owner);
+ /* wait to see the node go down, then
+ * drop down and allow the lockres to
+ * get cleaned up. need to remaster. */
+ dlm_wait_for_node_death(dlm, res->owner,
+ DLM_NODE_DEATH_WAIT_MAX);
} else {
dlm_wait_for_recovery(dlm);
+ goto retry_lock;
}
- goto retry_lock;
}
if (status != DLM_NORMAL) {
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 940be4c13b1..1b8346dd057 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -47,7 +47,6 @@
#include "dlmapi.h"
#include "dlmcommon.h"
-#include "dlmdebug.h"
#include "dlmdomain.h"
#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
@@ -74,6 +73,7 @@ struct dlm_master_list_entry
wait_queue_head_t wq;
atomic_t woken;
struct kref mle_refs;
+ int inuse;
unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
@@ -127,18 +127,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
return 1;
}
-#if 0
-/* Code here is included but defined out as it aids debugging */
+#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m)
+static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
+{
+ int i;
+ printk("%s=[ ", mapname);
+ for (i=0; i<O2NM_MAX_NODES; i++)
+ if (test_bit(i, map))
+ printk("%d ", i);
+ printk("]");
+}
-void dlm_print_one_mle(struct dlm_master_list_entry *mle)
+static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
{
- int i = 0, refs;
+ int refs;
char *type;
char attached;
u8 master;
unsigned int namelen;
const char *name;
struct kref *k;
+ unsigned long *maybe = mle->maybe_map,
+ *vote = mle->vote_map,
+ *resp = mle->response_map,
+ *node = mle->node_map;
k = &mle->mle_refs;
if (mle->type == DLM_MLE_BLOCK)
@@ -159,18 +171,29 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle)
name = mle->u.res->lockname.name;
}
- mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n",
- i, type, refs, master, mle->new_master, attached,
- namelen, namelen, name);
+ mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
+ namelen, name, type, refs, master, mle->new_master, attached,
+ mle->inuse);
+ dlm_print_nodemap(maybe);
+ printk(", ");
+ dlm_print_nodemap(vote);
+ printk(", ");
+ dlm_print_nodemap(resp);
+ printk(", ");
+ dlm_print_nodemap(node);
+ printk(", ");
+ printk("\n");
}
+#if 0
+/* Code here is included but defined out as it aids debugging */
+
static void dlm_dump_mles(struct dlm_ctxt *dlm)
{
struct dlm_master_list_entry *mle;
struct list_head *iter;
mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
- mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
spin_lock(&dlm->master_lock);
list_for_each(iter, &dlm->master_list) {
mle = list_entry(iter, struct dlm_master_list_entry, list);
@@ -314,6 +337,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
spin_unlock(&dlm->spinlock);
}
+static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
+{
+ struct dlm_ctxt *dlm;
+ dlm = mle->dlm;
+
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+ mle->inuse++;
+ kref_get(&mle->mle_refs);
+}
+
+static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
+{
+ struct dlm_ctxt *dlm;
+ dlm = mle->dlm;
+
+ spin_lock(&dlm->spinlock);
+ spin_lock(&dlm->master_lock);
+ mle->inuse--;
+ __dlm_put_mle(mle);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
+
+}
+
/* remove from list and free */
static void __dlm_put_mle(struct dlm_master_list_entry *mle)
{
@@ -322,9 +370,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle)
assert_spin_locked(&dlm->spinlock);
assert_spin_locked(&dlm->master_lock);
- BUG_ON(!atomic_read(&mle->mle_refs.refcount));
-
- kref_put(&mle->mle_refs, dlm_mle_release);
+ if (!atomic_read(&mle->mle_refs.refcount)) {
+ /* this may or may not crash, but who cares.
+ * it's a BUG. */
+ mlog(ML_ERROR, "bad mle: %p\n", mle);
+ dlm_print_one_mle(mle);
+ BUG();
+ } else
+ kref_put(&mle->mle_refs, dlm_mle_release);
}
@@ -367,6 +420,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle,
memset(mle->response_map, 0, sizeof(mle->response_map));
mle->master = O2NM_MAX_NODES;
mle->new_master = O2NM_MAX_NODES;
+ mle->inuse = 0;
if (mle->type == DLM_MLE_MASTER) {
BUG_ON(!res);
@@ -564,6 +618,28 @@ static void dlm_lockres_release(struct kref *kref)
mlog(0, "destroying lockres %.*s\n", res->lockname.len,
res->lockname.name);
+ if (!hlist_unhashed(&res->hash_node) ||
+ !list_empty(&res->granted) ||
+ !list_empty(&res->converting) ||
+ !list_empty(&res->blocked) ||
+ !list_empty(&res->dirty) ||
+ !list_empty(&res->recovering) ||
+ !list_empty(&res->purge)) {
+ mlog(ML_ERROR,
+ "Going to BUG for resource %.*s."
+ " We're on a list! [%c%c%c%c%c%c%c]\n",
+ res->lockname.len, res->lockname.name,
+ !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
+ !list_empty(&res->granted) ? 'G' : ' ',
+ !list_empty(&res->converting) ? 'C' : ' ',
+ !list_empty(&res->blocked) ? 'B' : ' ',
+ !list_empty(&res->dirty) ? 'D' : ' ',
+ !list_empty(&res->recovering) ? 'R' : ' ',
+ !list_empty(&res->purge) ? 'P' : ' ');
+
+ dlm_print_one_lock_resource(res);
+ }
+
/* By the time we're ready to blow this guy away, we shouldn't
* be on any lists. */
BUG_ON(!hlist_unhashed(&res->hash_node));
@@ -579,11 +655,6 @@ static void dlm_lockres_release(struct kref *kref)
kfree(res);
}
-void dlm_lockres_get(struct dlm_lock_resource *res)
-{
- kref_get(&res->refs);
-}
-
void dlm_lockres_put(struct dlm_lock_resource *res)
{
kref_put(&res->refs, dlm_lockres_release);
@@ -603,7 +674,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm,
memcpy(qname, name, namelen);
res->lockname.len = namelen;
- res->lockname.hash = full_name_hash(name, namelen);
+ res->lockname.hash = dlm_lockid_hash(name, namelen);
init_waitqueue_head(&res->wq);
spin_lock_init(&res->spinlock);
@@ -637,11 +708,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
{
struct dlm_lock_resource *res;
- res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
+ res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
if (!res)
return NULL;
- res->lockname.name = kmalloc(namelen, GFP_KERNEL);
+ res->lockname.name = kmalloc(namelen, GFP_NOFS);
if (!res->lockname.name) {
kfree(res);
return NULL;
@@ -677,19 +748,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
int blocked = 0;
int ret, nodenum;
struct dlm_node_iter iter;
- unsigned int namelen;
+ unsigned int namelen, hash;
int tries = 0;
int bit, wait_on_recovery = 0;
BUG_ON(!lockid);
namelen = strlen(lockid);
+ hash = dlm_lockid_hash(lockid, namelen);
mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
lookup:
spin_lock(&dlm->spinlock);
- tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
+ tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
if (tmpres) {
spin_unlock(&dlm->spinlock);
mlog(0, "found in hash!\n");
@@ -704,7 +776,7 @@ lookup:
mlog(0, "allocating a new resource\n");
/* nothing found and we need to allocate one. */
alloc_mle = (struct dlm_master_list_entry *)
- kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
+ kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
if (!alloc_mle)
goto leave;
res = dlm_new_lockres(dlm, lockid, namelen);
@@ -790,10 +862,11 @@ lookup:
* if so, the creator of the BLOCK may try to put the last
* ref at this time in the assert master handler, so we
* need an extra one to keep from a bad ptr deref. */
- dlm_get_mle(mle);
+ dlm_get_mle_inuse(mle);
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
+redo_request:
while (wait_on_recovery) {
/* any cluster changes that occurred after dropping the
* dlm spinlock would be detectable be a change on the mle,
@@ -812,7 +885,7 @@ lookup:
}
dlm_kick_recovery_thread(dlm);
- msleep(100);
+ msleep(1000);
dlm_wait_for_recovery(dlm);
spin_lock(&dlm->spinlock);
@@ -825,13 +898,15 @@ lookup:
} else
wait_on_recovery = 0;
spin_unlock(&dlm->spinlock);
+
+ if (wait_on_recovery)
+ dlm_wait_for_node_re