diff options
Diffstat (limited to 'fs/dlm/dir.c')
| -rw-r--r-- | fs/dlm/dir.c | 391 |
1 files changed, 138 insertions, 253 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 46754553fdc..d975851a7e1 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -23,50 +23,6 @@ #include "lock.h" #include "dir.h" - -static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) -{ - spin_lock(&ls->ls_recover_list_lock); - list_add(&de->list, &ls->ls_recover_list); - spin_unlock(&ls->ls_recover_list_lock); -} - -static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) -{ - int found = 0; - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - list_for_each_entry(de, &ls->ls_recover_list, list) { - if (de->length == len) { - list_del(&de->list); - de->master_nodeid = 0; - memset(de->name, 0, len); - found = 1; - break; - } - } - spin_unlock(&ls->ls_recover_list_lock); - - if (!found) - de = allocate_direntry(ls, len); - return de; -} - -void dlm_clear_free_entries(struct dlm_ls *ls) -{ - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - while (!list_empty(&ls->ls_recover_list)) { - de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, - list); - list_del(&de->list); - free_direntry(de); - } - spin_unlock(&ls->ls_recover_list_lock); -} - /* * We use the upper 16 bits of the hash value to select the directory node. * Low bits are used for distribution of rsb's among hash buckets on each node. @@ -78,148 +34,58 @@ void dlm_clear_free_entries(struct dlm_ls *ls) int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) { - struct list_head *tmp; - struct dlm_member *memb = NULL; - uint32_t node, n = 0; - int nodeid; - - if (ls->ls_num_nodes == 1) { - nodeid = dlm_our_nodeid(); - goto out; - } + uint32_t node; - if (ls->ls_node_array) { + if (ls->ls_num_nodes == 1) + return dlm_our_nodeid(); + else { node = (hash >> 16) % ls->ls_total_weight; - nodeid = ls->ls_node_array[node]; - goto out; + return ls->ls_node_array[node]; } - - /* make_member_array() failed to kmalloc ls_node_array... */ - - node = (hash >> 16) % ls->ls_num_nodes; - - list_for_each(tmp, &ls->ls_nodes) { - if (n++ != node) - continue; - memb = list_entry(tmp, struct dlm_member, list); - break; - } - - DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", - ls->ls_num_nodes, n, node);); - nodeid = memb->nodeid; - out: - return nodeid; } int dlm_dir_nodeid(struct dlm_rsb *r) { - return dlm_hash2nodeid(r->res_ls, r->res_hash); -} - -static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) -{ - uint32_t val; - - val = jhash(name, len, 0); - val &= (ls->ls_dirtbl_size - 1); - - return val; -} - -static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) -{ - uint32_t bucket; - - bucket = dir_hash(ls, de->name, de->length); - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); + return r->res_dir_nodeid; } -static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, - int namelen, uint32_t bucket) +void dlm_recover_dir_nodeid(struct dlm_ls *ls) { - struct dlm_direntry *de; - - list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { - if (de->length == namelen && !memcmp(name, de->name, namelen)) - goto out; - } - de = NULL; - out: - return de; -} - -void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) -{ - struct dlm_direntry *de; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - write_lock(&ls->ls_dirtbl[bucket].lock); - - de = search_bucket(ls, name, namelen, bucket); - - if (!de) { - log_error(ls, "remove fr %u none", nodeid); - goto out; - } + struct dlm_rsb *r; - if (de->master_nodeid != nodeid) { - log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); - goto out; - } - - list_del(&de->list); - free_direntry(de); - out: - write_unlock(&ls->ls_dirtbl[bucket].lock); -} - -void dlm_dir_clear(struct dlm_ls *ls) -{ - struct list_head *head; - struct dlm_direntry *de; - int i; - - DLM_ASSERT(list_empty(&ls->ls_recover_list), ); - - for (i = 0; i < ls->ls_dirtbl_size; i++) { - write_lock(&ls->ls_dirtbl[i].lock); - head = &ls->ls_dirtbl[i].list; - while (!list_empty(head)) { - de = list_entry(head->next, struct dlm_direntry, list); - list_del(&de->list); - put_free_de(ls, de); - } - write_unlock(&ls->ls_dirtbl[i].lock); + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); } + up_read(&ls->ls_root_sem); } int dlm_recover_directory(struct dlm_ls *ls) { struct dlm_member *memb; - struct dlm_direntry *de; char *b, *last_name = NULL; - int error = -ENOMEM, last_len, count = 0; + int error = -ENOMEM, last_len, nodeid, result; uint16_t namelen; + unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; - log_debug(ls, "dlm_recover_directory"); + log_rinfo(ls, "dlm_recover_directory"); if (dlm_no_directory(ls)) goto out_status; - dlm_dir_clear(ls); - - last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL); + last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); if (!last_name) goto out; list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->nodeid == dlm_our_nodeid()) + continue; + memset(last_name, 0, DLM_RESNAME_MAXLEN); last_len = 0; for (;;) { + int left; error = dlm_recovery_stopped(ls); if (error) goto out_free; @@ -229,18 +95,27 @@ int dlm_recover_directory(struct dlm_ls *ls) if (error) goto out_free; - schedule(); + cond_resched(); /* * pick namelen/name pairs out of received buffer */ - b = ls->ls_recover_buf + sizeof(struct dlm_rcom); + b = ls->ls_recover_buf->rc_buf; + left = ls->ls_recover_buf->rc_header.h_length; + left -= sizeof(struct dlm_rcom); for (;;) { - memcpy(&namelen, b, sizeof(uint16_t)); - namelen = be16_to_cpu(namelen); - b += sizeof(uint16_t); + __be16 v; + + error = -EINVAL; + if (left < sizeof(__be16)) + goto out_free; + + memcpy(&v, b, sizeof(__be16)); + namelen = be16_to_cpu(v); + b += sizeof(__be16); + left -= sizeof(__be16); /* namelen of 0xFFFFF marks end of names for this node; namelen of 0 marks end of the @@ -251,127 +126,135 @@ int dlm_recover_directory(struct dlm_ls *ls) if (!namelen) break; - error = -ENOMEM; - de = get_free_de(ls, namelen); - if (!de) + if (namelen > left) + goto out_free; + + if (namelen > DLM_RESNAME_MAXLEN) goto out_free; - de->master_nodeid = memb->nodeid; - de->length = namelen; + error = dlm_master_lookup(ls, memb->nodeid, + b, namelen, + DLM_LU_RECOVER_DIR, + &nodeid, &result); + if (error) { + log_error(ls, "recover_dir lookup %d", + error); + goto out_free; + } + + /* The name was found in rsbtbl, but the + * master nodeid is different from + * memb->nodeid which says it is the master. + * This should not happen. */ + + if (result == DLM_LU_MATCH && + nodeid != memb->nodeid) { + count_bad++; + log_error(ls, "recover_dir lookup %d " + "nodeid %d memb %d bad %u", + result, nodeid, memb->nodeid, + count_bad); + print_hex_dump_bytes("dlm_recover_dir ", + DUMP_PREFIX_NONE, + b, namelen); + } + + /* The name was found in rsbtbl, and the + * master nodeid matches memb->nodeid. */ + + if (result == DLM_LU_MATCH && + nodeid == memb->nodeid) { + count_match++; + } + + /* The name was not found in rsbtbl and was + * added with memb->nodeid as the master. */ + + if (result == DLM_LU_ADD) { + count_add++; + } + last_len = namelen; - memcpy(de->name, b, namelen); memcpy(last_name, b, namelen); b += namelen; - - add_entry_to_hash(ls, de); + left -= namelen; count++; } } - done: + done: ; } out_status: error = 0; dlm_set_recover_status(ls, DLM_RS_DIR); - log_debug(ls, "dlm_recover_directory %d entries", count); + + log_rinfo(ls, "dlm_recover_directory %u in %u new", + count, count_add); out_free: kfree(last_name); out: - dlm_clear_free_entries(ls); return error; } -static int get_entry(struct dlm_ls *ls, int nodeid, char *name, - int namelen, int *r_nodeid) +static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) { - struct dlm_direntry *de, *tmp; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - write_lock(&ls->ls_dirtbl[bucket].lock); - de = search_bucket(ls, name, namelen, bucket); - if (de) { - *r_nodeid = de->master_nodeid; - write_unlock(&ls->ls_dirtbl[bucket].lock); - if (*r_nodeid == nodeid) - return -EEXIST; - return 0; - } + struct dlm_rsb *r; + uint32_t hash, bucket; + int rv; - write_unlock(&ls->ls_dirtbl[bucket].lock); + hash = jhash(name, len, 0); + bucket = hash & (ls->ls_rsbtbl_size - 1); - de = allocate_direntry(ls, namelen); - if (!de) - return -ENOMEM; + spin_lock(&ls->ls_rsbtbl[bucket].lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); + if (rv) + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, + name, len, &r); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); - de->master_nodeid = nodeid; - de->length = namelen; - memcpy(de->name, name, namelen); + if (!rv) + return r; - write_lock(&ls->ls_dirtbl[bucket].lock); - tmp = search_bucket(ls, name, namelen, bucket); - if (tmp) { - free_direntry(de); - de = tmp; - } else { - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + if (len == r->res_length && !memcmp(name, r->res_name, len)) { + up_read(&ls->ls_root_sem); + log_debug(ls, "find_rsb_root revert to root_list %s", + r->res_name); + return r; + } } - *r_nodeid = de->master_nodeid; - write_unlock(&ls->ls_dirtbl[bucket].lock); - return 0; -} - -int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, - int *r_nodeid) -{ - return get_entry(ls, nodeid, name, namelen, r_nodeid); + up_read(&ls->ls_root_sem); + return NULL; } -/* Copy the names of master rsb's into the buffer provided. - Only select names whose dir node is the given nodeid. */ +/* Find the rsb where we left off (or start again), then send rsb names + for rsb's we're master of and whose directory node matches the requesting + node. inbuf is the rsb name last sent, inlen is the name's length */ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, char *outbuf, int outlen, int nodeid) { struct list_head *list; - struct dlm_rsb *start_r = NULL, *r = NULL; - int offset = 0, start_namelen, error, dir_nodeid; - char *start_name; - uint16_t be_namelen; - - /* - * Find the rsb where we left off (or start again) - */ - - start_namelen = inlen; - start_name = inbuf; - - if (start_namelen > 1) { - /* - * We could also use a find_rsb_root() function here that - * searched the ls_root_list. - */ - error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER, - &start_r); - DLM_ASSERT(!error && start_r, - printk("error %d\n", error);); - DLM_ASSERT(!list_empty(&start_r->res_root_list), - dlm_print_rsb(start_r);); - dlm_put_rsb(start_r); - } - - /* - * Send rsb names for rsb's we're master of and whose directory node - * matches the requesting node. - */ + struct dlm_rsb *r; + int offset = 0, dir_nodeid; + __be16 be_namelen; down_read(&ls->ls_root_sem); - if (start_r) - list = start_r->res_root_list.next; - else + + if (inlen > 1) { + r = find_rsb_root(ls, inbuf, inlen); + if (!r) { + inbuf[inlen - 1] = '\0'; + log_error(ls, "copy_master_names from %d start %d %s", + nodeid, inlen, inbuf); + goto out; + } + list = r->res_root_list.next; + } else { list = ls->ls_root_list.next; + } for (offset = 0; list != &ls->ls_root_list; list = list->next) { r = list_entry(list, struct dlm_rsb, res_root_list); @@ -392,17 +275,19 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { /* Write end-of-block record */ - be_namelen = 0; - memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); - offset += sizeof(uint16_t); + be_namelen = cpu_to_be16(0); + memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); + offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; goto out; } be_namelen = cpu_to_be16(r->res_length); - memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); - offset += sizeof(uint16_t); + memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); + offset += sizeof(__be16); memcpy(outbuf + offset, r->res_name, r->res_length); offset += r->res_length; + ls->ls_recover_dir_sent_res++; } /* @@ -412,11 +297,11 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, if ((list == &ls->ls_root_list) && (offset + sizeof(uint16_t) <= outlen)) { - be_namelen = 0xFFFF; - memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t)); - offset += sizeof(uint16_t); + be_namelen = cpu_to_be16(0xFFFF); + memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); + offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; } - out: up_read(&ls->ls_root_sem); } |
