diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2012-01-10 14:55:55 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2012-01-10 14:55:55 -0800 |
commit | 49d41bae46f15da528ef9848fd7c9d38582aa8e9 (patch) | |
tree | 76907a9a5066642a32cb238ef8d8367fc612d064 /fs/dlm | |
parent | 7b3480f8b701170c046e1ed362946f5f0d005e13 (diff) | |
parent | 60f98d1839376d30e13f3e452dce2433fad3060e (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
dlm: add recovery callbacks
dlm: add node slots and generation
dlm: move recovery barrier calls
dlm: convert rsb list to rb_tree
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/config.c | 130 | ||||
-rw-r--r-- | fs/dlm/config.h | 17 | ||||
-rw-r--r-- | fs/dlm/debug_fs.c | 28 | ||||
-rw-r--r-- | fs/dlm/dir.c | 1 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 60 | ||||
-rw-r--r-- | fs/dlm/lock.c | 87 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 71 | ||||
-rw-r--r-- | fs/dlm/member.c | 486 | ||||
-rw-r--r-- | fs/dlm/member.h | 10 | ||||
-rw-r--r-- | fs/dlm/rcom.c | 99 | ||||
-rw-r--r-- | fs/dlm/rcom.h | 2 | ||||
-rw-r--r-- | fs/dlm/recover.c | 87 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 53 | ||||
-rw-r--r-- | fs/dlm/user.c | 5 |
14 files changed, 873 insertions, 263 deletions
diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 6cf72fcc0d0..e7e327d43fa 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -17,6 +17,7 @@ #include <linux/slab.h> #include <linux/in.h> #include <linux/in6.h> +#include <linux/dlmconstants.h> #include <net/ipv6.h> #include <net/sock.h> @@ -36,6 +37,7 @@ static struct config_group *space_list; static struct config_group *comm_list; static struct dlm_comm *local_comm; +static uint32_t dlm_comm_count; struct dlm_clusters; struct dlm_cluster; @@ -103,6 +105,8 @@ struct dlm_cluster { unsigned int cl_timewarn_cs; unsigned int cl_waitwarn_us; unsigned int cl_new_rsb_count; + unsigned int cl_recover_callbacks; + char cl_cluster_name[DLM_LOCKSPACE_LEN]; }; enum { @@ -118,6 +122,8 @@ enum { CLUSTER_ATTR_TIMEWARN_CS, CLUSTER_ATTR_WAITWARN_US, CLUSTER_ATTR_NEW_RSB_COUNT, + CLUSTER_ATTR_RECOVER_CALLBACKS, + CLUSTER_ATTR_CLUSTER_NAME, }; struct cluster_attribute { @@ -126,6 +132,27 @@ struct cluster_attribute { ssize_t (*store)(struct dlm_cluster *, const char *, size_t); }; +static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf) +{ + return sprintf(buf, "%s\n", cl->cl_cluster_name); +} + +static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl, + const char *buf, size_t len) +{ + strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN); + strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN); + return len; +} + +static struct cluster_attribute cluster_attr_cluster_name = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "cluster_name", + .ca_mode = S_IRUGO | S_IWUSR }, + .show = cluster_cluster_name_read, + .store = cluster_cluster_name_write, +}; + static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, int *info_field, int check_zero, const char *buf, size_t len) @@ -171,6 +198,7 @@ CLUSTER_ATTR(protocol, 0); CLUSTER_ATTR(timewarn_cs, 1); CLUSTER_ATTR(waitwarn_us, 0); CLUSTER_ATTR(new_rsb_count, 0); +CLUSTER_ATTR(recover_callbacks, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, @@ -185,6 +213,8 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr, [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr, [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr, + [CLUSTER_ATTR_RECOVER_CALLBACKS] = &cluster_attr_recover_callbacks.attr, + [CLUSTER_ATTR_CLUSTER_NAME] = &cluster_attr_cluster_name.attr, NULL, }; @@ -293,6 +323,7 @@ struct dlm_comms { struct dlm_comm { struct config_item item; + int seq; int nodeid; int local; int addr_count; @@ -309,6 +340,7 @@ struct dlm_node { int nodeid; int weight; int new; + int comm_seq; /* copy of cm->seq when nd->nodeid is set */ }; static struct configfs_group_operations clusters_ops = { @@ -455,6 +487,9 @@ static struct config_group *make_cluster(struct config_group *g, cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs; cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us; cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count; + cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks; + memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name, + DLM_LOCKSPACE_LEN); space_list = &sps->ss_group; comm_list = &cms->cs_group; @@ -558,6 +593,11 @@ static struct config_item *make_comm(struct config_group *g, const char *name) return ERR_PTR(-ENOMEM); config_item_init_type_name(&cm->item, name, &comm_type); + + cm->seq = dlm_comm_count++; + if (!cm->seq) + cm->seq = dlm_comm_count++; + cm->nodeid = -1; cm->local = 0; cm->addr_count = 0; @@ -801,7 +841,10 @@ static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf) static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, size_t len) { + uint32_t seq = 0; nd->nodeid = simple_strtol(buf, NULL, 0); + dlm_comm_seq(nd->nodeid, &seq); + nd->comm_seq = seq; return len; } @@ -908,13 +951,13 @@ static void put_comm(struct dlm_comm *cm) } /* caller must free mem */ -int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out, - int **new_out, int *new_count_out) +int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, + int *count_out) { struct dlm_space *sp; struct dlm_node *nd; - int i = 0, rv = 0, ids_count = 0, new_count = 0; - int *ids, *new; + struct dlm_config_node *nodes, *node; + int rv, count; sp = get_space(lsname); if (!sp) @@ -927,73 +970,42 @@ int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out, goto out; } - ids_count = sp->members_count; + count = sp->members_count; - ids = kcalloc(ids_count, sizeof(int), GFP_NOFS); - if (!ids) { + nodes = kcalloc(count, sizeof(struct dlm_config_node), GFP_NOFS); + if (!nodes) { rv = -ENOMEM; goto out; } + node = nodes; list_for_each_entry(nd, &sp->members, list) { - ids[i++] = nd->nodeid; - if (nd->new) - new_count++; - } - - if (ids_count != i) - printk(KERN_ERR "dlm: bad nodeid count %d %d\n", ids_count, i); - - if (!new_count) - goto out_ids; + node->nodeid = nd->nodeid; + node->weight = nd->weight; + node->new = nd->new; + node->comm_seq = nd->comm_seq; + node++; - new = kcalloc(new_count, sizeof(int), GFP_NOFS); - if (!new) { - kfree(ids); - rv = -ENOMEM; - goto out; + nd->new = 0; } - i = 0; - list_for_each_entry(nd, &sp->members, list) { - if (nd->new) { - new[i++] = nd->nodeid; - nd->new = 0; - } - } - *new_count_out = new_count; - *new_out = new; - - out_ids: - *ids_count_out = ids_count; - *ids_out = ids; + *count_out = count; + *nodes_out = nodes; + rv = 0; out: mutex_unlock(&sp->members_lock); put_space(sp); return rv; } -int dlm_node_weight(char *lsname, int nodeid) +int dlm_comm_seq(int nodeid, uint32_t *seq) { - struct dlm_space *sp; - struct dlm_node *nd; - int w = -EEXIST; - - sp = get_space(lsname); - if (!sp) - goto out; - - mutex_lock(&sp->members_lock); - list_for_each_entry(nd, &sp->members, list) { - if (nd->nodeid != nodeid) - continue; - w = nd->weight; - break; - } - mutex_unlock(&sp->members_lock); - put_space(sp); - out: - return w; + struct dlm_comm *cm = get_comm(nodeid, NULL); + if (!cm) + return -EEXIST; + *seq = cm->seq; + put_comm(cm); + return 0; } int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr) @@ -1047,6 +1059,8 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */ #define DEFAULT_WAITWARN_US 0 #define DEFAULT_NEW_RSB_COUNT 128 +#define DEFAULT_RECOVER_CALLBACKS 0 +#define DEFAULT_CLUSTER_NAME "" struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, @@ -1060,6 +1074,8 @@ struct dlm_config_info dlm_config = { .ci_protocol = DEFAULT_PROTOCOL, .ci_timewarn_cs = DEFAULT_TIMEWARN_CS, .ci_waitwarn_us = DEFAULT_WAITWARN_US, - .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT + .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT, + .ci_recover_callbacks = DEFAULT_RECOVER_CALLBACKS, + .ci_cluster_name = DEFAULT_CLUSTER_NAME }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 3099d0dd26c..9f5e3663bb0 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -14,6 +14,13 @@ #ifndef __CONFIG_DOT_H__ #define __CONFIG_DOT_H__ +struct dlm_config_node { + int nodeid; + int weight; + int new; + uint32_t comm_seq; +}; + #define DLM_MAX_ADDR_COUNT 3 struct dlm_config_info { @@ -29,15 +36,17 @@ struct dlm_config_info { int ci_timewarn_cs; int ci_waitwarn_us; int ci_new_rsb_count; + int ci_recover_callbacks; + char ci_cluster_name[DLM_LOCKSPACE_LEN]; }; extern struct dlm_config_info dlm_config; int dlm_config_init(void); void dlm_config_exit(void); -int dlm_node_weight(char *lsname, int nodeid); -int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out, - int **new_out, int *new_count_out); +int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, + int *count_out); +int dlm_comm_seq(int nodeid, uint32_t *seq); int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr); int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid); int dlm_our_nodeid(void); diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 59779237e2b..3dca2b39e83 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -393,6 +393,7 @@ static const struct seq_operations format3_seq_ops; static void *table_seq_start(struct seq_file *seq, loff_t *pos) { + struct rb_node *node; struct dlm_ls *ls = seq->private; struct rsbtbl_iter *ri; struct dlm_rsb *r; @@ -418,9 +419,10 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ri->format = 3; spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { - list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, - res_hashchain) { + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { + for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node; + node = rb_next(node)) { + r = rb_entry(node, struct dlm_rsb, res_hashnode); if (!entry--) { dlm_hold_rsb(r); ri->rsb = r; @@ -449,9 +451,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) } spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { - r = list_first_entry(&ls->ls_rsbtbl[bucket].list, - struct dlm_rsb, res_hashchain); + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { + node = rb_first(&ls->ls_rsbtbl[bucket].keep); + r = rb_entry(node, struct dlm_rsb, res_hashnode); dlm_hold_rsb(r); ri->rsb = r; ri->bucket = bucket; @@ -467,7 +469,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) { struct dlm_ls *ls = seq->private; struct rsbtbl_iter *ri = iter_ptr; - struct list_head *next; + struct rb_node *next; struct dlm_rsb *r, *rp; loff_t n = *pos; unsigned bucket; @@ -480,10 +482,10 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) spin_lock(&ls->ls_rsbtbl[bucket].lock); rp = ri->rsb; - next = rp->res_hashchain.next; + next = rb_next(&rp->res_hashnode); - if (next != &ls->ls_rsbtbl[bucket].list) { - r = list_entry(next, struct dlm_rsb, res_hashchain); + if (next) { + r = rb_entry(next, struct dlm_rsb, res_hashnode); dlm_hold_rsb(r); ri->rsb = r; spin_unlock(&ls->ls_rsbtbl[bucket].lock); @@ -511,9 +513,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) } spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!list_empty(&ls->ls_rsbtbl[bucket].list)) { - r = list_first_entry(&ls->ls_rsbtbl[bucket].list, - struct dlm_rsb, res_hashchain); + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { + next = rb_first(&ls->ls_rsbtbl[bucket].keep); + r = rb_entry(next, struct dlm_rsb, res_hashnode); dlm_hold_rsb(r); ri->rsb = r; ri->bucket = bucket; diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 7b84c1dbc82..83641574b01 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -290,7 +290,6 @@ int dlm_recover_directory(struct dlm_ls *ls) out_status: error = 0; - dlm_set_recover_status(ls, DLM_RS_DIR); log_debug(ls, "dlm_recover_directory %d entries", count); out_free: kfree(last_name); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index fe2860c0244..3a564d197e9 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -103,8 +103,8 @@ struct dlm_dirtable { }; struct dlm_rsbtable { - struct list_head list; - struct list_head toss; + struct rb_root keep; + struct rb_root toss; spinlock_t lock; }; @@ -117,6 +117,10 @@ struct dlm_member { struct list_head list; int nodeid; int weight; + int slot; + int slot_prev; + int comm_seq; + uint32_t generation; }; /* @@ -125,10 +129,8 @@ struct dlm_member { struct dlm_recover { struct list_head list; - int *nodeids; /* nodeids of all members */ - int node_count; - int *new; /* nodeids of new members */ - int new_count; + struct dlm_config_node *nodes; + int nodes_count; uint64_t seq; }; @@ -285,7 +287,10 @@ struct dlm_rsb { unsigned long res_toss_time; uint32_t res_first_lkid; struct list_head res_lookup; /* lkbs waiting on first */ - struct list_head res_hashchain; /* rsbtbl */ + union { + struct list_head res_hashchain; + struct rb_node res_hashnode; /* rsbtbl */ + }; struct list_head res_grantqueue; struct list_head res_convertqueue; struct list_head res_waitqueue; @@ -334,7 +339,9 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag) /* dlm_header is first element of all structs sent between nodes */ #define DLM_HEADER_MAJOR 0x00030000 -#define DLM_HEADER_MINOR 0x00000000 +#define DLM_HEADER_MINOR 0x00000001 + +#define DLM_HEADER_SLOTS 0x00000001 #define DLM_MSG 1 #define DLM_RCOM 2 @@ -422,10 +429,34 @@ union dlm_packet { struct dlm_rcom rcom; }; +#define DLM_RSF_NEED_SLOTS 0x00000001 + +/* RCOM_STATUS data */ +struct rcom_status { + __le32 rs_flags; + __le32 rs_unused1; + __le64 rs_unused2; +}; + +/* RCOM_STATUS_REPLY data */ struct rcom_config { __le32 rf_lvblen; __le32 rf_lsflags; - __le64 rf_unused; + + /* DLM_HEADER_SLOTS adds: */ + __le32 rf_flags; + __le16 rf_our_slot; + __le16 rf_num_slots; + __le32 rf_generation; + __le32 rf_unused1; + __le64 rf_unused2; +}; + +struct rcom_slot { + __le32 ro_nodeid; + __le16 ro_slot; + __le16 ro_unused1; + __le64 ro_unused2; }; struct rcom_lock { @@ -452,6 +483,7 @@ struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ dlm_lockspace_t *ls_local_handle; uint32_t ls_global_id; /* global unique lockspace ID */ + uint32_t ls_generation; uint32_t ls_exflags; int ls_lvblen; int ls_count; /* refcount of processes in @@ -490,6 +522,11 @@ struct dlm_ls { int ls_total_weight; int *ls_node_array; + int ls_slot; + int ls_num_slots; + int ls_slots_size; + struct dlm_slot *ls_slots; + struct dlm_rsb ls_stub_rsb; /* for returning errors */ struct dlm_lkb ls_stub_lkb; /* for returning errors */ struct dlm_message ls_stub_ms; /* for faking a reply */ @@ -537,6 +574,9 @@ struct dlm_ls { struct list_head ls_root_list; /* root resources */ struct rw_semaphore ls_root_sem; /* protect root_list */ + const struct dlm_lockspace_ops *ls_ops; + void *ls_ops_arg; + int ls_namelen; char ls_name[1]; }; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 83b5e32514e..d47183043c5 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -56,6 +56,7 @@ L: receive_xxxx_reply() <- R: send_xxxx_reply() */ #include <linux/types.h> +#include <linux/rbtree.h> #include <linux/slab.h> #include "dlm_internal.h" #include <linux/dlm_device.h> @@ -380,6 +381,8 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); list_del(&r->res_hashchain); + /* Convert the empty list_head to a NULL rb_node for tree usage: */ + memset(&r->res_hashnode, 0, sizeof(struct rb_node)); ls->ls_new_rsb_count--; spin_unlock(&ls->ls_new_rsb_spin); @@ -388,7 +391,6 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, memcpy(r->res_name, name, len); mutex_init(&r->res_mutex); - INIT_LIST_HEAD(&r->res_hashchain); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); @@ -400,14 +402,31 @@ static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, return 0; } -static int search_rsb_list(struct list_head *head, char *name, int len, +static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) +{ + char maxname[DLM_RESNAME_MAXLEN]; + + memset(maxname, 0, DLM_RESNAME_MAXLEN); + memcpy(maxname, name, nlen); + return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); +} + +static int search_rsb_tree(struct rb_root *tree, char *name, int len, unsigned int flags, struct dlm_rsb **r_ret) { + struct rb_node *node = tree->rb_node; struct dlm_rsb *r; int error = 0; - - list_for_each_entry(r, head, res_hashchain) { - if (len == r->res_length && !memcmp(name, r->res_name, len)) + int rc; + + while (node) { + r = rb_entry(node, struct dlm_rsb, res_hashnode); + rc = rsb_cmp(r, name, len); + if (rc < 0) + node = node->rb_left; + else if (rc > 0) + node = node->rb_right; + else goto found; } *r_ret = NULL; @@ -420,22 +439,54 @@ static int search_rsb_list(struct list_head *head, char *name, int len, return error; } +static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) +{ + struct rb_node **newn = &tree->rb_node; + struct rb_node *parent = NULL; + int rc; + + while (*newn) { + struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, + res_hashnode); + + parent = *newn; + rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); + if (rc < 0) + newn = &parent->rb_left; + else if (rc > 0) + newn = &parent->rb_right; + else { + log_print("rsb_insert match"); + dlm_dump_rsb(rsb); + dlm_dump_rsb(cur); + return -EEXIST; + } + } + + rb_link_node(&rsb->res_hashnode, parent, newn); + rb_insert_color(&rsb->res_hashnode, tree); + return 0; +} + static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, unsigned int flags, struct dlm_rsb **r_ret) { struct dlm_rsb *r; int error; - error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); + error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); if (!error) { kref_get(&r->res_ref); goto out; } - error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); + error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); if (error) goto out; - list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + if (error) + return error; if (dlm_no_directory(ls)) goto out; @@ -527,8 +578,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, nodeid = 0; r->res_nodeid = nodeid; } - list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); - error = 0; + error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); out_unlock: spin_unlock(&ls->ls_rsbtbl[bucket].lock); out: @@ -556,7 +606,8 @@ static void toss_rsb(struct kref *kref) DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); kref_init(&r->res_ref); - list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); + rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); @@ -1082,19 +1133,19 @@ static void dir_remove(struct dlm_rsb *r) r->res_name, r->res_length); } -/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is - found since they are in order of newest to oldest? */ +/* FIXME: make this more efficient */ static int shrink_bucket(struct dlm_ls *ls, int b) { + struct rb_node *n; struct dlm_rsb *r; int count = 0, found; for (;;) { found = 0; spin_lock(&ls->ls_rsbtbl[b].lock); - list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, - res_hashchain) { + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { + r = rb_entry(n, struct dlm_rsb, res_hashnode); if (!time_after_eq(jiffies, r->res_toss_time + dlm_config.ci_toss_secs * HZ)) continue; @@ -1108,7 +1159,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) } if (kref_put(&r->res_ref, kill_rsb)) { - list_del(&r->res_hashchain); + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); spin_unlock(&ls->ls_rsbtbl[b].lock); if (is_master(r)) @@ -4441,10 +4492,12 @@ int dlm_purge_locks(struct dlm_ls *ls) static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) { + struct rb_node *n; struct dlm_rsb *r, *r_ret = NULL; spin_lock(&ls->ls_rsbtbl[bucket].lock); - list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { + for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { + r = rb_entry(n, struct dlm_rsb, res_hashnode); if (!rsb_flag(r, RSB_LOCKS_PURGED)) continue; hold_rsb(r); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index a1d8f1af144..a1ea25face8 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -386,12 +386,15 @@ static void threads_stop(void) dlm_lowcomms_stop(); } -static int new_lockspace(const char *name, int namelen, void **lockspace, - uint32_t flags, int lvblen) +static int new_lockspace(const char *name, const char *cluster, + uint32_t flags, int lvblen, + const struct dlm_lockspace_ops *ops, void *ops_arg, + int *ops_result, dlm_lockspace_t **lockspace) { struct dlm_ls *ls; int i, size, error; int do_unreg = 0; + int namelen = strlen(name); if (namelen > DLM_LOCKSPACE_LEN) return -EINVAL; @@ -403,8 +406,24 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, return -EINVAL; if (!dlm_user_daemon_available()) { - module_put(THIS_MODULE); - return -EUNATCH; + log_print("dlm user daemon not available"); + error = -EUNATCH; + goto out; + } + + if (ops && ops_result) { + if (!dlm_config.ci_recover_callbacks) + *ops_result = -EOPNOTSUPP; + else + *ops_result = 0; + } + + if (dlm_config.ci_recover_callbacks && cluster && + strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { + log_print("dlm cluster name %s mismatch %s", + dlm_config.ci_cluster_name, cluster); + error = -EBADR; + goto out; } error = 0; @@ -442,6 +461,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, ls->ls_flags = 0; ls->ls_scan_time = jiffies; + if (ops && dlm_config.ci_recover_callbacks) { + ls->ls_ops = ops; + ls->ls_ops_arg = ops_arg; + } + if (flags & DLM_LSFL_TIMEWARN) set_bit(LSFL_TIMEWARN, &ls->ls_flags); @@ -457,8 +481,8 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, if (!ls->ls_rsbtbl) goto out_lsfree; for (i = 0; i < size; i++) { - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list); - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss); + ls->ls_rsbtbl[i].keep.rb_node = NULL; + ls->ls_rsbtbl[i].toss.rb_node = NULL; spin_lock_init(&ls->ls_rsbtbl[i].lock); } @@ -525,6 +549,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, if (!ls->ls_recover_buf) goto out_dirfree; + ls->ls_slot = 0; + ls->ls_num_slots = 0; + ls->ls_slots_size = 0; + ls->ls_slots = NULL; + INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); ls->ls_recover_list_count = 0; @@ -614,8 +643,10 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, return error; } -int dlm_new_lockspace(const char *name, int namelen, void **lockspace, - uint32_t flags, int lvblen) +int dlm_new_lockspace(const char *name, const char *cluster, + uint32_t flags, int lvblen, + const struct dlm_lockspace_ops *ops, void *ops_arg, + int *ops_result, dlm_lockspace_t **lockspace) { int error = 0; @@ -625,7 +656,8 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace, if (error) goto out; - error = new_lockspace(name, namelen, lockspace, flags, lvblen); + error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, + ops_result, lockspace); if (!error) ls_count++; if (error > 0) @@ -685,7 +717,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force) static int release_lockspace(struct dlm_ls *ls, int force) { struct dlm_rsb *rsb; - struct list_head *head; + struct rb_node *n; int i, busy, rv; busy = lockspace_busy(ls, force); @@ -746,20 +778,15 @@ static int release_lockspace(struct dlm_ls *ls, int force) */ for (i = 0; i < ls->ls_rsbtbl_size; i++) { - head = &ls->ls_rsbtbl[i].list; - while (!list_empty(head)) { - rsb = list_entry(head->next, struct dlm_rsb, - res_hashchain); - - list_del(&rsb->res_hashchain); + while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { + rsb = rb_entry(n, struct dlm_rsb, res_hashnode); + rb_erase(n, &ls->ls_rsbtbl[i].keep); dlm_free_rsb(rsb); } - head = &ls->ls_rsbtbl[i].toss; - while (!list_empty(head)) { - rsb = list_entry(head->next, struct dlm_rsb, - res_hashchain); - list_del(&rsb->res_hashchain); + while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { + rsb = rb_entry(n, struct dlm_rsb, res_hashnode); + rb_erase(n, &ls->ls_rsbtbl[i].toss); dlm_free_rsb(rsb); } } diff --git a/fs/dlm/member.c b/fs/dlm/member.c index b12532e553f..862640a36d5 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -19,6 +19,280 @@ #include "config.h" #include "lowcomms.h" +int dlm_slots_version(struct dlm_header *h) +{ + if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS) + return 0; + return 1; +} + +void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, + struct dlm_member *memb) +{ + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; + + if (!dlm_slots_version(&rc->rc_header)) + return; + + memb->slot = le16_to_cpu(rf->rf_our_slot); + memb->generation = le32_to_cpu(rf->rf_generation); +} + +void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) +{ + struct dlm_slot *slot; + struct rcom_slot *ro; + int i; + + ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); + + /* ls_slots array is sparse, but not rcom_slots */ + + for (i = 0; i < ls->ls_slots_size; i++) { + slot = &ls->ls_slots[i]; + if (!slot->nodeid) + continue; + ro->ro_nodeid = cpu_to_le32(slot->nodeid); + ro->ro_slot = cpu_to_le16(slot->slot); + ro++; + } +} + +#define SLOT_DEBUG_LINE 128 + +static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, + struct rcom_slot *ro0, struct dlm_slot *array, + int array_size) +{ + char line[SLOT_DEBUG_LINE]; + int len = SLOT_DEBUG_LINE - 1; + int pos = 0; + int ret, i; + + if (!dlm_config.ci_log_debug) + return; + + memset(line, 0, sizeof(line)); + + if (array) { + for (i = 0; i < array_size; i++) { + if (!array[i].nodeid) + continue; + + ret = snprintf(line + pos, len - pos, " %d:%d", + array[i].slot, array[i].nodeid); + if (ret >= len - pos) + break; + pos += ret; + } + } else if (ro0) { + for (i = 0; i < num_slots; i++) { + ret = snprintf(line + pos, len - pos, " %d:%d", + ro0[i].ro_slot, ro0[i].ro_nodeid); + if (ret >= len - pos) + break; + pos += ret; + } + } + + log_debug(ls, "generation %u slots %d%s", gen, num_slots, line); +} + +int dlm_slots_copy_in(struct dlm_ls *ls) +{ + struct dlm_member *memb; + struct dlm_rcom *rc = ls->ls_recover_buf; + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; + struct rcom_slot *ro0, *ro; + int our_nodeid = dlm_our_nodeid(); |