diff options
Diffstat (limited to 'fs/ocfs2/dlm')
| -rw-r--r-- | fs/ocfs2/dlm/Makefile | 4 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmast.c | 86 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmcommon.h | 189 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmconvert.c | 24 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmdebug.c | 249 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmdebug.h | 5 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmdomain.c | 623 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmlock.c | 79 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmmaster.c | 567 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmrecovery.c | 272 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmthread.c | 260 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmunlock.c | 26 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmver.c | 42 | ||||
| -rw-r--r-- | fs/ocfs2/dlm/dlmver.h | 31 |
14 files changed, 1433 insertions, 1024 deletions
diff --git a/fs/ocfs2/dlm/Makefile b/fs/ocfs2/dlm/Makefile index dcebf0d920f..bd1aab1f49a 100644 --- a/fs/ocfs2/dlm/Makefile +++ b/fs/ocfs2/dlm/Makefile @@ -1,7 +1,7 @@ -EXTRA_CFLAGS += -Ifs/ocfs2 +ccflags-y := -Ifs/ocfs2 obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \ - dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o dlmver.o + dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c index f4499915683..b46278f9ae4 100644 --- a/fs/ocfs2/dlm/dlmast.c +++ b/fs/ocfs2/dlm/dlmast.c @@ -90,19 +90,29 @@ static int dlm_should_cancel_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock) void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock) { - mlog_entry_void(); + struct dlm_lock_resource *res; BUG_ON(!dlm); BUG_ON(!lock); + res = lock->lockres; + assert_spin_locked(&dlm->ast_lock); + if (!list_empty(&lock->ast_list)) { - mlog(ML_ERROR, "ast list not empty!! pending=%d, newlevel=%d\n", + mlog(ML_ERROR, "%s: res %.*s, lock %u:%llu, " + "AST list not empty, pending %d, newlevel %d\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), lock->ast_pending, lock->ml.type); BUG(); } if (lock->ast_pending) - mlog(0, "lock has an ast getting flushed right now\n"); + mlog(0, "%s: res %.*s, lock %u:%llu, AST getting flushed\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie))); /* putting lock on list, add a ref */ dlm_lock_get(lock); @@ -110,9 +120,10 @@ void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock) /* check to see if this ast obsoletes the bast */ if (dlm_should_cancel_bast(dlm, lock)) { - struct dlm_lock_resource *res = lock->lockres; - mlog(0, "%s: cancelling bast for %.*s\n", - dlm->name, res->lockname.len, res->lockname.name); + mlog(0, "%s: res %.*s, lock %u:%llu, Cancelling BAST\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie))); lock->bast_pending = 0; list_del_init(&lock->bast_list); lock->ml.highest_blocked = LKM_IVMODE; @@ -134,8 +145,6 @@ void __dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock) void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock) { - mlog_entry_void(); - BUG_ON(!dlm); BUG_ON(!lock); @@ -147,15 +156,21 @@ void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock) void __dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock) { - mlog_entry_void(); + struct dlm_lock_resource *res; BUG_ON(!dlm); BUG_ON(!lock); + assert_spin_locked(&dlm->ast_lock); + res = lock->lockres; + BUG_ON(!list_empty(&lock->bast_list)); if (lock->bast_pending) - mlog(0, "lock has a bast getting flushed right now\n"); + mlog(0, "%s: res %.*s, lock %u:%llu, BAST getting flushed\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie))); /* putting lock on list, add a ref */ dlm_lock_get(lock); @@ -167,8 +182,6 @@ void __dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock) void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock) { - mlog_entry_void(); - BUG_ON(!dlm); BUG_ON(!lock); @@ -213,7 +226,10 @@ void dlm_do_local_ast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, dlm_astlockfunc_t *fn; struct dlm_lockstatus *lksb; - mlog_entry_void(); + mlog(0, "%s: res %.*s, lock %u:%llu, Local AST\n", dlm->name, + res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie))); lksb = lock->lksb; fn = lock->ast; @@ -231,7 +247,10 @@ int dlm_do_remote_ast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lockstatus *lksb; int lksbflags; - mlog_entry_void(); + mlog(0, "%s: res %.*s, lock %u:%llu, Remote AST\n", dlm->name, + res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie))); lksb = lock->lksb; BUG_ON(lock->ml.node == dlm->node_num); @@ -250,9 +269,14 @@ void dlm_do_local_bast(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, { dlm_bastlockfunc_t *fn = lock->bast; - mlog_entry_void(); BUG_ON(lock->ml.node != dlm->node_num); + mlog(0, "%s: res %.*s, lock %u:%llu, Local BAST, blocked %d\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), + blocked_type); + (*fn)(lock->astdata, blocked_type); } @@ -268,8 +292,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, struct dlm_lock *lock = NULL; struct dlm_proxy_ast *past = (struct dlm_proxy_ast *) msg->buf; char *name; - struct list_head *iter, *head=NULL; - u64 cookie; + struct list_head *head = NULL; + __be64 cookie; u32 flags; u8 node; @@ -332,7 +356,8 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, /* cannot get a proxy ast message if this node owns it */ BUG_ON(res->owner == dlm->node_num); - mlog(0, "lockres %.*s\n", res->lockname.len, res->lockname.name); + mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -348,8 +373,7 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, /* try convert queue for both ast/bast */ head = &res->converting; lock = NULL; - list_for_each(iter, head) { - lock = list_entry (iter, struct dlm_lock, list); + list_for_each_entry(lock, head, list) { if (lock->ml.cookie == cookie) goto do_ast; } @@ -360,8 +384,7 @@ int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data, else head = &res->granted; - list_for_each(iter, head) { - lock = list_entry (iter, struct dlm_lock, list); + list_for_each_entry(lock, head, list) { if (lock->ml.cookie == cookie) goto do_ast; } @@ -382,8 +405,12 @@ do_ast: if (past->type == DLM_AST) { /* do not alter lock refcount. switching lists. */ list_move_tail(&lock->list, &res->granted); - mlog(0, "ast: Adding to granted list... type=%d, " - "convert_type=%d\n", lock->ml.type, lock->ml.convert_type); + mlog(0, "%s: res %.*s, lock %u:%llu, Granted type %d => %d\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(cookie)), + lock->ml.type, lock->ml.convert_type); + if (lock->ml.convert_type != LKM_IVMODE) { lock->ml.type = lock->ml.convert_type; lock->ml.convert_type = LKM_IVMODE; @@ -426,9 +453,9 @@ int dlm_send_proxy_ast_msg(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, size_t veclen = 1; int status; - mlog_entry("res %.*s, to=%u, type=%d, blocked_type=%d\n", - res->lockname.len, res->lockname.name, lock->ml.node, - msg_type, blocked_type); + mlog(0, "%s: res %.*s, to %u, type %d, blocked_type %d\n", dlm->name, + res->lockname.len, res->lockname.name, lock->ml.node, msg_type, + blocked_type); memset(&past, 0, sizeof(struct dlm_proxy_ast)); past.node_idx = dlm->node_num; @@ -441,7 +468,6 @@ int dlm_send_proxy_ast_msg(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, vec[0].iov_len = sizeof(struct dlm_proxy_ast); vec[0].iov_base = &past; if (flags & DLM_LKSB_GET_LVB) { - mlog(0, "returning requested LVB data\n"); be32_add_cpu(&past.flags, LKM_GET_LVB); vec[1].iov_len = DLM_LVB_LEN; vec[1].iov_base = lock->lksb->lvb; @@ -451,8 +477,8 @@ int dlm_send_proxy_ast_msg(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ret = o2net_send_message_vec(DLM_PROXY_AST_MSG, dlm->key, vec, veclen, lock->ml.node, &status); if (ret < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", ret, DLM_PROXY_AST_MSG, dlm->key, + mlog(ML_ERROR, "%s: res %.*s, error %d send AST to node %u\n", + dlm->name, res->lockname.len, res->lockname.name, ret, lock->ml.node); else { if (status == DLM_RECOVERING) { diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h index 4b6ae2c13b4..fae17c640df 100644 --- a/fs/ocfs2/dlm/dlmcommon.h +++ b/fs/ocfs2/dlm/dlmcommon.h @@ -50,10 +50,10 @@ #define dlm_lockid_hash(_n, _l) full_name_hash(_n, _l) enum dlm_mle_type { - DLM_MLE_BLOCK, - DLM_MLE_MASTER, - DLM_MLE_MIGRATION, - DLM_MLE_NUM_TYPES + DLM_MLE_BLOCK = 0, + DLM_MLE_MASTER = 1, + DLM_MLE_MIGRATION = 2, + DLM_MLE_NUM_TYPES = 3, }; struct dlm_master_list_entry { @@ -82,8 +82,8 @@ struct dlm_master_list_entry { enum dlm_ast_type { DLM_AST = 0, - DLM_BAST, - DLM_ASTUNLOCK + DLM_BAST = 1, + DLM_ASTUNLOCK = 2, }; @@ -108,7 +108,6 @@ static inline int dlm_is_recovery_lock(const char *lock_name, int name_len) struct dlm_recovery_ctxt { struct list_head resources; - struct list_head received; struct list_head node_data; u8 new_master; u8 dead_node; @@ -119,9 +118,9 @@ struct dlm_recovery_ctxt enum dlm_ctxt_state { DLM_CTXT_NEW = 0, - DLM_CTXT_JOINED, - DLM_CTXT_IN_SHUTDOWN, - DLM_CTXT_LEAVING, + DLM_CTXT_JOINED = 1, + DLM_CTXT_IN_SHUTDOWN = 2, + DLM_CTXT_LEAVING = 3, }; struct dlm_ctxt @@ -144,6 +143,7 @@ struct dlm_ctxt wait_queue_head_t dlm_join_events; unsigned long live_nodes_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long domain_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; + unsigned long exit_domain_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long recovery_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; struct dlm_recovery_ctxt reco; spinlock_t master_lock; @@ -331,6 +331,7 @@ struct dlm_lock_resource u16 state; char lvb[DLM_LVB_LEN]; unsigned int inflight_locks; + unsigned int inflight_assert_workers; unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; }; @@ -388,8 +389,8 @@ struct dlm_lock enum dlm_lockres_list { DLM_GRANTED_LIST = 0, - DLM_CONVERTING_LIST, - DLM_BLOCKED_LIST + DLM_CONVERTING_LIST = 1, + DLM_BLOCKED_LIST = 2, }; static inline int dlm_lvb_is_empty(char *lvb) @@ -401,6 +402,18 @@ static inline int dlm_lvb_is_empty(char *lvb) return 1; } +static inline char *dlm_list_in_text(enum dlm_lockres_list idx) +{ + if (idx == DLM_GRANTED_LIST) + return "granted"; + else if (idx == DLM_CONVERTING_LIST) + return "converting"; + else if (idx == DLM_BLOCKED_LIST) + return "blocked"; + else + return "unknown"; +} + static inline struct list_head * dlm_list_idx_to_ptr(struct dlm_lock_resource *res, enum dlm_lockres_list idx) { @@ -427,25 +440,28 @@ struct dlm_node_iter enum { - DLM_MASTER_REQUEST_MSG = 500, - DLM_UNUSED_MSG1, /* 501 */ - DLM_ASSERT_MASTER_MSG, /* 502 */ - DLM_CREATE_LOCK_MSG, /* 503 */ - DLM_CONVERT_LOCK_MSG, /* 504 */ - DLM_PROXY_AST_MSG, /* 505 */ - DLM_UNLOCK_LOCK_MSG, /* 506 */ - DLM_DEREF_LOCKRES_MSG, /* 507 */ - DLM_MIGRATE_REQUEST_MSG, /* 508 */ - DLM_MIG_LOCKRES_MSG, /* 509 */ - DLM_QUERY_JOIN_MSG, /* 510 */ - DLM_ASSERT_JOINED_MSG, /* 511 */ - DLM_CANCEL_JOIN_MSG, /* 512 */ - DLM_EXIT_DOMAIN_MSG, /* 513 */ - DLM_MASTER_REQUERY_MSG, /* 514 */ - DLM_LOCK_REQUEST_MSG, /* 515 */ - DLM_RECO_DATA_DONE_MSG, /* 516 */ - DLM_BEGIN_RECO_MSG, /* 517 */ - DLM_FINALIZE_RECO_MSG /* 518 */ + DLM_MASTER_REQUEST_MSG = 500, + DLM_UNUSED_MSG1 = 501, + DLM_ASSERT_MASTER_MSG = 502, + DLM_CREATE_LOCK_MSG = 503, + DLM_CONVERT_LOCK_MSG = 504, + DLM_PROXY_AST_MSG = 505, + DLM_UNLOCK_LOCK_MSG = 506, + DLM_DEREF_LOCKRES_MSG = 507, + DLM_MIGRATE_REQUEST_MSG = 508, + DLM_MIG_LOCKRES_MSG = 509, + DLM_QUERY_JOIN_MSG = 510, + DLM_ASSERT_JOINED_MSG = 511, + DLM_CANCEL_JOIN_MSG = 512, + DLM_EXIT_DOMAIN_MSG = 513, + DLM_MASTER_REQUERY_MSG = 514, + DLM_LOCK_REQUEST_MSG = 515, + DLM_RECO_DATA_DONE_MSG = 516, + DLM_BEGIN_RECO_MSG = 517, + DLM_FINALIZE_RECO_MSG = 518, + DLM_QUERY_REGION = 519, + DLM_QUERY_NODEINFO = 520, + DLM_BEGIN_EXIT_DOMAIN_MSG = 521, }; struct dlm_reco_node_data @@ -458,19 +474,19 @@ struct dlm_reco_node_data enum { DLM_RECO_NODE_DATA_DEAD = -1, DLM_RECO_NODE_DATA_INIT = 0, - DLM_RECO_NODE_DATA_REQUESTING, - DLM_RECO_NODE_DATA_REQUESTED, - DLM_RECO_NODE_DATA_RECEIVING, - DLM_RECO_NODE_DATA_DONE, - DLM_RECO_NODE_DATA_FINALIZE_SENT, + DLM_RECO_NODE_DATA_REQUESTING = 1, + DLM_RECO_NODE_DATA_REQUESTED = 2, + DLM_RECO_NODE_DATA_RECEIVING = 3, + DLM_RECO_NODE_DATA_DONE = 4, + DLM_RECO_NODE_DATA_FINALIZE_SENT = 5, }; enum { DLM_MASTER_RESP_NO = 0, - DLM_MASTER_RESP_YES, - DLM_MASTER_RESP_MAYBE, - DLM_MASTER_RESP_ERROR + DLM_MASTER_RESP_YES = 1, + DLM_MASTER_RESP_MAYBE = 2, + DLM_MASTER_RESP_ERROR = 3, }; @@ -647,9 +663,9 @@ struct dlm_proxy_ast #define DLM_MOD_KEY (0x666c6172) enum dlm_query_join_response_code { JOIN_DISALLOW = 0, - JOIN_OK, - JOIN_OK_NO_MAP, - JOIN_PROTOCOL_MISMATCH, + JOIN_OK = 1, + JOIN_OK_NO_MAP = 2, + JOIN_PROTOCOL_MISMATCH = 3, }; struct dlm_query_join_packet { @@ -663,7 +679,7 @@ struct dlm_query_join_packet { }; union dlm_query_join_response { - u32 intval; + __be32 intval; struct dlm_query_join_packet packet; }; @@ -727,6 +743,31 @@ struct dlm_cancel_join u8 domain[O2NM_MAX_NAME_LEN]; }; +struct dlm_query_region { + u8 qr_node; + u8 qr_numregions; + u8 qr_namelen; + u8 pad1; + u8 qr_domain[O2NM_MAX_NAME_LEN]; + u8 qr_regions[O2HB_MAX_REGION_NAME_LEN * O2NM_MAX_REGIONS]; +}; + +struct dlm_node_info { + u8 ni_nodenum; + u8 pad1; + __be16 ni_ipv4_port; + __be32 ni_ipv4_address; +}; + +struct dlm_query_nodeinfo { + u8 qn_nodenum; + u8 qn_numnodes; + u8 qn_namelen; + u8 pad1; + u8 qn_domain[O2NM_MAX_NAME_LEN]; + struct dlm_node_info qn_nodes[O2NM_MAX_NODES]; +}; + struct dlm_exit_domain { u8 node_idx; @@ -818,8 +859,8 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node); -int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); -int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); +void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout); +void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout); void dlm_put(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); @@ -836,9 +877,8 @@ static inline void dlm_lockres_get(struct dlm_lock_resource *res) kref_get(&res->refs); } void dlm_lockres_put(struct dlm_lock_resource *res); -void __dlm_unhash_lockres(struct dlm_lock_resource *res); -void __dlm_insert_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res); +void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); +void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, const char *name, unsigned int len, @@ -861,46 +901,18 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int namelen); -#define dlm_lockres_set_refmap_bit(bit,res) \ - __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__) -#define dlm_lockres_clear_refmap_bit(bit,res) \ - __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__) +void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit); +void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit); -static inline void __dlm_lockres_set_refmap_bit(int bit, - struct dlm_lock_resource *res, - const char *file, - int line) -{ - //printk("%s:%d:%.*s: setting bit %d\n", file, line, - // res->lockname.len, res->lockname.name, bit); - set_bit(bit, res->refmap); -} - -static inline void __dlm_lockres_clear_refmap_bit(int bit, - struct dlm_lock_resource *res, - const char *file, - int line) -{ - //printk("%s:%d:%.*s: clearing bit %d\n", file, line, - // res->lockname.len, res->lockname.name, bit); - clear_bit(bit, res->refmap); -} +void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); +void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); -void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - const char *file, - int line); -void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - int new_lockres, - const char *file, - int line); -#define dlm_lockres_drop_inflight_ref(d,r) \ - __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__) -#define dlm_lockres_grab_inflight_ref(d,r) \ - __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__) -#define dlm_lockres_grab_inflight_ref_new(d,r) \ - __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__) +void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res); void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock); void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock); @@ -1030,6 +1042,7 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res); void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node); +void dlm_force_free_mles(struct dlm_ctxt *dlm); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock); int __dlm_lockres_has_locks(struct dlm_lock_resource *res); int __dlm_lockres_unused(struct dlm_lock_resource *res); @@ -1069,11 +1082,9 @@ static inline int dlm_lock_compatible(int existing, int request) static inline int dlm_lock_on_list(struct list_head *head, struct dlm_lock *lock) { - struct list_head *iter; struct dlm_lock *tmplock; - list_for_each(iter, head) { - tmplock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(tmplock, head, list) { if (tmplock == lock) return 1; } diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c index 9f30491e5e8..e36d63ff178 100644 --- a/fs/ocfs2/dlm/dlmconvert.c +++ b/fs/ocfs2/dlm/dlmconvert.c @@ -123,13 +123,12 @@ static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm, int *kick_thread) { enum dlm_status status = DLM_NORMAL; - struct list_head *iter; struct dlm_lock *tmplock=NULL; assert_spin_locked(&res->spinlock); - mlog_entry("type=%d, convert_type=%d, new convert_type=%d\n", - lock->ml.type, lock->ml.convert_type, type); + mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n", + lock->ml.type, lock->ml.convert_type, type); spin_lock(&lock->spinlock); @@ -185,16 +184,14 @@ static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm, /* upconvert from here on */ status = DLM_NORMAL; - list_for_each(iter, &res->granted) { - tmplock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(tmplock, &res->granted, list) { if (tmplock == lock) continue; if (!dlm_lock_compatible(tmplock->ml.type, type)) goto switch_queues; } - list_for_each(iter, &res->converting) { - tmplock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(tmplock, &res->converting, list) { if (!dlm_lock_compatible(tmplock->ml.type, type)) goto switch_queues; /* existing conversion requests take precedence */ @@ -353,7 +350,7 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm, struct kvec vec[2]; size_t veclen = 1; - mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); + mlog(0, "%.*s\n", res->lockname.len, res->lockname.name); memset(&convert, 0, sizeof(struct dlm_convert_lock)); convert.node_idx = dlm->node_num; @@ -424,8 +421,8 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data, struct dlm_ctxt *dlm = data; struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf; struct dlm_lock_resource *res = NULL; - struct list_head *iter; struct dlm_lock *lock = NULL; + struct dlm_lock *tmp_lock; struct dlm_lockstatus *lksb; enum dlm_status status = DLM_NORMAL; u32 flags; @@ -471,14 +468,13 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data, dlm_error(status); goto leave; } - list_for_each(iter, &res->granted) { - lock = list_entry(iter, struct dlm_lock, list); - if (lock->ml.cookie == cnv->cookie && - lock->ml.node == cnv->node_idx) { + list_for_each_entry(tmp_lock, &res->granted, list) { + if (tmp_lock->ml.cookie == cnv->cookie && + tmp_lock->ml.node == cnv->node_idx) { + lock = tmp_lock; dlm_lock_get(lock); break; } - lock = NULL; } spin_unlock(&res->spinlock); if (!lock) { diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c index 0cd24cf5439..18f13c2e4a1 100644 --- a/fs/ocfs2/dlm/dlmdebug.c +++ b/fs/ocfs2/dlm/dlmdebug.c @@ -30,6 +30,7 @@ #include <linux/sysctl.h> #include <linux/spinlock.h> #include <linux/debugfs.h> +#include <linux/export.h> #include "cluster/heartbeat.h" #include "cluster/nodemanager.h" @@ -95,7 +96,6 @@ static void __dlm_print_lock(struct dlm_lock *lock) void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) { - struct list_head *iter2; struct dlm_lock *lock; char buf[DLM_LOCKID_NAME_MAX]; @@ -117,18 +117,15 @@ void __dlm_print_one_lock_resource(struct dlm_lock_resource *res) res->inflight_locks, atomic_read(&res->asts_reserved)); dlm_print_lockres_refmap(res); printk(" granted queue:\n"); - list_for_each(iter2, &res->granted) { - lock = list_entry(iter2, struct dlm_lock, list); + list_for_each_entry(lock, &res->granted, list) { __dlm_print_lock(lock); } printk(" converting queue:\n"); - list_for_each(iter2, &res->converting) { - lock = list_entry(iter2, struct dlm_lock, list); + list_for_each_entry(lock, &res->converting, list) { __dlm_print_lock(lock); } printk(" blocked queue:\n"); - list_for_each(iter2, &res->blocked) { - lock = list_entry(iter2, struct dlm_lock, list); + list_for_each_entry(lock, &res->blocked, list) { __dlm_print_lock(lock); } } @@ -341,7 +338,7 @@ void dlm_print_one_mle(struct dlm_master_list_entry *mle) #ifdef CONFIG_DEBUG_FS -static struct dentry *dlm_debugfs_root = NULL; +static struct dentry *dlm_debugfs_root; #define DLM_DEBUGFS_DIR "o2dlm" #define DLM_DEBUGFS_DLM_STATE "dlm_state" @@ -370,92 +367,46 @@ static void dlm_debug_get(struct dlm_debug_ctxt *dc) kref_get(&dc->debug_refcnt); } -static struct debug_buffer *debug_buffer_allocate(void) +static int debug_release(struct inode *inode, struct file *file) { - struct debug_buffer *db = NULL; - - db = kzalloc(sizeof(struct debug_buffer), GFP_KERNEL); - if (!db) - goto bail; - - db->len = PAGE_SIZE; - db->buf = kmalloc(db->len, GFP_KERNEL); - if (!db->buf) - goto bail; - - return db; -bail: - kfree(db); - return NULL; -} - -static ssize_t debug_buffer_read(struct file *file, char __user *buf, - size_t nbytes, loff_t *ppos) -{ - struct debug_buffer *db = file->private_data; - - return simple_read_from_buffer(buf, nbytes, ppos, db->buf, db->len); -} - -static loff_t debug_buffer_llseek(struct file *file, loff_t off, int whence) -{ - struct debug_buffer *db = file->private_data; - loff_t new = -1; - - switch (whence) { - case 0: - new = off; - break; - case 1: - new = file->f_pos + off; - break; - } - - if (new < 0 || new > db->len) - return -EINVAL; - - return (file->f_pos = new); + free_page((unsigned long)file->private_data); + return 0; } -static int debug_buffer_release(struct inode *inode, struct file *file) +static ssize_t debug_read(struct file *file, char __user *buf, + size_t nbytes, loff_t *ppos) { - struct debug_buffer *db = (struct debug_buffer *)file->private_data; - - if (db) - kfree(db->buf); - kfree(db); - - return 0; + return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, + i_size_read(file->f_mapping->host)); } /* end - util funcs */ /* begin - purge list funcs */ -static int debug_purgelist_print(struct dlm_ctxt *dlm, struct debug_buffer *db) +static int debug_purgelist_print(struct dlm_ctxt *dlm, char *buf, int len) { struct dlm_lock_resource *res; int out = 0; unsigned long total = 0; - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Dumping Purgelist for Domain: %s\n", dlm->name); spin_lock(&dlm->spinlock); list_for_each_entry(res, &dlm->purge_list, purge) { ++total; - if (db->len - out < 100) + if (len - out < 100) continue; spin_lock(&res->spinlock); out += stringify_lockname(res->lockname.name, res->lockname.len, - db->buf + out, db->len - out); - out += snprintf(db->buf + out, db->len - out, "\t%ld\n", + buf + out, len - out); + out += snprintf(buf + out, len - out, "\t%ld\n", (jiffies - res->last_used)/HZ); spin_unlock(&res->spinlock); } spin_unlock(&dlm->spinlock); - out += snprintf(db->buf + out, db->len - out, - "Total on list: %ld\n", total); + out += snprintf(buf + out, len - out, "Total on list: %ld\n", total); return out; } @@ -463,15 +414,15 @@ static int debug_purgelist_print(struct dlm_ctxt *dlm, struct debug_buffer *db) static int debug_purgelist_open(struct inode *inode, struct file *file) { struct dlm_ctxt *dlm = inode->i_private; - struct debug_buffer *db; + char *buf = NULL; - db = debug_buffer_allocate(); - if (!db) + buf = (char *) get_zeroed_page(GFP_NOFS); + if (!buf) goto bail; - db->len = debug_purgelist_print(dlm, db); + i_size_write(inode, debug_purgelist_print(dlm, buf, PAGE_SIZE - 1)); - file->private_data = db; + file->private_data = buf; return 0; bail: @@ -480,42 +431,39 @@ bail: static const struct file_operations debug_purgelist_fops = { .open = debug_purgelist_open, - .release = debug_buffer_release, - .read = debug_buffer_read, - .llseek = debug_buffer_llseek, + .release = debug_release, + .read = debug_read, + .llseek = generic_file_llseek, }; /* end - purge list funcs */ /* begin - debug mle funcs */ -static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) +static int debug_mle_print(struct dlm_ctxt *dlm, char *buf, int len) { struct dlm_master_list_entry *mle; struct hlist_head *bucket; - struct hlist_node *list; int i, out = 0; - unsigned long total = 0, longest = 0, bktcnt; + unsigned long total = 0, longest = 0, bucket_count = 0; - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Dumping MLEs for Domain: %s\n", dlm->name); spin_lock(&dlm->master_lock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_master_hash(dlm, i); - hlist_for_each(list, bucket) { - mle = hlist_entry(list, struct dlm_master_list_entry, - master_hash_node); + hlist_for_each_entry(mle, bucket, master_hash_node) { ++total; - ++bktcnt; - if (db->len - out < 200) + ++bucket_count; + if (len - out < 200) continue; - out += dump_mle(mle, db->buf + out, db->len - out); + out += dump_mle(mle, buf + out, len - out); } - longest = max(longest, bktcnt); - bktcnt = 0; + longest = max(longest, bucket_count); + bucket_count = 0; } spin_unlock(&dlm->master_lock); - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Total: %ld, Longest: %ld\n", total, longest); return out; } @@ -523,15 +471,15 @@ static int debug_mle_print(struct dlm_ctxt *dlm, struct debug_buffer *db) static int debug_mle_open(struct inode *inode, struct file *file) { struct dlm_ctxt *dlm = inode->i_private; - struct debug_buffer *db; + char *buf = NULL; - db = debug_buffer_allocate(); - if (!db) + buf = (char *) get_zeroed_page(GFP_NOFS); + if (!buf) goto bail; - db->len = debug_mle_print(dlm, db); + i_size_write(inode, debug_mle_print(dlm, buf, PAGE_SIZE - 1)); - file->private_data = db; + file->private_data = buf; return 0; bail: @@ -540,9 +488,9 @@ bail: static const struct file_operations debug_mle_fops = { .open = debug_mle_open, - .release = debug_buffer_release, - .read = debug_buffer_read, - .llseek = debug_buffer_llseek, + .release = debug_release, + .read = debug_read, + .llseek = generic_file_llseek, }; /* end - debug mle funcs */ @@ -636,8 +584,14 @@ static void *lockres_seq_start(struct seq_file *m, loff_t *pos) spin_lock(&dlm->track_lock); if (oldres) track_list = &oldres->tracking; - else + else { track_list = &dlm->tracking_list; + if (list_empty(track_list)) { + dl = NULL; + spin_unlock(&dlm->track_lock); + goto bail; + } + } list_for_each_entry(res, track_list, tracking) { if (&res->tracking == &dlm->tracking_list) @@ -660,6 +614,7 @@ static void *lockres_seq_start(struct seq_file *m, loff_t *pos) } else dl = NULL; +bail: /* passed to seq_show */ return dl; } @@ -715,7 +670,7 @@ static int debug_lockres_open(struct inode *inode, struct file *file) goto bail; } - seq = (struct seq_file *) file->private_data; + seq = file->private_data; seq->private = dl; dlm_grab(dlm); @@ -731,7 +686,7 @@ bail: static int debug_lockres_release(struct inode *inode, struct file *file) { - struct seq_file *seq = (struct seq_file *)file->private_data; + struct seq_file *seq = file->private_data; struct debug_lockres *dl = (struct debug_lockres *)seq->private; if (dl->dl_res) @@ -750,7 +705,7 @@ static const struct file_operations debug_lockres_fops = { /* end - debug lockres funcs */ /* begin - debug state funcs */ -static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) +static int debug_state_print(struct dlm_ctxt *dlm, char *buf, int len) { int out = 0; struct dlm_reco_node_data *node; @@ -774,33 +729,41 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) } /* Domain: xxxxxxxxxx Key: 0xdfbac769 */ - out += snprintf(db->buf + out, db->len - out, - "Domain: %s Key: 0x%08x\n", dlm->name, dlm->key); + out += snprintf(buf + out, len - out, + "Domain: %s Key: 0x%08x Protocol: %d.%d\n", + dlm->name, dlm->key, dlm->dlm_locking_proto.pv_major, + dlm->dlm_locking_proto.pv_minor); /* Thread Pid: xxx Node: xxx State: xxxxx */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Thread Pid: %d Node: %d State: %s\n", - dlm->dlm_thread_task->pid, dlm->node_num, state); + task_pid_nr(dlm->dlm_thread_task), dlm->node_num, state); /* Number of Joins: xxx Joining Node: xxx */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Number of Joins: %d Joining Node: %d\n", dlm->num_joins, dlm->joining_node); /* Domain Map: xx xx xx */ - out += snprintf(db->buf + out, db->len - out, "Domain Map: "); + out += snprintf(buf + out, len - out, "Domain Map: "); out += stringify_nodemap(dlm->domain_map, O2NM_MAX_NODES, - db->buf + out, db->len - out); - out += snprintf(db->buf + out, db->len - out, "\n"); + buf + out, len - out); + out += snprintf(buf + out, len - out, "\n"); + + /* Exit Domain Map: xx xx xx */ + out += snprintf(buf + out, len - out, "Exit Domain Map: "); + out += stringify_nodemap(dlm->exit_domain_map, O2NM_MAX_NODES, + buf + out, len - out); + out += snprintf(buf + out, len - out, "\n"); /* Live Map: xx xx xx */ - out += snprintf(db->buf + out, db->len - out, "Live Map: "); + out += snprintf(buf + out, len - out, "Live Map: "); out += stringify_nodemap(dlm->live_nodes_map, O2NM_MAX_NODES, - db->buf + out, db->len - out); - out += snprintf(db->buf + out, db->len - out, "\n"); + buf + out, len - out); + out += snprintf(buf + out, len - out, "\n"); /* Lock Resources: xxx (xxx) */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Lock Resources: %d (%d)\n", atomic_read(&dlm->res_cur_count), atomic_read(&dlm->res_tot_count)); @@ -812,29 +775,29 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) cur_mles += atomic_read(&dlm->mle_cur_count[i]); /* MLEs: xxx (xxx) */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "MLEs: %d (%d)\n", cur_mles, tot_mles); /* Blocking: xxx (xxx) */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, " Blocking: %d (%d)\n", atomic_read(&dlm->mle_cur_count[DLM_MLE_BLOCK]), atomic_read(&dlm->mle_tot_count[DLM_MLE_BLOCK])); /* Mastery: xxx (xxx) */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, " Mastery: %d (%d)\n", atomic_read(&dlm->mle_cur_count[DLM_MLE_MASTER]), atomic_read(&dlm->mle_tot_count[DLM_MLE_MASTER])); /* Migration: xxx (xxx) */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, " Migration: %d (%d)\n", atomic_read(&dlm->mle_cur_count[DLM_MLE_MIGRATION]), atomic_read(&dlm->mle_tot_count[DLM_MLE_MIGRATION])); /* Lists: Dirty=Empty Purge=InUse PendingASTs=Empty ... */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Lists: Dirty=%s Purge=%s PendingASTs=%s " "PendingBASTs=%s\n", (list_empty(&dlm->dirty_list) ? "Empty" : "InUse"), @@ -843,12 +806,12 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) (list_empty(&dlm->pending_basts) ? "Empty" : "InUse")); /* Purge Count: xxx Refs: xxx */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Purge Count: %d Refs: %d\n", dlm->purge_count, atomic_read(&dlm->dlm_refs.refcount)); /* Dead Node: xxx */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Dead Node: %d\n", dlm->reco.dead_node); /* What about DLM_RECO_STATE_FINALIZE? */ @@ -858,19 +821,19 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) state = "INACTIVE"; /* Recovery Pid: xxxx Master: xxx State: xxxx */ - out += snprintf(db->buf + out, db->len - out, + out += snprintf(buf + out, len - out, "Recovery Pid: %d Master: %d State: %s\n", - dlm->dlm_reco_thread_task->pid, + task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, state); /* Recovery Map: xx xx */ - out += snprintf(db->buf + out, db->len - out, "Recovery Map: "); + out += snprintf(buf + out, len - out, "Recovery Map: "); out += stringify_nodemap(dlm->recovery_map, O2NM_MAX_NODES, - db->buf + out, db->len - out); - out += snprintf(db->buf + out, db->len - out, "\n"); + buf + out, len - out); + out += snprintf(buf + out, len - out, "\n"); /* Recovery Node State: */ - out += snprintf(db->buf + out, db->len - out, "Recovery Node State:\n"); + out += snprintf(buf + out, len - out, "Recovery Node State:\n"); list_for_each_entry(node, &dlm->reco.node_data, list) { switch (node->state) { case DLM_RECO_NODE_DATA_INIT: @@ -898,7 +861,7 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) state = "BAD"; break; } - out += snprintf(db->buf + out, db->len - out, "\t%u - %s\n", + out += snprintf(buf + out, len - out, "\t%u - %s\n", node->node_num, state); } @@ -910,15 +873,15 @@ static int debug_state_print(struct dlm_ctxt *dlm, struct debug_buffer *db) static int debug_state_open(struct inode *inode, struct file *file) { struct dlm_ctxt *dlm = inode->i_private; - struct debug_buffer *db = NULL; + char *buf = NULL; - db = debug_buffer_allocate(); - if (!db) + buf = (char *) get_zeroed_page(GFP_NOFS); + if (!buf) goto bail; - db->len = debug_state_print(dlm, db); + i_size_write(inode, debug_state_print(dlm, buf, PAGE_SIZE - 1)); - file->private_data = db; + file->private_data = buf; return 0; bail: @@ -927,9 +890,9 @@ bail: static const struct file_operations debug_state_fops = { .open = debug_state_open, - .release = debug_buffer_release, - .read = debug_buffer_read, - .llseek = debug_buffer_llseek, + .release = debug_release, + .read = debug_read, + .llseek = generic_file_llseek, }; /* end - debug state funcs */ @@ -993,14 +956,10 @@ void dlm_debug_shutdown(struct dlm_ctxt *dlm) struct dlm_debug_ctxt *dc = dlm->dlm_debug_ctxt; if (dc) { - if (dc->debug_purgelist_dentry) - debugfs_remove(dc->debug_purgelist_dentry); - if (dc->debug_mle_dentry) - debugfs_remove(dc->debug_mle_dentry); - if (dc->debug_lockres_dentry) - debugfs_remove(dc->debug_lockres_dentry); - if (dc->debug_state_dentry) - debugfs_remove(dc->debug_state_dentry); + debugfs_remove(dc->debug_purgelist_dentry); + debugfs_remove(dc->debug_mle_dentry); + debugfs_remove(dc->debug_lockres_dentry); + debugfs_remove(dc->debug_state_dentry); dlm_debug_put(dc); } } @@ -1031,8 +990,7 @@ bail: void dlm_destroy_debugfs_subroot(struct dlm_ctxt *dlm) { - if (dlm->dlm_debugfs_subroot) - debugfs_remove(dlm->dlm_debugfs_subroot); + debugfs_remove(dlm->dlm_debugfs_subroot); } /* debugfs root */ @@ -1048,7 +1006,6 @@ int dlm_create_debugfs_root(void) void dlm_destroy_debugfs_root(void) { - if (dlm_debugfs_root) - debugfs_remove(dlm_debugfs_root); + debugfs_remove(dlm_debugfs_root); } #endif /* CONFIG_DEBUG_FS */ diff --git a/fs/ocfs2/dlm/dlmdebug.h b/fs/ocfs2/dlm/dlmdebug.h index 8c686d22f9c..1f27c4812d1 100644 --- a/fs/ocfs2/dlm/dlmdebug.h +++ b/fs/ocfs2/dlm/dlmdebug.h @@ -37,11 +37,6 @@ struct dlm_debug_ctxt { struct dentry *debug_purgelist_dentry; }; -struct debug_buffer { - int len; - char *buf; -}; - struct debug_lockres { int dl_len; char *dl_buf; diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c index 153abb5abef..39efc5057a3 100644 --- a/fs/ocfs2/dlm/dlmdomain.c +++ b/fs/ocfs2/dlm/dlmdomain.c @@ -43,8 +43,6 @@ #include "dlmdomain.h" #include "dlmdebug.h" -#include "dlmver.h" - #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN) #include "cluster/masklog.h" @@ -128,10 +126,16 @@ static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events); * will have a negotiated version with the same major number and a minor * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should * be used to determine what a running domain is actually using. + * + * New in version 1.1: + * - Message DLM_QUERY_REGION added to support global heartbeat + * - Message DLM_QUERY_NODEINFO added to allow online node removes + * New in version 1.2: + * - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain */ static const struct dlm_protocol_version dlm_protocol = { .pv_major = 1, - .pv_minor = 0, + .pv_minor = 2, }; #define DLM_DOMAIN_BACKOFF_MS 200 @@ -142,6 +146,8 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); +static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, + void *data, void **ret_data); static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data); static int dlm_protocol_compare(struct dlm_protocol_version *existing, @@ -149,16 +155,18 @@ static int dlm_protocol_compare(struct dlm_protocol_version *existing, static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm); -void __dlm_unhash_lockres(struct dlm_lock_resource *lockres) +void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - if (!hlist_unhashed(&lockres->hash_node)) { - hlist_del_init(&lockres->hash_node); - dlm_lockres_put(lockres); - } + if (hlist_unhashed(&res->hash_node)) + return; + + mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); + hlist_del_init(&res->hash_node); + dlm_lockres_put(res); } -void __dlm_insert_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res) +void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { struct hlist_head *bucket; struct qstr *q; @@ -172,6 +180,9 @@ void __dlm_insert_lockres(struct dlm_ctxt *dlm, dlm_lockres_get(res); hlist_add_head(&res->hash_node, bucket); + + mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); } struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, @@ -180,17 +191,15 @@ struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm, unsigned int hash) { struct hlist_head *bucket; - struct hlist_node *list; + struct dlm_lock_resource *res; - mlog_entry("%.*s\n", len, name); + mlog(0, "%.*s\n", len, name); assert_spin_locked(&dlm->spinlock); bucket = dlm_lockres_hash(dlm, hash); - hlist_for_each(list, bucket) { - struct dlm_lock_resource *res = hlist_entry(list, - struct dlm_lock_resource, hash_node); + hlist_for_each_entry(res, bucket, hash_node) { if (res->lockname.name[0] != name[0]) continue; if (unlikely(res->lockname.len != len)) @@ -216,7 +225,7 @@ struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res = NULL; - mlog_entry("%.*s\n", len, name); + mlog(0, "%.*s\n", len, name); assert_spin_locked(&dlm->spinlock); @@ -249,22 +258,19 @@ struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm, static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len) { - struct dlm_ctxt *tmp = NULL; - struct list_head *iter; + struct dlm_ctxt *tmp; assert_spin_locked(&dlm_domain_lock); /* tmp->name here is always NULL terminated, * but domain may not be! */ - list_for_each(iter, &dlm_domains) { - tmp = list_entry (iter, struct dlm_ctxt, list); + list_for_each_entry(tmp, &dlm_domains, list) { if (strlen(tmp->name) == len && memcmp(tmp->name, domain, len)==0) - break; - tmp = NULL; + return tmp; } - return tmp; + return NULL; } /* For null terminated domain strings ONLY */ @@ -306,9 +312,7 @@ static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm) if (dlm->master_hash) dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES); - if (dlm->name) - kfree(dlm->name); - + kfree(dlm->name); kfree(dlm); } @@ -355,25 +359,22 @@ static void __dlm_get(struct dlm_ctxt *dlm) * you shouldn't trust your pointer. */ struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm) { - struct list_head *iter; - struct dlm_ctxt *target = NULL; + struct dlm_ctxt *target; + struct dlm_ctxt *ret = NULL; spin_lock(&dlm_domain_lock); - list_for_each(iter, &dlm_domains) { - target = list_entry (iter, struct dlm_ctxt, list); - + list_for_each_entry(target, &dlm_domains, list) { if (target == dlm) { __dlm_get(target); + ret = target; break; } - - target = NULL; } spin_unlock(&dlm_domain_lock); - return target; + return ret; } int dlm_domain_fully_joined(struct dlm_ctxt *dlm) @@ -443,19 +444,21 @@ redo_bucket: dropped = dlm_empty_lockres(dlm, res); spin_lock(&res->spinlock); - __dlm_lockres_calc_usage(dlm, res); - iter = res->hash_node.next; + if (dropped) + __dlm_lockres_calc_usage(dlm, res); + else + iter = res->hash_node.next; spin_unlock(&res->spinlock); dlm_lockres_put(res); - if (dropped) + if (dropped) { + cond_resched_lock(&dlm->spinlock); goto redo_bucket; + } } cond_resched_lock(&dlm->spinlock); num += n; - mlog(0, "%s: touched %d lockreses in bucket %d " - "(tot=%d)\n", dlm->name, n, i, num); } spin_unlock(&dlm->spinlock); wake_up(&dlm->dlm_thread_wq); @@ -482,6 +485,28 @@ static int dlm_no_joining_node(struct dlm_ctxt *dlm) return ret; } +static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len, + void *data, void **ret_data) +{ + struct dlm_ctxt *dlm = data; + unsigned int node; + struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; + + if (!dlm_grab(dlm)) + return 0; + + node = exit_msg->node_idx; + mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node); + + spin_lock(&dlm->spinlock); + set_bit(node, dlm->exit_domain_map); + spin_unlock(&dlm->spinlock); + + dlm_put(dlm); + + return 0; +} + static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm) { /* Yikes, a double spinlock! I need domain_lock for the dlm @@ -507,17 +532,17 @@ again: static void __dlm_print_nodes(struct dlm_ctxt *dlm) { - int node = -1; + int node = -1, num = 0; assert_spin_locked(&dlm->spinlock); - printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name); - + printk("( "); while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1)) < O2NM_MAX_NODES) { printk("%d ", node); + ++num; } - printk("\n"); + printk(") %u nodes\n", num); } static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, @@ -527,17 +552,17 @@ static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, unsigned int node; struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf; - mlog_entry("%p %u %p", msg, len, data); + mlog(0, "%p %u %p", msg, len, data); if (!dlm_grab(dlm)) return 0; node = exit_msg->node_idx; - printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name); - spin_lock(&dlm->spinlock); clear_bit(node, dlm->domain_map); + clear_bit(node, dlm->exit_domain_map); + printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name); __dlm_print_nodes(dlm); /* notify anything attached to the heartbeat events */ @@ -550,29 +575,56 @@ static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data, return 0; } -static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, +static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type, unsigned int node) { int status; struct dlm_exit_domain leave_msg; - mlog(0, "Asking node %u if we can leave the domain %s me = %u\n", - node, dlm->name, dlm->node_num); + mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name, + msg_type, node); memset(&leave_msg, 0, sizeof(leave_msg)); leave_msg.node_idx = dlm->node_num; - status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key, - &leave_msg, sizeof(leave_msg), node, - NULL); + status = o2net_send_message(msg_type, dlm->key, &leave_msg, + sizeof(leave_msg), node, NULL); if (status < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node); - mlog(0, "status return %d from o2net_send_message\n", status); + mlog(ML_ERROR, "Error %d sending domain exit message %u " + "to node %u on domain %s\n", status, msg_type, node, + dlm->name); return status; } +static void dlm_begin_exit_domain(struct dlm_ctxt *dlm) +{ + int node = -1; + + /* Support for begin exit domain was added in 1.2 */ + if (dlm->dlm_locking_proto.pv_major == 1 && + dlm->dlm_locking_proto.pv_minor < 2) + return; + + /* + * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely + * informational. Meaning if a node does not receive the message, + * so be it. + */ + spin_lock(&dlm->spinlock); + while (1) { + node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1); + if (node >= O2NM_MAX_NODES) + break; + if (node == dlm->node_num) + continue; + + spin_unlock(&dlm->spinlock); + dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node); + spin_lock(&dlm->spinlock); + } + spin_unlock(&dlm->spinlock); +} static void dlm_leave_domain(struct dlm_ctxt *dlm) { @@ -598,7 +650,8 @@ static void dlm_leave_domain(struct dlm_ctxt *dlm) clear_node = 1; - status = dlm_send_one_domain_exit(dlm, node); + status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG, + node); if (status < 0 && status != -ENOPROTOOPT && status != -ENOTCONN) { @@ -673,6 +726,7 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) if (leave) { mlog(0, "shutting down domain %s\n", dlm->name); + dlm_begin_exit_domain(dlm); /* We changed dlm state, notify the thread */ dlm_kick_thread(dlm, NULL); @@ -693,6 +747,8 @@ void dlm_unregister_domain(struct dlm_ctxt *dlm) dlm_mark_domain_leaving(dlm); dlm_leave_domain(dlm); + printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name); + dlm_force_free_mles(dlm); dlm_complete_dlm_shutdown(dlm); } dlm_put(dlm); @@ -750,7 +806,7 @@ static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet, union dlm_query_join_response response; response.packet = *packet; - *wire = cpu_to_be32(response.intval); + *wire = be32_to_cpu(response.intval); } static void dlm_query_join_wire_to_packet(u32 wire, @@ -903,10 +959,19 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, * domain. Set him in the map and clean up our * leftover join state. */ BUG_ON(dlm->joining_node != assert->node_idx); + + if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) { + mlog(0, "dlm recovery is ongoing, disallow join\n"); + spin_unlock(&dlm->spinlock); + spin_unlock(&dlm_domain_lock); + return -EAGAIN; + } + set_bit(assert->node_idx, dlm->domain_map); + clear_bit(assert->node_idx, dlm->exit_domain_map); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); - printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n", + printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ", assert->node_idx, dlm->name); __dlm_print_nodes(dlm); @@ -920,6 +985,371 @@ static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data, return 0; } +static int dlm_match_regions(struct dlm_ctxt *dlm, + struct dlm_query_region *qr, + char *local, int locallen) +{ + char *remote = qr->qr_regions; + char *l, *r; + int localnr, i, j, foundit; + int status = 0; + + if (!o2hb_global_heartbeat_active()) { + if (qr->qr_numregions) { + mlog(ML_ERROR, "Domain %s: Joining node %d has global " + "heartbeat enabled but local node %d does not\n", + qr->qr_domain, qr->qr_node, dlm->node_num); + status = -EINVAL; + } + goto bail; + } + + if (o2hb_global_heartbeat_active() && !qr->qr_numregions) { + mlog(ML_ERROR, "Domain %s: Local node %d has global " + "heartbeat enabled but joining node %d does not\n", + qr->qr_domain, dlm->node_num, qr->qr_node); + status = -EINVAL; + goto bail; + } + + r = remote; + for (i = 0; i < qr->qr_numregions; ++i) { + mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r); + r += O2HB_MAX_REGION_NAME_LEN; + } + + localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN); + localnr = o2hb_get_all_regions(local, (u8)localnr); + + /* compare local regions with remote */ + l = local; + for (i = 0; i < localnr; ++i) { + foundit = 0; + r = remote; + for (j = 0; j <= qr->qr_numregions; ++j) { + if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) { + foundit = 1; + break; + } + r += O2HB_MAX_REGION_NAME_LEN; + } + if (!foundit) { + status = -EINVAL; + mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " + "in local node %d but not in joining node %d\n", + qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l, + dlm->node_num, qr->qr_node); + goto bail; + } + l += O2HB_MAX_REGION_NAME_LEN; + } + + /* compare remote with local regions */ + r = remote; + for (i = 0; i < qr->qr_numregions; ++i) { + foundit = 0; + l = local; + for (j = 0; j < localnr; ++j) { + if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) { + foundit = 1; + break; + } + l += O2HB_MAX_REGION_NAME_LEN; + } + if (!foundit) { + status = -EINVAL; + mlog(ML_ERROR, "Domain %s: Region '%.*s' registered " + "in joining node %d but not in local node %d\n", + qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r, + qr->qr_node, dlm->node_num); + goto bail; + } + r += O2HB_MAX_REGION_NAME_LEN; + } + +bail: + return status; +} + +static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map) +{ + struct dlm_query_region *qr = NULL; + int status, ret = 0, i; + char *p; + + if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES) + goto bail; + + qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL); + if (!qr) { + ret = -ENOMEM; + mlog_errno(ret); + goto bail; + } + + qr->qr_node = dlm->node_num; + qr->qr_namelen = strlen(dlm->name); + memcpy(qr->qr_domain, dlm->name, qr->qr_namelen); + /* if local hb, the numregions will be zero */ + if (o2hb_global_heartbeat_active()) + qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions, + O2NM_MAX_REGIONS); + + p = qr->qr_regions; + for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN) + mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p); + + i = -1; + while ((i = find_next_bit(node_map, O2NM_MAX_NODES, + i + 1)) < O2NM_MAX_NODES) { + if (i == dlm->node_num) + continue; + + mlog(0, "Sending regions to node %d\n", i); + + ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr, + sizeof(struct dlm_query_region), + i, &status); + if (ret >= 0) + ret = status; + if (ret) { + mlog(ML_ERROR, "Region mismatch %d, node %d\n", + ret, i); + break; + } + } + +bail: + kfree(qr); + return ret; +} + +static int dlm_query_region_handler(struct o2net_msg *msg, u32 len, + void *data, void **ret_data) +{ + struct dlm_query_region *qr; + struct dlm_ctxt *dlm = NULL; + char *local = NULL; + int status = 0; + + qr = (struct dlm_query_region *) msg->buf; + + mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node, + qr->qr_domain); + + /* buffer used in dlm_mast_regions() */ + local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL); + if (!local) + return -ENOMEM; + + status = -EINVAL; + + spin_lock(&dlm_domain_lock); + dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen); + if (!dlm) { + mlog(ML_ERROR, "Node %d queried hb regions on domain %s " + "before join domain\n", qr->qr_node, qr->qr_domain); + goto out_domain_lock; + } + + spin_lock(&dlm->spinlock); + if (dlm->joining_node != qr->qr_node) { + mlog(ML_ERROR, "Node %d queried hb regions on domain %s " + "but joining node is %d\n", qr->qr_node, qr->qr_domain, + dlm->joining_node); + goto out_dlm_lock; + } + + /* Support for global heartbeat was added in 1.1 */ + if (dlm->dlm_locking_proto.pv_major == 1 && + dlm->dlm_locking_proto.pv_minor == 0) { + mlog(ML_ERROR, "Node %d queried hb regions on domain %s " + "but active dlm protocol is %d.%d\n", qr->qr_node, + qr->qr_domain, dlm->dlm_locking_proto.pv_major, + dlm->dlm_locking_proto.pv_minor); + goto out_dlm_lock; + } + + status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions)); + +out_dlm_lock: + spin_unlock(&dlm->spinlock); + +out_domain_lock: + spin_unlock(&dlm_domain_lock); + + kfree(local); + + return status; +} + +static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn) +{ + struct o2nm_node *local; + struct dlm_node_info *remote; + int i, j; + int status = 0; + + for (j = 0; j < qn->qn_numnodes; ++j) + mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum, + &(qn->qn_nodes[j].ni_ipv4_address), + ntohs(qn->qn_nodes[j].ni_ipv4_port)); + + for (i = 0; i < O2NM_MAX_NODES && !status; ++i) { + local = o2nm_get_node_by_num(i); + remote = NULL; + for (j = 0; j < qn->qn_numnodes; ++j) { + if (qn->qn_nodes[j].ni_nodenum == i) { + remote = &(qn->qn_nodes[j]); + break; + } + } + + if (!local && !remote) + continue; + + if ((local && !remote) || (!local && remote)) + status = -EINVAL; + + if (!status && + ((remote->ni_nodenum != local->nd_num) || + (remote->ni_ipv4_port != local->nd_ipv4_port) || + (remote->ni_ipv4_address != local->nd_ipv4_address))) + status = -EINVAL; + + if (status) { + if (remote && !local) + mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) " + "registered in joining node %d but not in " + "local node %d\n", qn->qn_domain, + remote->ni_nodenum, + &(remote->ni_ipv4_address), + ntohs(remote->ni_ipv4_port), + qn->qn_nodenum, dlm->node_num); + if (local && !remote) + mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) " + "registered in local node %d but not in " + "joining node %d\n", qn->qn_domain, + local->nd_num, &(local->nd_ipv4_address), + ntohs(local->nd_ipv4_port), + dlm->node_num, qn->qn_nodenum); + BUG_ON((!local && !remote)); + } + + if (local) + o2nm_node_put(local); + } + + return status; +} + +static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map) +{ + struct dlm_query_nodeinfo *qn = NULL; + struct o2nm_node *node; + int ret = 0, status, count, i; + + if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES) + goto bail; + + qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL); + if (!qn) { + ret = -ENOMEM; + mlog_errno(ret); + goto bail; + } + + for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) { + node = o2nm_get_node_by_num(i); + if (!node) + continue; + qn->qn_nodes[count].ni_nodenum = node->nd_num; + qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port; + qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address; + mlog(0, "Node %3d, %pI4:%u\n", node->nd_num, + &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port)); + ++count; + o2nm_node_put(node); + } + + qn->qn_nodenum = dlm->node_num; + qn->qn_numnodes = count; + qn->qn_namelen = strlen(dlm->name); + memcpy(qn->qn_domain, dlm->name, qn->qn_namelen); + + i = -1; + while ((i = find_next_bit(node_map, O2NM_MAX_NODES, + i + 1)) < O2NM_MAX_NODES) { + if (i == dlm->node_num) + continue; + + mlog(0, "Sending nodeinfo to node %d\n", i); + + ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY, + qn, sizeof(struct dlm_query_nodeinfo), + i, &status); + if (ret >= 0) + ret = status; + if (ret) { + mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i); + break; + } + } + +bail: + kfree(qn); + return ret; +} + +static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len, + void *data, void **ret_data) +{ + struct dlm_query_nodeinfo *qn; + struct dlm_ctxt *dlm = NULL; + int locked = 0, status = -EINVAL; + + qn = (struct dlm_query_nodeinfo *) msg->buf; + + mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum, + qn->qn_domain); + + spin_lock(&dlm_domain_lock); + dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen); + if (!dlm) { + mlog(ML_ERROR, "Node %d queried nodes on domain %s before " + "join domain\n", qn->qn_nodenum, qn->qn_domain); + goto bail; + } + + spin_lock(&dlm->spinlock); + locked = 1; + if (dlm->joining_node != qn->qn_nodenum) { + mlog(ML_ERROR, "Node %d queried nodes on domain %s but " + "joining node is %d\n", qn->qn_nodenum, qn->qn_domain, + dlm->joining_node); + goto bail; + } + + /* Support for node query was added in 1.1 */ + if (dlm->dlm_locking_proto.pv_major == 1 && + dlm->dlm_locking_proto.pv_minor == 0) { + mlog(ML_ERROR, "Node %d queried nodes on domain %s " + "but active dlm protocol is %d.%d\n", qn->qn_nodenum, + qn->qn_domain, dlm->dlm_locking_proto.pv_major, + dlm->dlm_locking_proto.pv_minor); + goto bail; + } + + status = dlm_match_nodes(dlm, qn); + +bail: + if (locked) + spin_unlock(&dlm->spinlock); + spin_unlock(&dlm_domain_lock); + + return status; +} + static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data) { @@ -1095,6 +1525,7 @@ static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, unsigned int node) { int status; + int ret; struct dlm_assert_joined assert_msg; mlog(0, "Sending join assert to node %u\n", node); @@ -1106,11 +1537,13 @@ static int dlm_send_one_join_assert(struct dlm_ctxt *dlm, status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, &assert_msg, sizeof(assert_msg), node, - NULL); + &ret); if (status < 0) mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, node); + else + status = ret; return status; } @@ -1184,7 +1617,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) struct domain_join_ctxt *ctxt; enum dlm_query_join_response_code response = JOIN_DISALLOW; - mlog_entry("%p", dlm); + mlog(0, "%p", dlm); ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL); if (!ctxt) { @@ -1240,6 +1673,21 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) set_bit(dlm->node_num, dlm->domain_map); spin_unlock(&dlm->spinlock); + /* Support for global heartbeat and node info was added in 1.1 */ + if (dlm->dlm_locking_proto.pv_major > 1 || + dlm->dlm_locking_proto.pv_minor > 0) { + status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map); + if (status) { + mlog_errno(status); + goto bail; + } + status = dlm_send_regions(dlm, ctxt->yes_resp_map); + if (status) { + mlog_errno(status); + goto bail; + } + } + dlm_send_join_asserts(dlm, ctxt->yes_resp_map); /* Joined state *must* be set before the joining node @@ -1254,8 +1702,10 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm) bail: spin_lock(&dlm->spinlock); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); - if (!status) + if (!status) { + printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name); __dlm_print_nodes(dlm); + } spin_unlock(&dlm->spinlock); if (ctxt) { @@ -1276,8 +1726,8 @@ bail: static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm) { - o2hb_unregister_callback(NULL, &dlm->dlm_hb_up); - o2hb_unregister_callback(NULL, &dlm->dlm_hb_down); + o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up); + o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down); o2net_unregister_handler_list(&dlm->dlm_domain_handlers); } @@ -1289,13 +1739,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI); - status = o2hb_register_callback(NULL, &dlm->dlm_hb_down); + status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down); if (status) goto bail; o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI); - status = o2hb_register_callback(NULL, &dlm->dlm_hb_up); + status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up); if (status) goto bail; @@ -1405,6 +1855,13 @@ static int dlm_register_domain_handlers(struct dlm_ctxt *dlm) if (status) goto bail; + status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key, + sizeof(struct dlm_exit_domain), + dlm_begin_exit_domain_handler, + dlm, NULL, &dlm->dlm_domain_handlers); + if (status) + goto bail; + bail: if (status) dlm_unregister_domain_handlers(dlm); @@ -1428,19 +1885,19 @@ static int dlm_join_domain(struct dlm_ctxt *dlm) goto bail; } - status = dlm_debug_init(dlm); + status = dlm_launch_thread(dlm); if (status < 0) { mlog_errno(status); goto bail; } - status = dlm_launch_thread(dlm); + status = dlm_launch_recovery_thread(dlm); if (status < 0) { mlog_errno(status); goto bail; } - status = dlm_launch_recovery_thread(dlm); + status = dlm_debug_init(dlm); if (status < 0) { mlog_errno(status); goto bail; @@ -1577,7 +2034,6 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain, INIT_LIST_HEAD(&dlm->list); INIT_LIST_HEAD(&dlm->dirty_list); INIT_LIST_HEAD(&dlm->reco.resources); - INIT_LIST_HEAD(&dlm->reco.received); INIT_LIST_HEAD(&dlm->reco.node_data); INIT_LIST_HEAD(&dlm->purge_list); INIT_LIST_HEAD(&dlm->dlm_domain_handlers); @@ -1677,13 +2133,6 @@ struct dlm_ctxt * dlm_register_domain(const char *domain, goto leave; } - if (!o2hb_check_local_node_heartbeating()) { - mlog(ML_ERROR, "the local node has not been configured, or is " - "not heartbeating\n"); - ret = -EPROTO; - goto leave; - } - mlog(0, "register called for domain \"%s\"\n", domain); retry: @@ -1806,7 +2255,21 @@ static int dlm_register_net_handlers(void) sizeof(struct dlm_cancel_join), dlm_cancel_join_handler, NULL, NULL, &dlm_join_handlers); + if (status) + goto bail; + + status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY, + sizeof(struct dlm_query_region), + dlm_query_region_handler, + NULL, NULL, &dlm_join_handlers); + + if (status) + goto bail; + status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY, + sizeof(struct dlm_query_nodeinfo), + dlm_query_nodeinfo_handler, + NULL, NULL, &dlm_join_handlers); bail: if (status < 0) dlm_unregister_net_handlers(); @@ -1830,13 +2293,10 @@ static DECLARE_RWSEM(dlm_callback_sem); void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm, int node_num) { - struct list_head *iter; struct dlm_eviction_cb *cb; down_read(&dlm_callback_sem); - list_for_each(iter, &dlm->dlm_eviction_callbacks) { - cb = list_entry(iter, struct dlm_eviction_cb, ec_item); - + list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) { cb->ec_func(node_num, cb->ec_data); } up_read(&dlm_callback_sem); @@ -1873,8 +2333,6 @@ static int __init dlm_init(void) { int status; - dlm_print_version(); - status = dlm_init_mle_cache(); if (status) { mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n"); @@ -1924,6 +2382,7 @@ static void __exit dlm_exit (void) MODULE_AUTHOR("Oracle"); MODULE_LICENSE("GPL"); +MODULE_DESCRIPTION("OCFS2 Distributed Lock Management"); module_init(dlm_init); module_exit(dlm_exit); diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 69cf369961c..66c2a491f68 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -52,7 +52,7 @@ #define MLOG_MASK_PREFIX ML_DLM #include "cluster/masklog.h" -static struct kmem_cache *dlm_lock_cache = NULL; +static struct kmem_cache *dlm_lock_cache; static DEFINE_SPINLOCK(dlm_cookie_lock); static u64 dlm_next_cookie = 1; @@ -91,21 +91,19 @@ void dlm_destroy_lock_cache(void) static int dlm_can_grant_new_lock(struct dlm_lock_resource *res, struct dlm_lock *lock) { - struct list_head *iter; struct dlm_lock *tmplock; - list_for_each(iter, &res->granted) { - tmplock = list_entry(iter, struct dlm_lock, list); - + list_for_each_entry(tmplock, &res->granted, list) { if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) return 0; } - list_for_each(iter, &res->converting) { - tmplock = list_entry(iter, struct dlm_lock, list); - + list_for_each_entry(tmplock, &res->converting, list) { if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) return 0; + if (!dlm_lock_compatible(tmplock->ml.convert_type, + lock->ml.type)) + return 0; } return 1; @@ -125,7 +123,7 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, int call_ast = 0, kick_thread = 0; enum dlm_status status = DLM_NORMAL; - mlog_entry("type=%d\n", lock->ml.type); + mlog(0, "type=%d\n", lock->ml.type); spin_lock(&res->spinlock); /* if called from dlm_create_lock_handler, need to @@ -175,15 +173,12 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, lock->ml.node); } } else { + status = DLM_NORMAL; dlm_lock_get(lock); list_add_tail(&lock->list, &res->blocked); kick_thread = 1; } } - /* reduce the inflight count, this may result in the lockres - * being purged below during calc_usage */ - if (lock->ml.node == dlm->node_num) - dlm_lockres_drop_inflight_ref(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -224,14 +219,20 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, enum dlm_status status = DLM_DENIED; int lockres_changed = 1; - mlog_entry("type=%d\n", lock->ml.type); - mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, + mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n", + lock->ml.type, res->lockname.len, res->lockname.name, flags); + /* + * Wait if resource is getting recovered, remastered, etc. + * If the resource was remastered and new owner is self, then exit. + */ spin_lock(&res->spinlock); - - /* will exit this call with spinlock held */ __dlm_wait_on_lockres(res); + if (res->owner == dlm->node_num) { + spin_unlock(&res->spinlock); + return DLM_RECOVERING; + } res->state |= DLM_LOCK_RES_IN_PROGRESS; /* add lock to local (secondary) queue */ @@ -305,8 +306,6 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, int tmpret, status = 0; enum dlm_status ret; - mlog_entry_void(); - memset(&create, 0, sizeof(create)); create.node_idx = dlm->node_num; create.requested_type = lock->ml.type; @@ -318,27 +317,23 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), res->owner, &status); if (tmpret >= 0) { - // successfully sent and received - ret = status; // this is already a dlm_status + ret = status; if (ret == DLM_REJECTED) { - mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " - "no longer owned by %u. that node is coming back " - "up currently.\n", dlm->name, create.namelen, + mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer " + "owned by node %u. That node is coming back up " + "currently.\n", dlm->name, create.namelen, create.name, res->owner); dlm_print_one_lock_resource(res); BUG(); } } else { - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", tmpret, DLM_CREATE_LOCK_MSG, dlm->key, - res->owner); - if (dlm_is_host_down(tmpret)) { + mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to " + "node %u\n", dlm->name, create.namelen, create.name, + tmpret, res->owner); + if (dlm_is_host_down(tmpret)) ret = DLM_RECOVERING; - mlog(0, "node %u died so returning DLM_RECOVERING " - "from lock message!\n", res->owner); - } else { + else ret = dlm_err_to_dlm_status(tmpret); - } } return ret; @@ -439,7 +434,7 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, /* zero memory only if kernel-allocated */ lksb = kzalloc(sizeof(*lksb), GFP_NOFS); if (!lksb) { - kfree(lock); + kmem_cache_free(dlm_lock_cache, lock); return NULL; } kernel_allocated = 1; @@ -474,8 +469,6 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data, BUG_ON(!dlm); - mlog_entry_void(); - if (!dlm_grab(dlm)) return DLM_REJECTED; @@ -719,18 +712,10 @@ retry_lock: if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { - mlog(0, "retrying lock with migration/" - "recovery/in progress\n"); msleep(100); - /* no waiting for dlm_reco_thread */ if (recovery) { if (status != DLM_RECOVERING) goto retry_lock; - - mlog(0, "%s: got RECOVERING " - "for $RECOVERY lock, master " - "was %u\n", dlm->name, - res->owner); /* wait to see the node go down, then * drop down and allow the lockres to * get cleaned up. need to remaster. */ @@ -742,6 +727,14 @@ retry_lock: } } + /* Inflight taken in dlm_get_lock_resource() is dropped here */ + spin_lock(&res->spinlock); + dlm_lockres_drop_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); + + dlm_lockres_calc_usage(dlm, res); + dlm_kick_thread(dlm, res); + if (status != DLM_NORMAL) { lock->lksb->flags &= ~DLM_LKSB_GET_LVB; if (status != DLM_NOTQUEUED) diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 94b97fc6a88..82abf0cc9a1 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -82,9 +82,9 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, return 1; } -static struct kmem_cache *dlm_lockres_cache = NULL; -static struct kmem_cache *dlm_lockname_cache = NULL; -static struct kmem_cache *dlm_mle_cache = NULL; +static struct kmem_cache *dlm_lockres_cache; +static struct kmem_cache *dlm_lockname_cache; +static struct kmem_cache *dlm_mle_cache; static void dlm_mle_release(struct kref *kref); static void dlm_init_mle(struct dlm_master_list_entry *mle, @@ -342,16 +342,13 @@ static int dlm_find_mle(struct dlm_ctxt *dlm, { struct dlm_master_list_entry *tmpmle; struct hlist_head *bucket; - struct hlist_node *list; unsigned int hash; assert_spin_locked(&dlm->master_lock); hash = dlm_lockid_hash(name, namelen); bucket = dlm_master_hash(dlm, hash); - hlist_for_each(list, bucket) { - tmpmle = hlist_entry(list, struct dlm_master_list_entry, - master_hash_node); + hlist_for_each_entry(tmpmle, bucket, master_hash_node) { if (!dlm_mle_equal(dlm, tmpmle, name, namelen)) continue; dlm_get_mle(tmpmle); @@ -426,8 +423,6 @@ static void dlm_mle_release(struct kref *kref) struct dlm_master_list_entry *mle; struct dlm_ctxt *dlm; - mlog_entry_void(); - mle = container_of(kref, struct dlm_master_list_entry, mle_refs); dlm = mle->dlm; @@ -477,11 +472,15 @@ bail: void dlm_destroy_master_caches(void) { - if (dlm_lockname_cache) + if (dlm_lockname_cache) { kmem_cache_destroy(dlm_lockname_cache); + dlm_lockname_cache = NULL; + } - if (dlm_lockres_cache) + if (dlm_lockres_cache) { kmem_cache_destroy(dlm_lockres_cache); + dlm_lockres_cache = NULL; + } } static void dlm_lockres_release(struct kref *kref) @@ -511,8 +510,6 @@ static void dlm_lockres_release(struct kref *kref) atomic_dec(&dlm->res_cur_count); - dlm_put(dlm); - if (!hlist_unhashed(&res->hash_node) || !list_empty(&res->granted) || !list_empty(&res->converting) || @@ -584,9 +581,8 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; res->inflight_locks = 0; + res->inflight_assert_workers = 0; - /* put in dlm_lockres_release */ - dlm_grab(dlm); res->dlm = dlm; kref_init(&res->refs); @@ -637,42 +633,94 @@ error: return NULL; } -void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - int new_lockres, - const char *file, - int line) +void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit) { - if (!new_lockres) - assert_spin_locked(&res->spinlock); + assert_spin_locked(&res->spinlock); + + mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len, + res->lockname.name, bit, __builtin_return_address(0)); + + set_bit(bit, res->refmap); +} + +void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, int bit) +{ + assert_spin_locked(&res->spinlock); + + mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len, + res->lockname.name, bit, __builtin_return_address(0)); + + clear_bit(bit, res->refmap); +} + + +void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); - if (!test_bit(dlm->node_num, res->refmap)) { - BUG_ON(res->inflight_locks != 0); - dlm_lockres_set_refmap_bit(dlm->node_num, res); - } res->inflight_locks++; - mlog(0, "%s:%.*s: inflight++: now %u\n", - dlm->name, res->lockname.len, res->lockname.name, - res->inflight_locks); + + mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name, + res->lockname.len, res->lockname.name, res->inflight_locks, + __builtin_return_address(0)); } -void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - const char *file, - int line) +void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) { assert_spin_locked(&res->spinlock); BUG_ON(res->inflight_locks == 0); + res->inflight_locks--; - mlog(0, "%s:%.*s: inflight--: now %u\n", - dlm->name, res->lockname.len, res->lockname.name, - res->inflight_locks); - if (res->inflight_locks == 0) - dlm_lockres_clear_refmap_bit(dlm->node_num, res); + + mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name, + res->lockname.len, res->lockname.name, res->inflight_locks, + __builtin_return_address(0)); + wake_up(&res->wq); } +void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); + res->inflight_assert_workers++; + mlog(0, "%s:%.*s: inflight assert worker++: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_assert_workers); +} + +static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + spin_lock(&res->spinlock); + __dlm_lockres_grab_inflight_worker(dlm, res); + spin_unlock(&res->spinlock); +} + +static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + assert_spin_locked(&res->spinlock); + BUG_ON(res->inflight_assert_workers == 0); + res->inflight_assert_workers--; + mlog(0, "%s:%.*s: inflight assert worker--: now %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->inflight_assert_workers); +} + +static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res) +{ + spin_lock(&res->spinlock); + __dlm_lockres_drop_inflight_worker(dlm, res); + spin_unlock(&res->spinlock); +} + /* * lookup a lock resource by name. * may already exist in the hashtable. @@ -703,7 +751,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; - int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); @@ -715,36 +762,33 @@ lookup: spin_lock(&dlm->spinlock); tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { - int dropping_ref = 0; - spin_unlock(&dlm->spinlock); - spin_lock(&tmpres->spinlock); - /* We wait for the other thread that is mastering the resource */ + /* Wait on the thread that is mastering the resource */ if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { __dlm_wait_on_lockres(tmpres); BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN); + spin_unlock(&tmpres->spinlock); + dlm_lockres_put(tmpres); + tmpres = NULL; + goto lookup; } - if (tmpres->owner == dlm->node_num) { - BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); - dlm_lockres_grab_inflight_ref(dlm, tmpres); - } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) - dropping_ref = 1; - spin_unlock(&tmpres->spinlock); - - /* wait until done messaging the master, drop our ref to allow - * the lockres to be purged, start over. */ - if (dropping_ref) { - spin_lock(&tmpres->spinlock); - __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); + /* Wait on the resource purge to complete before continuing */ + if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) { + BUG_ON(tmpres->owner == dlm->node_num); + __dlm_wait_on_lockres_flags(tmpres, + DLM_LOCK_RES_DROPPING_REF); spin_unlock(&tmpres->spinlock); dlm_lockres_put(tmpres); tmpres = NULL; goto lookup; } - mlog(0, "found in hash!\n"); + /* Grab inflight ref to pin the resource */ + dlm_lockres_grab_inflight_ref(dlm, tmpres); + + spin_unlock(&tmpres->spinlock); if (res) dlm_lockres_put(res); res = tmpres; @@ -814,7 +858,7 @@ lookup: dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; - /* this is lame, but we cant wait on either + /* this is lame, but we can't wait on either * the mle or lockres waitqueue here */ if (mig) msleep(100); @@ -835,8 +879,8 @@ lookup: * but they might own this lockres. wait on them. */ bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " - "recover before lock mastery can begin\n", + mlog(0, "%s: res %.*s, At least one node (%d) " + "to recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } @@ -849,12 +893,11 @@ lookup: /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); - /* since this lockres is new it doesnt not require the spinlock */ - dlm_lockres_grab_inflight_ref_new(dlm, res); - /* if this node does not become the master make sure to drop - * this inflight reference below */ - drop_inflight_if_nonlocal = 1; + /* Grab inflight ref to pin the resource */ + spin_lock(&res->spinlock); + dlm_lockres_grab_inflight_ref(dlm, res); + spin_unlock(&res->spinlock); /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last @@ -870,8 +913,8 @@ redo_request: * dlm spinlock would be detectable be a change on the mle, * so we only need to clear out the recovery map once. */ if (dlm_is_recovery_lock(lockid, namelen)) { - mlog(ML_NOTICE, "%s: recovery map is not empty, but " - "must master $RECOVERY lock now\n", dlm->name); + mlog(0, "%s: Recovery map is not empty, but must " + "master $RECOVERY lock now\n", dlm->name); if (!dlm_pre_master_reco_lockres(dlm, res)) wait_on_recovery = 0; else { @@ -889,8 +932,8 @@ redo_request: spin_lock(&dlm->spinlock); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { - mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " - "recover before lock mastery can begin\n", + mlog(0, "%s: res %.*s, At least one node (%d) " + "to recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } else @@ -919,8 +962,8 @@ redo_request: * yet, keep going until it does. this is how the * master will know that asserts are needed back to * the lower nodes. */ - mlog(0, "%s:%.*s: requests only up to %u but master " - "is %u, keep going\n", dlm->name, namelen, + mlog(0, "%s: res %.*s, Requests only up to %u but " + "master is %u, keep going\n", dlm->name, namelen, lockid, nodenum, mle->master); } } @@ -930,13 +973,12 @@ wait: ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { wait_on_recovery = 1; - mlog(0, "%s:%.*s: node map changed, redo the " - "master request now, blocked=%d\n", - dlm->name, res->lockname.len, + mlog(0, "%s: res %.*s, Node map changed, redo the master " + "request now, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); if (++tries > 20) { - mlog(ML_ERROR, "%s:%.*s: spinning on " - "dlm_wait_for_lock_mastery, blocked=%d\n", + mlog(ML_ERROR, "%s: res %.*s, Spinning on " + "dlm_wait_for_lock_mastery, blocked = %d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); @@ -946,7 +988,8 @@ wait: goto redo_request; } - mlog(0, "lockres mastered by %u\n", res->owner); + mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len, + res->lockname.name, res->owner); /* make sure we never continue without this */ BUG_ON(res->owner == O2NM_MAX_NODES); @@ -958,8 +1001,6 @@ wait: wake_waiters: spin_lock(&res->spinlock); - if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) - dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq); @@ -1432,9 +1473,7 @@ way_up_top: } if (res->owner == dlm->node_num) { - mlog(0, "%s:%.*s: setting bit %u in refmap\n", - dlm->name, namelen, name, request->node_idx); - dlm_lockres_set_refmap_bit(request->node_idx, res); + dlm_lockres_set_refmap_bit(dlm, res, request->node_idx); spin_unlock(&res->spinlock); response = DLM_MASTER_RESP_YES; if (mle) @@ -1499,10 +1538,8 @@ way_up_top: * go back and clean the mles on any * other nodes */ dispatch_assert = 1; - dlm_lockres_set_refmap_bit(request->node_idx, res); - mlog(0, "%s:%.*s: setting bit %u in refmap\n", - dlm->name, namelen, name, - request->node_idx); + dlm_lockres_set_refmap_bit(dlm, res, + request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { @@ -1604,7 +1641,8 @@ send_response: mlog(ML_ERROR, "failed to dispatch assert master work\n"); response = DLM_MASTER_RESP_ERROR; dlm_lockres_put(res); - } + } else + dlm_lockres_grab_inflight_worker(dlm, res); } else { if (res) dlm_lockres_put(res); @@ -1708,7 +1746,7 @@ again: "lockres, set the bit in the refmap\n", namelen, lockname, to); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(to, res); + dlm_lockres_set_refmap_bit(dlm, res, to); spin_unlock(&res->spinlock); } } @@ -1890,8 +1928,10 @@ ok: * up nodes that this node contacted */ while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, nn+1)) < O2NM_MAX_NODES) { - if (nn != dlm->node_num && nn != assert->node_idx) + if (nn != dlm->node_num && nn != assert->node_idx) { master_request = 1; + break; + } } } mle->master = assert->node_idx; @@ -2022,7 +2062,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, int ignore_higher, u8 request_from, u32 flags) { struct dlm_work_item *item; - item = kzalloc(sizeof(*item), GFP_NOFS); + item = kzalloc(sizeof(*item), GFP_ATOMIC); if (!item) return -ENOMEM; @@ -2117,6 +2157,8 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) dlm_lockres_release_ast(dlm, res); put: + dlm_lockres_drop_inflight_worker(dlm, res); + dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); @@ -2193,8 +2235,6 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); - mlog(0, "%s:%.*s: sending deref to %d\n", - dlm->name, namelen, lockname, res->owner); memset(&deref, 0, sizeof(deref)); deref.node_idx = dlm->node_num; deref.namelen = namelen; @@ -2203,14 +2243,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, &deref, sizeof(deref), res->owner, &r); if (ret < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to " - "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key, - res->owner); + mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n", + dlm->name, namelen, lockname, ret, res->owner); else if (r < 0) { /* BAD. other node says I did not have a ref. */ - mlog(ML_ERROR,"while dropping ref on %s:%.*s " - "(master=%u) got %d.\n", dlm->name, namelen, - lockname, res->owner, r); + mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n", + dlm->name, namelen, lockname, res->owner, r); dlm_print_one_lock_resource(res); BUG(); } @@ -2266,7 +2304,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, else { BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); if (test_bit(node, res->refmap)) { - dlm_lockres_clear_refmap_bit(node, res); + dlm_lockres_clear_refmap_bit(dlm, res, node); cleared = 1; } } @@ -2326,7 +2364,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); if (test_bit(node, res->refmap)) { __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); - dlm_lockres_clear_refmap_bit(node, res); + dlm_lockres_clear_refmap_bit(dlm, res, node); cleared = 1; } spin_unlock(&res->spinlock); @@ -2345,55 +2383,59 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data) dlm_lockres_put(res); } -/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0 - * if not. If 0, numlocks is set to the number of locks in the lockres. +/* + * A migrateable resource is one that is : + * 1. locally mastered, and, + * 2. zero local locks, and, + * 3. one or more non-local locks, or, one or more references + * Returns 1 if yes, 0 if not. */ static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - int *numlocks) + struct dlm_lock_resource *res) { - int ret; - int i; - int count = 0; + enum dlm_lockres_list idx; + int nonlocal = 0, node_ref; struct list_head *queue; struct dlm_lock *lock; + u64 cookie; assert_spin_locked(&res->spinlock); - ret = -EINVAL; - if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { - mlog(0, "cannot migrate lockres with unknown owner!\n"); - goto leave; - } + /* delay migration when the lockres is in MIGRATING state */ + if (res->state & DLM_LOCK_RES_MIGRATING) + return 0; - if (res->owner != dlm->node_num) { - mlog(0, "cannot migrate lockres this node doesn't own!\n"); - goto leave; - } + if (res->owner != dlm->node_num) + return 0; - ret = 0; - queue = &res->granted; - for (i = 0; i < 3; i++) { + for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) { + queue = dlm_list_idx_to_ptr(res, idx); list_for_each_entry(lock, queue, list) { - ++count; - if (lock->ml.node == dlm->node_num) { - mlog(0, "found a lock owned by this node still " - "on the %s queue! will not migrate this " - "lockres\n", (i == 0 ? "granted" : - (i == 1 ? "converting" : - "blocked"))); - ret = -ENOTEMPTY; - goto leave; + if (lock->ml.node != dlm->node_num) { + nonlocal++; + continue; } + cookie = be64_to_cpu(lock->ml.cookie); + mlog(0, "%s: Not migrateable res %.*s, lock %u:%llu on " + "%s list\n", dlm->name, res->lockname.len, + res->lockname.name, + dlm_get_lock_cookie_node(cookie), + dlm_get_lock_cookie_seq(cookie), + dlm_list_in_text(idx)); + return 0; } - queue++; } - *numlocks = count; - mlog(0, "migrateable lockres having %d locks\n", *numlocks); + if (!nonlocal) { + node_ref = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); + if (node_ref >= O2NM_MAX_NODES) + return 0; + } -leave: - return ret; + mlog(0, "%s: res %.*s, Migrateable\n", dlm->name, res->lockname.len, + res->lockname.name); + + return 1; } /* @@ -2402,8 +2444,7 @@ leave: static int dlm_migrate_lockres(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 target) + struct dlm_lock_resource *res, u8 target) { struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; @@ -2412,39 +2453,20 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, const char *name; unsigned int namelen; int mle_added = 0; - int numlocks; int wake = 0; if (!dlm_grab(dlm)) return -EINVAL; + BUG_ON(target == O2NM_MAX_NODES); + name = res->lockname.name; namelen = res->lockname.len; - mlog(0, "migrating %.*s to %u\n", namelen, name, target); - - /* - * ensure this lockres is a proper candidate for migration - */ - spin_lock(&res->spinlock); - ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); - if (ret < 0) { - spin_unlock(&res->spinlock); - goto leave; - } - spin_unlock(&res->spinlock); - - /* no work to do */ - if (numlocks == 0) { - mlog(0, "no locks were found on this lockres! done!\n"); - goto leave; - } - - /* - * preallocate up front - * if this fails, abort - */ + mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name, + target); + /* preallocate up front. if this fails, abort */ ret = -ENOMEM; mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS); if (!mres) { @@ -2460,35 +2482,10 @@ static int dlm_migrate_lockres(struct dlm_ctxt *dlm, ret = 0; /* - * find a node to migrate the lockres to - */ - - mlog(0, "picking a migration node\n"); - spin_lock(&dlm->spinlock); - /* pick a new node */ - if (!test_bit(target, dlm->domain_map) || - target >= O2NM_MAX_NODES) { - target = dlm_pick_migration_target(dlm, res); - } - mlog(0, "node %u chosen for migration\n", target); - - if (target >= O2NM_MAX_NODES || - !test_bit(target, dlm->domain_map)) { - /* target chosen is not alive */ - ret = -EINVAL; - } - - if (ret) { - spin_unlock(&dlm->spinlock); - goto fail; - } - - mlog(0, "continuing with target = %u\n", target); - - /* * clear any existing master requests and * add the migration mle to the list */ + spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, target, dlm->node_num); @@ -2529,6 +2526,7 @@ fail: dlm_put_mle(mle); } else if (mle) { kmem_cache_free(dlm_mle_cache, mle); + mle = NULL; } goto leave; } @@ -2572,6 +2570,9 @@ fail: res->state &= ~DLM_LOCK_RES_MIGRATING; wake = 1; spin_unlock(&res->spinlock); + if (dlm_is_host_down(ret)) + dlm_wait_for_node_death(dlm, target, + DLM_NODE_DEATH_WAIT_MAX); goto leave; } @@ -2647,69 +2648,52 @@ leave: if (wake) wake_up(&res->wq); - /* TODO: cleanup */ if (mres) free_page((unsigned long)mres); dlm_put(dlm); - mlog(0, "returning %d\n", ret); + mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen, + name, target, ret); return ret; } #define DLM_MIGRATION_RETRY_MS 100 -/* Should be called only after beginning the domain leave process. +/* + * Should be called only after beginning the domain leave process. * There should not be any remaining locks on nonlocal lock resources, * and there should be no local locks left on locally mastered resources. * * Called with the dlm spinlock held, may drop it to do migration, but * will re-acquire before exit. * - * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */ + * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped + */ int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { int ret; int lock_dropped = 0; - int numlocks; + u8 target = O2NM_MAX_NODES; + + assert_spin_locked(&dlm->spinlock); spin_lock(&res->spinlock); - if (res->owner != dlm->node_num) { - if (!__dlm_lockres_unused(res)) { - mlog(ML_ERROR, "%s:%.*s: this node is not master, " - "trying to free this but locks remain\n", - dlm->name, res->lockname.len, res->lockname.name); - } - spin_unlock(&res->spinlock); - goto leave; - } + if (dlm_is_lockres_migrateable(dlm, res)) + target = dlm_pick_migration_target(dlm, res); + spin_unlock(&res->spinlock); - /* No need to migrate a lockres having no locks */ - ret = dlm_is_lockres_migrateable(dlm, res, &numlocks); - if (ret >= 0 && numlocks == 0) { - spin_unlock(&res->spinlock); + if (target == O2NM_MAX_NODES) goto leave; - } - spin_unlock(&res->spinlock); /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */ spin_unlock(&dlm->spinlock); lock_dropped = 1; - while (1) { - ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES); - if (ret >= 0) - break; - if (ret == -ENOTEMPTY) { - mlog(ML_ERROR, "lockres %.*s still has local locks!\n", - res->lockname.len, res->lockname.name); - BUG(); - } - - mlog(0, "lockres %.*s: migrate failed, " - "retrying\n", res->lockname.len, - res->lockname.name); - msleep(DLM_MIGRATION_RETRY_MS); - } + ret = dlm_migrate_lockres(dlm, res, target); + if (ret) + mlog(0, "%s: res %.*s, Migrate to node %u failed with %d\n", + dlm->name, res->lockname.len, res->lockname.name, + target, ret); spin_lock(&dlm->spinlock); leave: return lock_dropped; @@ -2866,7 +2850,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); - dlm_lockres_clear_refmap_bit(lock->ml.node, res); + dlm_lockres_clear_refmap_bit(dlm, res, + lock->ml.node); list_del_init(&lock->list); dlm_lock_put(lock); /* In a normal unlock, we would have added a @@ -2887,61 +2872,61 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, mlog(0, "%s:%.*s: node %u had a ref to this " "migrating lockres, clearing\n", dlm->name, res->lockname.len, res->lockname.name, bit); - dlm_lockres_clear_refmap_bit(bit, res); + dlm_lockres_clear_refmap_bit(dlm, res, bit); } bit++; } } -/* for now this is not too intelligent. we will - * need stats to make this do the right thing. - * this just finds the first lock on one of the - * queues and uses that node as the target. */ +/* + * Pick a node to migrate the lock resource to. This function selects a + * potential target based first on the locks and then on refmap. It skips + * nodes that are in the process of exiting the domain. + */ static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - int i; + enum dlm_lockres_list idx; struct list_head *queue = &res->granted; struct dlm_lock *lock; - int nodenum; + int noderef; + u8 nodenum = O2NM_MAX_NODES; assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&res->spinlock); - spin_lock(&res->spinlock); - for (i=0; i<3; i++) { + /* Go through all the locks */ + for (idx = DLM_GRANTED_LIST; idx <= DLM_BLOCKED_LIST; idx++) { + queue = dlm_list_idx_to_ptr(res, idx); list_for_each_entry(lock, queue, list) { - /* up to the caller to make sure this node - * is alive */ - if (lock->ml.node != dlm->node_num) { - spin_unlock(&res->spinlock); - return lock->ml.node; - } + if (lock->ml.node == dlm->node_num) + continue; + if (test_bit(lock->ml.node, dlm->exit_domain_map)) + continue; + nodenum = lock->ml.node; + goto bail; } - queue++; } - spin_unlock(&res->spinlock); - mlog(0, "have not found a suitable target yet! checking domain map\n"); - /* ok now we're getting desperate. pick anyone alive. */ - nodenum = -1; + /* Go thru the refmap */ + noderef = -1; while (1) { - nodenum = find_next_bit(dlm->domain_map, - O2NM_MAX_NODES, nodenum+1); - mlog(0, "found %d in domain map\n", nodenum); - if (nodenum >= O2NM_MAX_NODES) + noderef = find_next_bit(res->refmap, O2NM_MAX_NODES, + noderef + 1); + if (noderef >= O2NM_MAX_NODES) break; - if (nodenum != dlm->node_num) { - mlog(0, "picking %d\n", nodenum); - return nodenum; - } + if (noderef == dlm->node_num) + continue; + if (test_bit(noderef, dlm->exit_domain_map)) + continue; + nodenum = noderef; + goto bail; } - mlog(0, "giving up. no master to migrate to\n"); - return DLM_LOCK_RES_OWNER_UNKNOWN; +bail: + return nodenum; } - - /* this is called by the new master once all lockres * data has been received */ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, @@ -2980,9 +2965,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, &migrate, sizeof(migrate), nodenum, &status); if (ret < 0) { - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG, - dlm->key, nodenum); + mlog(ML_ERROR, "%s: res %.*s, Error %d send " + "MIGRATE_REQUEST to node %u\n", dlm->name, + migrate.namelen, migrate.name, ret, nodenum); if (!dlm_is_host_down(ret)) { mlog(ML_ERROR, "unhandled error=%d!\n", ret); BUG(); @@ -3001,7 +2986,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm, dlm->name, res->lockname.len, res->lockname.name, nodenum); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(nodenum, res); + dlm_lockres_set_refmap_bit(dlm, res, nodenum); spin_unlock(&res->spinlock); } } @@ -3050,8 +3035,6 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, /* check for pre-existing lock */ spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen, hash); - spin_lock(&dlm->master_lock); - if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -3069,14 +3052,15 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data, spin_unlock(&res->spinlock); } + spin_lock(&dlm->master_lock); /* ignore status. only nonzero status would BUG. */ ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, migrate->new_master, migrate->master); -unlock: spin_unlock(&dlm->master_lock); +unlock: spin_unlock(&dlm->spinlock); if (oldmle) { @@ -3111,8 +3095,6 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, *oldmle = NULL; - mlog_entry_void(); - assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); @@ -3147,11 +3129,15 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it so that only one mle will be found */ __dlm_unlink_mle(dlm, tmp); __dlm_mle_detach_hb_events(dlm, tmp); - ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; - mlog(0, "%s:%.*s: master=%u, newmaster=%u, " - "telling master to get ref for cleared out mle " - "during migration\n", dlm->name, namelen, name, - master, new_master); + if (tmp->type == DLM_MLE_MASTER) { + ret = DLM_MIGRATE_RESPONSE_MASTERY_REF; + mlog(0, "%s:%.*s: master=%u, newmaster=%u, " + "telling master to get ref " + "for cleared out mle during " + "migration\n", dlm->name, + namelen, name, master, + new_master); + } } spin_unlock(&tmp->spinlock); } @@ -3249,10 +3235,10 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; struct hlist_head *bucket; - struct hlist_node *list; + struct hlist_node *tmp; unsigned int i; - mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); + mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node); top: assert_spin_locked(&dlm->spinlock); @@ -3260,10 +3246,7 @@ top: spin_lock(&dlm->master_lock); for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_master_hash(dlm, i); - hlist_for_each(list, bucket) { - mle = hlist_entry(list, struct dlm_master_list_entry, - master_hash_node); - + hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) { BUG_ON(mle->type != DLM_MLE_BLOCK && mle->type != DLM_MLE_MASTER && mle->type != DLM_MLE_MIGRATION); @@ -3338,7 +3321,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, * mastery reference here since old_master will briefly have * a reference after the migration completes */ spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(old_master, res); + dlm_lockres_set_refmap_bit(dlm, res, old_master); spin_unlock(&res->spinlock); mlog(0, "now time to do a migrate request to other nodes\n"); @@ -3438,3 +3421,41 @@ void dlm_lockres_release_ast(struct dlm_ctxt *dlm, wake_up(&res->wq); wake_up(&dlm->migration_wq); } + +void dlm_force_free_mles(struct dlm_ctxt *dlm) +{ + int i; + struct hlist_head *bucket; + struct dlm_master_list_entry *mle; + struct hlist_node *tmp; + + /* + * We notified all other nodes that we are exiting the domain and + * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still + * around we force free them and wake any processes that are waiting + * on the mles + */ + spin_lock(&dlm->spinlock); + spin_lock(&dlm->master_lock); + + BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING); + BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES)); + + for (i = 0; i < DLM_HASH_BUCKETS; i++) { + bucket = dlm_master_hash(dlm, i); + hlist_for_each_entry_safe(mle, tmp, bucket, master_hash_node) { + if (mle->type != DLM_MLE_BLOCK) { + mlog(ML_ERROR, "bad mle: %p\n", mle); + dlm_print_one_mle(mle); + } + atomic_set(&mle->woken, 1); + wake_up(&mle->wq); + + __dlm_unlink_mle(dlm, mle); + __dlm_mle_detach_hb_events(dlm, mle); + __dlm_put_mle(mle); + } + } + spin_unlock(&dlm->master_lock); + spin_unlock(&dlm->spinlock); +} diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 9dfaac73b36..45067faf569 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -55,9 +55,6 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node); static int dlm_recovery_thread(void *data); -void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); -int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); -void dlm_kick_recovery_thread(struct dlm_ctxt *dlm); static int dlm_do_recovery(struct dlm_ctxt *dlm); static int dlm_pick_recovery_master(struct dlm_ctxt *dlm); @@ -362,40 +359,38 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) } -int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) +void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) { - if (timeout) { - mlog(ML_NOTICE, "%s: waiting %dms for notification of " - "death of node %u\n", dlm->name, timeout, node); + if (dlm_is_node_dead(dlm, node)) + return; + + printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in " + "domain %s\n", node, dlm->name); + + if (timeout) wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_dead(dlm, node), - msecs_to_jiffies(timeout)); - } else { - mlog(ML_NOTICE, "%s: waiting indefinitely for notification " - "of death of node %u\n", dlm->name, node); + dlm_is_node_dead(dlm, node), + msecs_to_jiffies(timeout)); + else wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, node)); - } - /* for now, return 0 */ - return 0; } -int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) +void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) { - if (timeout) { - mlog(0, "%s: waiting %dms for notification of " - "recovery of node %u\n", dlm->name, timeout, node); + if (dlm_is_node_recovered(dlm, node)) + return; + + printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in " + "domain %s\n", node, dlm->name); + + if (timeout) wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_recovered(dlm, node), - msecs_to_jiffies(timeout)); - } else { - mlog(0, "%s: waiting indefinitely for notification " - "of recovery of node %u\n", dlm->name, node); + dlm_is_node_recovered(dlm, node), + msecs_to_jiffies(timeout)); + else wait_event(dlm->dlm_reco_thread_wq, dlm_is_node_recovered(dlm, node)); - } - /* for now, return 0 */ - return 0; } /* callers of the top-level api calls (dlmlock/dlmunlock) should @@ -430,6 +425,8 @@ static void dlm_begin_recovery(struct dlm_ctxt *dlm) { spin_lock(&dlm->spinlock); BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); + printk(KERN_NOTICE "o2dlm: Begin recovery on domain %s for node %u\n", + dlm->name, dlm->reco.dead_node); dlm->reco.state |= DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock); } @@ -440,9 +437,18 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm) BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock); + printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); wake_up(&dlm->reco.event); } +static void dlm_print_recovery_master(struct dlm_ctxt *dlm) +{ + printk(KERN_NOTICE "o2dlm: Node %u (%s) is the Recovery Master for the " + "dead node %u in domain %s\n", dlm->reco.new_master, + (dlm->node_num == dlm->reco.new_master ? "me" : "he"), + dlm->reco.dead_node, dlm->name); +} + static int dlm_do_recovery(struct dlm_ctxt *dlm) { int status = 0; @@ -505,9 +511,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) } mlog(0, "another node will master this recovery session.\n"); } - mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", - dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, - dlm->node_num, dlm->reco.dead_node); + + dlm_print_recovery_master(dlm); /* it is safe to start everything back up here * because all of the dead node's lock resources @@ -518,15 +523,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) return 0; master_here: - mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node " - "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task), - dlm->node_num, dlm->reco.dead_node, dlm->name); + dlm_print_recovery_master(dlm); status = dlm_remaster_locks(dlm, dlm->reco.dead_node); if (status < 0) { /* we should never hit this anymore */ - mlog(ML_ERROR, "error %d remastering locks for node %u, " - "retrying.\n", status, dlm->reco.dead_node); + mlog(ML_ERROR, "%s: Error %d remastering locks for node %u, " + "retrying.\n", dlm->name, status, dlm->reco.dead_node); /* yield a bit to allow any final network messages * to get handled on remaining nodes */ msleep(100); @@ -534,7 +537,10 @@ master_here: /* success! see if any other nodes need recovery */ mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n", dlm->name, dlm->reco.dead_node, dlm->node_num); - dlm_reset_recovery(dlm); + spin_lock(&dlm->spinlock); + __dlm_reset_recovery(dlm); + dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; + spin_unlock(&dlm->spinlock); } dlm_end_recovery(dlm); @@ -567,7 +573,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); ndata->state = DLM_RECO_NODE_DATA_REQUESTING; - mlog(0, "requesting lock info from node %u\n", + mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, ndata->node_num); if (ndata->node_num == dlm->node_num) { @@ -640,7 +646,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) spin_unlock(&dlm_reco_state_lock); } - mlog(0, "done requesting all lock info\n"); + mlog(0, "%s: Done requesting all lock info\n", dlm->name); /* nodes should be sending reco data now * just need to wait */ @@ -692,6 +698,14 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) if (all_nodes_done) { int ret; + /* Set this flag on recovery master to avoid + * a new recovery for another dead node start + * before the recovery is not done. That may + * cause recovery hung.*/ + spin_lock(&dlm->spinlock); + dlm->reco.state |= DLM_RECO_STATE_FINALIZE; + spin_unlock(&dlm->spinlock); + /* all nodes are now in DLM_RECO_NODE_DATA_DONE state * just send a finalize message to everyone and * clean up */ @@ -727,7 +741,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) if (destroy) dlm_destroy_recovery_area(dlm, dead_node); - mlog_exit(status); return status; } @@ -784,7 +797,8 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, u8 dead_node) { struct dlm_lock_request lr; - enum dlm_status ret; + int ret; + int status; mlog(0, "\n"); @@ -797,16 +811,16 @@ static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, lr.dead_node = dead_node; // send message - ret = DLM_NOLOCKMGR; ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, - &lr, sizeof(lr), request_from, NULL); + &lr, sizeof(lr), request_from, &status); /* negative status is handled by caller */ if (ret < 0) - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_LOCK_REQUEST_MSG, - dlm->key, request_from); - + mlog(ML_ERROR, "%s: Error %d send LOCK_REQUEST to node %u " + "to recover dead node %u\n", dlm->name, ret, + request_from, dead_node); + else + ret = status; // return from here, then // sleep until all received or error return ret; @@ -957,9 +971,9 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); if (ret < 0) { - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_RECO_DATA_DONE_MSG, - dlm->key, send_to); + mlog(ML_ERROR, "%s: Error %d send RECO_DATA_DONE to node %u " + "to recover dead node %u\n", dlm->name, ret, send_to, + dead_node); if (!dlm_is_host_down(ret)) { BUG(); } @@ -1128,9 +1142,11 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, if (ret < 0) { /* XXX: negative status is not handled. * this will end up killing this node. */ - mlog(ML_ERROR, "Error %d when sending message %u (key " - "0x%x) to node %u\n", ret, DLM_MIG_LOCKRES_MSG, - dlm->key, send_to); + mlog(ML_ERROR, "%s: res %.*s, Error %d send MIG_LOCKRES to " + "node %u (%s)\n", dlm->name, mres->lockname_len, + mres->lockname, ret, send_to, + (orig_flags & DLM_MRES_MIGRATION ? + "migration" : "recovery")); } else { /* might get an -ENOMEM back here */ ret = status; @@ -1402,6 +1418,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data, mres->lockname_len, mres->lockname); ret = -EFAULT; spin_unlock(&res->spinlock); + dlm_lockres_put(res); goto leave; } res->state |= DLM_LOCK_RES_MIGRATING; @@ -1492,13 +1509,11 @@ leave: dlm_put(dlm); if (ret < 0) { - if (buf) - kfree(buf); - if (item) - kfree(item); + kfree(buf); + kfree(item); + mlog_errno(ret); } - mlog_exit(ret); return ret; } @@ -1567,7 +1582,6 @@ leave: dlm_lockres_put(res); } kfree(data); - mlog_exit(ret); } @@ -1694,7 +1708,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data, mlog_errno(-ENOMEM); /* retry!? */ BUG(); - } + } else + __dlm_lockres_grab_inflight_worker(dlm, res); } else /* put.. incase we are not the master */ dlm_lockres_put(res); spin_unlock(&res->spinlock); @@ -1747,13 +1762,13 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, struct dlm_migratable_lockres *mres) { struct dlm_migratable_lock *ml; - struct list_head *queue; + struct list_head *queue, *iter; struct list_head *tmpq = NULL; struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; int i, j, bad; - struct dlm_lock *lock = NULL; + struct dlm_lock *lock; u8 from = O2NM_MAX_NODES; unsigned int added = 0; __be64 c; @@ -1769,7 +1784,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, dlm->name, mres->lockname_len, mres->lockname, from); spin_lock(&res->spinlock); - dlm_lockres_set_refmap_bit(from, res); + dlm_lockres_set_refmap_bit(dlm, res, from); spin_unlock(&res->spinlock); added++; break; @@ -1788,14 +1803,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, /* MIGRATION ONLY! */ BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); + lock = NULL; spin_lock(&res->spinlock); for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) { tmpq = dlm_list_idx_to_ptr(res, j); - list_for_each_entry(lock, tmpq, list) { - if (lock->ml.cookie != ml->cookie) - lock = NULL; - else + list_for_each(iter, tmpq) { + lock = list_entry(iter, + struct dlm_lock, list); + if (lock->ml.cookie == ml->cookie) break; + lock = NULL; } if (lock) break; @@ -1883,6 +1900,13 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, if (ml->type == LKM_NLMODE) goto skip_lvb; + /* + * If the lock is in the blocked list it can't have a valid lvb, + * so skip it + */ + if (ml->list == DLM_BLOCKED_LIST) + goto skip_lvb; + if (!dlm_lvb_is_empty(mres->lvb)) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update @@ -1963,11 +1987,19 @@ skip_lvb: } if (!bad) { dlm_lock_get(newlock); - list_add_tail(&newlock->list, queue); + if (mres->flags & DLM_MRES_RECOVERY && + ml->list == DLM_CONVERTING_LIST && + newlock->ml.type > + newlock->ml.convert_type) { + /* newlock is doing downconvert, add it to the + * head of converting list */ + list_add(&newlock->list, queue); + } else + list_add_tail(&newlock->list, queue); mlog(0, "%s:%.*s: added lock for node %u, " "setting refmap bit\n", dlm->name, res->lockname.len, res->lockname.name, ml->node); - dlm_lockres_set_refmap_bit(ml->node, res); + dlm_lockres_set_refmap_bit(dlm, res, ml->node); added++; } spin_unlock(&res->spinlock); @@ -1986,7 +2018,6 @@ leave: dlm_lock_put(newlock); } - mlog_exit(ret); return ret; } @@ -1997,6 +2028,8 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, struct list_head *queue; struct dlm_lock *lock, *next; + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&res->spinlock); res->state |= DLM_LOCK_RES_RECOVERING; if (!list_empty(&res->recovering)) { mlog(0, @@ -2077,16 +2110,16 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, u8 dead_node, u8 new_master) { int i; - struct hlist_node *hash_iter; struct hlist_head *bucket; struct dlm_lock_resource *res, *next; - mlog_entry_void(); - assert_spin_locked(&dlm->spinlock); list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { if (res->owner == dead_node) { + mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->owner, new_master); list_del_init(&res->recovering); spin_lock(&res->spinlock); /* new_master has our reference from @@ -2107,41 +2140,31 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, * if necessary */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_lockres_hash(dlm, i); - hlist_for_each_entry(res, hash_iter, bucket, hash_node) { - if (res->state & DLM_LOCK_RES_RECOVERING) { - if (res->owner == dead_node) { - mlog(0, "(this=%u) res %.*s owner=%u " - "was not on recovering list, but " - "clearing state anyway\n", - dlm->node_num, res->lockname.len, - res->lockname.name, new_master); - } else if (res->owner == dlm->node_num) { - mlog(0, "(this=%u) res %.*s owner=%u " - "was not on recovering list, " - "owner is THIS node, clearing\n", - dlm->node_num, res->lockname.len, - res->lockname.name, new_master); - } else - continue; + hlist_for_each_entry(res, bucket, hash_node) { + if (!(res->state & DLM_LOCK_RES_RECOVERING)) + continue; - if (!list_empty(&res->recovering)) { - mlog(0, "%s:%.*s: lockres was " - "marked RECOVERING, owner=%u\n", - dlm->name, res->lockname.len, - res->lockname.name, res->owner); - list_del_init(&res->recovering); - dlm_lockres_put(res); - } - spin_lock(&res->spinlock); - /* new_master has our reference from - * the lock state sent during recovery */ - dlm_change_lockres_owner(dlm, res, new_master); - res->state &= ~DLM_LOCK_RES_RECOVERING; - if (__dlm_lockres_has_locks(res)) - __dlm_dirty_lockres(dlm, res); - spin_unlock(&res->spinlock); - wake_up(&res->wq); + if (res->owner != dead_node && + res->owner != dlm->node_num) + continue; + + if (!list_empty(&res->recovering)) { + list_del_init(&res->recovering); + dlm_lockres_put(res); } + + /* new_master has our reference from + * the lock state sent during recovery */ + mlog(0, "%s: res %.*s, Changing owner from %u to %u\n", + dlm->name, res->lockname.len, res->lockname.name, + res->owner, new_master); + spin_lock(&res->spinlock); + dlm_change_lockres_owner(dlm, res, new_master); + res->state &= ~DLM_LOCK_RES_RECOVERING; + if (__dlm_lockres_has_locks(res)) + __dlm_dirty_lockres(dlm, res); + spin_unlock(&res->spinlock); + wake_up(&res->wq); } } } @@ -2255,12 +2278,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, res->lockname.len, res->lockname.name, freed, dead_node); __dlm_print_one_lock_resource(res); } - dlm_lockres_clear_refmap_bit(dead_node, res); + dlm_lockres_clear_refmap_bit(dlm, res, dead_node); } else if (test_bit(dead_node, res->refmap)) { mlog(0, "%s:%.*s: dead node %u had a ref, but had " "no locks and had not purged before dying\n", dlm->name, res->lockname.len, res->lockname.name, dead_node); - dlm_lockres_clear_refmap_bit(dead_node, res); + dlm_lockres_clear_refmap_bit(dlm, res, dead_node); } /* do not kick thread yet */ @@ -2276,7 +2299,6 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm, static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) { - struct hlist_node *iter; struct dlm_lock_resource *res; int i; struct hlist_head *bucket; @@ -2302,7 +2324,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_lockres_hash(dlm, i); - hlist_for_each_entry(res, iter, bucket, hash_node) { + hlist_for_each_entry(res, bucket, hash_node) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, @@ -2326,22 +2348,26 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) /* zero the lvb if necessary */ dlm_revalidate_lvb(dlm, res, dead_node); if (res->owner == dead_node) { - if (res->state & DLM_LOCK_RES_DROPPING_REF) - mlog(0, "%s:%.*s: owned by " - "dead node %u, this node was " - "dropping its ref when it died. " - "continue, dropping the flag.\n", + if (res->state & DLM_LOCK_RES_DROPPING_REF) { + mlog(ML_NOTICE, "%s: res %.*s, Skip " + "recovery as it is being freed\n", dlm->name, res->lockname.len, - res->lockname.name, dead_node); - - /* the wake_up for this will happen when the - * RECOVERING flag is dropped later */ - res->state &= ~DLM_LOCK_RES_DROPPING_REF; + res->lockname.name); + } else + dlm_move_lockres_to_recovery_list(dlm, + res); - dlm_move_lockres_to_recovery_list(dlm, res); } else if (res->owner == dlm->node_num) { dlm_free_dead_locks(dlm, res, dead_node); __dlm_lockres_calc_usage(dlm, res); + } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { + if (test_bit(dead_node, res->refmap)) { + mlog(0, "%s:%.*s: dead node %u had a ref, but had " + "no locks and had not purged before dying\n", + dlm->name, res->lockname.len, + res->lockname.name, dead_node); + dlm_lockres_clear_refmap_bit(dlm, res, dead_node); + } } spin_unlock(&res->spinlock); } @@ -2400,6 +2426,7 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) mlog(0, "node %u being removed from domain map!\n", idx); clear_bit(idx, dlm->domain_map); + clear_bit(idx, dlm->exit_domain_map); /* wake up migration waiters if a node goes down. * perhaps later we can genericize this for other waiters. */ wake_up(&dlm->migration_wq); @@ -2609,8 +2636,6 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) int nodenum; int status; - mlog_entry("%u\n", dead_node); - mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); spin_lock(&dlm->spinlock); @@ -2707,6 +2732,7 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data, dlm->name, br->node_idx, br->dead_node, dlm->reco.dead_node, dlm->reco.new_master); spin_unlock(&dlm->spinlock); + dlm_put(dlm); return -EAGAIN; } spin_unlock(&dlm->spinlock); @@ -2878,8 +2904,8 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data, BUG(); } dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; + __dlm_reset_recovery(dlm); spin_unlock(&dlm->spinlock); - dlm_reset_recovery(dlm); dlm_kick_recovery_thread(dlm); break; default: diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c index d4f73ca68fe..69aac6f088a 100644 --- a/fs/ocfs2/dlm/dlmthread.c +++ b/fs/ocfs2/dlm/dlmthread.c @@ -92,19 +92,29 @@ int __dlm_lockres_has_locks(struct dlm_lock_resource *res) * truly ready to be freed. */ int __dlm_lockres_unused(struct dlm_lock_resource *res) { - if (!__dlm_lockres_has_locks(res) && - (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) { - /* try not to scan the bitmap unless the first two - * conditions are already true */ - int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); - if (bit >= O2NM_MAX_NODES) { - /* since the bit for dlm->node_num is not - * set, inflight_locks better be zero */ - BUG_ON(res->inflight_locks != 0); - return 1; - } - } - return 0; + int bit; + + assert_spin_locked(&res->spinlock); + + if (__dlm_lockres_has_locks(res)) + return 0; + + /* Locks are in the process of being created */ + if (res->inflight_locks) + return 0; + + if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) + return 0; + + if (res->state & DLM_LOCK_RES_RECOVERING) + return 0; + + /* Another node has this resource with this node as the master */ + bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); + if (bit < O2NM_MAX_NODES) + return 0; + + return 1; } @@ -114,15 +124,13 @@ int __dlm_lockres_unused(struct dlm_lock_resource *res) void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); - assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); if (__dlm_lockres_unused(res)){ if (list_empty(&res->purge)) { - mlog(0, "putting lockres %.*s:%p onto purge list\n", - res->lockname.len, res->lockname.name, res); + mlog(0, "%s: Adding res %.*s to purge list\n", + dlm->name, res->lockname.len, res->lockname.name); res->last_used = jiffies; dlm_lockres_get(res); @@ -130,8 +138,8 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, dlm->purge_count++; } } else if (!list_empty(&res->purge)) { - mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n", - res->lockname.len, res->lockname.name, res, res->owner); + mlog(0, "%s: Removing res %.*s from purge list\n", + dlm->name, res->lockname.len, res->lockname.name); list_del_init(&res->purge); dlm_lockres_put(res); @@ -142,7 +150,6 @@ void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); spin_lock(&dlm->spinlock); spin_lock(&res->spinlock); @@ -152,45 +159,24 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } -static int dlm_purge_lockres(struct dlm_ctxt *dlm, +static void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { int master; int ret = 0; - spin_lock(&res->spinlock); - if (!__dlm_lockres_unused(res)) { - mlog(0, "%s:%.*s: tried to purge but not unused\n", - dlm->name, res->lockname.len, res->lockname.name); - __dlm_print_one_lock_resource(res); - spin_unlock(&res->spinlock); - BUG(); - } - - if (res->state & DLM_LOCK_RES_MIGRATING) { - mlog(0, "%s:%.*s: Delay dropref as this lockres is " - "being remastered\n", dlm->name, res->lockname.len, - res->lockname.name); - /* Re-add the lockres to the end of the purge list */ - if (!list_empty(&res->purge)) { - list_del_init(&res->purge); - list_add_tail(&res->purge, &dlm->purge_list); - } - spin_unlock(&res->spinlock); - return 0; - } + assert_spin_locked(&dlm->spinlock); + assert_spin_locked(&res->spinlock); master = (res->owner == dlm->node_num); - if (!master) - res->state |= DLM_LOCK_RES_DROPPING_REF; - spin_unlock(&res->spinlock); - - mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len, - res->lockname.name, master); + mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, + res->lockname.len, res->lockname.name, master); if (!master) { + res->state |= DLM_LOCK_RES_DROPPING_REF; /* drop spinlock... retake below */ + spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); spin_lock(&res->spinlock); @@ -201,38 +187,38 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm, /* clear our bit from the master's refmap, ignore errors */ ret = dlm_drop_lockres_ref(dlm, res); if (ret < 0) { - mlog_errno(ret); if (!dlm_is_host_down(ret)) BUG(); } - mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n", - dlm->name, res->lockname.len, res->lockname.name, ret); spin_lock(&dlm->spinlock); + spin_lock(&res->spinlock); } - spin_lock(&res->spinlock); if (!list_empty(&res->purge)) { - mlog(0, "removing lockres %.*s:%p from purgelist, " - "master = %d\n", res->lockname.len, res->lockname.name, - res, master); + mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", + dlm->name, res->lockname.len, res->lockname.name, master); list_del_init(&res->purge); - spin_unlock(&res->spinlock); dlm_lockres_put(res); dlm->purge_count--; - } else - spin_unlock(&res->spinlock); + } - __dlm_unhash_lockres(res); + if (!__dlm_lockres_unused(res)) { + mlog(ML_ERROR, "%s: res %.*s in use after deref\n", + dlm->name, res->lockname.len, res->lockname.name); + __dlm_print_one_lock_resource(res); + BUG(); + } + + __dlm_unhash_lockres(dlm, res); /* lockres is not in the hash now. drop the flag and wake up * any processes waiting in dlm_get_lock_resource. */ if (!master) { - spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_DROPPING_REF; spin_unlock(&res->spinlock); wake_up(&res->wq); - } - return 0; + } else + spin_unlock(&res->spinlock); } static void dlm_run_purge_list(struct dlm_ctxt *dlm, @@ -251,17 +237,7 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm, lockres = list_entry(dlm->purge_list.next, struct dlm_lock_resource, purge); - /* Status of the lockres *might* change so double - * check. If the lockres is unused, holding the dlm - * spinlock will prevent people from getting and more - * refs on it -- there's no need to keep the lockres - * spinlock. */ spin_lock(&lockres->spinlock); - unused = __dlm_lockres_unused(lockres); - spin_unlock(&lockres->spinlock); - - if (!unused) - continue; purge_jiffies = lockres->last_used + msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); @@ -273,15 +249,32 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm, * in tail order, we can stop at the first * unpurgable resource -- anyone added after * him will have a greater last_used value */ + spin_unlock(&lockres->spinlock); break; } + /* Status of the lockres *might* change so double + * check. If the lockres is unused, holding the dlm + * spinlock will prevent people from getting and more + * refs on it. */ + unused = __dlm_lockres_unused(lockres); + if (!unused || + (lockres->state & DLM_LOCK_RES_MIGRATING) || + (lockres->inflight_assert_workers != 0)) { + mlog(0, "%s: res %.*s is in use or being remastered, " + "used %d, state %d, assert master workers %u\n", + dlm->name, lockres->lockname.len, + lockres->lockname.name, + !unused, lockres->state, + lockres->inflight_assert_workers); + list_move_tail(&lockres->purge, &dlm->purge_list); + spin_unlock(&lockres->spinlock); + continue; + } + dlm_lockres_get(lockres); - /* This may drop and reacquire the dlm spinlock if it - * has to do migration. */ - if (dlm_purge_lockres(dlm, lockres)) - BUG(); + dlm_purge_lockres(dlm, lockres); dlm_lockres_put(lockres); @@ -296,19 +289,14 @@ static void dlm_shuffle_lists(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { struct dlm_lock *lock, *target; - struct list_head *iter; - struct list_head *head; int can_grant = 1; - //mlog(0, "res->lockname.len=%d\n", res->lockname.len); - //mlog(0, "res->lockname.name=%p\n", res->lockname.name); - //mlog(0, "shuffle res %.*s\n", res->lockname.len, - // res->lockname.name); - - /* because this function is called with the lockres + /* + * Because this function is called with the lockres * spinlock, and because we know that it is not migrating/ * recovering/in-progress, it is fine to reserve asts and - * basts right before queueing them all throughout */ + * basts right before queueing them all throughout + */ assert_spin_locked(&dlm->ast_lock); assert_spin_locked(&res->spinlock); BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| @@ -318,18 +306,16 @@ static void dlm_shuffle_lists(struct dlm_ctxt *dlm, converting: if (list_empty(&res->converting)) goto blocked; - mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len, - res->lockname.name); + mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, + res->lockname.len, res->lockname.name); target = list_entry(res->converting.next, struct dlm_lock, list); if (target->ml.convert_type == LKM_IVMODE) { - mlog(ML_ERROR, "%.*s: converting a lock with no " - "convert_type!\n", res->lockname.len, res->lockname.name); + mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", + dlm->name, res->lockname.len, res->lockname.name); BUG(); } - head = &res->granted; - list_for_each(iter, head) { - lock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(lock, &res->granted, list) { if (lock==target) continue; if (!dlm_lock_compatible(lock->ml.type, @@ -346,9 +332,8 @@ converting: target->ml.convert_type; } } - head = &res->converting; - list_for_each(iter, head) { - lock = list_entry(iter, struct dlm_lock, list); + + list_for_each_entry(lock, &res->converting, list) { if (lock==target) continue; if (!dlm_lock_compatible(lock->ml.type, @@ -369,9 +354,12 @@ converting: spin_lock(&target->spinlock); BUG_ON(target->ml.highest_blocked != LKM_IVMODE); - mlog(0, "calling ast for converting lock: %.*s, have: %d, " - "granting: %d, node: %u\n", res->lockname.len, - res->lockname.name, target->ml.type, + mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " + "%d => %d, node %u\n", dlm->name, res->lockname.len, + res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), + target->ml.type, target->ml.convert_type, target->ml.node); target->ml.type = target->ml.convert_type; @@ -394,9 +382,7 @@ blocked: goto leave; target = list_entry(res->blocked.next, struct dlm_lock, list); - head = &res->granted; - list_for_each(iter, head) { - lock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(lock, &res->granted, list) { if (lock==target) continue; if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { @@ -410,9 +396,7 @@ blocked: } } - head = &res->converting; - list_for_each(iter, head) { - lock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(lock, &res->converting, list) { if (lock==target) continue; if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { @@ -432,11 +416,14 @@ blocked: spin_lock(&target->spinlock); BUG_ON(target->ml.highest_blocked != LKM_IVMODE); - mlog(0, "calling ast for blocked lock: %.*s, granting: %d, " - "node: %u\n", res->lockname.len, res->lockname.name, + mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " + "node %u\n", dlm->name, res->lockname.len, + res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), target->ml.type, target->ml.node); - // target->ml.type is already correct + /* target->ml.type is already correct */ list_move_tail(&target->list, &res->granted); BUG_ON(!target->lksb); @@ -457,7 +444,6 @@ leave: /* must have NO locks when calling this with res !=NULL * */ void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - mlog_entry("dlm=%p, res=%p\n", dlm, res); if (res) { spin_lock(&dlm->spinlock); spin_lock(&res->spinlock); @@ -470,8 +456,6 @@ void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) { - mlog_entry("dlm=%p, res=%p\n", dlm, res); - assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); @@ -488,13 +472,16 @@ void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) res->state |= DLM_LOCK_RES_DIRTY; } } + + mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, + res->lockname.name); } /* Launch the NM thread for the mounted volume */ int dlm_launch_thread(struct dlm_ctxt *dlm) { - mlog(0, "starting dlm thread...\n"); + mlog(0, "Starting dlm_thread...\n"); dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); if (IS_ERR(dlm->dlm_thread_task)) { @@ -509,7 +496,7 @@ int dlm_launch_thread(struct dlm_ctxt *dlm) void dlm_complete_thread(struct dlm_ctxt *dlm) { if (dlm->dlm_thread_task) { - mlog(ML_KTHREAD, "waiting for dlm thread to exit\n"); + mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); kthread_stop(dlm->dlm_thread_task); dlm->dlm_thread_task = NULL; } @@ -540,7 +527,12 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm) /* get an extra ref on lock */ dlm_lock_get(lock); res = lock->lockres; - mlog(0, "delivering an ast for this lockres\n"); + mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " + "node %u\n", dlm->name, res->lockname.len, + res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), + lock->ml.type, lock->ml.node); BUG_ON(!lock->ast_pending); @@ -561,9 +553,9 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm) /* possible that another ast was queued while * we were delivering the last one */ if (!list_empty(&lock->ast_list)) { - mlog(0, "aha another ast got queued while " - "we were finishing the last one. will " - "keep the ast_pending flag set.\n"); + mlog(0, "%s: res %.*s, AST queued while flushing last " + "one\n", dlm->name, res->lockname.len, + res->lockname.name); } else lock->ast_pending = 0; @@ -594,8 +586,12 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm) dlm_lock_put(lock); spin_unlock(&dlm->ast_lock); - mlog(0, "delivering a bast for this lockres " - "(blocked = %d\n", hi); + mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " + "blocked %d, node %u\n", + dlm->name, res->lockname.len, res->lockname.name, + dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), + dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), + hi, lock->ml.node); if (lock->ml.node != dlm->node_num) { ret = dlm_send_proxy_bast(dlm, res, lock, hi); @@ -609,9 +605,9 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm) /* possible that another bast was queued while * we were delivering the last one */ if (!list_empty(&lock->bast_list)) { - mlog(0, "aha another bast got queued while " - "we were finishing the last one. will " - "keep the bast_pending flag set.\n"); + mlog(0, "%s: res %.*s, BAST queued while flushing last " + "one\n", dlm->name, res->lockname.len, + res->lockname.name); } else lock->bast_pending = 0; @@ -679,11 +675,12 @@ static int dlm_thread(void *data) spin_lock(&res->spinlock); if (res->owner != dlm->node_num) { __dlm_print_one_lock_resource(res); - mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n", - res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no", - res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no", - res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no", - res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); + mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," + " dirty %d\n", dlm->name, + !!(res->state & DLM_LOCK_RES_IN_PROGRESS), + !!(res->state & DLM_LOCK_RES_MIGRATING), + !!(res->state & DLM_LOCK_RES_RECOVERING), + !!(res->state & DLM_LOCK_RES_DIRTY)); } BUG_ON(res->owner != dlm->node_num); @@ -697,8 +694,8 @@ static int dlm_thread(void *data) res->state &= ~DLM_LOCK_RES_DIRTY; spin_unlock(&res->spinlock); spin_unlock(&dlm->ast_lock); - mlog(0, "delaying list shuffling for in-" - "progress lockres %.*s, state=%d\n", + mlog(0, "%s: res %.*s, inprogress, delay list " + "shuffle, state %d\n", dlm->name, res->lockname.len, res->lockname.name, res->state); delay = 1; @@ -710,10 +707,6 @@ static int dlm_thread(void *data) * spinlock and do NOT have the dlm lock. * safe to reserve/queue asts and run the lists. */ - mlog(0, "calling dlm_shuffle_lists with dlm=%s, " - "res=%.*s\n", dlm->name, - res->lockname.len, res->lockname.name); - /* called while holding lockres lock */ dlm_shuffle_lists(dlm, res); res->state &= ~DLM_LOCK_RES_DIRTY; @@ -737,7 +730,8 @@ in_progress: /* unlikely, but we may need to give time to * other tasks */ if (!--n) { - mlog(0, "throttling dlm_thread\n"); + mlog(0, "%s: Throttling dlm thread\n", + dlm->name); break; } } diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c index 817287c6a6d..2e3c9dbab68 100644 --- a/fs/ocfs2/dlm/dlmunlock.c +++ b/fs/ocfs2/dlm/dlmunlock.c @@ -191,7 +191,9 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, DLM_UNLOCK_CLEAR_CONVERT_TYPE); } else if (status == DLM_RECOVERING || status == DLM_MIGRATING || - status == DLM_FORWARD) { + status == DLM_FORWARD || + status == DLM_NOLOCKMGR + ) { /* must clear the actions because this unlock * is about to be retried. cannot free or do * any list manipulation. */ @@ -200,7 +202,8 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, res->lockname.name, status==DLM_RECOVERING?"recovering": (status==DLM_MIGRATING?"migrating": - "forward")); + (status == DLM_FORWARD ? "forward" : + "nolockmanager"))); actions = 0; } if (flags & LKM_CANCEL) @@ -317,7 +320,7 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, struct kvec vec[2]; size_t veclen = 1; - mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); + mlog(0, "%.*s\n", res->lockname.len, res->lockname.name); if (owner == dlm->node_num) { /* ended up trying to contact ourself. this means @@ -364,7 +367,10 @@ static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm, * updated state to the recovery master. this thread * just needs to finish out the operation and call * the unlockast. */ - ret = DLM_NORMAL; + if (dlm_is_node_dead(dlm, owner)) + ret = DLM_NORMAL; + else + ret = DLM_NOLOCKMGR; } else { /* something bad. this will BUG in ocfs2 */ ret = dlm_err_to_dlm_status(tmpret); @@ -388,7 +394,6 @@ int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data, struct dlm_ctxt *dlm = data; struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf; struct dlm_lock_resource *res = NULL; - struct list_head *iter; struct dlm_lock *lock = NULL; enum dlm_status status = DLM_NORMAL; int found = 0, i; @@ -458,8 +463,7 @@ int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data, } for (i=0; i<3; i++) { - list_for_each(iter, queue) { - lock = list_entry(iter, struct dlm_lock, list); + list_for_each_entry(lock, queue, list) { if (lock->ml.cookie == unlock->cookie && lock->ml.node == unlock->node_idx) { dlm_lock_get(lock); @@ -588,8 +592,6 @@ enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb, struct dlm_lock *lock = NULL; int call_ast, is_master; - mlog_entry_void(); - if (!lksb) { dlm_error(DLM_BADARGS); return DLM_BADARGS; @@ -642,7 +644,9 @@ retry: if (status == DLM_RECOVERING || status == DLM_MIGRATING || - status == DLM_FORWARD) { + status == DLM_FORWARD || + status == DLM_NOLOCKMGR) { + /* We want to go away for a tiny bit to allow recovery * / migration to complete on this resource. I don't * know of any wait queue we could sleep on as this @@ -654,7 +658,7 @@ retry: msleep(50); mlog(0, "retrying unlock due to pending recovery/" - "migration/in-progress\n"); + "migration/in-progress/reconnect\n"); goto retry; } diff --git a/fs/ocfs2/dlm/dlmver.c b/fs/ocfs2/dlm/dlmver.c deleted file mode 100644 index dfc0da4d158..00000000000 --- a/fs/ocfs2/dlm/dlmver.c +++ /dev/null @@ -1,42 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * dlmver.c - * - * version string - * - * Copyright (C) 2002, 2005 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - */ - -#include <linux/module.h> -#include <linux/kernel.h> - -#include "dlmver.h" - -#define DLM_BUILD_VERSION "1.5.0" - -#define VERSION_STR "OCFS2 DLM " DLM_BUILD_VERSION - -void dlm_print_version(void) -{ - printk(KERN_INFO "%s\n", VERSION_STR); -} - -MODULE_DESCRIPTION(VERSION_STR); - -MODULE_VERSION(DLM_BUILD_VERSION); diff --git a/fs/ocfs2/dlm/dlmver.h b/fs/ocfs2/dlm/dlmver.h deleted file mode 100644 index f674aee77a1..00000000000 --- a/fs/ocfs2/dlm/dlmver.h +++ /dev/null @@ -1,31 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; -*- - * vim: noexpandtab sw=8 ts=8 sts=0: - * - * dlmfsver.h - * - * Function prototypes - * - * Copyright (C) 2005 Oracle. All rights reserved. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 021110-1307, USA. - */ - -#ifndef DLM_VER_H -#define DLM_VER_H - -void dlm_print_version(void); - -#endif /* DLM_VER_H */ |
