diff options
author | Linus Torvalds <torvalds@g5.osdl.org> | 2006-06-26 16:06:08 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@g5.osdl.org> | 2006-06-26 16:06:08 -0700 |
commit | eb99adde31b7d85c67a5e1c2fa5e098e1056dd79 (patch) | |
tree | 05d61251361c04f3afa64c5b1fe162558724d345 | |
parent | f6e6e883730aff2718610d3eba7608fcf73328ed (diff) | |
parent | 3fb5a9891dbb553dda96783dbc0dc4e77cbb2529 (diff) |
Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2
* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2: (56 commits)
[PATCH] fs/ocfs2/dlm/: cleanups
ocfs2: fix compiler warnings in dlm_convert_lock_handler()
ocfs2: dlm_print_one_mle() needs to be defined
ocfs2: remove whitespace in dlmunlock.c
ocfs2: move dlm work to a private work queue
ocfs2: fix incorrect error returns
ocfs2: tune down some noisy messages during dlm recovery
ocfs2: display message before waiting for recovery to complete
ocfs2: mlog in dlm_convert_lock_handler() should be ML_ERROR
ocfs2: retry operations when a lock is marked in recovery
ocfs2: use cond_resched() in dlm_thread()
ocfs2: use GFP_NOFS in some dlm operations
ocfs2: wait for recovery when starting lock mastery
ocfs2: continue recovery when a dead node is encountered
ocfs2: remove unneccesary spin_unlock() in dlm_remaster_locks()
ocfs2: dlm_remaster_locks() should never exit without completing
ocfs2: special case recovery lock in dlmlock_remote()
ocfs2: pending mastery asserts and migrations should block each other
ocfs2: temporarily disable automatic lock migration
ocfs2: do not unconditionally purge the lockres in dlmlock_remote()
...
-rw-r--r-- | fs/ocfs2/dlm/dlmast.c | 12 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmcommon.h | 63 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmconvert.c | 24 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmdebug.c | 6 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmdebug.h | 30 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmdomain.c | 101 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmfs.c | 6 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmlock.c | 68 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 448 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 580 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmthread.c | 68 | ||||
-rw-r--r-- | fs/ocfs2/dlm/dlmunlock.c | 10 | ||||
-rw-r--r-- | fs/ocfs2/dlm/userdlm.c | 2 |
13 files changed, 1045 insertions, 373 deletions
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c index 87ee29cad50..42775e2bbe2 100644 --- a/fs/ocfs2/dlm/dlmast.c +++ b/fs/ocfs2/dlm/dlmast.c @@ -197,12 +197,14 @@ static void dlm_update_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, lock->ml.node == dlm->node_num ? "master" : "remote"); memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN); - } else if (lksb->flags & DLM_LKSB_PUT_LVB) { - mlog(0, "setting lvb from lockres for %s node\n", - lock->ml.node == dlm->node_num ? "master" : - "remote"); - memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN); } + /* Do nothing for lvb put requests - they should be done in + * place when the lock is downconverted - otherwise we risk + * racing gets and puts which could result in old lvb data + * being propagated. We leave the put flag set and clear it + * here. In the future we might want to clear it at the time + * the put is actually done. + */ spin_unlock(&res->spinlock); } diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 88cc43df18f..9bdc9cf6599 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -37,7 +37,17 @@ #define DLM_THREAD_SHUFFLE_INTERVAL 5 // flush everything every 5 passes #define DLM_THREAD_MS 200 // flush at least every 200 ms -#define DLM_HASH_BUCKETS (PAGE_SIZE / sizeof(struct hlist_head)) +#define DLM_HASH_SIZE_DEFAULT (1 << 14) +#if DLM_HASH_SIZE_DEFAULT < PAGE_SIZE +# define DLM_HASH_PAGES 1 +#else +# define DLM_HASH_PAGES (DLM_HASH_SIZE_DEFAULT / PAGE_SIZE) +#endif +#define DLM_BUCKETS_PER_PAGE (PAGE_SIZE / sizeof(struct hlist_head)) +#define DLM_HASH_BUCKETS (DLM_HASH_PAGES * DLM_BUCKETS_PER_PAGE) + +/* Intended to make it easier for us to switch out hash functions */ +#define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l) enum dlm_ast_type { DLM_AST = 0, @@ -61,7 +71,8 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len) return 0; } -#define DLM_RECO_STATE_ACTIVE 0x0001 +#define DLM_RECO_STATE_ACTIVE 0x0001 +#define DLM_RECO_STATE_FINALIZE 0x0002 struct dlm_recovery_ctxt { @@ -85,7 +96,7 @@ enum dlm_ctxt_state { struct dlm_ctxt { struct list_head list; - struct hlist_head *lockres_hash; + struct hlist_head **lockres_hash; struct list_head dirty_list; struct list_head purge_list; struct list_head pending_asts; @@ -120,6 +131,7 @@ struct dlm_ctxt struct o2hb_callback_func dlm_hb_down; struct task_struct *dlm_thread_task; struct task_struct *dlm_reco_thread_task; + struct workqueue_struct *dlm_worker; wait_queue_head_t dlm_thread_wq; wait_queue_head_t dlm_reco_thread_wq; wait_queue_head_t ast_wq; @@ -132,6 +144,11 @@ struct dlm_ctxt struct list_head dlm_eviction_callbacks; }; +static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned i) +{ + return dlm->lockres_hash[(i / DLM_BUCKETS_PER_PAGE) % DLM_HASH_PAGES] + (i % DLM_BUCKETS_PER_PAGE); +} + /* these keventd work queue items are for less-frequently * called functions that cannot be directly called from the * net message handlers for some reason, usually because @@ -216,20 +233,29 @@ struct dlm_lock_resource /* WARNING: Please see the comment in dlm_init_lockres before * adding fields here. */ struct hlist_node hash_node; + struct qstr lockname; struct kref refs; - /* please keep these next 3 in this order - * some funcs want to iterate over all lists */ + /* + * Please keep granted, converting, and blocked in this order, + * as some funcs want to iterate over all lists. + * + * All four lists are protected by the hash's reference. + */ struct list_head granted; struct list_head converting; struct list_head blocked; + struct list_head purge; + /* + * These two lists require you to hold an additional reference + * while they are on the list. + */ struct list_head dirty; struct list_head recovering; // dlm_recovery_ctxt.resources list /* unused lock resources have their last_used stamped and are * put on a list for the dlm thread to run. */ - struct list_head purge; unsigned long last_used; unsigned migration_pending:1; @@ -238,7 +264,6 @@ struct dlm_lock_resource wait_queue_head_t wq; u8 owner; //node which owns the lock resource, or unknown u16 state; - struct qstr lockname; char lvb[DLM_LVB_LEN]; }; @@ -300,6 +325,15 @@ enum dlm_lockres_list { DLM_BLOCKED_LIST }; +static inline int dlm_lvb_is_empty(char *lvb) +{ + int i; + for (i=0; i<DLM_LVB_LEN; i++) + if (lvb[i]) + return 0; + return 1; +} + static inline struct list_head * dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx) { @@ -609,7 +643,8 @@ struct dlm_finalize_reco { u8 node_idx; u8 dead_node; - __be16 pad1; + u8 flags; + u8 pad1; __be32 pad2; }; @@ -676,6 +711,7 @@ void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); +int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_put(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); @@ -687,14 +723,20 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres); -void dlm_lockres_get(struct dlm_lock_resource *res); +static inline void dlm_lockres_get(struct dlm_lock_resource *res) +{ + /* This is called on every lookup, so it might be worth + * inlining. */ + kref_get(&res->refs); +} void dlm_lockres_put(struct dlm_lock_resource *res); void __dlm_unhash_lockres(struct dlm_lock_resource *res); void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, - unsigned int len); + unsigned int len, + unsigned int hash); struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int len); @@ -819,6 +861,7 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); +int __dlm_lockres_unused(struct dlm_lock_resource *res); static inline const char * dlm_lock_mode_name(int mode) { diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 70888b31e75..c764dc8e40a 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -214,6 +214,9 @@ grant: if (lock->ml.node == dlm->node_num) mlog(0, "doing in-place convert for nonlocal lock\n"); lock->ml.type = type; + if (lock->lksb->flags & DLM_LKSB_PUT_LVB) + memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN); + status = DLM_NORMAL; *call_ast = 1; goto unlock_exit; @@ -461,6 +464,12 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } spin_lock(&res->spinlock); + status = __dlm_lockres_state_to_status(res); + if (status != DLM_NORMAL) { + spin_unlock(&res->spinlock); + dlm_error(status); + goto leave; + } list_for_each(iter, &res->granted) { lock = list_entry(iter, struct dlm_lock, list); if (lock->ml.cookie == cnv->cookie && @@ -470,6 +479,21 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data) } lock = NULL; } + if (!lock) { + __dlm_print_one_lock_resource(res); + list_for_each(iter, &res->granted) { + lock = list_entry(iter, struct dlm_lock, list); + if (lock->ml.node == cnv->node_idx) { + mlog(ML_ERROR, "There is something here " + "for node %u, lock->ml.cookie=%llu, " + "cnv->cookie=%llu\n", cnv->node_idx, + (unsigned long long)lock->ml.cookie, + (unsigned long long)cnv->cookie); + break; + } + } + lock = NULL; + } spin_unlock(&res->spinlock); if (!lock) { status = DLM_IVLOCKID; diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index c7eae5d3324..3f6c8d88f7a 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -37,10 +37,8 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" -#include "dlmdebug.h" #define MLOG_MASK_PREFIX ML_DLM #include "cluster/masklog.h" @@ -120,6 +118,7 @@ void dlm_print_one_lock(struct dlm_lock *lockid) } EXPORT_SYMBOL_GPL(dlm_print_one_lock); +#if 0 void dlm_dump_lock_resources(struct dlm_ctxt *dlm) { struct dlm_lock_resource *res; @@ -136,12 +135,13 @@ void dlm_dump_lock_resources(struct dlm_ctxt *dlm) spin_lock(&dlm->spinlock); for (i=0; i<DLM_HASH_BUCKETS; i++) { - bucket = &(dlm->lockres_hash[i]); + bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, iter, bucket, hash_node) dlm_print_one_lock_resource(res); } spin_unlock(&dlm->spinlock); } +#endif /* 0 */ static const char *dlm_errnames[] = { [DLM_NORMAL] = "DLM_NORMAL", diff --git a/fs/ocfs2/dlm/dlmdebug.h b/fs/ocfs2/dlm/dlmdebug.h deleted file mode 100644 index 6858510c3cc..00000000000 --- a/fs/ocfs2/dlm/dlmdebug.h +++ /dev/null @@ -1,30 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * dlmdebug.h - * - * Copyright (C) 2004 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - * - */ - -#ifndef DLMDEBUG_H -#define DLMDEBUG_H - -void dlm_dump_lock_resources(struct dlm_ctxt *dlm); - -#endif diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 8f3a9e3106f..ba27c5c5e95 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -41,7 +41,6 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" #include "dlmver.h" @@ -49,6 +48,33 @@ #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) #include "cluster/masklog.h" +static void dlm_free_pagevec(void **vec, int pages) +{ + while (pages--) + free_page((unsigned long)vec[pages]); + kfree(vec); +} + +static void **dlm_alloc_pagevec(int pages) +{ + void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL); + int i; + + if (!vec) + return NULL; + + for (i = 0; i < pages; i++) + if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL))) + goto out_free; + + mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n", + pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE); + return vec; +out_free: + dlm_free_pagevec(vec, i); + return NULL; +} + /* * * spinlock lock ordering: if multiple locks are needed, obey this ordering: @@ -90,8 +116,7 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, assert_spin_locked(&dlm->spinlock); q = &res->lockname; - q->hash = full_name_hash(q->name, q->len); - bucket = &(dlm->lockres_hash[q->hash % DLM_HASH_BUCKETS]); + bucket = dlm_lockres_hash(dlm, q->hash); /* get a reference for our hashtable */ dlm_lockres_get(res); @@ -100,34 +125,32 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, } struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, - const char *name, - unsigned int len) + const char *name, + unsigned int len, + unsigned int hash) { - unsigned int hash; - struct hlist_node *iter; - struct dlm_lock_resource *tmpres=NULL; struct hlist_head *bucket; + struct hlist_node *list; mlog_entry("%.*s\n", len, name); assert_spin_locked(&dlm->spinlock); - hash = full_name_hash(name, len); - - bucket = &(dlm->lockres_hash[hash % DLM_HASH_BUCKETS]); - - /* check for pre-existing lock */ - hlist_for_each(iter, bucket) { - tmpres = hlist_entry(iter, struct dlm_lock_resource, hash_node); - if (tmpres->lockname.len == len && - memcmp(tmpres->lockname.name, name, len) == 0) { - dlm_lockres_get(tmpres); - break; - } + bucket = dlm_lockres_hash(dlm, hash); - tmpres = NULL; + hlist_for_each(list, bucket) { + struct dlm_lock_resource *res = hlist_entry(list, + struct dlm_lock_resource, hash_node); + if (res->lockname.name[0] != name[0]) + continue; + if (unlikely(res->lockname.len != len)) + continue; + if (memcmp(res->lockname.name + 1, name + 1, len - 1)) + continue; + dlm_lockres_get(res); + return res; } - return tmpres; + return NULL; } struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, @@ -135,9 +158,10 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, unsigned int len) { struct dlm_lock_resource *res; + unsigned int hash = dlm_lockid_hash(name, len); spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, len); + res = __dlm_lookup_lockres(dlm, name, len, hash); spin_unlock(&dlm->spinlock); return res; } @@ -194,7 +218,7 @@ static int dlm_wait_on_domain_helper(const char *domain) static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) { if (dlm->lockres_hash) - free_page((unsigned long) dlm->lockres_hash); + dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES); if (dlm->name) kfree(dlm->name); @@ -278,11 +302,21 @@ int dlm_domain_fully_joined(struct dlm_ctxt *dlm) return ret; } +static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm) +{ + if (dlm->dlm_worker) { + flush_workqueue(dlm->dlm_worker); + destroy_workqueue(dlm->dlm_worker); + dlm->dlm_worker = NULL; + } +} + static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm) { dlm_unregister_domain_handlers(dlm); dlm_complete_thread(dlm); dlm_complete_recovery_thread(dlm); + dlm_destroy_dlm_worker(dlm); /* We've left the domain. Now we can take ourselves out of the * list and allow the kref stuff to help us free the @@ -304,8 +338,8 @@ static void dlm_migrate_all_locks(struct dlm_ctxt *dlm) restart: spin_lock(&dlm->spinlock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { - while (!hlist_empty(&dlm->lockres_hash[i])) { - res = hlist_entry(dlm->lockres_hash[i].first, + while (!hlist_empty(dlm_lockres_hash(dlm, i))) { + res = hlist_entry(dlm_lockres_hash(dlm, i)->first, struct dlm_lock_resource, hash_node); /* need reference when manually grabbing lockres */ dlm_lockres_get(res); @@ -1126,6 +1160,13 @@ static int dlm_join_domain(struct dlm_ctxt *dlm) goto bail; } + dlm->dlm_worker = create_singlethread_workqueue("dlm_wq"); + if (!dlm->dlm_worker) { + status = -ENOMEM; + mlog_errno(status); + goto bail; + } + do { unsigned int backoff; status = dlm_try_to_join_domain(dlm); @@ -1166,6 +1207,7 @@ bail: dlm_unregister_domain_handlers(dlm); dlm_complete_thread(dlm); dlm_complete_recovery_thread(dlm); + dlm_destroy_dlm_worker(dlm); } return status; @@ -1191,7 +1233,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, goto leave; } - dlm->lockres_hash = (struct hlist_head *) __get_free_page(GFP_KERNEL); + dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES); if (!dlm->lockres_hash) { mlog_errno(-ENOMEM); kfree(dlm->name); @@ -1200,8 +1242,8 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, goto leave; } - for (i=0; i<DLM_HASH_BUCKETS; i++) - INIT_HLIST_HEAD(&dlm->lockres_hash[i]); + for (i = 0; i < DLM_HASH_BUCKETS; i++) + INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i)); strcpy(dlm->name, domain); dlm->key = key; @@ -1231,6 +1273,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, dlm->dlm_thread_task = NULL; dlm->dlm_reco_thread_task = NULL; + dlm->dlm_worker = NULL; init_waitqueue_head(&dlm->dlm_thread_wq); init_waitqueue_head(&dlm->dlm_reco_thread_wq); init_waitqueue_head(&dlm->reco.event); diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c index 7273d9fa6ba..033ad170123 100644 --- a/fs/ocfs2/dlm/dlmfs.c +++ b/fs/ocfs2/dlm/dlmfs.c @@ -116,7 +116,7 @@ static int dlmfs_file_open(struct inode *inode, * doesn't make sense for LVB writes. */ file->f_flags &= ~O_APPEND; - fp = kmalloc(sizeof(*fp), GFP_KERNEL); + fp = kmalloc(sizeof(*fp), GFP_NOFS); if (!fp) { status = -ENOMEM; goto bail; @@ -196,7 +196,7 @@ static ssize_t dlmfs_file_read(struct file *filp, else readlen = count - *ppos; - lvb_buf = kmalloc(readlen, GFP_KERNEL); + lvb_buf = kmalloc(readlen, GFP_NOFS); if (!lvb_buf) return -ENOMEM; @@ -240,7 +240,7 @@ static ssize_t dlmfs_file_write(struct file *filp, else writelen = count - *ppos; - lvb_buf = kmalloc(writelen, GFP_KERNEL); + lvb_buf = kmalloc(writelen, GFP_NOFS); if (!lvb_buf) return -ENOMEM; diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 55cda25ae11..d6f89577e25 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -201,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, struct dlm_lock *lock, int flags) { enum dlm_status status = DLM_DENIED; + int lockres_changed = 1; mlog_entry("type=%d\n", lock->ml.type); mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, @@ -226,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, res->state &= ~DLM_LOCK_RES_IN_PROGRESS; lock->lock_pending = 0; if (status != DLM_NORMAL) { - if (status != DLM_NOTQUEUED) + if (status == DLM_RECOVERING && + dlm_is_recovery_lock(res->lockname.name, + res->lockname.len)) { + /* recovery lock was mastered by dead node. + * we need to have calc_usage shoot down this + * lockres and completely remaster it. */ + mlog(0, "%s: recovery lock was owned by " + "dead node %u, remaster it now.\n", + dlm->name, res->owner); + } else if (status != DLM_NOTQUEUED) { + /* + * DO NOT call calc_usage, as this would unhash + * the remote lockres before we ever get to use + * it. treat as if we never made any change to + * the lockres. + */ + lockres_changed = 0; dlm_error(status); + } dlm_revert_pending_lock(res, lock); dlm_lock_put(lock); } else if (dlm_is_recovery_lock(res->lockname.name, @@ -243,7 +261,8 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, } spin_unlock(&res->spinlock); - dlm_lockres_calc_usage(dlm, res); + if (lockres_changed) + dlm_lockres_calc_usage(dlm, res); wake_up(&res->wq); return status; @@ -280,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, if (tmpret >= 0) { // successfully sent and received ret = status; // this is already a dlm_status + if (ret == DLM_REJECTED) { + mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " + "no longer owned by %u. that node is coming back " + "up currently.\n", dlm->name, create.namelen, + create.name, res->owner); + dlm_print_one_lock_resource(res); + BUG(); + } } else { mlog_errno(tmpret); if (dlm_is_host_down(tmpret)) { @@ -381,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, struct dlm_lock *lock; int kernel_allocated = 0; - lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); + lock = kcalloc(1, sizeof(*lock), GFP_NOFS); if (!lock) return NULL; if (!lksb) { /* zero memory only if kernel-allocated */ - lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); + lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS); if (!lksb) { kfree(lock); return NULL; @@ -428,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return DLM_REJECTED; - mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), - "Domain %s not fully joined!\n", dlm->name); - name = create->name; namelen = create->namelen; + status = DLM_REJECTED; + if (!dlm_domain_fully_joined(dlm)) { + mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " + "sending a create_lock message for lock %.*s!\n", + dlm->name, create->node_idx, namelen, name); + dlm_error(status); + goto leave; + } status = DLM_IVBUFLEN; if (namelen > DLM_LOCKID_NAME_MAX) { @@ -668,18 +700,22 @@ retry_lock: msleep(100); /* no waiting for dlm_reco_thread */ if (recovery) { - if (status == DLM_RECOVERING) { - mlog(0, "%s: got RECOVERING " - "for $REOCVERY lock, master " - "was %u\n", dlm->name, - res->owner); - dlm_wait_for_node_death(dlm, res->owner, - DLM_NODE_DEATH_WAIT_MAX); - } + if (status != DLM_RECOVERING) + goto retry_lock; + + mlog(0, "%s: got RECOVERING " + "for $RECOVERY lock, master " + "was %u\n", dlm->name, + res->owner); + /* wait to see the node go down, then + * drop down and allow the lockres to + * get cleaned up. need to remaster. */ + dlm_wait_for_node_death(dlm, res->owner, + DLM_NODE_DEATH_WAIT_MAX); } else { dlm_wait_for_recovery(dlm); + goto retry_lock; } - goto retry_lock; } if (status != DLM_NORMAL) { diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 940be4c13b1..1b8346dd057 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -47,7 +47,6 @@ #include "dlmapi.h" #include "dlmcommon.h" -#include "dlmdebug.h" #include "dlmdomain.h" #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) @@ -74,6 +73,7 @@ struct dlm_master_list_entry wait_queue_head_t wq; atomic_t woken; struct kref mle_refs; + int inuse; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -127,18 +127,30 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, return 1; } -#if 0 -/* Code here is included but defined out as it aids debugging */ +#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) +static void _dlm_print_nodemap(unsigned long *map, const char *mapname) +{ + int i; + printk("%s=[ ", mapname); + for (i=0; i<O2NM_MAX_NODES; i++) + if (test_bit(i, map)) + printk("%d ", i); + printk("]"); +} -void dlm_print_one_mle(struct dlm_master_list_entry *mle) +static void dlm_print_one_mle(struct dlm_master_list_entry *mle) { - int i = 0, refs; + int refs; char *type; char attached; u8 master; unsigned int namelen; const char *name; struct kref *k; + unsigned long *maybe = mle->maybe_map, + *vote = mle->vote_map, + *resp = mle->response_map, + *node = mle->node_map; k = &mle->mle_refs; if (mle->type == DLM_MLE_BLOCK) @@ -159,18 +171,29 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) name = mle->u.res->lockname.name; } - mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", - i, type, refs, master, mle->new_master, attached, - namelen, namelen, name); + mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ", + namelen, name, type, refs, master, mle->new_master, attached, + mle->inuse); + dlm_print_nodemap(maybe); + printk(", "); + dlm_print_nodemap(vote); + printk(", "); + dlm_print_nodemap(resp); + printk(", "); + dlm_print_nodemap(node); + printk(", "); + printk("\n"); } +#if 0 +/* Code here is included but defined out as it aids debugging */ + static void dlm_dump_mles(struct dlm_ctxt *dlm) { struct dlm_master_list_entry *mle; struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); - mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); spin_lock(&dlm->master_lock); list_for_each(iter, &dlm->master_list) { mle = list_entry(iter, struct dlm_master_list_entry, list); @@ -314,6 +337,31 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } +static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&dlm->master_lock); + mle->inuse++; + kref_get(&mle->mle_refs); +} + +static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle) +{ + struct dlm_ctxt *dlm; + dlm = mle->dlm; + + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + mle->inuse--; + __dlm_put_mle(mle); + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); + +} + /* remove from list and free */ static void __dlm_put_mle(struct dlm_master_list_entry *mle) { @@ -322,9 +370,14 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - BUG_ON(!atomic_read(&mle->mle_refs.refcount)); - - kref_put(&mle->mle_refs, dlm_mle_release); + if (!atomic_read(&mle->mle_refs.refcount)) { + /* this may or may not crash, but who cares. + * it's a BUG. */ + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + BUG(); + } else + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -367,6 +420,7 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, memset(mle->response_map, 0, sizeof(mle->response_map)); mle->master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES; + mle->inuse = 0; if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); @@ -564,6 +618,28 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); + if (!hlist_unhashed(&res->hash_node) || + !list_empty(&res->granted) || + !list_empty(&res->converting) || + !list_empty(&res->blocked) || + !list_empty(&res->dirty) || + !list_empty(&res->recovering) || + !list_empty(&res->purge)) { + mlog(ML_ERROR, + "Going to BUG for resource %.*s." + " We're on a list! [%c%c%c%c%c%c%c]\n", + res->lockname.len, res->lockname.name, + !hlist_unhashed(&res->hash_node) ? 'H' : ' ', + !list_empty(&res->granted) ? 'G' : ' ', + !list_empty(&res->converting) ? 'C' : ' ', + !list_empty(&res->blocked) ? 'B' : ' ', + !list_empty(&res->dirty) ? 'D' : ' ', + !list_empty(&res->recovering) ? 'R' : ' ', + !list_empty(&res->purge) ? 'P' : ' '); + + dlm_print_one_lock_resource(res); + } + /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); @@ -579,11 +655,6 @@ static void dlm_lockres_release(struct kref *kref) kfree(res); } -void dlm_lockres_get(struct dlm_lock_resource *res) -{ - kref_get(&res->refs); -} - void dlm_lockres_put(struct dlm_lock_resource *res) { kref_put(&res->refs, dlm_lockres_release); @@ -603,7 +674,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, memcpy(qname, name, namelen); res->lockname.len = namelen; - res->lockname.hash = full_name_hash(name, namelen); + res->lockname.hash = dlm_lockid_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); @@ -637,11 +708,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res; - res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); + res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS); if (!res) return NULL; - res->lockname.name = kmalloc(namelen, GFP_KERNEL); + res->lockname.name = kmalloc(namelen, GFP_NOFS); if (!res->lockname.name) { kfree(res); return NULL; @@ -677,19 +748,20 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; - unsigned int namelen; + unsigned int namelen, hash; int tries = 0; int bit, wait_on_recovery = 0; BUG_ON(!lockid); namelen = strlen(lockid); + hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen); lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); + tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); @@ -704,7 +776,7 @@ lookup: mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ alloc_mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); + kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); @@ -790,10 +862,11 @@ lookup: * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ - dlm_get_mle(mle); + dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); +redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, @@ -812,7 +885,7 @@ lookup: } dlm_kick_recovery_thread(dlm); - msleep(100); + msleep(1000); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); @@ -825,13 +898,15 @@ lookup: } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); + + if (wait_on_recovery) + dlm_wait_for_node_re |