aboutsummaryrefslogtreecommitdiff
path: root/fs/dlm/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/dir.c')
-rw-r--r--fs/dlm/dir.c391
1 files changed, 138 insertions, 253 deletions
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 46754553fdc..d975851a7e1 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -23,50 +23,6 @@
#include "lock.h"
#include "dir.h"
-
-static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
-{
- spin_lock(&ls->ls_recover_list_lock);
- list_add(&de->list, &ls->ls_recover_list);
- spin_unlock(&ls->ls_recover_list_lock);
-}
-
-static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
-{
- int found = 0;
- struct dlm_direntry *de;
-
- spin_lock(&ls->ls_recover_list_lock);
- list_for_each_entry(de, &ls->ls_recover_list, list) {
- if (de->length == len) {
- list_del(&de->list);
- de->master_nodeid = 0;
- memset(de->name, 0, len);
- found = 1;
- break;
- }
- }
- spin_unlock(&ls->ls_recover_list_lock);
-
- if (!found)
- de = allocate_direntry(ls, len);
- return de;
-}
-
-void dlm_clear_free_entries(struct dlm_ls *ls)
-{
- struct dlm_direntry *de;
-
- spin_lock(&ls->ls_recover_list_lock);
- while (!list_empty(&ls->ls_recover_list)) {
- de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
- list);
- list_del(&de->list);
- free_direntry(de);
- }
- spin_unlock(&ls->ls_recover_list_lock);
-}
-
/*
* We use the upper 16 bits of the hash value to select the directory node.
* Low bits are used for distribution of rsb's among hash buckets on each node.
@@ -78,148 +34,58 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
{
- struct list_head *tmp;
- struct dlm_member *memb = NULL;
- uint32_t node, n = 0;
- int nodeid;
-
- if (ls->ls_num_nodes == 1) {
- nodeid = dlm_our_nodeid();
- goto out;
- }
+ uint32_t node;
- if (ls->ls_node_array) {
+ if (ls->ls_num_nodes == 1)
+ return dlm_our_nodeid();
+ else {
node = (hash >> 16) % ls->ls_total_weight;
- nodeid = ls->ls_node_array[node];
- goto out;
+ return ls->ls_node_array[node];
}
-
- /* make_member_array() failed to kmalloc ls_node_array... */
-
- node = (hash >> 16) % ls->ls_num_nodes;
-
- list_for_each(tmp, &ls->ls_nodes) {
- if (n++ != node)
- continue;
- memb = list_entry(tmp, struct dlm_member, list);
- break;
- }
-
- DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
- ls->ls_num_nodes, n, node););
- nodeid = memb->nodeid;
- out:
- return nodeid;
}
int dlm_dir_nodeid(struct dlm_rsb *r)
{
- return dlm_hash2nodeid(r->res_ls, r->res_hash);
-}
-
-static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
-{
- uint32_t val;
-
- val = jhash(name, len, 0);
- val &= (ls->ls_dirtbl_size - 1);
-
- return val;
-}
-
-static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
-{
- uint32_t bucket;
-
- bucket = dir_hash(ls, de->name, de->length);
- list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
+ return r->res_dir_nodeid;
}
-static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
- int namelen, uint32_t bucket)
+void dlm_recover_dir_nodeid(struct dlm_ls *ls)
{
- struct dlm_direntry *de;
-
- list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
- if (de->length == namelen && !memcmp(name, de->name, namelen))
- goto out;
- }
- de = NULL;
- out:
- return de;
-}
-
-void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
-{
- struct dlm_direntry *de;
- uint32_t bucket;
-
- bucket = dir_hash(ls, name, namelen);
-
- write_lock(&ls->ls_dirtbl[bucket].lock);
-
- de = search_bucket(ls, name, namelen, bucket);
-
- if (!de) {
- log_error(ls, "remove fr %u none", nodeid);
- goto out;
- }
+ struct dlm_rsb *r;
- if (de->master_nodeid != nodeid) {
- log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
- goto out;
- }
-
- list_del(&de->list);
- free_direntry(de);
- out:
- write_unlock(&ls->ls_dirtbl[bucket].lock);
-}
-
-void dlm_dir_clear(struct dlm_ls *ls)
-{
- struct list_head *head;
- struct dlm_direntry *de;
- int i;
-
- DLM_ASSERT(list_empty(&ls->ls_recover_list), );
-
- for (i = 0; i < ls->ls_dirtbl_size; i++) {
- write_lock(&ls->ls_dirtbl[i].lock);
- head = &ls->ls_dirtbl[i].list;
- while (!list_empty(head)) {
- de = list_entry(head->next, struct dlm_direntry, list);
- list_del(&de->list);
- put_free_de(ls, de);
- }
- write_unlock(&ls->ls_dirtbl[i].lock);
+ down_read(&ls->ls_root_sem);
+ list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+ r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
}
+ up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls)
{
struct dlm_member *memb;
- struct dlm_direntry *de;
char *b, *last_name = NULL;
- int error = -ENOMEM, last_len, count = 0;
+ int error = -ENOMEM, last_len, nodeid, result;
uint16_t namelen;
+ unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
- log_debug(ls, "dlm_recover_directory");
+ log_rinfo(ls, "dlm_recover_directory");
if (dlm_no_directory(ls))
goto out_status;
- dlm_dir_clear(ls);
-
- last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
+ last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
if (!last_name)
goto out;
list_for_each_entry(memb, &ls->ls_nodes, list) {
+ if (memb->nodeid == dlm_our_nodeid())
+ continue;
+
memset(last_name, 0, DLM_RESNAME_MAXLEN);
last_len = 0;
for (;;) {
+ int left;
error = dlm_recovery_stopped(ls);
if (error)
goto out_free;
@@ -229,18 +95,27 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (error)
goto out_free;
- schedule();
+ cond_resched();
/*
* pick namelen/name pairs out of received buffer
*/
- b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
+ b = ls->ls_recover_buf->rc_buf;
+ left = ls->ls_recover_buf->rc_header.h_length;
+ left -= sizeof(struct dlm_rcom);
for (;;) {
- memcpy(&namelen, b, sizeof(uint16_t));
- namelen = be16_to_cpu(namelen);
- b += sizeof(uint16_t);
+ __be16 v;
+
+ error = -EINVAL;
+ if (left < sizeof(__be16))
+ goto out_free;
+
+ memcpy(&v, b, sizeof(__be16));
+ namelen = be16_to_cpu(v);
+ b += sizeof(__be16);
+ left -= sizeof(__be16);
/* namelen of 0xFFFFF marks end of names for
this node; namelen of 0 marks end of the
@@ -251,127 +126,135 @@ int dlm_recover_directory(struct dlm_ls *ls)
if (!namelen)
break;
- error = -ENOMEM;
- de = get_free_de(ls, namelen);
- if (!de)
+ if (namelen > left)
+ goto out_free;
+
+ if (namelen > DLM_RESNAME_MAXLEN)
goto out_free;
- de->master_nodeid = memb->nodeid;
- de->length = namelen;
+ error = dlm_master_lookup(ls, memb->nodeid,
+ b, namelen,
+ DLM_LU_RECOVER_DIR,
+ &nodeid, &result);
+ if (error) {
+ log_error(ls, "recover_dir lookup %d",
+ error);
+ goto out_free;
+ }
+
+ /* The name was found in rsbtbl, but the
+ * master nodeid is different from
+ * memb->nodeid which says it is the master.
+ * This should not happen. */
+
+ if (result == DLM_LU_MATCH &&
+ nodeid != memb->nodeid) {
+ count_bad++;
+ log_error(ls, "recover_dir lookup %d "
+ "nodeid %d memb %d bad %u",
+ result, nodeid, memb->nodeid,
+ count_bad);
+ print_hex_dump_bytes("dlm_recover_dir ",
+ DUMP_PREFIX_NONE,
+ b, namelen);
+ }
+
+ /* The name was found in rsbtbl, and the
+ * master nodeid matches memb->nodeid. */
+
+ if (result == DLM_LU_MATCH &&
+ nodeid == memb->nodeid) {
+ count_match++;
+ }
+
+ /* The name was not found in rsbtbl and was
+ * added with memb->nodeid as the master. */
+
+ if (result == DLM_LU_ADD) {
+ count_add++;
+ }
+
last_len = namelen;
- memcpy(de->name, b, namelen);
memcpy(last_name, b, namelen);
b += namelen;
-
- add_entry_to_hash(ls, de);
+ left -= namelen;
count++;
}
}
- done:
+ done:
;
}
out_status:
error = 0;
dlm_set_recover_status(ls, DLM_RS_DIR);
- log_debug(ls, "dlm_recover_directory %d entries", count);
+
+ log_rinfo(ls, "dlm_recover_directory %u in %u new",
+ count, count_add);
out_free:
kfree(last_name);
out:
- dlm_clear_free_entries(ls);
return error;
}
-static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
- int namelen, int *r_nodeid)
+static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
{
- struct dlm_direntry *de, *tmp;
- uint32_t bucket;
-
- bucket = dir_hash(ls, name, namelen);
-
- write_lock(&ls->ls_dirtbl[bucket].lock);
- de = search_bucket(ls, name, namelen, bucket);
- if (de) {
- *r_nodeid = de->master_nodeid;
- write_unlock(&ls->ls_dirtbl[bucket].lock);
- if (*r_nodeid == nodeid)
- return -EEXIST;
- return 0;
- }
+ struct dlm_rsb *r;
+ uint32_t hash, bucket;
+ int rv;
- write_unlock(&ls->ls_dirtbl[bucket].lock);
+ hash = jhash(name, len, 0);
+ bucket = hash & (ls->ls_rsbtbl_size - 1);
- de = allocate_direntry(ls, namelen);
- if (!de)
- return -ENOMEM;
+ spin_lock(&ls->ls_rsbtbl[bucket].lock);
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
+ if (rv)
+ rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
+ name, len, &r);
+ spin_unlock(&ls->ls_rsbtbl[bucket].lock);
- de->master_nodeid = nodeid;
- de->length = namelen;
- memcpy(de->name, name, namelen);
+ if (!rv)
+ return r;
- write_lock(&ls->ls_dirtbl[bucket].lock);
- tmp = search_bucket(ls, name, namelen, bucket);
- if (tmp) {
- free_direntry(de);
- de = tmp;
- } else {
- list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
+ down_read(&ls->ls_root_sem);
+ list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
+ if (len == r->res_length && !memcmp(name, r->res_name, len)) {
+ up_read(&ls->ls_root_sem);
+ log_debug(ls, "find_rsb_root revert to root_list %s",
+ r->res_name);
+ return r;
+ }
}
- *r_nodeid = de->master_nodeid;
- write_unlock(&ls->ls_dirtbl[bucket].lock);
- return 0;
-}
-
-int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
- int *r_nodeid)
-{
- return get_entry(ls, nodeid, name, namelen, r_nodeid);
+ up_read(&ls->ls_root_sem);
+ return NULL;
}
-/* Copy the names of master rsb's into the buffer provided.
- Only select names whose dir node is the given nodeid. */
+/* Find the rsb where we left off (or start again), then send rsb names
+ for rsb's we're master of and whose directory node matches the requesting
+ node. inbuf is the rsb name last sent, inlen is the name's length */
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid)
{
struct list_head *list;
- struct dlm_rsb *start_r = NULL, *r = NULL;
- int offset = 0, start_namelen, error, dir_nodeid;
- char *start_name;
- uint16_t be_namelen;
-
- /*
- * Find the rsb where we left off (or start again)
- */
-
- start_namelen = inlen;
- start_name = inbuf;
-
- if (start_namelen > 1) {
- /*
- * We could also use a find_rsb_root() function here that
- * searched the ls_root_list.
- */
- error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER,
- &start_r);
- DLM_ASSERT(!error && start_r,
- printk("error %d\n", error););
- DLM_ASSERT(!list_empty(&start_r->res_root_list),
- dlm_print_rsb(start_r););
- dlm_put_rsb(start_r);
- }
-
- /*
- * Send rsb names for rsb's we're master of and whose directory node
- * matches the requesting node.
- */
+ struct dlm_rsb *r;
+ int offset = 0, dir_nodeid;
+ __be16 be_namelen;
down_read(&ls->ls_root_sem);
- if (start_r)
- list = start_r->res_root_list.next;
- else
+
+ if (inlen > 1) {
+ r = find_rsb_root(ls, inbuf, inlen);
+ if (!r) {
+ inbuf[inlen - 1] = '\0';
+ log_error(ls, "copy_master_names from %d start %d %s",
+ nodeid, inlen, inbuf);
+ goto out;
+ }
+ list = r->res_root_list.next;
+ } else {
list = ls->ls_root_list.next;
+ }
for (offset = 0; list != &ls->ls_root_list; list = list->next) {
r = list_entry(list, struct dlm_rsb, res_root_list);
@@ -392,17 +275,19 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
/* Write end-of-block record */
- be_namelen = 0;
- memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
- offset += sizeof(uint16_t);
+ be_namelen = cpu_to_be16(0);
+ memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
+ offset += sizeof(__be16);
+ ls->ls_recover_dir_sent_msg++;
goto out;
}
be_namelen = cpu_to_be16(r->res_length);
- memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
- offset += sizeof(uint16_t);
+ memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
+ offset += sizeof(__be16);
memcpy(outbuf + offset, r->res_name, r->res_length);
offset += r->res_length;
+ ls->ls_recover_dir_sent_res++;
}
/*
@@ -412,11 +297,11 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
if ((list == &ls->ls_root_list) &&
(offset + sizeof(uint16_t) <= outlen)) {
- be_namelen = 0xFFFF;
- memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
- offset += sizeof(uint16_t);
+ be_namelen = cpu_to_be16(0xFFFF);
+ memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
+ offset += sizeof(__be16);
+ ls->ls_recover_dir_sent_msg++;
}
-
out:
up_read(&ls->ls_root_sem);
}