diff options
Diffstat (limited to 'fs/dlm/lock.c')
| -rw-r--r-- | fs/dlm/lock.c | 2263 |
1 files changed, 1739 insertions, 524 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 64e5f3efdd8..83f3d552030 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -56,6 +56,7 @@ L: receive_xxxx_reply() <- R: send_xxxx_reply() */ #include <linux/types.h> +#include <linux/rbtree.h> #include <linux/slab.h> #include "dlm_internal.h" #include <linux/dlm_device.h> @@ -89,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void del_timeout(struct dlm_lkb *lkb); +static void toss_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -159,18 +161,21 @@ static const int __quecvt_compat_matrix[8][8] = { void dlm_print_lkb(struct dlm_lkb *lkb) { - printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" - " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", + printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " + "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, - lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); + lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, + (unsigned long long)lkb->lkb_recover_seq); } static void dlm_print_rsb(struct dlm_rsb *r) { - printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", - r->res_nodeid, r->res_flags, r->res_first_lkid, - r->res_recover_locks_count, r->res_name); + printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " + "rlc %d name %s\n", + r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, + r->res_flags, r->res_first_lkid, r->res_recover_locks_count, + r->res_name); } void dlm_dump_rsb(struct dlm_rsb *r) @@ -250,8 +255,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb) static inline int is_master_copy(struct dlm_lkb *lkb) { - if (lkb->lkb_flags & DLM_IFL_MSTCPY) - DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; } @@ -305,10 +308,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) rv = -EDEADLK; } - lkb->lkb_lksb->sb_status = rv; - lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; - - dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode); + dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); } static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -319,13 +319,10 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { - lkb->lkb_time_bast = ktime_get(); - if (is_master_copy(lkb)) { - lkb->lkb_bastmode = rqmode; /* printed by debugfs */ send_bast(r, lkb, rqmode); } else { - dlm_add_ast(lkb, AST_BAST, rqmode); + dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); } } @@ -333,13 +330,94 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ -static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) +/* This is only called to add a reference when the code already holds + a valid reference to the rsb, so there's no need for locking. */ + +static inline void hold_rsb(struct dlm_rsb *r) +{ + kref_get(&r->res_ref); +} + +void dlm_hold_rsb(struct dlm_rsb *r) +{ + hold_rsb(r); +} + +/* When all references to the rsb are gone it's transferred to + the tossed list for later disposal. */ + +static void put_rsb(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + uint32_t bucket = r->res_bucket; + + spin_lock(&ls->ls_rsbtbl[bucket].lock); + kref_put(&r->res_ref, toss_rsb); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); +} + +void dlm_put_rsb(struct dlm_rsb *r) +{ + put_rsb(r); +} + +static int pre_rsb_struct(struct dlm_ls *ls) +{ + struct dlm_rsb *r1, *r2; + int count = 0; + + spin_lock(&ls->ls_new_rsb_spin); + if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { + spin_unlock(&ls->ls_new_rsb_spin); + return 0; + } + spin_unlock(&ls->ls_new_rsb_spin); + + r1 = dlm_allocate_rsb(ls); + r2 = dlm_allocate_rsb(ls); + + spin_lock(&ls->ls_new_rsb_spin); + if (r1) { + list_add(&r1->res_hashchain, &ls->ls_new_rsb); + ls->ls_new_rsb_count++; + } + if (r2) { + list_add(&r2->res_hashchain, &ls->ls_new_rsb); + ls->ls_new_rsb_count++; + } + count = ls->ls_new_rsb_count; + spin_unlock(&ls->ls_new_rsb_spin); + + if (!count) + return -ENOMEM; + return 0; +} + +/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can + unlock any spinlocks, go back and call pre_rsb_struct again. + Otherwise, take an rsb off the list and return it. */ + +static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, + struct dlm_rsb **r_ret) { struct dlm_rsb *r; + int count; + + spin_lock(&ls->ls_new_rsb_spin); + if (list_empty(&ls->ls_new_rsb)) { + count = ls->ls_new_rsb_count; + spin_unlock(&ls->ls_new_rsb_spin); + log_debug(ls, "find_rsb retry %d %d %s", + count, dlm_config.ci_new_rsb_count, name); + return -EAGAIN; + } - r = dlm_allocate_rsb(ls, len); - if (!r) - return NULL; + r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); + list_del(&r->res_hashchain); + /* Convert the empty list_head to a NULL rb_node for tree usage: */ + memset(&r->res_hashnode, 0, sizeof(struct rb_node)); + ls->ls_new_rsb_count--; + spin_unlock(&ls->ls_new_rsb_spin); r->res_ls = ls; r->res_length = len; @@ -353,161 +431,696 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_recover_list); - return r; + *r_ret = r; + return 0; } -static int search_rsb_list(struct list_head *head, char *name, int len, - unsigned int flags, struct dlm_rsb **r_ret) +static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) { - struct dlm_rsb *r; - int error = 0; + char maxname[DLM_RESNAME_MAXLEN]; + + memset(maxname, 0, DLM_RESNAME_MAXLEN); + memcpy(maxname, name, nlen); + return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); +} - list_for_each_entry(r, head, res_hashchain) { - if (len == r->res_length && !memcmp(name, r->res_name, len)) +int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, + struct dlm_rsb **r_ret) +{ + struct rb_node *node = tree->rb_node; + struct dlm_rsb *r; + int rc; + + while (node) { + r = rb_entry(node, struct dlm_rsb, res_hashnode); + rc = rsb_cmp(r, name, len); + if (rc < 0) + node = node->rb_left; + else if (rc > 0) + node = node->rb_right; + else goto found; } *r_ret = NULL; return -EBADR; found: - if (r->res_nodeid && (flags & R_MASTER)) - error = -ENOTBLK; *r_ret = r; - return error; + return 0; } -static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, - unsigned int flags, struct dlm_rsb **r_ret) +static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) { - struct dlm_rsb *r; + struct rb_node **newn = &tree->rb_node; + struct rb_node *parent = NULL; + int rc; + + while (*newn) { + struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, + res_hashnode); + + parent = *newn; + rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); + if (rc < 0) + newn = &parent->rb_left; + else if (rc > 0) + newn = &parent->rb_right; + else { + log_print("rsb_insert match"); + dlm_dump_rsb(rsb); + dlm_dump_rsb(cur); + return -EEXIST; + } + } + + rb_link_node(&rsb->res_hashnode, parent, newn); + rb_insert_color(&rsb->res_hashnode, tree); + return 0; +} + +/* + * Find rsb in rsbtbl and potentially create/add one + * + * Delaying the release of rsb's has a similar benefit to applications keeping + * NL locks on an rsb, but without the guarantee that the cached master value + * will still be valid when the rsb is reused. Apps aren't always smart enough + * to keep NL locks on an rsb that they may lock again shortly; this can lead + * to excessive master lookups and removals if we don't delay the release. + * + * Searching for an rsb means looking through both the normal list and toss + * list. When found on the toss list the rsb is moved to the normal list with + * ref count of 1; when found on normal list the ref count is incremented. + * + * rsb's on the keep list are being used locally and refcounted. + * rsb's on the toss list are not being used locally, and are not refcounted. + * + * The toss list rsb's were either + * - previously used locally but not any more (were on keep list, then + * moved to toss list when last refcount dropped) + * - created and put on toss list as a directory record for a lookup + * (we are the dir node for the res, but are not using the res right now, + * but some other node is) + * + * The purpose of find_rsb() is to return a refcounted rsb for local use. + * So, if the given rsb is on the toss list, it is moved to the keep list + * before being returned. + * + * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * more refcounts exist, so the rsb is moved from the keep list to the + * toss list. + * + * rsb's on both keep and toss lists are used for doing a name to master + * lookups. rsb's that are in use locally (and being refcounted) are on + * the keep list, rsb's that are not in use locally (not refcounted) and + * only exist for name/master lookups are on the toss list. + * + * rsb's on the toss list who's dir_nodeid is not local can have stale + * name/master mappings. So, remote requests on such rsb's can potentially + * return with an error, which means the mapping is stale and needs to + * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and + * first_lkid is to keep only a single outstanding request on an rsb + * while that rsb has a potentially stale master.) + */ + +static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, + uint32_t hash, uint32_t b, + int dir_nodeid, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) +{ + struct dlm_rsb *r = NULL; + int our_nodeid = dlm_our_nodeid(); + int from_local = 0; + int from_other = 0; + int from_dir = 0; + int create = 0; int error; - error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); - if (!error) { - kref_get(&r->res_ref); - goto out; + if (flags & R_RECEIVE_REQUEST) { + if (from_nodeid == dir_nodeid) + from_dir = 1; + else + from_other = 1; + } else if (flags & R_REQUEST) { + from_local = 1; + } + + /* + * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so + * from_nodeid has sent us a lock in dlm_recover_locks, believing + * we're the new master. Our local recovery may not have set + * res_master_nodeid to our_nodeid yet, so allow either. Don't + * create the rsb; dlm_recover_process_copy() will handle EBADR + * by resending. + * + * If someone sends us a request, we are the dir node, and we do + * not find the rsb anywhere, then recreate it. This happens if + * someone sends us a request after we have removed/freed an rsb + * from our toss list. (They sent a request instead of lookup + * because they are using an rsb from their toss list.) + */ + + if (from_local || from_dir || + (from_other && (dir_nodeid == our_nodeid))) { + create = 1; + } + + retry: + if (create) { + error = pre_rsb_struct(ls); + if (error < 0) + goto out; } - error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); + + spin_lock(&ls->ls_rsbtbl[b].lock); + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); if (error) - goto out; + goto do_toss; + + /* + * rsb is active, so we can't check master_nodeid without lock_rsb. + */ - list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); + kref_get(&r->res_ref); + error = 0; + goto out_unlock; - if (dlm_no_directory(ls)) - goto out; - if (r->res_nodeid == -1) { + do_toss: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto do_new; + + /* + * rsb found inactive (master_nodeid may be out of date unless + * we are the dir_nodeid or were the master) No other thread + * is using this rsb because it's on the toss list, so we can + * look at or update res_master_nodeid without lock_rsb. + */ + + if ((r->res_master_nodeid != our_nodeid) && from_other) { + /* our rsb was not master, and another node (not the dir node) + has sent us a request */ + log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + from_nodeid, r->res_master_nodeid, dir_nodeid, + r->res_name); + error = -ENOTBLK; + goto out_unlock; + } + + if ((r->res_master_nodeid != our_nodeid) && from_dir) { + /* don't think this should ever happen */ + log_error(ls, "find_rsb toss from_dir %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + /* fix it and go on */ + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; - } else if (r->res_nodeid > 0) { + } + + if (from_local && (r->res_master_nodeid != our_nodeid)) { + /* Because we have held no locks on this rsb, + res_master_nodeid could have become stale. */ rsb_set_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + goto out_unlock; + + + do_new: + /* + * rsb not found + */ + + if (error == -EBADR && !create) + goto out_unlock; + + error = get_rsb_struct(ls, name, len, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; + } + if (error) + goto out_unlock; + + r->res_hash = hash; + r->res_bucket = b; + r->res_dir_nodeid = dir_nodeid; + kref_init(&r->res_ref); + + if (from_dir) { + /* want to see how often this happens */ + log_debug(ls, "find_rsb new from_dir %d recreate %s", + from_nodeid, r->res_name); + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; + goto out_add; + } + + if (from_other && (dir_nodeid != our_nodeid)) { + /* should never happen */ + log_error(ls, "find_rsb new from_other %d dir %d our %d %s", + from_nodeid, dir_nodeid, our_nodeid, r->res_name); + dlm_free_rsb(r); + r = NULL; + error = -ENOTBLK; + goto out_unlock; + } + + if (from_other) { + log_debug(ls, "find_rsb new from_other %d dir %d %s", + from_nodeid, dir_nodeid, r->res_name); + } + + if (dir_nodeid == our_nodeid) { + /* When we are the dir nodeid, we can set the master + node immediately */ + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; } else { - DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); - DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); + /* set_master will send_lookup to dir_nodeid */ + r->res_master_nodeid = 0; + r->res_nodeid = -1; } + + out_add: + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + out_unlock: + spin_unlock(&ls->ls_rsbtbl[b].lock); out: *r_ret = r; return error; } -static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, - unsigned int flags, struct dlm_rsb **r_ret) +/* During recovery, other nodes can send us new MSTCPY locks (from + dlm_recover_locks) before we've made ourself master (in + dlm_recover_masters). */ + +static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, + uint32_t hash, uint32_t b, + int dir_nodeid, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) { + struct dlm_rsb *r = NULL; + int our_nodeid = dlm_our_nodeid(); + int recover = (flags & R_RECEIVE_RECOVER); int error; + + retry: + error = pre_rsb_struct(ls); + if (error < 0) + goto out; + spin_lock(&ls->ls_rsbtbl[b].lock); - error = _search_rsb(ls, name, len, b, flags, r_ret); + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (error) + goto do_toss; + + /* + * rsb is active, so we can't check master_nodeid without lock_rsb. + */ + + kref_get(&r->res_ref); + goto out_unlock; + + + do_toss: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto do_new; + + /* + * rsb found inactive. No other thread is using this rsb because + * it's on the toss list, so we can look at or update + * res_master_nodeid without lock_rsb. + */ + + if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { + /* our rsb is not master, and another node has sent us a + request; this should never happen */ + log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + from_nodeid, r->res_master_nodeid, dir_nodeid); + dlm_print_rsb(r); + error = -ENOTBLK; + goto out_unlock; + } + + if (!recover && (r->res_master_nodeid != our_nodeid) && + (dir_nodeid == our_nodeid)) { + /* our rsb is not master, and we are dir; may as well fix it; + this should never happen */ + log_error(ls, "find_rsb toss our %d master %d dir %d", + our_nodeid, r->res_master_nodeid, dir_nodeid); + dlm_print_rsb(r); + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + goto out_unlock; + + + do_new: + /* + * rsb not found + */ + + error = get_rsb_struct(ls, name, len, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; + } + if (error) + goto out_unlock; + + r->res_hash = hash; + r->res_bucket = b; + r->res_dir_nodeid = dir_nodeid; + r->res_master_nodeid = dir_nodeid; + r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; + kref_init(&r->res_ref); + + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + out_unlock: spin_unlock(&ls->ls_rsbtbl[b].lock); + out: + *r_ret = r; return error; } +static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) +{ + uint32_t hash, b; + int dir_nodeid; + + if (len > DLM_RESNAME_MAXLEN) + return -EINVAL; + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + dir_nodeid = dlm_hash2nodeid(ls, hash); + + if (dlm_no_directory(ls)) + return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, + from_nodeid, flags, r_ret); + else + return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, + from_nodeid, flags, r_ret); +} + +/* we have received a request and found that res_master_nodeid != our_nodeid, + so we need to return an error or make ourself the master */ + +static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, + int from_nodeid) +{ + if (dlm_no_directory(ls)) { + log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", + from_nodeid, r->res_master_nodeid, + r->res_dir_nodeid); + dlm_print_rsb(r); + return -ENOTBLK; + } + + if (from_nodeid != r->res_dir_nodeid) { + /* our rsb is not master, and another node (not the dir node) + has sent us a request. this is much more common when our + master_nodeid is zero, so limit debug to non-zero. */ + + if (r->res_master_nodeid) { + log_debug(ls, "validate master from_other %d master %d " + "dir %d first %x %s", from_nodeid, + r->res_master_nodeid, r->res_dir_nodeid, + r->res_first_lkid, r->res_name); + } + return -ENOTBLK; + } else { + /* our rsb is not master, but the dir nodeid has sent us a + request; this could happen with master 0 / res_nodeid -1 */ + + if (r->res_master_nodeid) { + log_error(ls, "validate master from_dir %d master %d " + "first %x %s", + from_nodeid, r->res_master_nodeid, + r->res_first_lkid, r->res_name); + } + + r->res_master_nodeid = dlm_our_nodeid(); + r->res_nodeid = 0; + return 0; + } +} + /* - * Find rsb in rsbtbl and potentially create/add one + * We're the dir node for this res and another node wants to know the + * master nodeid. During normal operation (non recovery) this is only + * called from receive_lookup(); master lookups when the local node is + * the dir node are done by find_rsb(). * - * Delaying the release of rsb's has a similar benefit to applications keeping - * NL locks on an rsb, but without the guarantee that the cached master value - * will still be valid when the rsb is reused. Apps aren't always smart enough - * to keep NL locks on an rsb that they may lock again shortly; this can lead - * to excessive master lookups and removals if we don't delay the release. + * normal operation, we are the dir node for a resource + * . _request_lock + * . set_master + * . send_lookup + * . receive_lookup + * . dlm_master_lookup flags 0 * - * Searching for an rsb means looking through both the normal list and toss - * list. When found on the toss list the rsb is moved to the normal list with - * ref count of 1; when found on normal list the ref count is incremented. + * recover directory, we are rebuilding dir for all resources + * . dlm_recover_directory + * . dlm_rcom_names + * remote node sends back the rsb names it is master of and we are dir of + * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) + * we either create new rsb setting remote node as master, or find existing + * rsb and set master to be the remote node. + * + * recover masters, we are finding the new master for resources + * . dlm_recover_masters + * . recover_master + * . dlm_send_rcom_lookup + * . receive_rcom_lookup + * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -static int find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, + unsigned int flags, int *r_nodeid, int *result) { - struct dlm_rsb *r = NULL, *tmp; - uint32_t hash, bucket; - int error = -EINVAL; + struct dlm_rsb *r = NULL; + uint32_t hash, b; + int from_master = (flags & DLM_LU_RECOVER_DIR); + int fix_master = (flags & DLM_LU_RECOVER_MASTER); + int our_nodeid = dlm_our_nodeid(); + int dir_nodeid, error, toss_list = 0; - if (namelen > DLM_RESNAME_MAXLEN) - goto out; + if (len > DLM_RESNAME_MAXLEN) + return -EINVAL; - if (dlm_no_directory(ls)) - flags |= R_CREATE; + if (from_nodeid == our_nodeid) { + log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", + our_nodeid, flags); + return -EINVAL; + } - error = 0; - hash = jhash(name, namelen, 0); - bucket = hash & (ls->ls_rsbtbl_size - 1); + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); - error = search_rsb(ls, name, namelen, bucket, flags, &r); - if (!error) - goto out; + dir_nodeid = dlm_hash2nodeid(ls, hash); + if (dir_nodeid != our_nodeid) { + log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", + from_nodeid, dir_nodeid, our_nodeid, hash, + ls->ls_num_nodes); + *r_nodeid = -1; + return -EINVAL; + } - if (error == -EBADR && !(flags & R_CREATE)) - goto out; + retry: + error = pre_rsb_struct(ls); + if (error < 0) + return error; - /* the rsb was found but wasn't a master copy */ - if (error == -ENOTBLK) - goto out; + spin_lock(&ls->ls_rsbtbl[b].lock); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!error) { + /* because the rsb is active, we need to lock_rsb before + checking/changing re_master_nodeid */ - error = -ENOMEM; - r = create_rsb(ls, name, namelen); - if (!r) - goto out; + hold_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + lock_rsb(r); + goto found; + } - r->res_hash = hash; - r->res_bucket = bucket; - r->res_nodeid = -1; - kref_init(&r->res_ref); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto not_found; - /* With no directory, the master can be set immediately */ - if (dlm_no_directory(ls)) { - int nodeid = dlm_dir_nodeid(r); - if (nodeid == dlm_our_nodeid()) - nodeid = 0; - r->res_nodeid = nodeid; + /* because the rsb is inactive (on toss list), it's not refcounted + and lock_rsb is not used, but is protected by the rsbtbl lock */ + + toss_list = 1; + found: + if (r->res_dir_nodeid != our_nodeid) { + /* should not happen, but may as well fix it and carry on */ + log_error(ls, "dlm_master_lookup res_dir %d our %d %s", + r->res_dir_nodeid, our_nodeid, r->res_name); + r->res_dir_nodeid = our_nodeid; + } + + if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + /* Recovery uses this function to set a new master when + the previous master failed. Setting NEW_MASTER will + force dlm_recover_masters to call recover_master on this + rsb even though the res_nodeid is no longer removed. */ + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + + if (toss_list) { + /* I don't think we should ever find it on toss list. */ + log_error(ls, "dlm_master_lookup fix_master on toss"); + dlm_dump_rsb(r); + } } - spin_lock(&ls->ls_rsbtbl[bucket].lock); - error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); - if (!error) { - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + if (from_master && (r->res_master_nodeid != from_nodeid)) { + /* this will happen if from_nodeid became master during + a previous recovery cycle, and we aborted the previous + cycle before recovering this master value */ + + log_limit(ls, "dlm_master_lookup from_master %d " + "master_nodeid %d res_nodeid %d first %x %s", + from_nodeid, r->res_master_nodeid, r->res_nodeid, + r->res_first_lkid, r->res_name); + + if (r->res_master_nodeid == our_nodeid) { + log_error(ls, "from_master %d our_master", from_nodeid); + dlm_dump_rsb(r); + dlm_send_rcom_lookup_dump(r, from_nodeid); + goto out_found; + } + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + } + + if (!r->res_master_nodeid) { + /* this will happen if recovery happens while we're looking + up the master for this rsb */ + + log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", + from_nodeid, r->res_first_lkid, r->res_name); + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + } + + if (!from_master && !fix_master && + (r->res_master_nodeid == from_nodeid)) { + /* this can happen when the master sends remove, the dir node + finds the rsb on the keep list and ignores the remove, + and the former master sends a lookup */ + + log_limit(ls, "dlm_master_lookup from master %d flags %x " + "first %x %s", from_nodeid, flags, + r->res_first_lkid, r->res_name); + } + + out_found: + *r_nodeid = r->res_master_nodeid; + if (result) + *result = DLM_LU_MATCH; + + if (toss_list) { + r->res_toss_time = jiffies; + /* the rsb was inactive (on toss list) */ + spin_unlock(&ls->ls_rsbtbl[b].lock); + } else { + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + } + return 0; + + not_found: + error = get_rsb_struct(ls, name, len, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; + } + if (error) + goto out_unlock; + + r->res_hash = hash; + r->res_bucket = b; + r->res_dir_nodeid = our_nodeid; + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + kref_init(&r->res_ref); + r->res_toss_time = jiffies; + + error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); + if (error) { + /* should never happen */ dlm_free_rsb(r); - r = tmp; - goto out; + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; } - list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + + if (result) + *result = DLM_LU_ADD; + *r_nodeid = from_nodeid; error = 0; - out: - *r_ret = r; + out_unlock: + spin_unlock(&ls->ls_rsbtbl[b].lock); return error; } -/* This is only called to add a reference when the code already holds - a valid reference to the rsb, so there's no need for locking. */ - -static inline void hold_rsb(struct dlm_rsb *r) +static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) { - kref_get(&r->res_ref); + struct rb_node *n; + struct dlm_rsb *r; + int i; + + for (i = 0; i < ls->ls_rsbtbl_size; i++) { + spin_lock(&ls->ls_rsbtbl[i].lock); + for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { + r = rb_entry(n, struct dlm_rsb, res_hashnode); + if (r->res_hash == hash) + dlm_dump_rsb(r); + } + spin_unlock(&ls->ls_rsbtbl[i].lock); + } } -void dlm_hold_rsb(struct dlm_rsb *r) +void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) { - hold_rsb(r); + struct dlm_rsb *r = NULL; + uint32_t hash, b; + int error; + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + spin_lock(&ls->ls_rsbtbl[b].lock); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!error) + goto out_dump; + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto out; + out_dump: + dlm_dump_rsb(r); + out: + spin_unlock(&ls->ls_rsbtbl[b].lock); } static void toss_rsb(struct kref *kref) @@ -517,32 +1130,16 @@ static void toss_rsb(struct kref *kref) DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); kref_init(&r->res_ref); - list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); + rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; + ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } -/* When all references to the rsb are gone it's transfered to - the tossed list for later disposal. */ - -static void put_rsb(struct dlm_rsb *r) -{ - struct dlm_ls *ls = r->res_ls; - uint32_t bucket = r->res_bucket; - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - kref_put(&r->res_ref, toss_rsb); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); -} - -void dlm_put_rsb(struct dlm_rsb *r) -{ - put_rsb(r); -} - /* See comment for unhold_lkb */ static void unhold_rsb(struct dlm_rsb *r) @@ -586,9 +1183,8 @@ static void detach_lkb(struct dlm_lkb *lkb) static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - struct dlm_lkb *lkb, *tmp; - uint32_t lkid = 0; - uint16_t bucket; + struct dlm_lkb *lkb; + int rv; lkb = dlm_allocate_lkb(ls); if (!lkb) @@ -600,59 +1196,36 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); INIT_LIST_HEAD(&lkb->lkb_time_list); + INIT_LIST_HEAD(&lkb->lkb_cb_list); + mutex_init(&lkb->lkb_cb_mutex); + INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - get_random_bytes(&bucket, sizeof(bucket)); - bucket &= (ls->ls_lkbtbl_size - 1); + idr_preload(GFP_NOFS); + spin_lock(&ls->ls_lkbidr_spin); + rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT); + if (rv >= 0) + lkb->lkb_id = rv; + spin_unlock(&ls->ls_lkbidr_spin); + idr_preload_end(); - write_lock(&ls->ls_lkbtbl[bucket].lock); - - /* counter can roll over so we must verify lkid is not in use */ - - while (lkid == 0) { - lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; - - list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, - lkb_idtbl_list) { - if (tmp->lkb_id != lkid) - continue; - lkid = 0; - break; - } + if (rv < 0) { + log_error(ls, "create_lkb idr error %d", rv); + return rv; } - lkb->lkb_id = lkid; - list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); - write_unlock(&ls->ls_lkbtbl[bucket].lock); - *lkb_ret = lkb; return 0; } -static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) -{ - struct dlm_lkb *lkb; - uint16_t bucket = (lkid >> 16); - - list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { - if (lkb->lkb_id == lkid) - return lkb; - } - return NULL; -} - static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - uint16_t bucket = (lkid >> 16); - - if (bucket >= ls->ls_lkbtbl_size) - return -EBADSLT; - read_lock(&ls->ls_lkbtbl[bucket].lock); - lkb = __find_lkb(ls, lkid); + spin_lock(&ls->ls_lkbidr_spin); + lkb = idr_find(&ls->ls_lkbidr, lkid); if (lkb) kref_get(&lkb->lkb_ref); - read_unlock(&ls->ls_lkbtbl[bucket].lock); + spin_unlock(&ls->ls_lkbidr_spin); *lkb_ret = lkb; return lkb ? 0 : -ENOENT; @@ -673,12 +1246,12 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { - uint16_t bucket = (lkb->lkb_id >> 16); + uint32_t lkid = lkb->lkb_id; - write_lock(&ls->ls_lkbtbl[bucket].lock); + spin_lock(&ls->ls_lkbidr_spin); if (kref_put(&lkb->lkb_ref, kill_lkb)) { - list_del(&lkb->lkb_idtbl_list); - write_unlock(&ls->ls_lkbtbl[bucket].lock); + idr_remove(&ls->ls_lkbidr, lkid); + spin_unlock(&ls->ls_lkbidr_spin); detach_lkb(lkb); @@ -688,7 +1261,7 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) dlm_free_lkb(lkb); return 1; } else { - write_unlock(&ls->ls_lkbtbl[bucket].lock); + spin_unlock(&ls->ls_lkbidr_spin); return 0; } } @@ -804,10 +1377,80 @@ static int msg_reply_type(int mstype) return -1; } +static int nodeid_warned(int nodeid, int num_nodes, int *warned) +{ + int i; + + for (i = 0; i < num_nodes; i++) { + if (!warned[i]) { + warned[i] = nodeid; + return 0; + } + if (warned[i] == nodeid) + return 1; + } + return 0; +} + +void dlm_scan_waiters(struct dlm_ls *ls) +{ + struct dlm_lkb *lkb; + ktime_t zero = ktime_set(0, 0); + s64 us; + s64 debug_maxus = 0; + u32 debug_scanned = 0; + u32 debug_expired = 0; + int num_nodes = 0; + int *warned = NULL; + + if (!dlm_config.ci_waitwarn_us) + return; + + mutex_lock(&ls->ls_waiters_mutex); + + list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { + if (ktime_equal(lkb->lkb_wait_time, zero)) + continue; + + debug_scanned++; + + us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time)); + + if (us < dlm_config.ci_waitwarn_us) + continue; + + lkb->lkb_wait_time = zero; + + debug_expired++; + if (us > debug_maxus) + debug_maxus = us; + + if (!num_nodes) { + num_nodes = ls->ls_num_nodes; + warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL); + } + if (!warned) + continue; + if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) + continue; + + log_error(ls, "waitwarn %x %lld %d us check connection to " + "node %d", lkb->lkb_id, (long long)us, + dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); + } + mutex_unlock(&ls->ls_waiters_mutex); + kfree(warned); + + if (debug_expired) + log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", + debug_scanned, debug_expired, + dlm_config.ci_waitwarn_us, (long long)debug_maxus); +} + /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static int add_to_waiters(struct dlm_lkb *lkb, int mstype) +static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error = 0; @@ -847,6 +1490,8 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype) lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; + lkb->lkb_wait_time = ktime_get(); + lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: @@ -920,8 +1565,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, goto out_del; } - log_error(ls, "remwait error %x reply %d flags %x no wait_type", - lkb->lkb_id, mstype, lkb->lkb_flags); + log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", + lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, + mstype, lkb->lkb_flags); return -1; out_del: @@ -966,69 +1612,192 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; - if (ms != &ls->ls_stub_ms) + if (ms->m_flags != DLM_IFL_STUB_MS) mutex_lock(&ls->ls_waiters_mutex); error = _remove_from_waiters(lkb, ms->m_type, ms); - if (ms != &ls->ls_stub_ms) + if (ms->m_flags != DLM_IFL_STUB_MS) mutex_unlock(&ls->ls_waiters_mutex); return error; } -static void dir_remove(struct dlm_rsb *r) +/* If there's an rsb for the same resource being removed, ensure + that the remove message is sent before the new lookup message. + It should be rare to need a delay here, but if not, then it may + be worthwhile to add a proper wait mechanism rather than a delay. */ + +static void wait_pending_remove(struct dlm_rsb *r) { - int to_nodeid; + struct dlm_ls *ls = r->res_ls; + restart: + spin_lock(&ls->ls_remove_spin); + if (ls->ls_remove_len && + !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { + log_debug(ls, "delay lookup for remove dir %d %s", + r->res_dir_nodeid, r->res_name); + spin_unlock(&ls->ls_remove_spin); + msleep(1); + goto restart; + } + spin_unlock(&ls->ls_remove_spin); +} - if (dlm_no_directory(r->res_ls)) +/* + * ls_remove_spin protects ls_remove_name and ls_remove_len which are + * read by other threads in wait_pending_remove. ls_remove_names + * and ls_remove_lens are only used by the scan thread, so they do + * not need protection. + */ + +static void shrink_bucket(struct dlm_ls *ls, int b) +{ + struct rb_node *n, *next; + struct dlm_rsb *r; + char *name; + int our_nodeid = dlm_our_nodeid(); + int remote_count = 0; + int need_shrink = 0; + int i, len, rv; + + memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); + + spin_lock(&ls->ls_rsbtbl[b].lock); + + if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); return; + } - to_nodeid = dlm_dir_nodeid(r); - if (to_nodeid != dlm_our_nodeid()) - send_remove(r); + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { + next = rb_next(n); + r = rb_entry(n, struct dlm_rsb, res_hashnode); + + /* If we're the directory record for this rsb, and + we're not the master of it, then we need to wait + for the master node to send us a dir remove for + before removing the dir record. */ + + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)) { + continue; + } + + need_shrink = 1; + + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + continue; + } + + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) { + + /* We're the master of this rsb but we're not + the directory record, so we need to tell the + dir node to remove the dir record. */ + + ls->ls_remove_lens[remote_count] = r->res_length; + memcpy(ls->ls_remove_names[remote_count], r->res_name, + DLM_RESNAME_MAXLEN); + remote_count++; + + if (remote_count >= DLM_REMOVE_NAMES_MAX) + break; + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { + log_error(ls, "tossed rsb in use %s", r->res_name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + dlm_free_rsb(r); + } + + if (need_shrink) + ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; else - dlm_dir_remove_entry(r->res_ls, to_nodeid, - r->res_name, r->res_length); -} + ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; + spin_unlock(&ls->ls_rsbtbl[b].lock); -/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is - found since they are in order of newest to oldest? */ + /* + * While searching for rsb's to free, we found some that require + * remote removal. We leave them in place and find them again here + * so there is a very small gap between removing them from the toss + * list and sending the removal. Keeping this gap small is + * important to keep us (the master node) from being out of sync + * with the remote dir node for very long. + * + * From the time the rsb is removed from toss until just after + * send_remove, the rsb name is saved in ls_remove_name. A new + * lookup checks this to ensure that a new lookup message for the + * same resource name is not sent just before the remove message. + */ -static int shrink_bucket(struct dlm_ls *ls, int b) -{ - struct dlm_rsb *r; - int count = 0, found; + for (i = 0; i < remote_count; i++) { + name = ls->ls_remove_names[i]; + len = ls->ls_remove_lens[i]; - for (;;) { - found = 0; spin_lock(&ls->ls_rsbtbl[b].lock); - list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, - res_hashchain) { - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) - continue; - found = 1; - break; + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name not toss %s", name); + continue; } - if (!found) { + if (r->res_master_nodeid != our_nodeid) { spin_unlock(&ls->ls_rsbtbl[b].lock); - break; + log_debug(ls, "remove_name master %d dir %d our %d %s", + r->res_master_nodeid, r->res_dir_nodeid, + our_nodeid, name); + continue; } - if (kref_put(&r->res_ref, kill_rsb)) { - list_del(&r->res_hashchain); + if (r->res_dir_nodeid == our_nodeid) { + /* should never happen */ spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name dir %d master %d our %d %s", + r->res_dir_nodeid, r->res_master_nodeid, + our_nodeid, name); + continue; + } - if (is_master(r)) - dir_remove(r); - dlm_free_rsb(r); - count++; - } else { + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "tossed rsb in use %s", r->res_name); + log_debug(ls, "remove_name toss_time %lu now %lu %s", + r->res_toss_time, jiffies, name); + continue; } - } - return count; + if (!kref_put(&r->res_ref, kill_rsb)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name in use %s", name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + + /* block lookup of same name until we've sent remove */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + send_remove(r); + + /* allow lookup of name again */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + + dlm_free_rsb(r); + } } void dlm_scan_rsbs(struct dlm_ls *ls) @@ -1162,6 +1931,16 @@ void dlm_adjust_timeouts(struct dlm_ls *ls) list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); mutex_unlock(&ls->ls_timeout_mutex); + + if (!dlm_config.ci_waitwarn_us) + return; + + mutex_lock(&ls->ls_waiters_mutex); + list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { + if (ktime_to_us(lkb->lkb_wait_time)) + lkb->lkb_wait_time = ktime_get(); + } + mutex_unlock(&ls->ls_waiters_mutex); } /* lkb is master or local copy */ @@ -1260,8 +2039,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > r->res_ls->ls_lvblen) + len = r->res_ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); lkb->lkb_lvbseq = ms->m_lvbseq; } @@ -1344,13 +2123,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } lkb->lkb_rqmode = DLM_LOCK_IV; + lkb->lkb_highbast = 0; } static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) { set_lvb_lock(r, lkb); _grant_lock(r, lkb); - lkb->lkb_highbast = 0; } static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, @@ -1381,14 +2160,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become compatible with other granted locks */ -static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) +static void munge_demoted(struct dlm_lkb *lkb) { - if (ms->m_type != DLM_MSG_CONVERT_REPLY) { - log_print("munge_demoted %x invalid reply type %d", - lkb->lkb_id, ms->m_type); - return; - } - if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { log_print("munge_demoted %x invalid modes gr %d rq %d", lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); @@ -1516,10 +2289,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) * immediate request, it is 0 if called later, after the lock has been * queued. * + * recover is 1 if dlm_recover_grant() is trying to grant conversions + * after recovery. + * * References are from chapter 6 of "VAXcluster Principles" by Roy Davis */ -static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) +static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, + int recover) { int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); @@ -1551,7 +2328,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) */ if (queue_conflict(&r->res_grantqueue, lkb)) - goto out; + return 0; /* * 6-3: By default, a conversion request is immediately granted if the @@ -1560,7 +2337,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) */ if (queue_conflict(&r->res_convertqueue, lkb)) - goto out; + return 0; + + /* + * The RECOVER_GRANT flag means dlm_recover_grant() is granting + * locks for a recovered rsb, on which lkb's have been rebuilt. + * The lkb's may have been rebuilt on the queues in a different + * order than they were in on the previous master. So, granting + * queued conversions in order after recovery doesn't make sense + * since the order hasn't been preserved anyway. The new order + * could also have created a new "in place" conversion deadlock. + * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. + * After recovery, there would be no granted locks, and possibly + * NL->EX, PR->EX, an in-place conversion deadlock.) So, after + * recovery, grant conversions without considering order. + */ + + if (conv && recover) + return 1; /* * 6-5: But the default algorithm for deciding whether to grant or @@ -1589,6 +2383,18 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) return 1; /* + * Even if the convert is compat with all granted locks, + * QUECVT forces it behind other locks on the convert queue. + */ + + if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { + if (list_empty(&r->res_convertqueue)) + return 1; + else + return 0; + } + + /* * The NOORDER flag is set to avoid the standard vms rules on grant * order. */ @@ -1631,12 +2437,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) if (!now && !conv && list_empty(&r->res_convertqueue) && first_in_list(lkb, &r->res_waitqueue)) return 1; - out: + return 0; } static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, - int *err) + int recover, int *err) { int rv; int8_t alt = 0, rqmode = lkb->lkb_rqmode; @@ -1645,7 +2451,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, if (err) *err = 0; - rv = _can_be_granted(r, lkb, now); + rv = _can_be_granted(r, lkb, now, recover); if (rv) goto out; @@ -1686,7 +2492,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, if (alt) { lkb->lkb_rqmode = alt; - rv = _can_be_granted(r, lkb, now); + rv = _can_be_granted(r, lkb, now, 0); if (rv) lkb->lkb_sbflags |= DLM_SBF_ALTMODE; else @@ -1706,9 +2512,11 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, /* Returns the highest requested mode of all blocked conversions; sets cw if there's a blocked conversion to DLM_LOCK_CW. */ -static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) +static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, + unsigned int *count) { struct dlm_lkb *lkb, *s; + int recover = rsb_flag(r, RSB_RECOVER_GRANT); int hi, demoted, quit, grant_restart, demote_restart; int deadlk; @@ -1722,9 +2530,11 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) demoted = is_demoted(lkb); deadlk = 0; - if (can_be_granted(r, lkb, 0, &deadlk)) { + if (can_be_granted(r, lkb, 0, recover, &deadlk)) { grant_lock_pending(r, lkb); grant_restart = 1; + if (count) + (*count)++; continue; } @@ -1758,14 +2568,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) return max_t(int, high, hi); } -static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) +static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, + unsigned int *count) { struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { - if (can_be_granted(r, lkb, 0, NULL)) + if (can_be_granted(r, lkb, 0, 0, NULL)) { grant_lock_pending(r, lkb); - else { + if (count) + (*count)++; + } else { high = max_t(int, lkb->lkb_rqmode, high); if (lkb->lkb_rqmode == DLM_LOCK_CW) *cw = 1; @@ -1794,16 +2607,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) return 0; } -static void grant_pending_locks(struct dlm_rsb *r) +static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) { struct dlm_lkb *lkb, *s; int high = DLM_LOCK_IV; int cw = 0; - DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); + if (!is_master(r)) { + log_print("grant_pending_locks r nodeid %d", r->res_nodeid); + dlm_dump_rsb(r); + return; + } - high = grant_pending_convert(r, high, &cw); - high = grant_pending_wait(r, high, &cw); + high = grant_pending_convert(r, high, &cw, count); + high = grant_pending_wait(r, high, &cw, count); if (high == DLM_LOCK_IV) return; @@ -1888,8 +2705,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { - struct dlm_ls *ls = r->res_ls; - int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -1903,53 +2719,37 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - if (r->res_nodeid == 0) { + if (r->res_master_nodeid == our_nodeid) { lkb->lkb_nodeid = 0; return 0; } - if (r->res_nodeid > 0) { - lkb->lkb_nodeid = r->res_nodeid; + if (r->res_master_nodeid) { + lkb->lkb_nodeid = r->res_master_nodeid; return 0; } - DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); - - dir_nodeid = dlm_dir_nodeid(r); - - if (dir_nodeid != our_nodeid) { - r->res_first_lkid = lkb->lkb_id; - send_lookup(r, lkb); - return 1; - } - - for (i = 0; i < 2; i++) { - /* It's possible for dlm_scand to remove an old rsb for - this same resource from the toss list, us to create - a new one, look up the master locally, and find it - already exists just before dlm_scand does the - dir_remove() on the previous rsb. */ - - error = dlm_dir_lookup(ls, our_nodeid, r->res_name, - r->res_length, &ret_nodeid); - if (!error) - break; - log_debug(ls, "dir_lookup error %d %s", error, r->res_name); - schedule(); - } - if (error && error != -EEXIST) - return error; - - if (ret_nodeid == our_nodeid) { - r->res_first_lkid = 0; + if (dlm_dir_nodeid(r) == our_nodeid) { + /* This is a somewhat unusual case; find_rsb will usually + have set res_master_nodeid when dir nodeid is local, but + there are cases where we become the dir node after we've + past find_rsb and go through _request_lock again. + confirm_master() or process_lookup_list() needs to be + called after this. */ + log_debug(r->res_ls, "set_master %x self master %d dir %d %s", + lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, + r->res_name); + r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; lkb->lkb_nodeid = 0; - } else { - r->res_first_lkid = lkb->lkb_id; - r->res_nodeid = ret_nodeid; - lkb->lkb_nodeid = ret_nodeid; + return 0; } - return 0; + + wait_pending_remove(r); + + r->res_first_lkid = lkb->lkb_id; + send_lookup(r, lkb); + return 1; } static void process_lookup_list(struct dlm_rsb *r) @@ -2274,7 +3074,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; - if (can_be_granted(r, lkb, 1, NULL)) { + if (can_be_granted(r, lkb, 1, 0, NULL)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2314,7 +3114,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* changing an existing lock may allow others to be granted */ - if (can_be_granted(r, lkb, 1, &deadlk)) { + if (can_be_granted(r, lkb, 1, 0, &deadlk)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2326,9 +3126,6 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) if (deadlk) { /* it's left on the granted queue */ - log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s", - lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status, - lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name); revert_lock(r, lkb); queue_cast(r, lkb, -EDEADLK); error = -EDEADLK; @@ -2342,8 +3139,8 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) before we try again to grant this one. */ if (is_demoted(lkb)) { - grant_pending_convert(r, DLM_LOCK_IV, NULL); - if (_can_be_granted(r, lkb, 1)) { + grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); + if (_can_be_granted(r, lkb, 1, 0)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2370,7 +3167,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, { switch (error) { case 0: - grant_pending_locks(r); + grant_pending_locks(r, NULL); /* grant_pending_locks also sends basts */ break; case -EAGAIN: @@ -2393,11 +3190,11 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, int error) { - grant_pending_locks(r); + grant_pending_locks(r, NULL); } /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ - + static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error; @@ -2414,7 +3211,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, int error) { if (error) - grant_pending_locks(r); + grant_pending_locks(r, NULL); } /* @@ -2521,11 +3318,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, error = validate_lock_args(ls, lkb, args); if (error) - goto out; + return error; - error = find_rsb(ls, name, len, R_CREATE, &r); + error = find_rsb(ls, name, len, 0, R_REQUEST, &r); if (error) - goto out; + return error; lock_rsb(r); @@ -2536,8 +3333,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, unlock_rsb(r); put_rsb(r); - - out: return error; } @@ -2819,9 +3614,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, not from lkb fields */ if (lkb->lkb_bastfn) - ms->m_asts |= AST_BAST; + ms->m_asts |= DLM_CB_BAST; if (lkb->lkb_astfn) - ms->m_asts |= AST_COMP; + ms->m_asts |= DLM_CB_CAST; /* compare with switch in create_message; send_remove() doesn't use send_args() */ @@ -2849,12 +3644,12 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) struct dlm_mhandle *mh; int to_nodeid, error; - error = add_to_waiters(lkb, mstype); + to_nodeid = r->res_nodeid; + + error = add_to_waiters(lkb, mstype, to_nodeid); if (error) return error; - to_nodeid = r->res_nodeid; - error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); if (error) goto fail; @@ -2885,9 +3680,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); + r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS; r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; r->res_ls->ls_stub_ms.m_result = 0; - r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); } @@ -2956,12 +3751,12 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) struct dlm_mhandle *mh; int to_nodeid, error; - error = add_to_waiters(lkb, DLM_MSG_LOOKUP); + to_nodeid = dlm_dir_nodeid(r); + + error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); if (error) return error; - to_nodeid = dlm_dir_nodeid(r); - error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); if (error) goto fail; @@ -3075,6 +3870,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) { + if (ms->m_flags == DLM_IFL_STUB_MS) + return; + lkb->lkb_sbflags = ms->m_sbflags; lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | (ms->m_flags & 0x0000FFFF); @@ -3096,8 +3894,8 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > ls->ls_lvblen) + len = ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); } return 0; @@ -3122,8 +3920,8 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_grmode = DLM_LOCK_IV; lkb->lkb_rqmode = ms->m_rqmode; - lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL; - lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL; + lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { /* lkb was just created so there won't be an lvb yet */ @@ -3212,11 +4010,72 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) +static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) +{ + char name[DLM_RESNAME_MAXLEN + 1]; + struct dlm_message *ms; + struct dlm_mhandle *mh; + struct dlm_rsb *r; + uint32_t hash, b; + int rv, dir_nodeid; + + memset(name, 0, sizeof(name)); + memcpy(name, ms_name, len); + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + dir_nodeid = dlm_hash2nodeid(ls, hash); + + log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); + + spin_lock(&ls->ls_rsbtbl[b].lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "repeat_remove on keep %s", name); + return; + } + + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (!rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "repeat_remove on toss %s", name); + return; + } + + /* use ls->remove_name2 to avoid conflict with shrink? */ + + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + rv = _create_message(ls, sizeof(struct dlm_message) + len, + dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); + if (rv) + return; + + memcpy(ms->m_extra, name, len); + ms->m_hash = hash; + + send_message(mh, ms); + + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); +} + +static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error, namelen; + int from_nodeid; + int error, namelen = 0; + + from_nodeid = ms->m_header.h_nodeid; error = create_lkb(ls, &lkb); if (error) @@ -3230,9 +4089,16 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) goto fail; } + /* The dir node is the authority on whether we are the master + for this rsb or not, so if the master sends us a request, we should + recreate the rsb if we've destroyed it. This race happens when we + send a remove message to the dir node at the same time that the dir + node sends us a request for the rsb. */ + namelen = receive_extralen(ms); - error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); + error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, + R_RECEIVE_REQUEST, &r); if (error) { __put_lkb(ls, lkb); goto fail; @@ -3240,6 +4106,16 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) lock_rsb(r); + if (r->res_master_nodeid != dlm_our_nodeid()) { + error = validate_master_nodeid(ls, r, from_nodeid); + if (error) { + unlock_rsb(r); + put_rsb(r); + __put_lkb(ls, lkb); + goto fail; + } + } + attach_lkb(r, lkb); error = do_request(r, lkb); send_request_reply(r, lkb, error); @@ -3252,14 +4128,40 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) error = 0; if (error) dlm_put_lkb(lkb); - return; + return 0; fail: + /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup + and do this receive_request again from process_lookup_list once + we get the lookup reply. This would avoid a many repeated + ENOTBLK request failures when the lookup reply designating us + as master is delayed. */ + + /* We could repeatedly return -EBADR here if our send_remove() is + delayed in being sent/arriving/being processed on the dir node. + Another node would repeatedly lookup up the master, and the dir + node would continue returning our nodeid until our send_remove + took effect. + + We send another remove message in case our previous send_remove + was lost/ignored/missed somehow. */ + + if (error != -ENOTBLK) { + log_limit(ls, "receive_request %x from %d %d", + ms->m_lkid, from_nodeid, error); + } + + if (namelen && error == -EBADR) { + send_repeat_remove(ls, ms->m_extra, namelen); + msleep(1000); + } + setup_stub_lkb(ls, ms); send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + return error; } -static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3269,6 +4171,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto fail; + if (lkb->lkb_remid != ms->m_lkid) { + log_error(ls, "receive_convert %x remid %x recover_seq %llu " + "remote %d %x", lkb->lkb_id, lkb->lkb_remid, + (unsigned long long)lkb->lkb_recover_seq, + ms->m_header.h_nodeid, ms->m_lkid); + error = -ENOENT; + goto fail; + } + r = lkb->lkb_resource; hold_rsb(r); @@ -3296,14 +4207,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); - return; + return 0; fail: setup_stub_lkb(ls, ms); send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + return error; } -static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3313,6 +4225,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) if (error) goto fail; + if (lkb->lkb_remid != ms->m_lkid) { + log_error(ls, "receive_unlock %x remid %x remote %d %x", + lkb->lkb_id, lkb->lkb_remid, + ms->m_header.h_nodeid, ms->m_lkid); + error = -ENOENT; + goto fail; + } + r = lkb->lkb_resource; hold_rsb(r); @@ -3337,14 +4257,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); - return; + return 0; fail: setup_stub_lkb(ls, ms); send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + return error; } -static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3372,25 +4293,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); - return; + return 0; fail: setup_stub_lkb(ls, ms); send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); + return error; } -static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_grant from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; r = lkb->lkb_resource; @@ -3410,20 +4329,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); + return 0; } -static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_bast from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; r = lkb->lkb_resource; @@ -3435,57 +4352,120 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) goto out; queue_bast(r, lkb, ms->m_bastmode); + lkb->lkb_highbast = ms->m_bastmode; out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); + return 0; } static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) { - int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; + int len, error, ret_nodeid, from_nodeid, our_nodeid; from_nodeid = ms->m_header.h_nodeid; our_nodeid = dlm_our_nodeid(); len = receive_extralen(ms); - dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); - if (dir_nodeid != our_nodeid) { - log_error(ls, "lookup dir_nodeid %d from %d", - dir_nodeid, from_nodeid); - error = -EINVAL; - ret_nodeid = -1; - goto out; - } - - error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid); + error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, + &ret_nodeid, NULL); /* Optimization: we're master so treat lookup as a request */ if (!error && ret_nodeid == our_nodeid) { receive_request(ls, ms); return; } - out: send_lookup_reply(ls, ms, ret_nodeid, error); } static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) { - int len, dir_nodeid, from_nodeid; + char name[DLM_RESNAME_MAXLEN+1]; + struct dlm_rsb *r; + uint32_t hash, b; + int rv, len, dir_nodeid, from_nodeid; from_nodeid = ms->m_header.h_nodeid; len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) { + log_error(ls, "receive_remove from %d bad len %d", + from_nodeid, len); + return; + } + dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); if (dir_nodeid != dlm_our_nodeid()) { - log_error(ls, "remove dir entry dir_nodeid %d from %d", - dir_nodeid, from_nodeid); + log_error(ls, "receive_remove from %d bad nodeid %d", + from_nodeid, dir_nodeid); return; } - dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); + /* Look for name on rsbtbl.toss, if it's there, kill it. + If it's on rsbtbl.keep, it's being used, and we should ignore this + message. This is an expected race between the dir node sending a + request to the master node at the same time as the master node sends + a remove to the dir node. The resolution to that race is for the + dir node to ignore the remove message, and the master node to + recreate the master rsb when it gets a request from the dir node for + an rsb it doesn't have. */ + + memset(name, 0, sizeof(name)); + memcpy(name, ms->m_extra, len); + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + spin_lock(&ls->ls_rsbtbl[b].lock); + + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + /* verify the rsb is on keep list per comment above */ + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (rv) { + /* should not happen */ + log_error(ls, "receive_remove from %d not found %s", + from_nodeid, name); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + if (r->res_master_nodeid != from_nodeid) { + /* should not happen */ + log_error(ls, "receive_remove keep from %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + + log_debug(ls, "receive_remove from %d master %d first %x %s", + from_nodeid, r->res_master_nodeid, r->res_first_lkid, + name); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + + if (r->res_master_nodeid != from_nodeid) { + log_error(ls, "receive_remove toss from %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + + if (kref_put(&r->res_ref, kill_rsb)) { + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + spin_unlock(&ls->ls_rsbtbl[b].lock); + dlm_free_rsb(r); + } else { + log_error(ls, "receive_remove from %d rsb ref error", + from_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + } } static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) @@ -3493,18 +4473,16 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) do_purge(ls, ms->m_nodeid, ms->m_pid); } -static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; int error, mstype, result; + int from_nodeid = ms->m_header.h_nodeid; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_request_reply from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; r = lkb->lkb_resource; hold_rsb(r); @@ -3516,14 +4494,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) mstype = lkb->lkb_wait_type; error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); - if (error) + if (error) { + log_error(ls, "receive_request_reply %x remote %d %x result %d", + lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); + dlm_dump_rsb(r); goto out; + } /* Optimization: the dir node was also the master, so it took our lookup as a request and sent request reply instead of lookup reply */ if (mstype == DLM_MSG_LOOKUP) { - r->res_nodeid = ms->m_header.h_nodeid; - lkb->lkb_nodeid = r->res_nodeid; + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + lkb->lkb_nodeid = from_nodeid; } /* this is the value returned from do_request() on the master */ @@ -3557,18 +4540,30 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) case -EBADR: case -ENOTBLK: /* find_rsb failed to find rsb or rsb wasn't master */ - log_debug(ls, "receive_request_reply %x %x master diff %d %d", - lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); - r->res_nodeid = -1; - lkb->lkb_nodeid = -1; + log_limit(ls, "receive_request_reply %x from %d %d " + "master %d dir %d first %x %s", lkb->lkb_id, + from_nodeid, result, r->res_master_nodeid, + r->res_dir_nodeid, r->res_first_lkid, r->res_name); + + if (r->res_dir_nodeid != dlm_our_nodeid() && + r->res_master_nodeid != dlm_our_nodeid()) { + /* cause _request_lock->set_master->send_lookup */ + r->res_master_nodeid = 0; + r->res_nodeid = -1; + lkb->lkb_nodeid = -1; + } if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ - } else + } else { _request_lock(r, lkb); + + if (r->res_master_nodeid == dlm_our_nodeid()) + confirm_master(r, 0); + } break; default: @@ -3595,6 +4590,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); + return 0; } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, @@ -3617,7 +4613,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, /* convert was queued on remote master */ receive_flags_reply(lkb, ms); if (is_demoted(lkb)) - munge_demoted(lkb, ms); + munge_demoted(lkb); del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); add_timeout(lkb); @@ -3627,14 +4623,17 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, /* convert was granted on remote master */ receive_flags_reply(lkb, ms); if (is_demoted(lkb)) - munge_demoted(lkb, ms); + munge_demoted(lkb); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); break; default: - log_error(r->res_ls, "receive_convert_reply %x error %d", - lkb->lkb_id, ms->m_result); + log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", + lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, + ms->m_result); + dlm_print_rsb(r); + dlm_print_lkb(lkb); } } @@ -3661,20 +4660,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) put_rsb(r); } -static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; int error; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_convert_reply from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; _receive_convert_reply(lkb, ms); dlm_put_lkb(lkb); + return 0; } static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) @@ -3713,20 +4710,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) put_rsb(r); } -static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; int error; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_unlock_reply from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; _receive_unlock_reply(lkb, ms); dlm_put_lkb(lkb); + return 0; } static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) @@ -3765,20 +4760,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) put_rsb(r); } -static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; int error; error = find_lkb(ls, ms->m_remid, &lkb); - if (error) { - log_debug(ls, "receive_cancel_reply from %d no lkb %x", - ms->m_header.h_nodeid, ms->m_remid); - return; - } + if (error) + return error; _receive_cancel_reply(lkb, ms); dlm_put_lkb(lkb); + return 0; } static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) @@ -3786,14 +4779,15 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; struct dlm_rsb *r; int error, ret_nodeid; + int do_lookup_list = 0; error = find_lkb(ls, ms->m_lkid, &lkb); if (error) { - log_error(ls, "receive_lookup_reply no lkb"); + log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); return; } - /* ms->m_result is the value returned by dlm_dir_lookup on dir node + /* ms->m_result is the value returned by dlm_master_lookup on dir node FIXME: will a non-zero error ever be returned? */ r = lkb->lkb_resource; @@ -3805,12 +4799,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) goto out; ret_nodeid = ms->m_nodeid; + + /* We sometimes receive a request from the dir node for this + rsb before we've received the dir node's loookup_reply for it. + The request from the dir node implies we're the master, so we set + ourself as master in receive_request_reply, and verify here that + we are indeed the master. */ + + if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { + /* This should never happen */ + log_error(ls, "receive_lookup_reply %x from %d ret %d " + "master %d dir %d our %d first %x %s", + lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, + r->res_master_nodeid, r->res_dir_nodeid, + dlm_our_nodeid(), r->res_first_lkid, r->res_name); + } + if (ret_nodeid == dlm_our_nodeid()) { + r->res_master_nodeid = ret_nodeid; r->res_nodeid = 0; - ret_nodeid = 0; + do_lookup_list = 1; r->res_first_lkid = 0; + } else if (ret_nodeid == -1) { + /* the remote node doesn't believe it's the dir node */ + log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", + lkb->lkb_id, ms->m_header.h_nodeid); + r->res_master_nodeid = 0; + r->res_nodeid = -1; + lkb->lkb_nodeid = -1; } else { - /* set_master() will copy res_nodeid to lkb_nodeid */ + /* set_master() will set lkb_nodeid from r */ + r->res_master_nodeid = ret_nodeid; r->res_nodeid = ret_nodeid; } @@ -3825,7 +4844,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) _request_lock(r, lkb); out_list: - if (!ret_nodeid) + if (do_lookup_list) process_lookup_list(r); out: unlock_rsb(r); @@ -3833,10 +4852,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) +static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, + uint32_t saved_seq) { + int error = 0, noent = 0; + if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { - log_debug(ls, "ignore non-member message %d from %d %x %x %d", + log_limit(ls, "receive %d from non-member %d %x %x %d", ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, ms->m_remid, ms->m_result); return; @@ -3847,47 +4869,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) /* messages sent to a master node */ case DLM_MSG_REQUEST: - receive_request(ls, ms); + error = receive_request(ls, ms); break; case DLM_MSG_CONVERT: - receive_convert(ls, ms); + error = receive_convert(ls, ms); break; case DLM_MSG_UNLOCK: - receive_unlock(ls, ms); + error = receive_unlock(ls, ms); break; case DLM_MSG_CANCEL: - receive_cancel(ls, ms); + noent = 1; + error = receive_cancel(ls, ms); break; /* messages sent from a master node (replies to above) */ case DLM_MSG_REQUEST_REPLY: - receive_request_reply(ls, ms); + error = receive_request_reply(ls, ms); break; case DLM_MSG_CONVERT_REPLY: - receive_convert_reply(ls, ms); + error = receive_convert_reply(ls, ms); break; case DLM_MSG_UNLOCK_REPLY: - receive_unlock_reply(ls, ms); + error = receive_unlock_reply(ls, ms); break; case DLM_MSG_CANCEL_REPLY: - receive_cancel_reply(ls, ms); + error = receive_cancel_reply(ls, ms); break; /* messages sent from a master node (only two types of async msg) */ case DLM_MSG_GRANT: - receive_grant(ls, ms); + noent = 1; + error = receive_grant(ls, ms); break; case DLM_MSG_BAST: - receive_bast(ls, ms); + noent = 1; + error = receive_bast(ls, ms); break; /* messages sent to a dir node */ @@ -3916,7 +4941,36 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms) log_error(ls, "unknown message type %d", ms->m_type); } - dlm_astd_wake(); + /* + * When checking for ENOENT, we're checking the result of + * find_lkb(m_remid): + * + * The lock id referenced in the message wasn't found. This may + * happen in normal usage for the async messages and cancel, so + * only use log_debug for them. + * + * Some errors are expected and normal. + */ + + if (error == -ENOENT && noent) { + log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", + ms->m_type, ms->m_remid, ms->m_header.h_nodeid, + ms->m_lkid, saved_seq); + } else if (error == -ENOENT) { + log_error(ls, "receive %d no %x remote %d %x saved_seq %u", + ms->m_type, ms->m_remid, ms->m_header.h_nodeid, + ms->m_lkid, saved_seq); + + if (ms->m_type == DLM_MSG_CONVERT) + dlm_dump_rsb_hash(ls, ms->m_hash); + } + + if (error == -EINVAL) { + log_error(ls, "receive %d inval from %d lkid %x remid %x " + "saved_seq %u", + ms->m_type, ms->m_header.h_nodeid, + ms->m_lkid, ms->m_remid, saved_seq); + } } /* If the lockspace is in recovery mode (locking stopped), then normal @@ -3931,19 +4985,29 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) { if (dlm_locking_stopped(ls)) { + /* If we were a member of this lockspace, left, and rejoined, + other nodes may still be sending us messages from the + lockspace generation before we left. */ + if (!ls->ls_generation) { + log_limit(ls, "receive %d from %d ignore old gen", + ms->m_type, nodeid); + return; + } + dlm_add_requestqueue(ls, nodeid, ms); } else { dlm_wait_requestqueue(ls); - _receive_message(ls, ms); + _receive_message(ls, ms, 0); } } /* This is called by dlm_recoverd to process messages that were saved on the requestqueue. */ -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms) +void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, + uint32_t saved_seq) { - _receive_message(ls, ms); + _receive_message(ls, ms, saved_seq); } /* This is called by the midcomms layer when something is received for @@ -3979,9 +5043,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) ls = dlm_find_lockspace_global(hd->h_lockspace); if (!ls) { - if (dlm_config.ci_log_debug) - log_print("invalid lockspace %x from %d cmd %d type %d", - hd->h_lockspace, nodeid, hd->h_cmd, type); + if (dlm_config.ci_log_debug) { + printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " + "%u from %d cmd %d type %d\n", + hd->h_lockspace, nodeid, hd->h_cmd, type); + } if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) dlm_send_ls_not_ready(nodeid, &p->rcom); @@ -4001,15 +5067,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) dlm_put_lockspace(ls); } -static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) +static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, + struct dlm_message *ms_stub) { if (middle_conversion(lkb)) { hold_lkb(lkb); - ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; - ls->ls_stub_ms.m_result = -EINPROGRESS; - ls->ls_stub_ms.m_flags = lkb->lkb_flags; - ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; - _receive_convert_reply(lkb, &ls->ls_stub_ms); + memset(ms_stub, 0, sizeof(struct dlm_message)); + ms_stub->m_flags = DLM_IFL_STUB_MS; + ms_stub->m_type = DLM_MSG_CONVERT_REPLY; + ms_stub->m_result = -EINPROGRESS; + ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + _receive_convert_reply(lkb, ms_stub); /* Same special case as in receive_rcom_lock_args() */ lkb->lkb_grmode = DLM_LOCK_IV; @@ -4027,15 +5095,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) /* A waiting lkb needs recovery if the master node has failed, or the master node is changing (only when no directory is used) */ -static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) +static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, + int dir_nodeid) { - if (dlm_is_removed(ls, lkb->lkb_nodeid)) + if (dlm_no_directory(ls)) return 1; - if (!dlm_no_directory(ls)) - return 0; - - if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid) + if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) return 1; return 0; @@ -4050,13 +5116,36 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_recover_waiters_pre(struct dlm_ls *ls) { struct dlm_lkb *lkb, *safe; + struct dlm_message *ms_stub; int wait_type, stub_unlock_result, stub_cancel_result; + int dir_nodeid; + + ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); + if (!ms_stub) { + log_error(ls, "dlm_recover_waiters_pre no mem"); + return; + } mutex_lock(&ls->ls_waiters_mutex); list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { - log_debug(ls, "pre recover waiter lkid %x type %d flags %x", - lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); + + dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); + + /* exclude debug messages about unlocks because there can be so + many and they aren't very interesting */ + + if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { + log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " + "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", + lkb->lkb_id, + lkb->lkb_remid, + lkb->lkb_wait_type, + lkb->lkb_resource->res_nodeid, + lkb->lkb_nodeid, + lkb->lkb_wait_nodeid, + dir_nodeid); + } /* all outstanding lookups, regardless of destination will be resent after recovery is done */ @@ -4066,7 +5155,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) continue; } - if (!waiter_needs_recovery(ls, lkb)) + if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) continue; wait_type = lkb->lkb_wait_type; @@ -4102,26 +5191,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) break; case DLM_MSG_CONVERT: - recover_convert_waiter(ls, lkb); + recover_convert_waiter(ls, lkb, ms_stub); break; case DLM_MSG_UNLOCK: hold_lkb(lkb); - ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; - ls->ls_stub_ms.m_result = stub_unlock_result; - ls->ls_stub_ms.m_flags = lkb->lkb_flags; - ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; - _receive_unlock_reply(lkb, &ls->ls_stub_ms); + memset(ms_stub, 0, sizeof(struct dlm_message)); + ms_stub->m_flags = DLM_IFL_STUB_MS; + ms_stub->m_type = DLM_MSG_UNLOCK_REPLY; + ms_stub->m_result = stub_unlock_result; + ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + _receive_unlock_reply(lkb, ms_stub); dlm_put_lkb(lkb); break; case DLM_MSG_CANCEL: hold_lkb(lkb); - ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; - ls->ls_stub_ms.m_result = stub_cancel_result; - ls->ls_stub_ms.m_flags = lkb->lkb_flags; - ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid; - _receive_cancel_reply(lkb, &ls->ls_stub_ms); + memset(ms_stub, 0, sizeof(struct dlm_message)); + ms_stub->m_flags = DLM_IFL_STUB_MS; + ms_stub->m_type = DLM_MSG_CANCEL_REPLY; + ms_stub->m_result = stub_cancel_result; + ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; + _receive_cancel_reply(lkb, ms_stub); dlm_put_lkb(lkb); break; @@ -4132,6 +5223,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) schedule(); } mutex_unlock(&ls->ls_waiters_mutex); + kfree(ms_stub); } static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) @@ -4196,8 +5288,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) ou = is_overlap_unlock(lkb); err = 0; - log_debug(ls, "recover_waiters_post %x type %d flags %x %s", - lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); + log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " + "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " + "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, + r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, + dlm_dir_nodeid(r), oc, ou); /* At this point we assume that we won't get a reply to any previous op or overlap op on this lock. First, do a big @@ -4249,9 +5344,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) } } - if (err) - log_error(ls, "recover_waiters_post %x %d %x %d %d", - lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou); + if (err) { + log_error(ls, "waiter %x msg %d r_nodeid %d " + "dir_nodeid %d overlap %d %d", + lkb->lkb_id, mstype, r->res_nodeid, + dlm_dir_nodeid(r), oc, ou); + } unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -4260,110 +5358,187 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) return error; } -static void purge_queue(struct dlm_rsb *r, struct list_head *queue, - int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) +static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, + struct list_head *list) { - struct dlm_ls *ls = r->res_ls; struct dlm_lkb *lkb, *safe; - list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { - if (test(ls, lkb)) { - rsb_set_flag(r, RSB_LOCKS_PURGED); - del_lkb(r, lkb); - /* this put should free the lkb */ - if (!dlm_put_lkb(lkb)) - log_error(ls, "purged lkb not released"); - } + list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { + if (!is_master_copy(lkb)) + continue; + + /* don't purge lkbs we've added in recover_master_copy for + the current recovery seq */ + + if (lkb->lkb_recover_seq == ls->ls_recover_seq) + continue; + + del_lkb(r, lkb); + + /* this put should free the lkb */ + if (!dlm_put_lkb(lkb)) + log_error(ls, "purged mstcpy lkb not released"); } } -static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) +void dlm_purge_mstcpy_locks(struct dlm_rsb *r) { - return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); -} + struct dlm_ls *ls = r->res_ls; -static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) -{ - return is_master_copy(lkb); + purge_mstcpy_list(ls, r, &r->res_grantqueue); + purge_mstcpy_list(ls, r, &r->res_convertqueue); + purge_mstcpy_list(ls, r, &r->res_waitqueue); } -static void purge_dead_locks(struct dlm_rsb *r) +static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, + struct list_head *list, + int nodeid_gone, unsigned int *count) { - purge_queue(r, &r->res_grantqueue, &purge_dead_test); - purge_queue(r, &r->res_convertqueue, &purge_dead_test); - purge_queue(r, &r->res_waitqueue, &purge_dead_test); -} + struct dlm_lkb *lkb, *safe; -void dlm_purge_mstcpy_locks(struct dlm_rsb *r) -{ - purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); - purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); - purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); + list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { + if (!is_master_copy(lkb)) + continue; + + if ((lkb->lkb_nodeid == nodeid_gone) || + dlm_is_removed(ls, lkb->lkb_nodeid)) { + + /* tell recover_lvb to invalidate the lvb + because a node holding EX/PW failed */ + if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && + (lkb->lkb_grmode >= DLM_LOCK_PW)) { + rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); + } + + del_lkb(r, lkb); + + /* this put should free the lkb */ + if (!dlm_put_lkb(lkb)) + log_error(ls, "purged dead lkb not released"); + + rsb_set_flag(r, RSB_RECOVER_GRANT); + + (*count)++; + } + } } /* Get rid of locks held by nodes that are gone. */ -int dlm_purge_locks(struct dlm_ls *ls) +void dlm_recover_purge(struct dlm_ls *ls) { struct dlm_rsb *r; + struct dlm_member *memb; + int nodes_count = 0; + int nodeid_gone = 0; + unsigned int lkb_count = 0; + + /* cache one removed nodeid to optimize the common + case of a single node removed */ - log_debug(ls, "dlm_purge_locks"); + list_for_each_entry(memb, &ls->ls_nodes_gone, list) { + nodes_count++; + nodeid_gone = memb->nodeid; + } + + if (!nodes_count) + return; down_write(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { hold_rsb(r); lock_rsb(r); - if (is_master(r)) - purge_dead_locks(r); + if (is_master(r)) { + purge_dead_list(ls, r, &r->res_grantqueue, + nodeid_gone, &lkb_count); + purge_dead_list(ls, r, &r->res_convertqueue, + nodeid_gone, &lkb_count); + purge_dead_list(ls, r, &r->res_waitqueue, + nodeid_gone, &lkb_count); + } unlock_rsb(r); unhold_rsb(r); - - schedule(); + cond_resched(); } up_write(&ls->ls_root_sem); - return 0; + if (lkb_count) + log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", + lkb_count, nodes_count); } -static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) +static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) { - struct dlm_rsb *r, *r_ret = NULL; + struct rb_node *n; + struct dlm_rsb *r; spin_lock(&ls->ls_rsbtbl[bucket].lock); - list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { - if (!rsb_flag(r, RSB_LOCKS_PURGED)) + for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { + r = rb_entry(n, struct dlm_rsb, res_hashnode); + + if (!rsb_flag(r, RSB_RECOVER_GRANT)) + continue; + if (!is_master(r)) { + rsb_clear_flag(r, RSB_RECOVER_GRANT); continue; + } hold_rsb(r); - rsb_clear_flag(r, RSB_LOCKS_PURGED); - r_ret = r; - break; + spin_unlock(&ls->ls_rsbtbl[bucket].lock); + return r; } spin_unlock(&ls->ls_rsbtbl[bucket].lock); - return r_ret; + return NULL; } -void dlm_grant_after_purge(struct dlm_ls *ls) +/* + * Attempt to grant locks on resources that we are the master of. + * Locks may have become grantable during recovery because locks + * from departed nodes have been purged (or not rebuilt), allowing + * previously blocked locks to now be granted. The subset of rsb's + * we are interested in are those with lkb's on either the convert or + * waiting queues. + * + * Simplest would be to go through each master rsb and check for non-empty + * convert or waiting queues, and attempt to grant on those rsbs. + * Checking the queues requires lock_rsb, though, for which we'd need + * to release the rsbtbl lock. This would make iterating through all + * rsb's very inefficient. So, we rely on earlier recovery routines + * to set RECOVER_GRANT on any rsb's that we should attempt to grant + * locks for. + */ + +void dlm_recover_grant(struct dlm_ls *ls) { struct dlm_rsb *r; int bucket = 0; + unsigned int count = 0; + unsigned int rsb_count = 0; + unsigned int lkb_count = 0; while (1) { - r = find_purged_rsb(ls, bucket); + r = find_grant_rsb(ls, bucket); if (!r) { if (bucket == ls->ls_rsbtbl_size - 1) break; bucket++; continue; } + rsb_count++; + count = 0; lock_rsb(r); - if (is_master(r)) { - grant_pending_locks(r); - confirm_master(r, 0); - } + /* the RECOVER_GRANT flag is checked in the grant path */ + grant_pending_locks(r, &count); + rsb_clear_flag(r, RSB_RECOVER_GRANT); + lkb_count += count; + confirm_master(r, 0); unlock_rsb(r); put_rsb(r); - schedule(); + cond_resched(); } + + if (lkb_count) + log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", + lkb_count, rsb_count); } static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, @@ -4412,8 +5587,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, lkb->lkb_grmode = rl->rl_grmode; /* don't set lkb_status because add_lkb wants to itself */ - lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL; - lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL; + lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; + lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; if (lkb->lkb_exflags & DLM_LKF_VALBLK) { int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - @@ -4452,6 +5627,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; struct dlm_lkb *lkb; + uint32_t remid = 0; + int from_nodeid = rc->rc_header.h_nodeid; int error; if (rl->rl_parent_lkid) { @@ -4459,14 +5636,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) goto out; } + remid = le32_to_cpu(rl->rl_lkid); + + /* In general we expect the rsb returned to be R_MASTER, but we don't + have to require it. Recovery of masters on one node can overlap + recovery of locks on another node, so one node can send us MSTCPY + locks before we've made ourselves master of this rsb. We can still + add new MSTCPY locks that we receive here without any harm; when + we make ourselves master, dlm_recover_masters() won't touch the + MSTCPY locks we've received early. */ + error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), - R_MASTER, &r); + from_nodeid, R_RECEIVE_RECOVER, &r); if (error) goto out; lock_rsb(r); - lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid)); + if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { + log_error(ls, "dlm_recover_master_copy remote %d %x not dir", + from_nodeid, remid); + error = -EBADR; + goto out_unlock; + } + + lkb = search_remid(r, from_nodeid, remid); if (lkb) { error = -EEXIST; goto out_remid; @@ -4485,19 +5679,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) attach_lkb(r, lkb); add_lkb(r, lkb, rl->rl_status); error = 0; + ls->ls_recover_locks_in++; + + if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) + rsb_set_flag(r, RSB_RECOVER_GRANT); out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ rl->rl_remid = cpu_to_le32(lkb->lkb_id); + lkb->lkb_recover_seq = ls->ls_recover_seq; + out_unlock: unlock_rsb(r); put_rsb(r); out: - if (error) - log_debug(ls, "recover_master_copy %d %x", error, - le32_to_cpu(rl->rl_lkid)); + if (error && error != -EEXIST) + log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", + from_nodeid, remid, error); rl->rl_result = cpu_to_le32(error); return error; } @@ -4508,41 +5708,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; struct dlm_lkb *lkb; - int error; + uint32_t lkid, remid; + int error, result; + + lkid = le32_to_cpu(rl->rl_lkid); + remid = le32_to_cpu(rl->rl_remid); + result = le32_to_cpu(rl->rl_result); - error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb); + error = find_lkb(ls, lkid, &lkb); if (error) { - log_error(ls, "recover_process_copy no lkid %x", - le32_to_cpu(rl->rl_lkid)); + log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", + lkid, rc->rc_header.h_nodeid, remid, result); return error; } - DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - - error = le32_to_cpu(rl->rl_result); - r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); - switch (error) { + if (!is_process_copy(lkb)) { + log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", + lkid, rc->rc_header.h_nodeid, remid, result); + dlm_dump_rsb(r); + unlock_rsb(r); + put_rsb(r); + dlm_put_lkb(lkb); + return -EINVAL; + } + + switch (result) { case -EBADR: /* There's a chance the new master received our lock before dlm_recover_master_reply(), this wouldn't happen if we did a barrier between recover_masters and recover_locks. */ - log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, - (unsigned long)r, r->res_name); + + log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", + lkid, rc->rc_header.h_nodeid, remid, result); + dlm_send_rcom_lock(r, lkb); goto out; case -EEXIST: - log_debug(ls, "master copy exists %x", lkb->lkb_id); - /* fall through */ case 0: - lkb->lkb_remid = le32_to_cpu(rl->rl_remid); + lkb->lkb_remid = remid; break; default: - log_error(ls, "dlm_recover_process_copy unknown error %d %x", - error, lkb->lkb_id); + log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", + lkid, rc->rc_header.h_nodeid, remid, result); } /* an ack for dlm_recover_locks() which waits for replies from @@ -4589,7 +5800,6 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, fake_astfn, ua, fake_bastfn, &args); lkb->lkb_flags |= DLM_IFL_USER; - ua->old_mode = DLM_LOCK_IV; if (error) { __put_lkb(ls, lkb); @@ -4658,7 +5868,6 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua->bastparam = ua_tmp->bastparam; ua->bastaddr = ua_tmp->bastaddr; ua->user_lksb = ua_tmp->user_lksb; - ua->old_mode = lkb->lkb_grmode; error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, fake_astfn, ua, fake_bastfn, &args); @@ -4714,7 +5923,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, goto out_put; spin_lock(&ua->proc->locks_spin); - /* dlm_user_add_ast() may have already taken lkb off the proc list */ + /* dlm_user_add_cb() may have already taken lkb off the proc list */ if (!list_empty(&lkb->lkb_ownqueue)) list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); spin_unlock(&ua->proc->locks_spin); @@ -4833,15 +6042,18 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) return error; } -/* The force flag allows the unlock to go ahead even if the lkb isn't granted. - Regardless of what rsb queue the lock is on, it's removed and freed. */ +/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't + granted. Regardless of what rsb queue the lock is on, it's removed and + freed. The IVVALBLK flag causes the lvb on the resource to be invalidated + if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { struct dlm_args args; int error; - set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args); + set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, + lkb->lkb_ua, &args); error = unlock_lock(ls, lkb, &args); if (error == -DLM_EUNLOCK) @@ -4851,7 +6063,7 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() (which does lock_rsb) due to deadlock with receiving a message that does - lock_rsb followed by dlm_user_add_ast() */ + lock_rsb followed by dlm_user_add_cb() */ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, struct dlm_user_proc *proc) @@ -4874,7 +6086,7 @@ static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, return lkb; } -/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which +/* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, which we clear here. */ @@ -4916,9 +6128,10 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { - lkb->lkb_ast_type = 0; - list_del(&lkb->lkb_astqueue); + list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { + memset(&lkb->lkb_callbacks, 0, + sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); + list_del_init(&lkb->lkb_cb_list); dlm_put_lkb(lkb); } @@ -4957,8 +6170,10 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) spin_unlock(&proc->locks_spin); spin_lock(&proc->asts_spin); - list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { - list_del(&lkb->lkb_astqueue); + list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { + memset(&lkb->lkb_callbacks, 0, + sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); + list_del_init(&lkb->lkb_cb_list); dlm_put_lkb(lkb); } spin_unlock(&proc->asts_spin); |
