diff options
Diffstat (limited to 'fs/dlm')
| -rw-r--r-- | fs/dlm/Kconfig | 2 | ||||
| -rw-r--r-- | fs/dlm/ast.c | 12 | ||||
| -rw-r--r-- | fs/dlm/config.c | 119 | ||||
| -rw-r--r-- | fs/dlm/config.h | 3 | ||||
| -rw-r--r-- | fs/dlm/debug_fs.c | 135 | ||||
| -rw-r--r-- | fs/dlm/dir.c | 289 | ||||
| -rw-r--r-- | fs/dlm/dir.h | 7 | ||||
| -rw-r--r-- | fs/dlm/dlm_internal.h | 112 | ||||
| -rw-r--r-- | fs/dlm/lock.c | 1354 | ||||
| -rw-r--r-- | fs/dlm/lock.h | 5 | ||||
| -rw-r--r-- | fs/dlm/lockspace.c | 103 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.c | 405 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.h | 2 | ||||
| -rw-r--r-- | fs/dlm/main.c | 2 | ||||
| -rw-r--r-- | fs/dlm/member.c | 44 | ||||
| -rw-r--r-- | fs/dlm/netlink.c | 18 | ||||
| -rw-r--r-- | fs/dlm/plock.c | 18 | ||||
| -rw-r--r-- | fs/dlm/rcom.c | 149 | ||||
| -rw-r--r-- | fs/dlm/rcom.h | 1 | ||||
| -rw-r--r-- | fs/dlm/recover.c | 330 | ||||
| -rw-r--r-- | fs/dlm/recover.h | 2 | ||||
| -rw-r--r-- | fs/dlm/recoverd.c | 73 | ||||
| -rw-r--r-- | fs/dlm/recoverd.h | 1 | ||||
| -rw-r--r-- | fs/dlm/user.c | 32 |
24 files changed, 2241 insertions, 977 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 1897eb1b4b6..e4242c3f848 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -1,6 +1,6 @@ menuconfig DLM tristate "Distributed Lock Manager (DLM)" - depends on EXPERIMENTAL && INET + depends on INET depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) select IP_SCTP help diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 63dc19c54d5..dcea1e37a1b 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -14,9 +14,10 @@ #include "dlm_internal.h" #include "lock.h" #include "user.h" +#include "ast.h" -static uint64_t dlm_cb_seq; -static spinlock_t dlm_cb_seq_spin; +static uint64_t dlm_cb_seq; +static DEFINE_SPINLOCK(dlm_cb_seq_spin); static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) { @@ -267,10 +268,7 @@ void dlm_callback_work(struct work_struct *work) int dlm_callback_start(struct dlm_ls *ls) { ls->ls_callback_wq = alloc_workqueue("dlm_callback", - WQ_UNBOUND | - WQ_MEM_RECLAIM | - WQ_NON_REENTRANT, - 0); + WQ_UNBOUND | WQ_MEM_RECLAIM, 0); if (!ls->ls_callback_wq) { log_print("can't start dlm_callback workqueue"); return -ENOMEM; @@ -311,6 +309,6 @@ void dlm_callback_resume(struct dlm_ls *ls) mutex_unlock(&ls->ls_cb_mutex); if (count) - log_debug(ls, "dlm_callback_resume %d", count); + log_rinfo(ls, "dlm_callback_resume %d", count); } diff --git a/fs/dlm/config.c b/fs/dlm/config.c index e7e327d43fa..d521bddf876 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -96,7 +96,6 @@ struct dlm_cluster { unsigned int cl_tcp_port; unsigned int cl_buffer_size; unsigned int cl_rsbtbl_size; - unsigned int cl_dirtbl_size; unsigned int cl_recover_timer; unsigned int cl_toss_secs; unsigned int cl_scan_secs; @@ -113,7 +112,6 @@ enum { CLUSTER_ATTR_TCP_PORT = 0, CLUSTER_ATTR_BUFFER_SIZE, CLUSTER_ATTR_RSBTBL_SIZE, - CLUSTER_ATTR_DIRTBL_SIZE, CLUSTER_ATTR_RECOVER_TIMER, CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_SCAN_SECS, @@ -140,8 +138,9 @@ static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf) static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl, const char *buf, size_t len) { - strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN); - strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN); + strlcpy(dlm_config.ci_cluster_name, buf, + sizeof(dlm_config.ci_cluster_name)); + strlcpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); return len; } @@ -158,11 +157,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, const char *buf, size_t len) { unsigned int x; + int rc; if (!capable(CAP_SYS_ADMIN)) - return -EACCES; - - x = simple_strtoul(buf, NULL, 0); + return -EPERM; + rc = kstrtouint(buf, 0, &x); + if (rc) + return rc; if (check_zero && !x) return -EINVAL; @@ -189,7 +190,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write) CLUSTER_ATTR(tcp_port, 1); CLUSTER_ATTR(buffer_size, 1); CLUSTER_ATTR(rsbtbl_size, 1); -CLUSTER_ATTR(dirtbl_size, 1); CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(scan_secs, 1); @@ -204,7 +204,6 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, - [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr, [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, @@ -478,7 +477,6 @@ static struct config_group *make_cluster(struct config_group *g, cl->cl_tcp_port = dlm_config.ci_tcp_port; cl->cl_buffer_size = dlm_config.ci_buffer_size; cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; - cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size; cl->cl_recover_timer = dlm_config.ci_recover_timer; cl->cl_toss_secs = dlm_config.ci_toss_secs; cl->cl_scan_secs = dlm_config.ci_scan_secs; @@ -734,7 +732,10 @@ static ssize_t comm_nodeid_read(struct dlm_comm *cm, char *buf) static ssize_t comm_nodeid_write(struct dlm_comm *cm, const char *buf, size_t len) { - cm->nodeid = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &cm->nodeid); + + if (rc) + return rc; return len; } @@ -746,7 +747,10 @@ static ssize_t comm_local_read(struct dlm_comm *cm, char *buf) static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf, size_t len) { - cm->local= simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &cm->local); + + if (rc) + return rc; if (cm->local && !local_comm) local_comm = cm; return len; @@ -755,6 +759,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf, static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) { struct sockaddr_storage *addr; + int rv; if (len != sizeof(struct sockaddr_storage)) return -EINVAL; @@ -767,6 +772,13 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) return -ENOMEM; memcpy(addr, buf, len); + + rv = dlm_lowcomms_addr(cm->nodeid, addr, len); + if (rv) { + kfree(addr); + return rv; + } + cm->addr[cm->addr_count++] = addr; return len; } @@ -842,7 +854,10 @@ static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, size_t len) { uint32_t seq = 0; - nd->nodeid = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &nd->nodeid); + + if (rc) + return rc; dlm_comm_seq(nd->nodeid, &seq); nd->comm_seq = seq; return len; @@ -856,7 +871,10 @@ static ssize_t node_weight_read(struct dlm_node *nd, char *buf) static ssize_t node_weight_write(struct dlm_node *nd, const char *buf, size_t len) { - nd->weight = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &nd->weight); + + if (rc) + return rc; return len; } @@ -883,34 +901,7 @@ static void put_space(struct dlm_space *sp) config_item_put(&sp->group.cg_item); } -static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) -{ - switch (x->ss_family) { - case AF_INET: { - struct sockaddr_in *sinx = (struct sockaddr_in *)x; - struct sockaddr_in *siny = (struct sockaddr_in *)y; - if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) - return 0; - if (sinx->sin_port != siny->sin_port) - return 0; - break; - } - case AF_INET6: { - struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; - struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; - if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) - return 0; - if (sinx->sin6_port != siny->sin6_port) - return 0; - break; - } - default: - return 0; - } - return 1; -} - -static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr) +static struct dlm_comm *get_comm(int nodeid) { struct config_item *i; struct dlm_comm *cm = NULL; @@ -924,19 +915,11 @@ static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr) list_for_each_entry(i, &comm_list->cg_children, ci_entry) { cm = config_item_to_comm(i); - if (nodeid) { - if (cm->nodeid != nodeid) - continue; - found = 1; - config_item_get(i); - break; - } else { - if (!cm->addr_count || !addr_compare(cm->addr[0], addr)) - continue; - found = 1; - config_item_get(i); - break; - } + if (cm->nodeid != nodeid) + continue; + found = 1; + config_item_get(i); + break; } mutex_unlock(&clusters_root.subsys.su_mutex); @@ -1000,7 +983,7 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int dlm_comm_seq(int nodeid, uint32_t *seq) { - struct dlm_comm *cm = get_comm(nodeid, NULL); + struct dlm_comm *cm = get_comm(nodeid); if (!cm) return -EEXIST; *seq = cm->seq; @@ -1008,28 +991,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) return 0; } -int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr) -{ - struct dlm_comm *cm = get_comm(nodeid, NULL); - if (!cm) - return -EEXIST; - if (!cm->addr_count) - return -ENOENT; - memcpy(addr, cm->addr[0], sizeof(*addr)); - put_comm(cm); - return 0; -} - -int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) -{ - struct dlm_comm *cm = get_comm(0, addr); - if (!cm) - return -EEXIST; - *nodeid = cm->nodeid; - put_comm(cm); - return 0; -} - int dlm_our_nodeid(void) { return local_comm ? local_comm->nodeid : 0; @@ -1050,7 +1011,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TCP_PORT 21064 #define DEFAULT_BUFFER_SIZE 4096 #define DEFAULT_RSBTBL_SIZE 1024 -#define DEFAULT_DIRTBL_SIZE 1024 #define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 @@ -1066,7 +1026,6 @@ struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, .ci_buffer_size = DEFAULT_BUFFER_SIZE, .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, - .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE, .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS, diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 9f5e3663bb0..f30697bc278 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -27,7 +27,6 @@ struct dlm_config_info { int ci_tcp_port; int ci_buffer_size; int ci_rsbtbl_size; - int ci_dirtbl_size; int ci_recover_timer; int ci_toss_secs; int ci_scan_secs; @@ -47,8 +46,6 @@ void dlm_config_exit(void); int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int *count_out); int dlm_comm_seq(int nodeid, uint32_t *seq); -int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr); -int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid); int dlm_our_nodeid(void); int dlm_our_addr(struct sockaddr_storage *addr, int num); diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 1c9b08095f9..8d77ba7b175 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -68,7 +68,7 @@ static int print_format1_lock(struct seq_file *s, struct dlm_lkb *lkb, if (lkb->lkb_wait_type) seq_printf(s, " wait_type: %d", lkb->lkb_wait_type); - return seq_printf(s, "\n"); + return seq_puts(s, "\n"); } static int print_format1(struct dlm_rsb *res, struct seq_file *s) @@ -92,31 +92,31 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) } if (res->res_nodeid > 0) - rv = seq_printf(s, "\" \nLocal Copy, Master is node %d\n", + rv = seq_printf(s, "\"\nLocal Copy, Master is node %d\n", res->res_nodeid); else if (res->res_nodeid == 0) - rv = seq_printf(s, "\" \nMaster Copy\n"); + rv = seq_puts(s, "\"\nMaster Copy\n"); else if (res->res_nodeid == -1) - rv = seq_printf(s, "\" \nLooking up master (lkid %x)\n", + rv = seq_printf(s, "\"\nLooking up master (lkid %x)\n", res->res_first_lkid); else - rv = seq_printf(s, "\" \nInvalid master %d\n", + rv = seq_printf(s, "\"\nInvalid master %d\n", res->res_nodeid); if (rv) goto out; /* Print the LVB: */ if (res->res_lvbptr) { - seq_printf(s, "LVB: "); + seq_puts(s, "LVB: "); for (i = 0; i < lvblen; i++) { if (i == lvblen / 2) - seq_printf(s, "\n "); + seq_puts(s, "\n "); seq_printf(s, "%02x ", (unsigned char) res->res_lvbptr[i]); } if (rsb_flag(res, RSB_VALNOTVALID)) - seq_printf(s, " (INVALID)"); - rv = seq_printf(s, "\n"); + seq_puts(s, " (INVALID)"); + rv = seq_puts(s, "\n"); if (rv) goto out; } @@ -133,21 +133,21 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) } /* Print the locks attached to this resource */ - seq_printf(s, "Granted Queue\n"); + seq_puts(s, "Granted Queue\n"); list_for_each_entry(lkb, &res->res_grantqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) goto out; } - seq_printf(s, "Conversion Queue\n"); + seq_puts(s, "Conversion Queue\n"); list_for_each_entry(lkb, &res->res_convertqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) goto out; } - seq_printf(s, "Waiting Queue\n"); + seq_puts(s, "Waiting Queue\n"); list_for_each_entry(lkb, &res->res_waitqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) @@ -157,13 +157,13 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) if (list_empty(&res->res_lookup)) goto out; - seq_printf(s, "Lookup Queue\n"); + seq_puts(s, "Lookup Queue\n"); list_for_each_entry(lkb, &res->res_lookup, lkb_rsb_lookup) { rv = seq_printf(s, "%08x %s", lkb->lkb_id, print_lockmode(lkb->lkb_rqmode)); if (lkb->lkb_wait_type) seq_printf(s, " wait_type: %d", lkb->lkb_wait_type); - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); } out: unlock_rsb(res); @@ -300,7 +300,7 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) else seq_printf(s, " %02x", (unsigned char)r->res_name[i]); } - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); if (rv) goto out; @@ -311,7 +311,7 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) for (i = 0; i < lvblen; i++) seq_printf(s, " %02x", (unsigned char)r->res_lvbptr[i]); - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); if (rv) goto out; @@ -344,6 +344,45 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) return rv; } +static int print_format4(struct dlm_rsb *r, struct seq_file *s) +{ + int our_nodeid = dlm_our_nodeid(); + int print_name = 1; + int i, rv; + + lock_rsb(r); + + rv = seq_printf(s, "rsb %p %d %d %d %d %lu %lx %d ", + r, + r->res_nodeid, + r->res_master_nodeid, + r->res_dir_nodeid, + our_nodeid, + r->res_toss_time, + r->res_flags, + r->res_length); + if (rv) + goto out; + + for (i = 0; i < r->res_length; i++) { + if (!isascii(r->res_name[i]) || !isprint(r->res_name[i])) + print_name = 0; + } + + seq_printf(s, "%s", print_name ? "str " : "hex"); + + for (i = 0; i < r->res_length; i++) { + if (print_name) + seq_printf(s, "%c", r->res_name[i]); + else + seq_printf(s, " %02x", (unsigned char)r->res_name[i]); + } + rv = seq_puts(s, "\n"); + out: + unlock_rsb(r); + return rv; +} + struct rsbtbl_iter { struct dlm_rsb *rsb; unsigned bucket; @@ -382,6 +421,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) } rv = print_format3(ri->rsb, seq); break; + case 4: + if (ri->header) { + seq_printf(seq, "version 4 rsb 2\n"); + ri->header = 0; + } + rv = print_format4(ri->rsb, seq); + break; } return rv; @@ -390,15 +436,18 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) static const struct seq_operations format1_seq_ops; static const struct seq_operations format2_seq_ops; static const struct seq_operations format3_seq_ops; +static const struct seq_operations format4_seq_ops; static void *table_seq_start(struct seq_file *seq, loff_t *pos) { + struct rb_root *tree; struct rb_node *node; struct dlm_ls *ls = seq->private; struct rsbtbl_iter *ri; struct dlm_rsb *r; loff_t n = *pos; unsigned bucket, entry; + int toss = (seq->op == &format4_seq_ops); bucket = n >> 32; entry = n & ((1LL << 32) - 1); @@ -417,11 +466,14 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ri->format = 2; if (seq->op == &format3_seq_ops) ri->format = 3; + if (seq->op == &format4_seq_ops) + ri->format = 4; + + tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { - for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node; - node = rb_next(node)) { + if (!RB_EMPTY_ROOT(tree)) { + for (node = rb_first(tree); node; node = rb_next(node)) { r = rb_entry(node, struct dlm_rsb, res_hashnode); if (!entry--) { dlm_hold_rsb(r); @@ -449,10 +501,11 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) kfree(ri); return NULL; } + tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { - node = rb_first(&ls->ls_rsbtbl[bucket].keep); + if (!RB_EMPTY_ROOT(tree)) { + node = rb_first(tree); r = rb_entry(node, struct dlm_rsb, res_hashnode); dlm_hold_rsb(r); ri->rsb = r; @@ -469,10 +522,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) { struct dlm_ls *ls = seq->private; struct rsbtbl_iter *ri = iter_ptr; + struct rb_root *tree; struct rb_node *next; struct dlm_rsb *r, *rp; loff_t n = *pos; unsigned bucket; + int toss = (seq->op == &format4_seq_ops); bucket = n >> 32; @@ -511,10 +566,11 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) kfree(ri); return NULL; } + tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; spin_lock(&ls->ls_rsbtbl[bucket].lock); - if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) { - next = rb_first(&ls->ls_rsbtbl[bucket].keep); + if (!RB_EMPTY_ROOT(tree)) { + next = rb_first(tree); r = rb_entry(next, struct dlm_rsb, res_hashnode); dlm_hold_rsb(r); ri->rsb = r; @@ -558,9 +614,17 @@ static const struct seq_operations format3_seq_ops = { .show = table_seq_show, }; +static const struct seq_operations format4_seq_ops = { + .start = table_seq_start, + .next = table_seq_next, + .stop = table_seq_stop, + .show = table_seq_show, +}; + static const struct file_operations format1_fops; static const struct file_operations format2_fops; static const struct file_operations format3_fops; +static const struct file_operations format4_fops; static int table_open(struct inode *inode, struct file *file) { @@ -573,6 +637,8 @@ static int table_open(struct inode *inode, struct file *file) ret = seq_open(file, &format2_seq_ops); else if (file->f_op == &format3_fops) ret = seq_open(file, &format3_seq_ops); + else if (file->f_op == &format4_fops) + ret = seq_open(file, &format4_seq_ops); if (ret) return ret; @@ -606,6 +672,14 @@ static const struct file_operations format3_fops = { .release = seq_release }; +static const struct file_operations format4_fops = { + .owner = THIS_MODULE, + .open = table_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release +}; + /* * dump lkb's on the ls_waiters list */ @@ -652,6 +726,8 @@ void dlm_delete_debug_file(struct dlm_ls *ls) debugfs_remove(ls->ls_debug_locks_dentry); if (ls->ls_debug_all_dentry) debugfs_remove(ls->ls_debug_all_dentry); + if (ls->ls_debug_toss_dentry) + debugfs_remove(ls->ls_debug_toss_dentry); } int dlm_create_debug_file(struct dlm_ls *ls) @@ -694,6 +770,19 @@ int dlm_create_debug_file(struct dlm_ls *ls) if (!ls->ls_debug_all_dentry) goto fail; + /* format 4 */ + + memset(name, 0, sizeof(name)); + snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_toss", ls->ls_name); + + ls->ls_debug_toss_dentry = debugfs_create_file(name, + S_IFREG | S_IRUGO, + dlm_root, + ls, + &format4_fops); + if (!ls->ls_debug_toss_dentry) + goto fail; + memset(name, 0, sizeof(name)); snprintf(name, DLM_LOCKSPACE_LEN+8, "%s_waiters", ls->ls_name); diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index dc5eb598b81..d975851a7e1 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -23,50 +23,6 @@ #include "lock.h" #include "dir.h" - -static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) -{ - spin_lock(&ls->ls_recover_list_lock); - list_add(&de->list, &ls->ls_recover_list); - spin_unlock(&ls->ls_recover_list_lock); -} - -static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) -{ - int found = 0; - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - list_for_each_entry(de, &ls->ls_recover_list, list) { - if (de->length == len) { - list_del(&de->list); - de->master_nodeid = 0; - memset(de->name, 0, len); - found = 1; - break; - } - } - spin_unlock(&ls->ls_recover_list_lock); - - if (!found) - de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS); - return de; -} - -void dlm_clear_free_entries(struct dlm_ls *ls) -{ - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - while (!list_empty(&ls->ls_recover_list)) { - de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, - list); - list_del(&de->list); - kfree(de); - } - spin_unlock(&ls->ls_recover_list_lock); -} - /* * We use the upper 16 bits of the hash value to select the directory node. * Low bits are used for distribution of rsb's among hash buckets on each node. @@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls) int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) { - struct list_head *tmp; - struct dlm_member *memb = NULL; - uint32_t node, n = 0; - int nodeid; - - if (ls->ls_num_nodes == 1) { - nodeid = dlm_our_nodeid(); - goto out; - } + uint32_t node; - if (ls->ls_node_array) { + if (ls->ls_num_nodes == 1) + return dlm_our_nodeid(); + else { node = (hash >> 16) % ls->ls_total_weight; - nodeid = ls->ls_node_array[node]; - goto out; - } - - /* make_member_array() failed to kmalloc ls_node_array... */ - - node = (hash >> 16) % ls->ls_num_nodes; - - list_for_each(tmp, &ls->ls_nodes) { - if (n++ != node) - continue; - memb = list_entry(tmp, struct dlm_member, list); - break; + return ls->ls_node_array[node]; } - - DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", - ls->ls_num_nodes, n, node);); - nodeid = memb->nodeid; - out: - return nodeid; } int dlm_dir_nodeid(struct dlm_rsb *r) { - return dlm_hash2nodeid(r->res_ls, r->res_hash); -} - -static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) -{ - uint32_t val; - - val = jhash(name, len, 0); - val &= (ls->ls_dirtbl_size - 1); - - return val; -} - -static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) -{ - uint32_t bucket; - - bucket = dir_hash(ls, de->name, de->length); - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); + return r->res_dir_nodeid; } -static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, - int namelen, uint32_t bucket) +void dlm_recover_dir_nodeid(struct dlm_ls *ls) { - struct dlm_direntry *de; - - list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { - if (de->length == namelen && !memcmp(name, de->name, namelen)) - goto out; - } - de = NULL; - out: - return de; -} - -void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) -{ - struct dlm_direntry *de; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - - de = search_bucket(ls, name, namelen, bucket); - - if (!de) { - log_error(ls, "remove fr %u none", nodeid); - goto out; - } - - if (de->master_nodeid != nodeid) { - log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); - goto out; - } - - list_del(&de->list); - kfree(de); - out: - spin_unlock(&ls->ls_dirtbl[bucket].lock); -} + struct dlm_rsb *r; -void dlm_dir_clear(struct dlm_ls *ls) -{ - struct list_head *head; - struct dlm_direntry *de; - int i; - - DLM_ASSERT(list_empty(&ls->ls_recover_list), ); - - for (i = 0; i < ls->ls_dirtbl_size; i++) { - spin_lock(&ls->ls_dirtbl[i].lock); - head = &ls->ls_dirtbl[i].list; - while (!list_empty(head)) { - de = list_entry(head->next, struct dlm_direntry, list); - list_del(&de->list); - put_free_de(ls, de); - } - spin_unlock(&ls->ls_dirtbl[i].lock); + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); } + up_read(&ls->ls_root_sem); } int dlm_recover_directory(struct dlm_ls *ls) { struct dlm_member *memb; - struct dlm_direntry *de; char *b, *last_name = NULL; - int error = -ENOMEM, last_len, count = 0; + int error = -ENOMEM, last_len, nodeid, result; uint16_t namelen; + unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; - log_debug(ls, "dlm_recover_directory"); + log_rinfo(ls, "dlm_recover_directory"); if (dlm_no_directory(ls)) goto out_status; - dlm_dir_clear(ls); - last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); if (!last_name) goto out; list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->nodeid == dlm_our_nodeid()) + continue; + memset(last_name, 0, DLM_RESNAME_MAXLEN); last_len = 0; @@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls) if (error) goto out_free; - schedule(); + cond_resched(); /* * pick namelen/name pairs out of received buffer @@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls) if (namelen > DLM_RESNAME_MAXLEN) goto out_free; - error = -ENOMEM; - de = get_free_de(ls, namelen); - if (!de) + error = dlm_master_lookup(ls, memb->nodeid, + b, namelen, + DLM_LU_RECOVER_DIR, + &nodeid, &result); + if (error) { + log_error(ls, "recover_dir lookup %d", + error); goto out_free; + } + + /* The name was found in rsbtbl, but the + * master nodeid is different from + * memb->nodeid which says it is the master. + * This should not happen. */ + + if (result == DLM_LU_MATCH && + nodeid != memb->nodeid) { + count_bad++; + log_error(ls, "recover_dir lookup %d " + "nodeid %d memb %d bad %u", + result, nodeid, memb->nodeid, + count_bad); + print_hex_dump_bytes("dlm_recover_dir ", + DUMP_PREFIX_NONE, + b, namelen); + } + + /* The name was found in rsbtbl, and the + * master nodeid matches memb->nodeid. */ + + if (result == DLM_LU_MATCH && + nodeid == memb->nodeid) { + count_match++; + } + + /* The name was not found in rsbtbl and was + * added with memb->nodeid as the master. */ + + if (result == DLM_LU_ADD) { + count_add++; + } - de->master_nodeid = memb->nodeid; - de->length = namelen; last_len = namelen; - memcpy(de->name, b, namelen); memcpy(last_name, b, namelen); b += namelen; left -= namelen; - - add_entry_to_hash(ls, de); count++; } } - done: + done: ; } out_status: error = 0; - log_debug(ls, "dlm_recover_directory %d entries", count); + dlm_set_recover_status(ls, DLM_RS_DIR); + + log_rinfo(ls, "dlm_recover_directory %u in %u new", + count, count_add); out_free: kfree(last_name); out: - dlm_clear_free_entries(ls); return error; } -static int get_entry(struct dlm_ls *ls, int nodeid, char *name, - int namelen, int *r_nodeid) -{ - struct dlm_direntry *de, *tmp; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - de = search_bucket(ls, name, namelen, bucket); - if (de) { - *r_nodeid = de->master_nodeid; - spin_unlock(&ls->ls_dirtbl[bucket].lock); - if (*r_nodeid == nodeid) - return -EEXIST; - return 0; - } - - spin_unlock(&ls->ls_dirtbl[bucket].lock); - - if (namelen > DLM_RESNAME_MAXLEN) - return -EINVAL; - - de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS); - if (!de) - return -ENOMEM; - - de->master_nodeid = nodeid; - de->length = namelen; - memcpy(de->name, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - tmp = search_bucket(ls, name, namelen, bucket); - if (tmp) { - kfree(de); - de = tmp; - } else { - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); - } - *r_nodeid = de->master_nodeid; - spin_unlock(&ls->ls_dirtbl[bucket].lock); - return 0; -} - -int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, - int *r_nodeid) -{ - return get_entry(ls, nodeid, name, namelen, r_nodeid); -} - static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; @@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) bucket = hash & (ls->ls_rsbtbl_size - 1); spin_lock(&ls->ls_rsbtbl[bucket].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); if (rv) rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, - name, len, 0, &r); + name, len, &r); spin_unlock(&ls->ls_rsbtbl[bucket].lock); if (!rv) @@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) list_for_each_entry(r, &ls->ls_root_list, res_root_list) { if (len == r->res_length && !memcmp(name, r->res_name, len)) { up_read(&ls->ls_root_sem); - log_error(ls, "find_rsb_root revert to root_list %s", + log_debug(ls, "find_rsb_root revert to root_list %s", r->res_name); return r; } @@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, be_namelen = cpu_to_be16(0); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; goto out; } @@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, offset += sizeof(__be16); memcpy(outbuf + offset, r->res_name, r->res_length); offset += r->res_length; + ls->ls_recover_dir_sent_res++; } /* @@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, be_namelen = cpu_to_be16(0xFFFF); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; } - out: up_read(&ls->ls_root_sem); } diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h index 0b0eb1267b6..41750634445 100644 --- a/fs/dlm/dir.h +++ b/fs/dlm/dir.h @@ -14,15 +14,10 @@ #ifndef __DIR_DOT_H__ #define __DIR_DOT_H__ - int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); -void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int len); -void dlm_dir_clear(struct dlm_ls *ls); -void dlm_clear_free_entries(struct dlm_ls *ls); +void dlm_recover_dir_nodeid(struct dlm_ls *ls); int dlm_recover_directory(struct dlm_ls *ls); -int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, - int *r_nodeid); void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, char *outbuf, int outlen, int nodeid); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index bc342f7ac3a..5eff6ea3e27 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -55,8 +55,6 @@ struct dlm_lkb; struct dlm_rsb; struct dlm_member; struct dlm_rsbtable; -struct dlm_dirtable; -struct dlm_direntry; struct dlm_recover; struct dlm_header; struct dlm_message; @@ -67,6 +65,8 @@ struct dlm_mhandle; printk(KERN_ERR "dlm: "fmt"\n" , ##args) #define log_error(ls, fmt, args...) \ printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args) +#define log_rinfo(ls, fmt, args...) \ + printk(KERN_INFO "dlm: %s: " fmt "\n", (ls)->ls_name , ##args); #define log_debug(ls, fmt, args...) \ do { \ @@ -98,22 +98,13 @@ do { \ } -struct dlm_direntry { - struct list_head list; - uint32_t master_nodeid; - uint16_t length; - char name[1]; -}; - -struct dlm_dirtable { - struct list_head list; - spinlock_t lock; -}; +#define DLM_RTF_SHRINK 0x00000001 struct dlm_rsbtable { struct rb_root keep; struct rb_root toss; spinlock_t lock; + uint32_t flags; }; @@ -283,6 +274,15 @@ struct dlm_lkb { }; }; +/* + * res_master_nodeid is "normal": 0 is unset/invalid, non-zero is the real + * nodeid, even when nodeid is our_nodeid. + * + * res_nodeid is "odd": -1 is unset/invalid, zero means our_nodeid, + * greater than zero when another nodeid. + * + * (TODO: remove res_nodeid and only use res_master_nodeid) + */ struct dlm_rsb { struct dlm_ls *res_ls; /* the lockspace */ @@ -291,6 +291,9 @@ struct dlm_rsb { unsigned long res_flags; int res_length; /* length of rsb name */ int res_nodeid; + int res_master_nodeid; + int res_dir_nodeid; + int res_id; /* for ls_recover_idr */ uint32_t res_lvbseq; uint32_t res_hash; uint32_t res_bucket; /* rsbtbl */ @@ -313,10 +316,21 @@ struct dlm_rsb { char res_name[DLM_RESNAME_MAXLEN+1]; }; +/* dlm_master_lookup() flags */ + +#define DLM_LU_RECOVER_DIR 1 +#define DLM_LU_RECOVER_MASTER 2 + +/* dlm_master_lookup() results */ + +#define DLM_LU_MATCH 1 +#define DLM_LU_ADD 2 + /* find_rsb() flags */ -#define R_MASTER 1 /* only return rsb if it's a master */ -#define R_CREATE 2 /* create/add rsb if not found */ +#define R_REQUEST 0x00000001 +#define R_RECEIVE_REQUEST 0x00000002 +#define R_RECEIVE_RECOVER 0x00000004 /* rsb_flags */ @@ -328,6 +342,7 @@ enum rsb_flags { RSB_NEW_MASTER2, RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, + RSB_RECOVER_LVB_INVAL, }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -489,6 +504,13 @@ struct rcom_lock { char rl_lvb[0]; }; +/* + * The max number of resources per rsbtbl bucket that shrink will attempt + * to remove in each iteration. + */ + +#define DLM_REMOVE_NAMES_MAX 8 + struct dlm_ls { struct list_head ls_list; /* list of lockspaces */ dlm_lockspace_t *ls_local_handle; @@ -509,9 +531,6 @@ struct dlm_ls { struct dlm_rsbtable *ls_rsbtbl; uint32_t ls_rsbtbl_size; - struct dlm_dirtable *ls_dirtbl; - uint32_t ls_dirtbl_size; - struct mutex ls_waiters_mutex; struct list_head ls_waiters; /* lkbs needing a reply */ @@ -525,6 +544,12 @@ struct dlm_ls { int ls_new_rsb_count; struct list_head ls_new_rsb; /* new rsb structs */ + spinlock_t ls_remove_spin; + char ls_remove_name[DLM_RESNAME_MAXLEN+1]; + char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; + int ls_remove_len; + int ls_remove_lens[DLM_REMOVE_NAMES_MAX]; + struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ @@ -545,6 +570,7 @@ struct dlm_ls { struct dentry *ls_debug_waiters_dentry; /* debugfs */ struct dentry *ls_debug_locks_dentry; /* debugfs */ struct dentry *ls_debug_all_dentry; /* debugfs */ + struct dentry *ls_debug_toss_dentry; /* debugfs */ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ int ls_uevent_result; @@ -573,13 +599,18 @@ struct dlm_ls { struct mutex ls_requestqueue_mutex; struct dlm_rcom *ls_recover_buf; int ls_recover_nodeid; /* for debugging */ + unsigned int ls_recover_dir_sent_res; /* for log info */ + unsigned int ls_recover_dir_sent_msg; /* for log info */ unsigned int ls_recover_locks_in; /* for log info */ uint64_t ls_rcom_seq; spinlock_t ls_rcom_spin; struct list_head ls_recover_list; spinlock_t ls_recover_list_lock; int ls_recover_list_count; + struct idr ls_recover_idr; + spinlock_t ls_recover_idr_lock; wait_queue_head_t ls_wait_general; + wait_queue_head_t ls_recover_lock_wait; struct mutex ls_clear_proc_locks; struct list_head ls_root_list; /* root resources */ @@ -592,15 +623,40 @@ struct dlm_ls { char ls_name[1]; }; -#define LSFL_WORK 0 -#define LSFL_RUNNING 1 -#define LSFL_RECOVERY_STOP 2 -#define LSFL_RCOM_READY 3 -#define LSFL_RCOM_WAIT 4 -#define LSFL_UEVENT_WAIT 5 -#define LSFL_TIMEWARN 6 -#define LSFL_CB_DELAY 7 -#define LSFL_NODIR 8 +/* + * LSFL_RECOVER_STOP - dlm_ls_stop() sets this to tell dlm recovery routines + * that they should abort what they're doing so new recovery can be started. + * + * LSFL_RECOVER_DOWN - dlm_ls_stop() sets this to tell dlm_recoverd that it + * should do down_write() on the in_recovery rw_semaphore. (doing down_write + * within dlm_ls_stop causes complaints about the lock acquired/released + * in different contexts.) + * + * LSFL_RECOVER_LOCK - dlm_recoverd holds the in_recovery rw_semaphore. + * It sets this after it is done with down_write() on the in_recovery + * rw_semaphore and clears it after it has released the rw_semaphore. + * + * LSFL_RECOVER_WORK - dlm_ls_start() sets this to tell dlm_recoverd that it + * should begin recovery of the lockspace. + * + * LSFL_RUNNING - set when normal locking activity is enabled. + * dlm_ls_stop() clears this to tell dlm locking routines that they should + * quit what they are doing so recovery can run. dlm_recoverd sets + * this after recovery is finished. + */ + +#define LSFL_RECOVER_STOP 0 +#define LSFL_RECOVER_DOWN 1 +#define LSFL_RECOVER_LOCK 2 +#define LSFL_RECOVER_WORK 3 +#define LSFL_RUNNING 4 + +#define LSFL_RCOM_READY 5 +#define LSFL_RCOM_WAIT 6 +#define LSFL_UEVENT_WAIT 7 +#define LSFL_TIMEWARN 8 +#define LSFL_CB_DELAY 9 +#define LSFL_NODIR 10 /* much of this is just saving user space pointers associated with the lock that we pass back to the user lib with an ast */ @@ -643,7 +699,7 @@ static inline int dlm_locking_stopped(struct dlm_ls *ls) static inline int dlm_recovery_stopped(struct dlm_ls *ls) { - return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + return test_bit(LSFL_RECOVER_STOP, &ls->ls_flags); } static inline int dlm_no_directory(struct dlm_ls *ls) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index bdafb65a523..83f3d552030 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -90,6 +90,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, static int receive_extralen(struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void del_timeout(struct dlm_lkb *lkb); +static void toss_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -170,9 +171,11 @@ void dlm_print_lkb(struct dlm_lkb *lkb) static void dlm_print_rsb(struct dlm_rsb *r) { - printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", - r->res_nodeid, r->res_flags, r->res_first_lkid, - r->res_recover_locks_count, r->res_name); + printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " + "rlc %d name %s\n", + r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, + r->res_flags, r->res_first_lkid, r->res_recover_locks_count, + r->res_name); } void dlm_dump_rsb(struct dlm_rsb *r) @@ -327,6 +330,37 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ +/* This is only called to add a reference when the code already holds + a valid reference to the rsb, so there's no need for locking. */ + +static inline void hold_rsb(struct dlm_rsb *r) +{ + kref_get(&r->res_ref); +} + +void dlm_hold_rsb(struct dlm_rsb *r) +{ + hold_rsb(r); +} + +/* When all references to the rsb are gone it's transferred to + the tossed list for later disposal. */ + +static void put_rsb(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + uint32_t bucket = r->res_bucket; + + spin_lock(&ls->ls_rsbtbl[bucket].lock); + kref_put(&r->res_ref, toss_rsb); + spin_unlock(&ls->ls_rsbtbl[bucket].lock); +} + +void dlm_put_rsb(struct dlm_rsb *r) +{ + put_rsb(r); +} + static int pre_rsb_struct(struct dlm_ls *ls) { struct dlm_rsb *r1, *r2; @@ -411,11 +445,10 @@ static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) } int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, - unsigned int flags, struct dlm_rsb **r_ret) + struct dlm_rsb **r_ret) { struct rb_node *node = tree->rb_node; struct dlm_rsb *r; - int error = 0; int rc; while (node) { @@ -432,10 +465,8 @@ int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, return -EBADR; found: - if (r->res_nodeid && (flags & R_MASTER)) - error = -ENOTBLK; *r_ret = r; - return error; + return 0; } static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) @@ -467,124 +498,588 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) return 0; } -static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, - unsigned int flags, struct dlm_rsb **r_ret) +/* + * Find rsb in rsbtbl and potentially create/add one + * + * Delaying the release of rsb's has a similar benefit to applications keeping + * NL locks on an rsb, but without the guarantee that the cached master value + * will still be valid when the rsb is reused. Apps aren't always smart enough + * to keep NL locks on an rsb that they may lock again shortly; this can lead + * to excessive master lookups and removals if we don't delay the release. + * + * Searching for an rsb means looking through both the normal list and toss + * list. When found on the toss list the rsb is moved to the normal list with + * ref count of 1; when found on normal list the ref count is incremented. + * + * rsb's on the keep list are being used locally and refcounted. + * rsb's on the toss list are not being used locally, and are not refcounted. + * + * The toss list rsb's were either + * - previously used locally but not any more (were on keep list, then + * moved to toss list when last refcount dropped) + * - created and put on toss list as a directory record for a lookup + * (we are the dir node for the res, but are not using the res right now, + * but some other node is) + * + * The purpose of find_rsb() is to return a refcounted rsb for local use. + * So, if the given rsb is on the toss list, it is moved to the keep list + * before being returned. + * + * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * more refcounts exist, so the rsb is moved from the keep list to the + * toss list. + * + * rsb's on both keep and toss lists are used for doing a name to master + * lookups. rsb's that are in use locally (and being refcounted) are on + * the keep list, rsb's that are not in use locally (not refcounted) and + * only exist for name/master lookups are on the toss list. + * + * rsb's on the toss list who's dir_nodeid is not local can have stale + * name/master mappings. So, remote requests on such rsb's can potentially + * return with an error, which means the mapping is stale and needs to + * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and + * first_lkid is to keep only a single outstanding request on an rsb + * while that rsb has a potentially stale master.) + */ + +static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, + uint32_t hash, uint32_t b, + int dir_nodeid, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) { - struct dlm_rsb *r; + struct dlm_rsb *r = NULL; + int our_nodeid = dlm_our_nodeid(); + int from_local = 0; + int from_other = 0; + int from_dir = 0; + int create = 0; int error; - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r); - if (!error) { - kref_get(&r->res_ref); - goto out; + if (flags & R_RECEIVE_REQUEST) { + if (from_nodeid == dir_nodeid) + from_dir = 1; + else + from_other = 1; + } else if (flags & R_REQUEST) { + from_local = 1; + } + + /* + * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so + * from_nodeid has sent us a lock in dlm_recover_locks, believing + * we're the new master. Our local recovery may not have set + * res_master_nodeid to our_nodeid yet, so allow either. Don't + * create the rsb; dlm_recover_process_copy() will handle EBADR + * by resending. + * + * If someone sends us a request, we are the dir node, and we do + * not find the rsb anywhere, then recreate it. This happens if + * someone sends us a request after we have removed/freed an rsb + * from our toss list. (They sent a request instead of lookup + * because they are using an rsb from their toss list.) + */ + + if (from_local || from_dir || + (from_other && (dir_nodeid == our_nodeid))) { + create = 1; } - if (error == -ENOTBLK) - goto out; - error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); + retry: + if (create) { + error = pre_rsb_struct(ls); + if (error < 0) + goto out; + } + + spin_lock(&ls->ls_rsbtbl[b].lock); + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); if (error) - goto out; + goto do_toss; + + /* + * rsb is active, so we can't check master_nodeid without lock_rsb. + */ - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); - error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + kref_get(&r->res_ref); + error = 0; + goto out_unlock; + + + do_toss: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); if (error) - return error; + goto do_new; - if (dlm_no_directory(ls)) - goto out; + /* + * rsb found inactive (master_nodeid may be out of date unless + * we are the dir_nodeid or were the master) No other thread + * is using this rsb because it's on the toss list, so we can + * look at or update res_master_nodeid without lock_rsb. + */ - if (r->res_nodeid == -1) { + if ((r->res_master_nodeid != our_nodeid) && from_other) { + /* our rsb was not master, and another node (not the dir node) + has sent us a request */ + log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + from_nodeid, r->res_master_nodeid, dir_nodeid, + r->res_name); + error = -ENOTBLK; + goto out_unlock; + } + + if ((r->res_master_nodeid != our_nodeid) && from_dir) { + /* don't think this should ever happen */ + log_error(ls, "find_rsb toss from_dir %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + /* fix it and go on */ + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; - } else if (r->res_nodeid > 0) { + } + + if (from_local && (r->res_master_nodeid != our_nodeid)) { + /* Because we have held no locks on this rsb, + res_master_nodeid could have become stale. */ rsb_set_flag(r, RSB_MASTER_UNCERTAIN); r->res_first_lkid = 0; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + goto out_unlock; + + + do_new: + /* + * rsb not found + */ + + if (error == -EBADR && !create) + goto out_unlock; + + error = get_rsb_struct(ls, name, len, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; + } + if (error) + goto out_unlock; + + r->res_hash = hash; + r->res_bucket = b; + r->res_dir_nodeid = dir_nodeid; + kref_init(&r->res_ref); + + if (from_dir) { + /* want to see how often this happens */ + log_debug(ls, "find_rsb new from_dir %d recreate %s", + from_nodeid, r->res_name); + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; + goto out_add; + } + + if (from_other && (dir_nodeid != our_nodeid)) { + /* should never happen */ + log_error(ls, "find_rsb new from_other %d dir %d our %d %s", + from_nodeid, dir_nodeid, our_nodeid, r->res_name); + dlm_free_rsb(r); + r = NULL; + error = -ENOTBLK; + goto out_unlock; + } + + if (from_other) { + log_debug(ls, "find_rsb new from_other %d dir %d %s", + from_nodeid, dir_nodeid, r->res_name); + } + + if (dir_nodeid == our_nodeid) { + /* When we are the dir nodeid, we can set the master + node immediately */ + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; } else { - DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); - DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); + /* set_master will send_lookup to dir_nodeid */ + r->res_master_nodeid = 0; + r->res_nodeid = -1; + } + + out_add: + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + out_unlock: + spin_unlock(&ls->ls_rsbtbl[b].lock); + out: + *r_ret = r; + return error; +} + +/* During recovery, other nodes can send us new MSTCPY locks (from + dlm_recover_locks) before we've made ourself master (in + dlm_recover_masters). */ + +static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, + uint32_t hash, uint32_t b, + int dir_nodeid, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) +{ + struct dlm_rsb *r = NULL; + int our_nodeid = dlm_our_nodeid(); + int recover = (flags & R_RECEIVE_RECOVER); + int error; + + retry: + error = pre_rsb_struct(ls); + if (error < 0) + goto out; + + spin_lock(&ls->ls_rsbtbl[b].lock); + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (error) + goto do_toss; + + /* + * rsb is active, so we can't check master_nodeid without lock_rsb. + */ + + kref_get(&r->res_ref); + goto out_unlock; + + + do_toss: + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto do_new; + + /* + * rsb found inactive. No other thread is using this rsb because + * it's on the toss list, so we can look at or update + * res_master_nodeid without lock_rsb. + */ + + if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { + /* our rsb is not master, and another node has sent us a + request; this should never happen */ + log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + from_nodeid, r->res_master_nodeid, dir_nodeid); + dlm_print_rsb(r); + error = -ENOTBLK; + goto out_unlock; + } + + if (!recover && (r->res_master_nodeid != our_nodeid) && + (dir_nodeid == our_nodeid)) { + /* our rsb is not master, and we are dir; may as well fix it; + this should never happen */ + log_error(ls, "find_rsb toss our %d master %d dir %d", + our_nodeid, r->res_master_nodeid, dir_nodeid); + dlm_print_rsb(r); + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + goto out_unlock; + + + do_new: + /* + * rsb not found + */ + + error = get_rsb_struct(ls, name, len, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; } + if (error) + goto out_unlock; + + r->res_hash = hash; + r->res_bucket = b; + r->res_dir_nodeid = dir_nodeid; + r->res_master_nodeid = dir_nodeid; + r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; + kref_init(&r->res_ref); + + error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); + out_unlock: + spin_unlock(&ls->ls_rsbtbl[b].lock); out: *r_ret = r; return error; } +static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, + unsigned int flags, struct dlm_rsb **r_ret) +{ + uint32_t hash, b; + int dir_nodeid; + + if (len > DLM_RESNAME_MAXLEN) + return -EINVAL; + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + dir_nodeid = dlm_hash2nodeid(ls, hash); + + if (dlm_no_directory(ls)) + return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, + from_nodeid, flags, r_ret); + else + return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, + from_nodeid, flags, r_ret); +} + +/* we have received a request and found that res_master_nodeid != our_nodeid, + so we need to return an error or make ourself the master */ + +static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, + int from_nodeid) +{ + if (dlm_no_directory(ls)) { + log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", + from_nodeid, r->res_master_nodeid, + r->res_dir_nodeid); + dlm_print_rsb(r); + return -ENOTBLK; + } + + if (from_nodeid != r->res_dir_nodeid) { + /* our rsb is not master, and another node (not the dir node) + has sent us a request. this is much more common when our + master_nodeid is zero, so limit debug to non-zero. */ + + if (r->res_master_nodeid) { + log_debug(ls, "validate master from_other %d master %d " + "dir %d first %x %s", from_nodeid, + r->res_master_nodeid, r->res_dir_nodeid, + r->res_first_lkid, r->res_name); + } + return -ENOTBLK; + } else { + /* our rsb is not master, but the dir nodeid has sent us a + request; this could happen with master 0 / res_nodeid -1 */ + + if (r->res_master_nodeid) { + log_error(ls, "validate master from_dir %d master %d " + "first %x %s", + from_nodeid, r->res_master_nodeid, + r->res_first_lkid, r->res_name); + } + + r->res_master_nodeid = dlm_our_nodeid(); + r->res_nodeid = 0; + return 0; + } +} + /* - * Find rsb in rsbtbl and potentially create/add one + * We're the dir node for this res and another node wants to know the + * master nodeid. During normal operation (non recovery) this is only + * called from receive_lookup(); master lookups when the local node is + * the dir node are done by find_rsb(). * - * Delaying the release of rsb's has a similar benefit to applications keeping - * NL locks on an rsb, but without the guarantee that the cached master value - * will still be valid when the rsb is reused. Apps aren't always smart enough - * to keep NL locks on an rsb that they may lock again shortly; this can lead - * to excessive master lookups and removals if we don't delay the release. + * normal operation, we are the dir node for a resource + * . _request_lock + * . set_master + * . send_lookup + * . receive_lookup + * . dlm_master_lookup flags 0 * - * Searching for an rsb means looking through both the normal list and toss - * list. When found on the toss list the rsb is moved to the normal list with - * ref count of 1; when found on normal list the ref count is incremented. + * recover directory, we are rebuilding dir for all resources + * . dlm_recover_directory + * . dlm_rcom_names + * remote node sends back the rsb names it is master of and we are dir of + * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) + * we either create new rsb setting remote node as master, or find existing + * rsb and set master to be the remote node. + * + * recover masters, we are finding the new master for resources + * . dlm_recover_masters + * . recover_master + * . dlm_send_rcom_lookup + * . receive_rcom_lookup + * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -static int find_rsb(struct dlm_ls *ls, char *name, int namelen, - unsigned int flags, struct dlm_rsb **r_ret) +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, + unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; - uint32_t hash, bucket; - int error; + uint32_t hash, b; + int from_master = (flags & DLM_LU_RECOVER_DIR); + int fix_master = (flags & DLM_LU_RECOVER_MASTER); + int our_nodeid = dlm_our_nodeid(); + int dir_nodeid, error, toss_list = 0; - if (namelen > DLM_RESNAME_MAXLEN) { - error = -EINVAL; - goto out; + if (len > DLM_RESNAME_MAXLEN) + return -EINVAL; + + if (from_nodeid == our_nodeid) { + log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", + our_nodeid, flags); + return -EINVAL; } - if (dlm_no_directory(ls)) - flags |= R_CREATE; + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); - hash = jhash(name, namelen, 0); - bucket = hash & (ls->ls_rsbtbl_size - 1); + dir_nodeid = dlm_hash2nodeid(ls, hash); + if (dir_nodeid != our_nodeid) { + log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", + from_nodeid, dir_nodeid, our_nodeid, hash, + ls->ls_num_nodes); + *r_nodeid = -1; + return -EINVAL; + } retry: - if (flags & R_CREATE) { - error = pre_rsb_struct(ls); - if (error < 0) - goto out; + error = pre_rsb_struct(ls); + if (error < 0) + return error; + + spin_lock(&ls->ls_rsbtbl[b].lock); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!error) { + /* because the rsb is active, we need to lock_rsb before + checking/changing re_master_nodeid */ + + hold_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + lock_rsb(r); + goto found; } - spin_lock(&ls->ls_rsbtbl[bucket].lock); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto not_found; - error = _search_rsb(ls, name, namelen, bucket, flags, &r); - if (!error) - goto out_unlock; + /* because the rsb is inactive (on toss list), it's not refcounted + and lock_rsb is not used, but is protected by the rsbtbl lock */ - if (error == -EBADR && !(flags & R_CREATE)) - goto out_unlock; + toss_list = 1; + found: + if (r->res_dir_nodeid != our_nodeid) { + /* should not happen, but may as well fix it and carry on */ + log_error(ls, "dlm_master_lookup res_dir %d our %d %s", + r->res_dir_nodeid, our_nodeid, r->res_name); + r->res_dir_nodeid = our_nodeid; + } + + if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { + /* Recovery uses this function to set a new master when + the previous master failed. Setting NEW_MASTER will + force dlm_recover_masters to call recover_master on this + rsb even though the res_nodeid is no longer removed. */ + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + + if (toss_list) { + /* I don't think we should ever find it on toss list. */ + log_error(ls, "dlm_master_lookup fix_master on toss"); + dlm_dump_rsb(r); + } + } - /* the rsb was found but wasn't a master copy */ - if (error == -ENOTBLK) - goto out_unlock; + if (from_master && (r->res_master_nodeid != from_nodeid)) { + /* this will happen if from_nodeid became master during + a previous recovery cycle, and we aborted the previous + cycle before recovering this master value */ + + log_limit(ls, "dlm_master_lookup from_master %d " + "master_nodeid %d res_nodeid %d first %x %s", + from_nodeid, r->res_master_nodeid, r->res_nodeid, + r->res_first_lkid, r->res_name); + + if (r->res_master_nodeid == our_nodeid) { + log_error(ls, "from_master %d our_master", from_nodeid); + dlm_dump_rsb(r); + dlm_send_rcom_lookup_dump(r, from_nodeid); + goto out_found; + } + + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + rsb_set_flag(r, RSB_NEW_MASTER); + } + + if (!r->res_master_nodeid) { + /* this will happen if recovery happens while we're looking + up the master for this rsb */ + + log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", + from_nodeid, r->res_first_lkid, r->res_name); + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + } + + if (!from_master && !fix_master && + (r->res_master_nodeid == from_nodeid)) { + /* this can happen when the master sends remove, the dir node + finds the rsb on the keep list and ignores the remove, + and the former master sends a lookup */ + + log_limit(ls, "dlm_master_lookup from master %d flags %x " + "first %x %s", from_nodeid, flags, + r->res_first_lkid, r->res_name); + } + + out_found: + *r_nodeid = r->res_master_nodeid; + if (result) + *result = DLM_LU_MATCH; + + if (toss_list) { + r->res_toss_time = jiffies; + /* the rsb was inactive (on toss list) */ + spin_unlock(&ls->ls_rsbtbl[b].lock); + } else { + /* the rsb was active */ + unlock_rsb(r); + put_rsb(r); + } + return 0; - error = get_rsb_struct(ls, name, namelen, &r); + not_found: + error = get_rsb_struct(ls, name, len, &r); if (error == -EAGAIN) { - spin_unlock(&ls->ls_rsbtbl[bucket].lock); + spin_unlock(&ls->ls_rsbtbl[b].lock); goto retry; } if (error) goto out_unlock; r->res_hash = hash; - r->res_bucket = bucket; - r->res_nodeid = -1; + r->res_bucket = b; + r->res_dir_nodeid = our_nodeid; + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; kref_init(&r->res_ref); + r->res_toss_time = jiffies; - /* With no directory, the master can be set immediately */ - if (dlm_no_directory(ls)) { - int nodeid = dlm_dir_nodeid(r); - if (nodeid == dlm_our_nodeid()) - nodeid = 0; - r->res_nodeid = nodeid; + error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); + if (error) { + /* should never happen */ + dlm_free_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + goto retry; } - error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep); + + if (result) + *result = DLM_LU_ADD; + *r_nodeid = from_nodeid; + error = 0; out_unlock: - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - out: - *r_ret = r; + spin_unlock(&ls->ls_rsbtbl[b].lock); return error; } @@ -605,17 +1100,27 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) } } -/* This is only called to add a reference when the code already holds - a valid reference to the rsb, so there's no need for locking. */ - -static inline void hold_rsb(struct dlm_rsb *r) +void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) { - kref_get(&r->res_ref); -} + struct dlm_rsb *r = NULL; + uint32_t hash, b; + int error; -void dlm_hold_rsb(struct dlm_rsb *r) -{ - hold_rsb(r); + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + spin_lock(&ls->ls_rsbtbl[b].lock); + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!error) + goto out_dump; + + error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (error) + goto out; + out_dump: + dlm_dump_rsb(r); + out: + spin_unlock(&ls->ls_rsbtbl[b].lock); } static void toss_rsb(struct kref *kref) @@ -628,30 +1133,13 @@ static void toss_rsb(struct kref *kref) rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; + ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; } } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ - -static void put_rsb(struct dlm_rsb *r) -{ - struct dlm_ls *ls = r->res_ls; - uint32_t bucket = r->res_bucket; - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - kref_put(&r->res_ref, toss_rsb); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); -} - -void dlm_put_rsb(struct dlm_rsb *r) -{ - put_rsb(r); -} - /* See comment for unhold_lkb */ static void unhold_rsb(struct dlm_rsb *r) @@ -696,7 +1184,7 @@ static void detach_lkb(struct dlm_lkb *lkb) static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - int rv, id; + int rv; lkb = dlm_allocate_lkb(ls); if (!lkb) @@ -712,19 +1200,13 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) mutex_init(&lkb->lkb_cb_mutex); INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - retry: - rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS); - if (!rv) - return -ENOMEM; - + idr_preload(GFP_NOFS); spin_lock(&ls->ls_lkbidr_spin); - rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id); - if (!rv) - lkb->lkb_id = id; + rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT); + if (rv >= 0) + lkb->lkb_id = rv; spin_unlock(&ls->ls_lkbidr_spin); - - if (rv == -EAGAIN) - goto retry; + idr_preload_end(); if (rv < 0) { log_error(ls, "create_lkb idr error %d", rv); @@ -1138,61 +1620,184 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -static void dir_remove(struct dlm_rsb *r) +/* If there's an rsb for the same resource being removed, ensure + that the remove message is sent before the new lookup message. + It should be rare to need a delay here, but if not, then it may + be worthwhile to add a proper wait mechanism rather than a delay. */ + +static void wait_pending_remove(struct dlm_rsb *r) { - int to_nodeid; + struct dlm_ls *ls = r->res_ls; + restart: + spin_lock(&ls->ls_remove_spin); + if (ls->ls_remove_len && + !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { + log_debug(ls, "delay lookup for remove dir %d %s", + r->res_dir_nodeid, r->res_name); + spin_unlock(&ls->ls_remove_spin); + msleep(1); + goto restart; + } + spin_unlock(&ls->ls_remove_spin); +} + +/* + * ls_remove_spin protects ls_remove_name and ls_remove_len which are + * read by other threads in wait_pending_remove. ls_remove_names + * and ls_remove_lens are only used by the scan thread, so they do + * not need protection. + */ + +static void shrink_bucket(struct dlm_ls *ls, int b) +{ + struct rb_node *n, *next; + struct dlm_rsb *r; + char *name; + int our_nodeid = dlm_our_nodeid(); + int remote_count = 0; + int need_shrink = 0; + int i, len, rv; + + memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - if (dlm_no_directory(r->res_ls)) + spin_lock(&ls->ls_rsbtbl[b].lock); + + if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); return; + } - to_nodeid = dlm_dir_nodeid(r); - if (to_nodeid != dlm_our_nodeid()) - send_remove(r); + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { + next = rb_next(n); + r = rb_entry(n, struct dlm_rsb, res_hashnode); + + /* If we're the directory record for this rsb, and + we're not the master of it, then we need to wait + for the master node to send us a dir remove for + before removing the dir record. */ + + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)) { + continue; + } + + need_shrink = 1; + + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { + continue; + } + + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) { + + /* We're the master of this rsb but we're not + the directory record, so we need to tell the + dir node to remove the dir record. */ + + ls->ls_remove_lens[remote_count] = r->res_length; + memcpy(ls->ls_remove_names[remote_count], r->res_name, + DLM_RESNAME_MAXLEN); + remote_count++; + + if (remote_count >= DLM_REMOVE_NAMES_MAX) + break; + continue; + } + + if (!kref_put(&r->res_ref, kill_rsb)) { + log_error(ls, "tossed rsb in use %s", r->res_name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + dlm_free_rsb(r); + } + + if (need_shrink) + ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; else - dlm_dir_remove_entry(r->res_ls, to_nodeid, - r->res_name, r->res_length); -} + ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; + spin_unlock(&ls->ls_rsbtbl[b].lock); -/* FIXME: make this more efficient */ + /* + * While searching for rsb's to free, we found some that require + * remote removal. We leave them in place and find them again here + * so there is a very small gap between removing them from the toss + * list and sending the removal. Keeping this gap small is + * important to keep us (the master node) from being out of sync + * with the remote dir node for very long. + * + * From the time the rsb is removed from toss until just after + * send_remove, the rsb name is saved in ls_remove_name. A new + * lookup checks this to ensure that a new lookup message for the + * same resource name is not sent just before the remove message. + */ -static int shrink_bucket(struct dlm_ls *ls, int b) -{ - struct rb_node *n; - struct dlm_rsb *r; - int count = 0, found; + for (i = 0; i < remote_count; i++) { + name = ls->ls_remove_names[i]; + len = ls->ls_remove_lens[i]; - for (;;) { - found = 0; spin_lock(&ls->ls_rsbtbl[b].lock); - for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) - continue; - found = 1; - break; + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_debug(ls, "remove_name not toss %s", name); + continue; } - if (!found) { + if (r->res_master_nodeid != our_nodeid) { spin_unlock(&ls->ls_rsbtbl[b].lock); - break; + log_debug(ls, "remove_name master %d dir %d our %d %s", + r->res_master_nodeid, r->res_dir_nodeid, + our_nodeid, name); + continue; } - if (kref_put(&r->res_ref, kill_rsb)) { - rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + if (r->res_dir_nodeid == our_nodeid) { + /* should never happen */ spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name dir %d master %d our %d %s", + r->res_dir_nodeid, r->res_master_nodeid, + our_nodeid, name); + continue; + } - if (is_master(r)) - dir_remove(r); - dlm_free_rsb(r); - count++; - } else { + if (!time_after_eq(jiffies, r->res_toss_time + + dlm_config.ci_toss_secs * HZ)) { spin_unlock(&ls->ls_rsbtbl[b].lock); - log_error(ls, "tossed rsb in use %s", r->res_name); + log_debug(ls, "remove_name toss_time %lu now %lu %s", + r->res_toss_time, jiffies, name); + continue; } - } - return count; + if (!kref_put(&r->res_ref, kill_rsb)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "remove_name in use %s", name); + continue; + } + + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + + /* block lookup of same name until we've sent remove */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + send_remove(r); + + /* allow lookup of name again */ + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + + dlm_free_rsb(r); + } } void dlm_scan_rsbs(struct dlm_ls *ls) @@ -1434,8 +2039,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > r->res_ls->ls_lvblen) + len = r->res_ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); lkb->lkb_lvbseq = ms->m_lvbseq; } @@ -1684,10 +2289,14 @@ static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) * immediate request, it is 0 if called later, after the lock has been * queued. * + * recover is 1 if dlm_recover_grant() is trying to grant conversions + * after recovery. + * * References are from chapter 6 of "VAXcluster Principles" by Roy Davis */ -static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) +static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, + int recover) { int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); @@ -1719,7 +2328,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) */ if (queue_conflict(&r->res_grantqueue, lkb)) - goto out; + return 0; /* * 6-3: By default, a conversion request is immediately granted if the @@ -1728,7 +2337,24 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) */ if (queue_conflict(&r->res_convertqueue, lkb)) - goto out; + return 0; + + /* + * The RECOVER_GRANT flag means dlm_recover_grant() is granting + * locks for a recovered rsb, on which lkb's have been rebuilt. + * The lkb's may have been rebuilt on the queues in a different + * order than they were in on the previous master. So, granting + * queued conversions in order after recovery doesn't make sense + * since the order hasn't been preserved anyway. The new order + * could also have created a new "in place" conversion deadlock. + * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. + * After recovery, there would be no granted locks, and possibly + * NL->EX, PR->EX, an in-place conversion deadlock.) So, after + * recovery, grant conversions without considering order. + */ + + if (conv && recover) + return 1; /* * 6-5: But the default algorithm for deciding whether to grant or @@ -1765,7 +2391,7 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) if (list_empty(&r->res_convertqueue)) return 1; else - goto out; + return 0; } /* @@ -1811,12 +2437,12 @@ static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) if (!now && !conv && list_empty(&r->res_convertqueue) && first_in_list(lkb, &r->res_waitqueue)) return 1; - out: + return 0; } static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, - int *err) + int recover, int *err) { int rv; int8_t alt = 0, rqmode = lkb->lkb_rqmode; @@ -1825,7 +2451,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, if (err) *err = 0; - rv = _can_be_granted(r, lkb, now); + rv = _can_be_granted(r, lkb, now, recover); if (rv) goto out; @@ -1866,7 +2492,7 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, if (alt) { lkb->lkb_rqmode = alt; - rv = _can_be_granted(r, lkb, now); + rv = _can_be_granted(r, lkb, now, 0); if (rv) lkb->lkb_sbflags |= DLM_SBF_ALTMODE; else @@ -1890,6 +2516,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, unsigned int *count) { struct dlm_lkb *lkb, *s; + int recover = rsb_flag(r, RSB_RECOVER_GRANT); int hi, demoted, quit, grant_restart, demote_restart; int deadlk; @@ -1903,7 +2530,7 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, demoted = is_demoted(lkb); deadlk = 0; - if (can_be_granted(r, lkb, 0, &deadlk)) { + if (can_be_granted(r, lkb, 0, recover, &deadlk)) { grant_lock_pending(r, lkb); grant_restart = 1; if (count) @@ -1947,7 +2574,7 @@ static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, struct dlm_lkb *lkb, *s; list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { - if (can_be_granted(r, lkb, 0, NULL)) { + if (can_be_granted(r, lkb, 0, 0, NULL)) { grant_lock_pending(r, lkb); if (count) (*count)++; @@ -2078,8 +2705,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) { - struct dlm_ls *ls = r->res_ls; - int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); + int our_nodeid = dlm_our_nodeid(); if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); @@ -2093,53 +2719,37 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) return 1; } - if (r->res_nodeid == 0) { + if (r->res_master_nodeid == our_nodeid) { lkb->lkb_nodeid = 0; return 0; } - if (r->res_nodeid > 0) { - lkb->lkb_nodeid = r->res_nodeid; + if (r->res_master_nodeid) { + lkb->lkb_nodeid = r->res_master_nodeid; return 0; } - DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); - - dir_nodeid = dlm_dir_nodeid(r); - - if (dir_nodeid != our_nodeid) { - r->res_first_lkid = lkb->lkb_id; - send_lookup(r, lkb); - return 1; - } - - for (i = 0; i < 2; i++) { - /* It's possible for dlm_scand to remove an old rsb for - this same resource from the toss list, us to create - a new one, look up the master locally, and find it - already exists just before dlm_scand does the - dir_remove() on the previous rsb. */ - - error = dlm_dir_lookup(ls, our_nodeid, r->res_name, - r->res_length, &ret_nodeid); - if (!error) - break; - log_debug(ls, "dir_lookup error %d %s", error, r->res_name); - schedule(); - } - if (error && error != -EEXIST) - return error; - - if (ret_nodeid == our_nodeid) { - r->res_first_lkid = 0; + if (dlm_dir_nodeid(r) == our_nodeid) { + /* This is a somewhat unusual case; find_rsb will usually + have set res_master_nodeid when dir nodeid is local, but + there are cases where we become the dir node after we've + past find_rsb and go through _request_lock again. + confirm_master() or process_lookup_list() needs to be + called after this. */ + log_debug(r->res_ls, "set_master %x self master %d dir %d %s", + lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, + r->res_name); + r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; lkb->lkb_nodeid = 0; - } else { - r->res_first_lkid = lkb->lkb_id; - r->res_nodeid = ret_nodeid; - lkb->lkb_nodeid = ret_nodeid; + return 0; } - return 0; + + wait_pending_remove(r); + + r->res_first_lkid = lkb->lkb_id; + send_lookup(r, lkb); + return 1; } static void process_lookup_list(struct dlm_rsb *r) @@ -2464,7 +3074,7 @@ static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error = 0; - if (can_be_granted(r, lkb, 1, NULL)) { + if (can_be_granted(r, lkb, 1, 0, NULL)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2504,7 +3114,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* changing an existing lock may allow others to be granted */ - if (can_be_granted(r, lkb, 1, &deadlk)) { + if (can_be_granted(r, lkb, 1, 0, &deadlk)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2530,7 +3140,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) if (is_demoted(lkb)) { grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); - if (_can_be_granted(r, lkb, 1)) { + if (_can_be_granted(r, lkb, 1, 0)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; @@ -2584,7 +3194,7 @@ static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, } /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ - + static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) { int error; @@ -2708,11 +3318,11 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, error = validate_lock_args(ls, lkb, args); if (error) - goto out; + return error; - error = find_rsb(ls, name, len, R_CREATE, &r); + error = find_rsb(ls, name, len, 0, R_REQUEST, &r); if (error) - goto out; + return error; lock_rsb(r); @@ -2723,8 +3333,6 @@ static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, unlock_rsb(r); put_rsb(r); - - out: return error; } @@ -3286,8 +3894,8 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > ls->ls_lvblen) + len = ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); } return 0; @@ -3402,11 +4010,72 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } +static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) +{ + char name[DLM_RESNAME_MAXLEN + 1]; + struct dlm_message *ms; + struct dlm_mhandle *mh; + struct dlm_rsb *r; + uint32_t hash, b; + int rv, dir_nodeid; + + memset(name, 0, sizeof(name)); + memcpy(name, ms_name, len); + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + dir_nodeid = dlm_hash2nodeid(ls, hash); + + log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); + + spin_lock(&ls->ls_rsbtbl[b].lock); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (!rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "repeat_remove on keep %s", name); + return; + } + + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (!rv) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + log_error(ls, "repeat_remove on toss %s", name); + return; + } + + /* use ls->remove_name2 to avoid conflict with shrink? */ + + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = len; + memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); + spin_unlock(&ls->ls_rsbtbl[b].lock); + + rv = _create_message(ls, sizeof(struct dlm_message) + len, + dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); + if (rv) + return; + + memcpy(ms->m_extra, name, len); + ms->m_hash = hash; + + send_message(mh, ms); + + spin_lock(&ls->ls_remove_spin); + ls->ls_remove_len = 0; + memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); + spin_unlock(&ls->ls_remove_spin); +} + static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error, namelen; + int from_nodeid; + int error, namelen = 0; + + from_nodeid = ms->m_header.h_nodeid; error = create_lkb(ls, &lkb); if (error) @@ -3420,9 +4089,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) goto fail; } + /* The dir node is the authority on whether we are the master + for this rsb or not, so if the master sends us a request, we should + recreate the rsb if we've destroyed it. This race happens when we + send a remove message to the dir node at the same time that the dir + node sends us a request for the rsb. */ + namelen = receive_extralen(ms); - error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); + error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, + R_RECEIVE_REQUEST, &r); if (error) { __put_lkb(ls, lkb); goto fail; @@ -3430,6 +4106,16 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) lock_rsb(r); + if (r->res_master_nodeid != dlm_our_nodeid()) { + error = validate_master_nodeid(ls, r, from_nodeid); + if (error) { + unlock_rsb(r); + put_rsb(r); + __put_lkb(ls, lkb); + goto fail; + } + } + attach_lkb(r, lkb); error = do_request(r, lkb); send_request_reply(r, lkb, error); @@ -3445,6 +4131,31 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) return 0; fail: + /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup + and do this receive_request again from process_lookup_list once + we get the lookup reply. This would avoid a many repeated + ENOTBLK request failures when the lookup reply designating us + as master is delayed. */ + + /* We could repeatedly return -EBADR here if our send_remove() is + delayed in being sent/arriving/being processed on the dir node. + Another node would repeatedly lookup up the master, and the dir + node would continue returning our nodeid until our send_remove + took effect. + + We send another remove message in case our previous send_remove + was lost/ignored/missed somehow. */ + + if (error != -ENOTBLK) { + log_limit(ls, "receive_request %x from %d %d", + ms->m_lkid, from_nodeid, error); + } + + if (namelen && error == -EBADR) { + send_repeat_remove(ls, ms->m_extra, namelen); + msleep(1000); + } + setup_stub_lkb(ls, ms); send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); return error; @@ -3651,49 +4362,110 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) { - int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; + int len, error, ret_nodeid, from_nodeid, our_nodeid; from_nodeid = ms->m_header.h_nodeid; our_nodeid = dlm_our_nodeid(); len = receive_extralen(ms); - dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); - if (dir_nodeid != our_nodeid) { - log_error(ls, "lookup dir_nodeid %d from %d", - dir_nodeid, from_nodeid); - error = -EINVAL; - ret_nodeid = -1; - goto out; - } - - error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid); + error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, + &ret_nodeid, NULL); /* Optimization: we're master so treat lookup as a request */ if (!error && ret_nodeid == our_nodeid) { receive_request(ls, ms); return; } - out: send_lookup_reply(ls, ms, ret_nodeid, error); } static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) { - int len, dir_nodeid, from_nodeid; + char name[DLM_RESNAME_MAXLEN+1]; + struct dlm_rsb *r; + uint32_t hash, b; + int rv, len, dir_nodeid, from_nodeid; from_nodeid = ms->m_header.h_nodeid; len = receive_extralen(ms); + if (len > DLM_RESNAME_MAXLEN) { + log_error(ls, "receive_remove from %d bad len %d", + from_nodeid, len); + return; + } + dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); if (dir_nodeid != dlm_our_nodeid()) { - log_error(ls, "remove dir entry dir_nodeid %d from %d", - dir_nodeid, from_nodeid); + log_error(ls, "receive_remove from %d bad nodeid %d", + from_nodeid, dir_nodeid); + return; + } + + /* Look for name on rsbtbl.toss, if it's there, kill it. + If it's on rsbtbl.keep, it's being used, and we should ignore this + message. This is an expected race between the dir node sending a + request to the master node at the same time as the master node sends + a remove to the dir node. The resolution to that race is for the + dir node to ignore the remove message, and the master node to + recreate the master rsb when it gets a request from the dir node for + an rsb it doesn't have. */ + + memset(name, 0, sizeof(name)); + memcpy(name, ms->m_extra, len); + + hash = jhash(name, len, 0); + b = hash & (ls->ls_rsbtbl_size - 1); + + spin_lock(&ls->ls_rsbtbl[b].lock); + + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); + if (rv) { + /* verify the rsb is on keep list per comment above */ + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); + if (rv) { + /* should not happen */ + log_error(ls, "receive_remove from %d not found %s", + from_nodeid, name); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + if (r->res_master_nodeid != from_nodeid) { + /* should not happen */ + log_error(ls, "receive_remove keep from %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + + log_debug(ls, "receive_remove from %d master %d first %x %s", + from_nodeid, r->res_master_nodeid, r->res_first_lkid, + name); + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + + if (r->res_master_nodeid != from_nodeid) { + log_error(ls, "receive_remove toss from %d master %d", + from_nodeid, r->res_master_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); return; } - dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); + if (kref_put(&r->res_ref, kill_rsb)) { + rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); + spin_unlock(&ls->ls_rsbtbl[b].lock); + dlm_free_rsb(r); + } else { + log_error(ls, "receive_remove from %d rsb ref error", + from_nodeid); + dlm_print_rsb(r); + spin_unlock(&ls->ls_rsbtbl[b].lock); + } } static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) @@ -3706,6 +4478,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; struct dlm_rsb *r; int error, mstype, result; + int from_nodeid = ms->m_header.h_nodeid; error = find_lkb(ls, ms->m_remid, &lkb); if (error) @@ -3723,8 +4496,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); if (error) { log_error(ls, "receive_request_reply %x remote %d %x result %d", - lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, - ms->m_result); + lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); dlm_dump_rsb(r); goto out; } @@ -3732,8 +4504,9 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) /* Optimization: the dir node was also the master, so it took our lookup as a request and sent request reply instead of lookup reply */ if (mstype == DLM_MSG_LOOKUP) { - r->res_nodeid = ms->m_header.h_nodeid; - lkb->lkb_nodeid = r->res_nodeid; + r->res_master_nodeid = from_nodeid; + r->res_nodeid = from_nodeid; + lkb->lkb_nodeid = from_nodeid; } /* this is the value returned from do_request() on the master */ @@ -3767,18 +4540,30 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) case -EBADR: case -ENOTBLK: /* find_rsb failed to find rsb or rsb wasn't master */ - log_debug(ls, "receive_request_reply %x %x master diff %d %d", - lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result); - r->res_nodeid = -1; - lkb->lkb_nodeid = -1; + log_limit(ls, "receive_request_reply %x from %d %d " + "master %d dir %d first %x %s", lkb->lkb_id, + from_nodeid, result, r->res_master_nodeid, + r->res_dir_nodeid, r->res_first_lkid, r->res_name); + + if (r->res_dir_nodeid != dlm_our_nodeid() && + r->res_master_nodeid != dlm_our_nodeid()) { + /* cause _request_lock->set_master->send_lookup */ + r->res_master_nodeid = 0; + r->res_nodeid = -1; + lkb->lkb_nodeid = -1; + } if (is_overlap(lkb)) { /* we'll ignore error in cancel/unlock reply */ queue_cast_overlap(r, lkb); confirm_master(r, result); unhold_lkb(lkb); /* undoes create_lkb() */ - } else + } else { _request_lock(r, lkb); + + if (r->res_master_nodeid == dlm_our_nodeid()) + confirm_master(r, 0); + } break; default: @@ -3994,6 +4779,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) struct dlm_lkb *lkb; struct dlm_rsb *r; int error, ret_nodeid; + int do_lookup_list = 0; error = find_lkb(ls, ms->m_lkid, &lkb); if (error) { @@ -4001,7 +4787,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) return; } - /* ms->m_result is the value returned by dlm_dir_lookup on dir node + /* ms->m_result is the value returned by dlm_master_lookup on dir node FIXME: will a non-zero error ever be returned? */ r = lkb->lkb_resource; @@ -4013,12 +4799,37 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) goto out; ret_nodeid = ms->m_nodeid; + + /* We sometimes receive a request from the dir node for this + rsb before we've received the dir node's loookup_reply for it. + The request from the dir node implies we're the master, so we set + ourself as master in receive_request_reply, and verify here that + we are indeed the master. */ + + if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { + /* This should never happen */ + log_error(ls, "receive_lookup_reply %x from %d ret %d " + "master %d dir %d our %d first %x %s", + lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, + r->res_master_nodeid, r->res_dir_nodeid, + dlm_our_nodeid(), r->res_first_lkid, r->res_name); + } + if (ret_nodeid == dlm_our_nodeid()) { + r->res_master_nodeid = ret_nodeid; r->res_nodeid = 0; - ret_nodeid = 0; + do_lookup_list = 1; r->res_first_lkid = 0; + } else if (ret_nodeid == -1) { + /* the remote node doesn't believe it's the dir node */ + log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", + lkb->lkb_id, ms->m_header.h_nodeid); + r->res_master_nodeid = 0; + r->res_nodeid = -1; + lkb->lkb_nodeid = -1; } else { - /* set_master() will copy res_nodeid to lkb_nodeid */ + /* set_master() will set lkb_nodeid from r */ + r->res_master_nodeid = ret_nodeid; r->res_nodeid = ret_nodeid; } @@ -4033,7 +4844,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) _request_lock(r, lkb); out_list: - if (!ret_nodeid) + if (do_lookup_list) process_lookup_list(r); out: unlock_rsb(r); @@ -4047,7 +4858,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, int error = 0, noent = 0; if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { - log_debug(ls, "ignore non-member message %d from %d %x %x %d", + log_limit(ls, "receive %d from non-member %d %x %x %d", ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, ms->m_remid, ms->m_result); return; @@ -4174,6 +4985,15 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) { if (dlm_locking_stopped(ls)) { + /* If we were a member of this lockspace, left, and rejoined, + other nodes may still be sending us messages from the + lockspace generation before we left. */ + if (!ls->ls_generation) { + log_limit(ls, "receive %d from %d ignore old gen", + ms->m_type, nodeid); + return; + } + dlm_add_requestqueue(ls, nodeid, ms); } else { dlm_wait_requestqueue(ls); @@ -4583,6 +5403,13 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, if ((lkb->lkb_nodeid == nodeid_gone) || dlm_is_removed(ls, lkb->lkb_nodeid)) { + /* tell recover_lvb to invalidate the lvb + because a node holding EX/PW failed */ + if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && + (lkb->lkb_grmode >= DLM_LOCK_PW)) { + rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); + } + del_lkb(r, lkb); /* this put should free the lkb */ @@ -4636,7 +5463,7 @@ void dlm_recover_purge(struct dlm_ls *ls) up_write(&ls->ls_root_sem); if (lkb_count) - log_debug(ls, "dlm_recover_purge %u locks for %u nodes", + log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", lkb_count, nodes_count); } @@ -4651,9 +5478,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; - rsb_clear_flag(r, RSB_RECOVER_GRANT); - if (!is_master(r)) + if (!is_master(r)) { + rsb_clear_flag(r, RSB_RECOVER_GRANT); continue; + } hold_rsb(r); spin_unlock(&ls->ls_rsbtbl[bucket].lock); return r; @@ -4698,7 +5526,9 @@ void dlm_recover_grant(struct dlm_ls *ls) rsb_count++; count = 0; lock_rsb(r); + /* the RECOVER_GRANT flag is checked in the grant path */ grant_pending_locks(r, &count); + rsb_clear_flag(r, RSB_RECOVER_GRANT); lkb_count += count; confirm_master(r, 0); unlock_rsb(r); @@ -4707,7 +5537,7 @@ void dlm_recover_grant(struct dlm_ls *ls) } if (lkb_count) - log_debug(ls, "dlm_recover_grant %u locks on %u resources", + log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", lkb_count, rsb_count); } @@ -4798,6 +5628,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) struct dlm_rsb *r; struct dlm_lkb *lkb; uint32_t remid = 0; + int from_nodeid = rc->rc_header.h_nodeid; int error; if (rl->rl_parent_lkid) { @@ -4815,21 +5646,21 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) we make ourselves master, dlm_recover_masters() won't touch the MSTCPY locks we've received early. */ - error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r); + error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), + from_nodeid, R_RECEIVE_RECOVER, &r); if (error) goto out; + lock_rsb(r); + if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { log_error(ls, "dlm_recover_master_copy remote %d %x not dir", - rc->rc_header.h_nodeid, remid); + from_nodeid, remid); error = -EBADR; - put_rsb(r); - goto out; + goto out_unlock; } - lock_rsb(r); - - lkb = search_remid(r, rc->rc_header.h_nodeid, remid); + lkb = search_remid(r, from_nodeid, remid); if (lkb) { error = -EEXIST; goto out_remid; @@ -4865,8 +5696,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) put_rsb(r); out: if (error && error != -EEXIST) - log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", - rc->rc_header.h_nodeid, remid, error); + log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", + from_nodeid, remid, error); rl->rl_result = cpu_to_le32(error); return error; } @@ -5211,15 +6042,18 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) return error; } -/* The force flag allows the unlock to go ahead even if the lkb isn't granted. - Regardless of what rsb queue the lock is on, it's removed and freed. */ +/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't + granted. Regardless of what rsb queue the lock is on, it's removed and + freed. The IVVALBLK flag causes the lvb on the resource to be invalidated + if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { struct dlm_args args; int error; - set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args); + set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, + lkb->lkb_ua, &args); error = unlock_lock(ls, lkb, &args); if (error == -DLM_EUNLOCK) diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index c8b226c6280..5e0c72e36a9 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -14,6 +14,7 @@ #define __LOCK_DOT_H__ void dlm_dump_rsb(struct dlm_rsb *r); +void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, uint32_t saved_seq); @@ -28,9 +29,11 @@ void dlm_unlock_recovery(struct dlm_ls *ls); void dlm_scan_waiters(struct dlm_ls *ls); void dlm_scan_timeout(struct dlm_ls *ls); void dlm_adjust_timeouts(struct dlm_ls *ls); +int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len, + unsigned int flags, int *r_nodeid, int *result); int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, - unsigned int flags, struct dlm_rsb **r_ret); + struct dlm_rsb **r_ret); void dlm_recover_purge(struct dlm_ls *ls); void dlm_purge_mstcpy_locks(struct dlm_rsb *r); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index ca506abbdd3..f3e72787e7f 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -35,8 +35,11 @@ static struct task_struct * scand_task; static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { ssize_t ret = len; - int n = simple_strtol(buf, NULL, 0); + int n; + int rc = kstrtoint(buf, 0, &n); + if (rc) + return rc; ls = dlm_find_lockspace_local(ls->ls_local_handle); if (!ls) return -EINVAL; @@ -57,7 +60,10 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_uevent_result = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); + + if (rc) + return rc; set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); wake_up(&ls->ls_uevent_wait); return len; @@ -70,7 +76,10 @@ static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_global_id = simple_strtoul(buf, NULL, 0); + int rc = kstrtouint(buf, 0, &ls->ls_global_id); + + if (rc) + return rc; return len; } @@ -81,7 +90,11 @@ static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) { - int val = simple_strtoul(buf, NULL, 0); + int val; + int rc = kstrtoint(buf, 0, &val); + + if (rc) + return rc; if (val == 1) set_bit(LSFL_NODIR, &ls->ls_flags); return len; @@ -190,7 +203,7 @@ static int do_uevent(struct dlm_ls *ls, int in) else kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); - log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving"); + log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); /* dlm_controld will see the uevent, do the necessary group management and then write to sysfs to wake us */ @@ -198,7 +211,7 @@ static int do_uevent(struct dlm_ls *ls, int in) error = wait_event_interruptible(ls->ls_uevent_wait, test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); - log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result); + log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result); if (error) goto out; @@ -506,20 +519,18 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock_init(&ls->ls_rsbtbl[i].lock); } - idr_init(&ls->ls_lkbidr); - spin_lock_init(&ls->ls_lkbidr_spin); - - size = dlm_config.ci_dirtbl_size; - ls->ls_dirtbl_size = size; + spin_lock_init(&ls->ls_remove_spin); - ls->ls_dirtbl = vmalloc(sizeof(struct dlm_dirtable) * size); - if (!ls->ls_dirtbl) - goto out_lkbfree; - for (i = 0; i < size; i++) { - INIT_LIST_HEAD(&ls->ls_dirtbl[i].list); - spin_lock_init(&ls->ls_dirtbl[i].lock); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, + GFP_KERNEL); + if (!ls->ls_remove_names[i]) + goto out_rsbtbl; } + idr_init(&ls->ls_lkbidr); + spin_lock_init(&ls->ls_lkbidr_spin); + INIT_LIST_HEAD(&ls->ls_waiters); mutex_init(&ls->ls_waiters_mutex); INIT_LIST_HEAD(&ls->ls_orphans); @@ -567,7 +578,7 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); if (!ls->ls_recover_buf) - goto out_dirfree; + goto out_lkbidr; ls->ls_slot = 0; ls->ls_num_slots = 0; @@ -576,14 +587,14 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); + idr_init(&ls->ls_recover_idr); + spin_lock_init(&ls->ls_recover_idr_lock); ls->ls_recover_list_count = 0; ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_root_list); init_rwsem(&ls->ls_root_sem); - down_write(&ls->ls_in_recovery); - spin_lock(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); @@ -597,13 +608,24 @@ static int new_lockspace(const char *name, const char *cluster, } } - /* needs to find ls in lslist */ + init_waitqueue_head(&ls->ls_recover_lock_wait); + + /* + * Once started, dlm_recoverd first looks for ls in lslist, then + * initializes ls_in_recovery as locked in "down" mode. We need + * to wait for the wakeup from dlm_recoverd because in_recovery + * has to start out in down mode. + */ + error = dlm_recoverd_start(ls); if (error) { log_error(ls, "can't start dlm_recoverd %d", error); goto out_callback; } + wait_event(ls->ls_recover_lock_wait, + test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); + ls->ls_kobj.kset = dlm_kset; error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, "%s", ls->ls_name); @@ -631,7 +653,7 @@ static int new_lockspace(const char *name, const char *cluster, dlm_create_debug_file(ls); - log_debug(ls, "join complete"); + log_rinfo(ls, "join complete"); *lockspace = ls; return 0; @@ -647,11 +669,15 @@ static int new_lockspace(const char *name, const char *cluster, spin_lock(&lslist_lock); list_del(&ls->ls_list); spin_unlock(&lslist_lock); + idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); - out_dirfree: - vfree(ls->ls_dirtbl); - out_lkbfree: + out_lkbidr: idr_destroy(&ls->ls_lkbidr); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + if (ls->ls_remove_names[i]) + kfree(ls->ls_remove_names[i]); + } + out_rsbtbl: vfree(ls->ls_rsbtbl); out_lsfree: if (do_unreg) @@ -693,9 +719,7 @@ static int lkb_idr_is_local(int id, void *p, void *data) { struct dlm_lkb *lkb = p; - if (!lkb->lkb_nodeid) - return 1; - return 0; + return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; } static int lkb_idr_is_any(int id, void *p, void *data) @@ -779,18 +803,10 @@ static int release_lockspace(struct dlm_ls *ls, int force) kfree(ls->ls_recover_buf); /* - * Free direntry structs. - */ - - dlm_dir_clear(ls); - vfree(ls->ls_dirtbl); - - /* * Free all lkb's in idr */ idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); - idr_remove_all(&ls->ls_lkbidr); idr_destroy(&ls->ls_lkbidr); /* @@ -813,6 +829,9 @@ static int release_lockspace(struct dlm_ls *ls, int force) vfree(ls->ls_rsbtbl); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) + kfree(ls->ls_remove_names[i]); + while (!list_empty(&ls->ls_new_rsb)) { rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); @@ -826,11 +845,10 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_purge_requestqueue(ls); kfree(ls->ls_recover_args); - dlm_clear_free_entries(ls); dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - log_debug(ls, "release_lockspace final free"); + log_rinfo(ls, "release_lockspace final free"); kobject_put(&ls->ls_kobj); /* The ls structure will be freed when the kobject is done with */ @@ -876,17 +894,24 @@ int dlm_release_lockspace(void *lockspace, int force) void dlm_stop_lockspaces(void) { struct dlm_ls *ls; + int count; restart: + count = 0; spin_lock(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { - if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) + if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { + count++; continue; + } spin_unlock(&lslist_lock); log_error(ls, "no userland control daemon, stopping lockspace"); dlm_ls_stop(ls); goto restart; } spin_unlock(&lslist_lock); + + if (count) + log_print("dlm user daemon left %d lockspaces", count); } diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5c1b0e38c7a..d08e079ea5d 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -53,7 +53,6 @@ #include <linux/sctp.h> #include <linux/slab.h> #include <net/sctp/sctp.h> -#include <net/sctp/user.h> #include <net/ipv6.h> #include "dlm_internal.h" @@ -126,6 +125,7 @@ struct connection { struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ + bool try_new_addr; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -140,6 +140,17 @@ struct writequeue_entry { struct connection *con; }; +struct dlm_node_addr { + struct list_head list; + int nodeid; + int addr_count; + int curr_addr_index; + struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; +}; + +static LIST_HEAD(dlm_node_addrs); +static DEFINE_SPINLOCK(dlm_node_addrs_spin); + static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; static int dlm_allow_conn; @@ -167,12 +178,11 @@ static inline int nodeid_hash(int nodeid) static struct connection *__find_con(int nodeid) { int r; - struct hlist_node *h; struct connection *con; r = nodeid_hash(nodeid); - hlist_for_each_entry(con, h, &connection_hash[r], list) { + hlist_for_each_entry(con, &connection_hash[r], list) { if (con->nodeid == nodeid) return con; } @@ -222,13 +232,12 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) static void foreach_conn(void (*conn_func)(struct connection *c)) { int i; - struct hlist_node *h, *n; + struct hlist_node *n; struct connection *con; for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ + hlist_for_each_entry_safe(con, n, &connection_hash[i], list) conn_func(con); - } } } @@ -247,13 +256,12 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) static struct connection *assoc2con(int assoc_id) { int i; - struct hlist_node *h; struct connection *con; mutex_lock(&connections_lock); for (i = 0 ; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry(con, h, &connection_hash[i], list) { + hlist_for_each_entry(con, &connection_hash[i], list) { if (con->sctp_assoc == assoc_id) { mutex_unlock(&connections_lock); return con; @@ -264,33 +272,159 @@ static struct connection *assoc2con(int assoc_id) return NULL; } -static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) +static struct dlm_node_addr *find_node_addr(int nodeid) { - struct sockaddr_storage addr; - int error; + struct dlm_node_addr *na; + + list_for_each_entry(na, &dlm_node_addrs, list) { + if (na->nodeid == nodeid) + return na; + } + return NULL; +} + +static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) +{ + switch (x->ss_family) { + case AF_INET: { + struct sockaddr_in *sinx = (struct sockaddr_in *)x; + struct sockaddr_in *siny = (struct sockaddr_in *)y; + if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) + return 0; + if (sinx->sin_port != siny->sin_port) + return 0; + break; + } + case AF_INET6: { + struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; + struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; + if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) + return 0; + if (sinx->sin6_port != siny->sin6_port) + return 0; + break; + } + default: + return 0; + } + return 1; +} + +static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, + struct sockaddr *sa_out, bool try_new_addr) +{ + struct sockaddr_storage sas; + struct dlm_node_addr *na; if (!dlm_local_count) return -1; - error = dlm_nodeid_to_addr(nodeid, &addr); - if (error) - return error; + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (na && na->addr_count) { + if (try_new_addr) { + na->curr_addr_index++; + if (na->curr_addr_index == na->addr_count) + na->curr_addr_index = 0; + } + + memcpy(&sas, na->addr[na->curr_addr_index ], + sizeof(struct sockaddr_storage)); + } + spin_unlock(&dlm_node_addrs_spin); + + if (!na) + return -EEXIST; + + if (!na->addr_count) + return -ENOENT; + + if (sas_out) + memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); + + if (!sa_out) + return 0; if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; - struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; + struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; + struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; } else { - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; - struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; + struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; ret6->sin6_addr = in6->sin6_addr; } return 0; } +static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) +{ + struct dlm_node_addr *na; + int rv = -EEXIST; + int addr_i; + + spin_lock(&dlm_node_addrs_spin); + list_for_each_entry(na, &dlm_node_addrs, list) { + if (!na->addr_count) + continue; + + for (addr_i = 0; addr_i < na->addr_count; addr_i++) { + if (addr_compare(na->addr[addr_i], addr)) { + *nodeid = na->nodeid; + rv = 0; + goto unlock; + } + } + } +unlock: + spin_unlock(&dlm_node_addrs_spin); + return rv; +} + +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +{ + struct sockaddr_storage *new_addr; + struct dlm_node_addr *new_node, *na; + + new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); + if (!new_node) + return -ENOMEM; + + new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); + if (!new_addr) { + kfree(new_node); + return -ENOMEM; + } + + memcpy(new_addr, addr, len); + + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (!na) { + new_node->nodeid = nodeid; + new_node->addr[0] = new_addr; + new_node->addr_count = 1; + list_add(&new_node->list, &dlm_node_addrs); + spin_unlock(&dlm_node_addrs_spin); + return 0; + } + + if (na->addr_count >= DLM_MAX_ADDR_COUNT) { + spin_unlock(&dlm_node_addrs_spin); + kfree(new_addr); + kfree(new_node); + return -ENOSPC; + } + + na->addr[na->addr_count++] = new_addr; + spin_unlock(&dlm_node_addrs_spin); + kfree(new_node); + return 0; +} + /* Data available on socket or listen socket received a connect */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) +static void lowcomms_data_ready(struct sock *sk) { struct connection *con = sock2con(sk); if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) @@ -348,7 +482,7 @@ int dlm_lowcomms_connect_node(int nodeid) } /* Make a socket active */ -static int add_sock(struct socket *sock, struct connection *con) +static void add_sock(struct socket *sock, struct connection *con) { con->sock = sock; @@ -358,7 +492,6 @@ static int add_sock(struct socket *sock, struct connection *con) con->sock->sk->sk_state_change = lowcomms_state_change; con->sock->sk->sk_user_data = con; con->sock->sk->sk_allocation = GFP_NOFS; - return 0; } /* Add the port number to an IPv6 or 4 sockaddr and return the address @@ -440,8 +573,23 @@ static void sctp_send_shutdown(sctp_assoc_t associd) static void sctp_init_failed_foreach(struct connection *con) { + + /* + * Don't try to recover base con and handle race where the + * other node's assoc init creates a assoc and we get that + * notification, then we get a notification that our attempt + * failed due. This happens when we are still trying the primary + * address, but the other node has already tried secondary addrs + * and found one that worked. + */ + if (!con->nodeid || con->sctp_assoc) + return; + + log_print("Retrying SCTP association init for node %d\n", con->nodeid); + + con->try_new_addr = true; con->sctp_assoc = 0; - if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) { if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork); } @@ -458,15 +606,62 @@ static void sctp_init_failed(void) mutex_unlock(&connections_lock); } +static void retry_failed_sctp_send(struct connection *recv_con, + struct sctp_send_failed *sn_send_failed, + char *buf) +{ + int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed); + struct dlm_mhandle *mh; + struct connection *con; + char *retry_buf; + int nodeid = sn_send_failed->ssf_info.sinfo_ppid; + + log_print("Retry sending %d bytes to node id %d", len, nodeid); + + if (!nodeid) { + log_print("Shouldn't resend data via listening connection."); + return; + } + + con = nodeid2con(nodeid, 0); + if (!con) { + log_print("Could not look up con for nodeid %d\n", + nodeid); + return; + } + + mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf); + if (!mh) { + log_print("Could not allocate buf for retry."); + return; + } + memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len); + dlm_lowcomms_commit_buffer(mh); + + /* + * If we got a assoc changed event before the send failed event then + * we only need to retry the send. + */ + if (con->sctp_assoc) { + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); + } else + sctp_init_failed_foreach(con); +} + /* Something happened to an association */ static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf) { union sctp_notification *sn = (union sctp_notification *)buf; + struct linger linger; - if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + switch (sn->sn_header.sn_type) { + case SCTP_SEND_FAILED: + retry_failed_sctp_send(con, &sn->sn_send_failed, buf); + break; + case SCTP_ASSOC_CHANGE: switch (sn->sn_assoc_change.sac_state) { - case SCTP_COMM_UP: case SCTP_RESTART: { @@ -510,7 +705,7 @@ static void process_sctp_notification(struct connection *con, return; } make_sockaddr(&prim.ssp_addr, 0, &addr_len); - if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { + if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) { unsigned char *b=(unsigned char *)&prim.ssp_addr; log_print("reject connect from unknown addr"); print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, @@ -524,11 +719,11 @@ static void process_sctp_notification(struct connection *con, return; /* Peel off a new sock */ - sctp_lock_sock(con->sock->sk); + lock_sock(con->sock->sk); ret = sctp_do_peeloff(con->sock->sk, sn->sn_assoc_change.sac_assoc_id, &new_con->sock); - sctp_release_sock(con->sock->sk); + release_sock(con->sock->sk); if (ret < 0) { log_print("Can't peel off a socket for " "connection %d to node %d: err=%d", @@ -538,12 +733,21 @@ static void process_sctp_notification(struct connection *con, } add_sock(new_con->sock, new_con); + linger.l_onoff = 1; + linger.l_linger = 0; + ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER, + (char *)&linger, sizeof(linger)); + if (ret < 0) + log_print("set socket option SO_LINGER failed"); + log_print("connecting to %d sctp association %d", nodeid, (int)sn->sn_assoc_change.sac_assoc_id); + new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id; + new_con->try_new_addr = false; /* Send any pending writes */ clear_bit(CF_CONNECT_PENDING, &new_con->flags); - clear_bit(CF_INIT_PENDING, &con->flags); + clear_bit(CF_INIT_PENDING, &new_con->flags); if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { queue_work(send_workqueue, &new_con->swork); } @@ -562,14 +766,10 @@ static void process_sctp_notification(struct connection *con, } break; - /* We don't know which INIT failed, so clear the PENDING flags - * on them all. if assoc_id is zero then it will then try - * again */ - case SCTP_CANT_STR_ASSOC: { + /* Will retry init when we get the send failed notification */ log_print("Can't start SCTP association - retrying"); - sctp_init_failed(); } break; @@ -578,6 +778,8 @@ static void process_sctp_notification(struct connection *con, (int)sn->sn_assoc_change.sac_assoc_id, sn->sn_assoc_change.sac_state); } + default: + ; /* fall through */ } } @@ -747,7 +949,7 @@ static int tcp_accept_from_sock(struct connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); - if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { + if (addr_to_nodeid(&peeraddr, &nodeid)) { unsigned char *b=(unsigned char *)&peeraddr; log_print("connect from non cluster node"); print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, @@ -837,6 +1039,24 @@ static void free_entry(struct writequeue_entry *e) kfree(e); } +/* + * writequeue_entry_complete - try to delete and free write queue entry + * @e: write queue entry to try to delete + * @completed: bytes completed + * + * writequeue_lock must be held. + */ +static void writequeue_entry_complete(struct writequeue_entry *e, int completed) +{ + e->offset += completed; + e->len -= completed; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + free_entry(e); + } +} + /* Initiate an SCTP association. This is a special case of send_to_sock() in that we don't yet have a peeled-off socket for this association, so we use the listening socket @@ -856,15 +1076,14 @@ static void sctp_init_assoc(struct connection *con) int addrlen; struct kvec iov[1]; + mutex_lock(&con->sock_mutex); if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) - return; - - if (con->retries++ > MAX_CONNECT_RETRIES) - return; + goto unlock; - if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { + if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, + con->try_new_addr)) { log_print("no address for nodeid %d", con->nodeid); - return; + goto unlock; } base_con = nodeid2con(0, 0); BUG_ON(base_con == NULL); @@ -882,17 +1101,25 @@ static void sctp_init_assoc(struct connection *con) if (list_empty(&con->writequeue)) { spin_unlock(&con->writequeue_lock); log_print("writequeue empty for nodeid %d", con->nodeid); - return; + goto unlock; } e = list_first_entry(&con->writequeue, struct writequeue_entry, list); len = e->len; offset = e->offset; - spin_unlock(&con->writequeue_lock); /* Send the first block off the write queue */ iov[0].iov_base = page_address(e->page)+offset; iov[0].iov_len = len; + spin_unlock(&con->writequeue_lock); + + if (rem_addr.ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; + log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr); + } else { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr; + log_print("Trying to connect to %pI6", &sin6->sin6_addr); + } cmsg = CMSG_FIRSTHDR(&outmessage); cmsg->cmsg_level = IPPROTO_SCTP; @@ -900,8 +1127,9 @@ static void sctp_init_assoc(struct connection *con) cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); sinfo = CMSG_DATA(cmsg); memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); + sinfo->sinfo_ppid = cpu_to_le32(con->nodeid); outmessage.msg_controllen = cmsg->cmsg_len; + sinfo->sinfo_flags |= SCTP_ADDR_OVER; ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); if (ret < 0) { @@ -914,25 +1142,22 @@ static void sctp_init_assoc(struct connection *con) } else { spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - } + writequeue_entry_complete(e, ret); spin_unlock(&con->writequeue_lock); } + +unlock: + mutex_unlock(&con->sock_mutex); } /* Connect a new socket to its peer */ static void tcp_connect_to_sock(struct connection *con) { - int result = -EHOSTUNREACH; struct sockaddr_storage saddr, src_addr; int addr_len; struct socket *sock = NULL; int one = 1; + int result; if (con->nodeid == 0) { log_print("attempt to connect sock 0 foiled"); @@ -944,10 +1169,8 @@ static void tcp_connect_to_sock(struct connection *con) goto out; /* Some odd races can cause double-connects, ignore them */ - if (con->sock) { - result = 0; + if (con->sock) goto out; - } /* Create a socket to communicate with */ result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, @@ -956,8 +1179,11 @@ static void tcp_connect_to_sock(struct connection *con) goto out_err; memset(&saddr, 0, sizeof(saddr)); - if (dlm_nodeid_to_addr(con->nodeid, &saddr)) + result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); + if (result < 0) { + log_print("no address for nodeid %d", con->nodeid); goto out_err; + } sock->sk->sk_user_data = con; con->rx_action = receive_from_sock; @@ -983,8 +1209,7 @@ static void tcp_connect_to_sock(struct connection *con) kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); - result = - sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, + result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, O_NONBLOCK); if (result == -EINPROGRESS) result = 0; @@ -1002,11 +1227,17 @@ out_err: * Some errors are fatal and this list might need adjusting. For other * errors we try again until the max number of retries is reached. */ - if (result != -EHOSTUNREACH && result != -ENETUNREACH && - result != -ENETDOWN && result != -EINVAL - && result != -EPROTONOSUPPORT) { + if (result != -EHOSTUNREACH && + result != -ENETUNREACH && + result != -ENETDOWN && + result != -EINVAL && + result != -EPROTONOSUPPORT) { + log_print("connect %d try %d error %d", con->nodeid, + con->retries, result); + mutex_unlock(&con->sock_mutex); + msleep(1000); lowcomms_connect_sock(con); - result = 0; + return; } out: mutex_unlock(&con->sock_mutex); @@ -1044,10 +1275,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con, if (result < 0) { log_print("Failed to set SO_REUSEADDR on socket: %d", result); } - sock->sk->sk_user_data = con; con->rx_action = tcp_accept_from_sock; con->connect_action = tcp_connect_to_sock; - con->sock = sock; /* Bind to our port */ make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); @@ -1129,6 +1358,7 @@ static int sctp_listen_for_all(void) int result = -EINVAL, num = 1, i, addr_len; struct connection *con = nodeid2con(0, GFP_NOFS); int bufsize = NEEDED_RMEM; + int one = 1; if (!con) return -ENOMEM; @@ -1163,6 +1393,11 @@ static int sctp_listen_for_all(void) goto create_delsock; } + result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, + sizeof(one)); + if (result < 0) + log_print("Could not set SCTP NODELAY error %d\n", result); + /* Init con struct */ sock->sk->sk_user_data = con; con->sock = sock; @@ -1257,7 +1492,6 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) struct connection *con; struct writequeue_entry *e; int offset = 0; - int users = 0; con = nodeid2con(nodeid, allocation); if (!con) @@ -1271,7 +1505,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) } else { offset = e->end; e->end += len; - users = e->users++; + e->users++; } spin_unlock(&con->writequeue_lock); @@ -1286,7 +1520,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) spin_lock(&con->writequeue_lock); offset = e->end; e->end += len; - users = e->users++; + e->users++; list_add_tail(&e->list, &con->writequeue); spin_unlock(&con->writequeue_lock); goto got_one; @@ -1358,8 +1592,7 @@ static void send_to_sock(struct connection *con) } cond_resched(); goto out; - } - if (ret <= 0) + } else if (ret < 0) goto send_error; } @@ -1370,14 +1603,7 @@ static void send_to_sock(struct connection *con) } spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - continue; - } + writequeue_entry_complete(e, ret); } spin_unlock(&con->writequeue_lock); out: @@ -1394,7 +1620,6 @@ out_connect: mutex_unlock(&con->sock_mutex); if (!test_bit(CF_INIT_PENDING, &con->flags)) lowcomms_connect_sock(con); - return; } static void clean_one_writequeue(struct connection *con) @@ -1414,6 +1639,7 @@ static void clean_one_writequeue(struct connection *con) int dlm_lowcomms_close(int nodeid) { struct connection *con; + struct dlm_node_addr *na; log_print("closing connection to node %d", nodeid); con = nodeid2con(nodeid, 0); @@ -1428,6 +1654,17 @@ int dlm_lowcomms_close(int nodeid) clean_one_writequeue(con); close_connection(con, true); } + + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (na) { + list_del(&na->list); + while (na->addr_count--) + kfree(na->addr[na->addr_count]); + kfree(na); + } + spin_unlock(&dlm_node_addrs_spin); + return 0; } @@ -1577,3 +1814,17 @@ fail_destroy: fail: return error; } + +void dlm_lowcomms_exit(void) +{ + struct dlm_node_addr *na, *safe; + + spin_lock(&dlm_node_addrs_spin); + list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { + list_del(&na->list); + while (na->addr_count--) + kfree(na->addr[na->addr_count]); + kfree(na); + } + spin_unlock(&dlm_node_addrs_spin); +} diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 1311e642628..67462e54fc2 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -16,10 +16,12 @@ int dlm_lowcomms_start(void); void dlm_lowcomms_stop(void); +void dlm_lowcomms_exit(void); int dlm_lowcomms_close(int nodeid); void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); void dlm_lowcomms_commit_buffer(void *mh); int dlm_lowcomms_connect_node(int nodeid); +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 5a59efa0bb4..079c0bd71ab 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -17,6 +17,7 @@ #include "user.h" #include "memory.h" #include "config.h" +#include "lowcomms.h" static int __init init_dlm(void) { @@ -78,6 +79,7 @@ static void __exit exit_dlm(void) dlm_config_exit(); dlm_memory_exit(); dlm_lockspace_exit(); + dlm_lowcomms_exit(); dlm_unregister_debugfs(); } diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 862640a36d5..9c47f1c14a8 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -60,18 +60,15 @@ void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) #define SLOT_DEBUG_LINE 128 -static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, - struct rcom_slot *ro0, struct dlm_slot *array, - int array_size) +static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, + struct rcom_slot *ro0, struct dlm_slot *array, + int array_size) { char line[SLOT_DEBUG_LINE]; int len = SLOT_DEBUG_LINE - 1; int pos = 0; int ret, i; - if (!dlm_config.ci_log_debug) - return; - memset(line, 0, sizeof(line)); if (array) { @@ -95,7 +92,7 @@ static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, } } - log_debug(ls, "generation %u slots %d%s", gen, num_slots, line); + log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line); } int dlm_slots_copy_in(struct dlm_ls *ls) @@ -129,7 +126,7 @@ int dlm_slots_copy_in(struct dlm_ls *ls) ro->ro_slot = le16_to_cpu(ro->ro_slot); } - log_debug_slots(ls, gen, num_slots, ro0, NULL, 0); + log_slots(ls, gen, num_slots, ro0, NULL, 0); list_for_each_entry(memb, &ls->ls_nodes, list) { for (i = 0, ro = ro0; i < num_slots; i++, ro++) { @@ -274,7 +271,7 @@ int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, gen++; - log_debug_slots(ls, gen, num, NULL, array, array_size); + log_slots(ls, gen, num, NULL, array, array_size); max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) - sizeof(struct rcom_config)) / sizeof(struct rcom_slot); @@ -447,7 +444,7 @@ static int ping_members(struct dlm_ls *ls) break; } if (error) - log_debug(ls, "ping_members aborted %d last nodeid %d", + log_rinfo(ls, "ping_members aborted %d last nodeid %d", error, ls->ls_recover_nodeid); return error; } @@ -539,7 +536,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) count as a negative change so the "neg" recovery steps will happen */ list_for_each_entry(memb, &ls->ls_nodes_gone, list) { - log_debug(ls, "prev removed member %d", memb->nodeid); + log_rinfo(ls, "prev removed member %d", memb->nodeid); neg++; } @@ -551,10 +548,10 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) continue; if (!node) { - log_debug(ls, "remove member %d", memb->nodeid); + log_rinfo(ls, "remove member %d", memb->nodeid); } else { /* removed and re-added */ - log_debug(ls, "remove member %d comm_seq %u %u", + log_rinfo(ls, "remove member %d comm_seq %u %u", memb->nodeid, memb->comm_seq, node->comm_seq); } @@ -571,7 +568,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) if (dlm_is_member(ls, node->nodeid)) continue; dlm_add_member(ls, node); - log_debug(ls, "add member %d", node->nodeid); + log_rinfo(ls, "add member %d", node->nodeid); } list_for_each_entry(memb, &ls->ls_nodes, list) { @@ -591,7 +588,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) complete(&ls->ls_members_done); } - log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); + log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } @@ -616,13 +613,13 @@ int dlm_ls_stop(struct dlm_ls *ls) down_write(&ls->ls_recv_active); /* - * Abort any recovery that's in progress (see RECOVERY_STOP, + * Abort any recovery that's in progress (see RECOVER_STOP, * dlm_recovery_stopped()) and tell any other threads running in the * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). */ spin_lock(&ls->ls_recover_lock); - set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); ls->ls_recover_seq++; spin_unlock(&ls->ls_recover_lock); @@ -642,12 +639,16 @@ int dlm_ls_stop(struct dlm_ls *ls) * when recovery is complete. */ - if (new) - down_write(&ls->ls_in_recovery); + if (new) { + set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags); + wake_up_process(ls->ls_recoverd_task); + wait_event(ls->ls_recover_lock_wait, + test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); + } /* * The recoverd suspend/resume makes sure that dlm_recoverd (if - * running) has noticed RECOVERY_STOP above and quit processing the + * running) has noticed RECOVER_STOP above and quit processing the * previous recovery. */ @@ -709,7 +710,8 @@ int dlm_ls_start(struct dlm_ls *ls) kfree(rv_old); } - dlm_recoverd_kick(ls); + set_bit(LSFL_RECOVER_WORK, &ls->ls_flags); + wake_up_process(ls->ls_recoverd_task); return 0; fail: diff --git a/fs/dlm/netlink.c b/fs/dlm/netlink.c index ef17e0169da..e7cfbaf8d0e 100644 --- a/fs/dlm/netlink.c +++ b/fs/dlm/netlink.c @@ -14,7 +14,7 @@ #include "dlm_internal.h" static uint32_t dlm_nl_seqnum; -static uint32_t listener_nlpid; +static uint32_t listener_nlportid; static struct genl_family family = { .id = GENL_ID_GENERATE, @@ -64,24 +64,26 @@ static int send_data(struct sk_buff *skb) return rv; } - return genlmsg_unicast(&init_net, skb, listener_nlpid); + return genlmsg_unicast(&init_net, skb, listener_nlportid); } static int user_cmd(struct sk_buff *skb, struct genl_info *info) { - listener_nlpid = info->snd_pid; - printk("user_cmd nlpid %u\n", listener_nlpid); + listener_nlportid = info->snd_portid; + printk("user_cmd nlpid %u\n", listener_nlportid); return 0; } -static struct genl_ops dlm_nl_ops = { - .cmd = DLM_CMD_HELLO, - .doit = user_cmd, +static struct genl_ops dlm_nl_ops[] = { + { + .cmd = DLM_CMD_HELLO, + .doit = user_cmd, + }, }; int __init dlm_netlink_init(void) { - return genl_register_family_with_ops(&family, &dlm_nl_ops, 1); + return genl_register_family_with_ops(&family, dlm_nl_ops); } void dlm_netlink_exit(void) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 01fd5c11a7f..f704458ea5f 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -247,6 +247,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct dlm_ls *ls; struct plock_op *op; int rv; + unsigned char fl_flags = fl->fl_flags; ls = dlm_find_lockspace_local(lockspace); if (!ls) @@ -258,9 +259,18 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, goto out; } - if (posix_lock_file_wait(file, fl) < 0) - log_error(ls, "dlm_posix_unlock: vfs unlock error %llx", - (unsigned long long)number); + /* cause the vfs unlock to return ENOENT if lock is not found */ + fl->fl_flags |= FL_EXISTS; + + rv = posix_lock_file_wait(file, fl); + if (rv == -ENOENT) { + rv = 0; + goto out_free; + } + if (rv < 0) { + log_error(ls, "dlm_posix_unlock: vfs unlock error %d %llx", + rv, (unsigned long long)number); + } op->info.optype = DLM_PLOCK_OP_UNLOCK; op->info.pid = fl->fl_pid; @@ -296,9 +306,11 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, if (rv == -ENOENT) rv = 0; +out_free: kfree(op); out: dlm_put_lockspace(ls); + fl->fl_flags = fl_flags; return rv; } EXPORT_SYMBOL_GPL(dlm_posix_unlock); diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 64d3e2b958c..9d61947d473 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -23,8 +23,6 @@ #include "memory.h" #include "lock.h" #include "util.h" -#include "member.h" - static int rcom_response(struct dlm_ls *ls) { @@ -275,19 +273,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) struct dlm_rcom *rc; struct dlm_mhandle *mh; int error = 0; - int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom); ls->ls_recover_nodeid = nodeid; - if (nodeid == dlm_our_nodeid()) { - ls->ls_recover_buf->rc_header.h_length = - dlm_config.ci_buffer_size; - dlm_copy_master_names(ls, last_name, last_len, - ls->ls_recover_buf->rc_buf, - max_size, nodeid); - goto out; - } - error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); if (error) goto out; @@ -337,7 +325,26 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) if (error) goto out; memcpy(rc->rc_buf, r->res_name, r->res_length); - rc->rc_id = (unsigned long) r; + rc->rc_id = (unsigned long) r->res_id; + + send_rcom(ls, mh, rc); + out: + return error; +} + +int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid) +{ + struct dlm_rcom *rc; + struct dlm_mhandle *mh; + struct dlm_ls *ls = r->res_ls; + int error; + + error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length, + &rc, &mh); + if (error) + goto out; + memcpy(rc->rc_buf, r->res_name, r->res_length); + rc->rc_id = 0xFFFFFFFF; send_rcom(ls, mh, rc); out: @@ -355,7 +362,14 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) if (error) return; - error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); + if (rc_in->rc_id == 0xFFFFFFFF) { + log_error(ls, "receive_rcom_lookup dump from %d", nodeid); + dlm_dump_rsb_name(ls, rc_in->rc_buf, len); + return; + } + + error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len, + DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL); if (error) ret_nodeid = error; rc->rc_result = ret_nodeid; @@ -486,17 +500,76 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) return 0; } +/* + * Ignore messages for stage Y before we set + * recover_status bit for stage X: + * + * recover_status = 0 + * + * dlm_recover_members() + * - send nothing + * - recv nothing + * - ignore NAMES, NAMES_REPLY + * - ignore LOOKUP, LOOKUP_REPLY + * - ignore LOCK, LOCK_REPLY + * + * recover_status |= NODES + * + * dlm_recover_members_wait() + * + * dlm_recover_directory() + * - send NAMES + * - recv NAMES_REPLY + * - ignore LOOKUP, LOOKUP_REPLY + * - ignore LOCK, LOCK_REPLY + * + * recover_status |= DIR + * + * dlm_recover_directory_wait() + * + * dlm_recover_masters() + * - send LOOKUP + * - recv LOOKUP_REPLY + * + * dlm_recover_locks() + * - send LOCKS + * - recv LOCKS_REPLY + * + * recover_status |= LOCKS + * + * dlm_recover_locks_wait() + * + * recover_status |= DONE + */ + /* Called by dlm_recv; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) { int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); - int stop, reply = 0, lock = 0; + int stop, reply = 0, names = 0, lookup = 0, lock = 0; uint32_t status; uint64_t seq; switch (rc->rc_type) { + case DLM_RCOM_STATUS_REPLY: + reply = 1; + break; + case DLM_RCOM_NAMES: + names = 1; + break; + case DLM_RCOM_NAMES_REPLY: + names = 1; + reply = 1; + break; + case DLM_RCOM_LOOKUP: + lookup = 1; + break; + case DLM_RCOM_LOOKUP_REPLY: + lookup = 1; + reply = 1; + break; case DLM_RCOM_LOCK: lock = 1; break; @@ -504,31 +577,25 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) lock = 1; reply = 1; break; - case DLM_RCOM_STATUS_REPLY: - case DLM_RCOM_NAMES_REPLY: - case DLM_RCOM_LOOKUP_REPLY: - reply = 1; }; spin_lock(&ls->ls_recover_lock); status = ls->ls_recover_status; - stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags); seq = ls->ls_recover_seq; spin_unlock(&ls->ls_recover_lock); - if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || - (reply && (rc->rc_seq_reply != seq)) || - (lock && !(status & DLM_RS_DIR))) { - log_limit(ls, "dlm_receive_rcom ignore msg %d " - "from %d %llu %llu recover seq %llu sts %x gen %u", - rc->rc_type, - nodeid, - (unsigned long long)rc->rc_seq, - (unsigned long long)rc->rc_seq_reply, - (unsigned long long)seq, - status, ls->ls_generation); - goto out; - } + if (stop && (rc->rc_type != DLM_RCOM_STATUS)) + goto ignore; + + if (reply && (rc->rc_seq_reply != seq)) + goto ignore; + + if (!(status & DLM_RS_NODES) && (names || lookup || lock)) + goto ignore; + + if (!(status & DLM_RS_DIR) && (lookup || lock)) + goto ignore; switch (rc->rc_type) { case DLM_RCOM_STATUS: @@ -570,10 +637,20 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) default: log_error(ls, "receive_rcom bad type %d", rc->rc_type); } -out: + return; + +ignore: + log_limit(ls, "dlm_receive_rcom ignore msg %d " + "from %d %llu %llu recover seq %llu sts %x gen %u", + rc->rc_type, + nodeid, + (unsigned long long)rc->rc_seq, + (unsigned long long)rc->rc_seq_reply, + (unsigned long long)seq, + status, ls->ls_generation); return; Eshort: - log_error(ls, "recovery message %x from %d is too short", - rc->rc_type, nodeid); + log_error(ls, "recovery message %d from %d is too short", + rc->rc_type, nodeid); } diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h index 206723ab744..f8e243463c1 100644 --- a/fs/dlm/rcom.h +++ b/fs/dlm/rcom.h @@ -17,6 +17,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); +int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 7554e4dac6b..eaea789bf97 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -36,30 +36,23 @@ * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes). When another * function thinks it could have completed the waited-on task, they should wake * up ls_wait_general to get an immediate response rather than waiting for the - * timer to detect the result. A timer wakes us up periodically while waiting - * to see if we should abort due to a node failure. This should only be called - * by the dlm_recoverd thread. + * timeout. This uses a timeout so it can check periodically if the wait + * should abort due to node failure (which doesn't cause a wake_up). + * This should only be called by the dlm_recoverd thread. */ -static void dlm_wait_timer_fn(unsigned long data) -{ - struct dlm_ls *ls = (struct dlm_ls *) data; - mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ)); - wake_up(&ls->ls_wait_general); -} - int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) { int error = 0; + int rv; - init_timer(&ls->ls_timer); - ls->ls_timer.function = dlm_wait_timer_fn; - ls->ls_timer.data = (long) ls; - ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ); - add_timer(&ls->ls_timer); - - wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls)); - del_timer_sync(&ls->ls_timer); + while (1) { + rv = wait_event_timeout(ls->ls_wait_general, + testfn(ls) || dlm_recovery_stopped(ls), + dlm_config.ci_recover_timer * HZ); + if (rv) + break; + } if (dlm_recovery_stopped(ls)) { log_debug(ls, "dlm_wait_function aborted"); @@ -277,32 +270,100 @@ static void recover_list_del(struct dlm_rsb *r) dlm_put_rsb(r); } -static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id) +static void recover_list_clear(struct dlm_ls *ls) { - struct dlm_rsb *r = NULL; + struct dlm_rsb *r, *s; spin_lock(&ls->ls_recover_list_lock); + list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { + list_del_init(&r->res_recover_list); + r->res_recover_locks_count = 0; + dlm_put_rsb(r); + ls->ls_recover_list_count--; + } - list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) { - if (id == (unsigned long) r) - goto out; + if (ls->ls_recover_list_count != 0) { + log_error(ls, "warning: recover_list_count %d", + ls->ls_recover_list_count); + ls->ls_recover_list_count = 0; } - r = NULL; - out: spin_unlock(&ls->ls_recover_list_lock); +} + +static int recover_idr_empty(struct dlm_ls *ls) +{ + int empty = 1; + + spin_lock(&ls->ls_recover_idr_lock); + if (ls->ls_recover_list_count) + empty = 0; + spin_unlock(&ls->ls_recover_idr_lock); + + return empty; +} + +static int recover_idr_add(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + int rv; + + idr_preload(GFP_NOFS); + spin_lock(&ls->ls_recover_idr_lock); + if (r->res_id) { + rv = -1; + goto out_unlock; + } + rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); + if (rv < 0) + goto out_unlock; + + r->res_id = rv; + ls->ls_recover_list_count++; + dlm_hold_rsb(r); + rv = 0; +out_unlock: + spin_unlock(&ls->ls_recover_idr_lock); + idr_preload_end(); + return rv; +} + +static void recover_idr_del(struct dlm_rsb *r) +{ + struct dlm_ls *ls = r->res_ls; + + spin_lock(&ls->ls_recover_idr_lock); + idr_remove(&ls->ls_recover_idr, r->res_id); + r->res_id = 0; + ls->ls_recover_list_count--; + spin_unlock(&ls->ls_recover_idr_lock); + + dlm_put_rsb(r); +} + +static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) +{ + struct dlm_rsb *r; + + spin_lock(&ls->ls_recover_idr_lock); + r = idr_find(&ls->ls_recover_idr, (int)id); + spin_unlock(&ls->ls_recover_idr_lock); return r; } -static void recover_list_clear(struct dlm_ls *ls) +static void recover_idr_clear(struct dlm_ls *ls) { - struct dlm_rsb *r, *s; + struct dlm_rsb *r; + int id; - spin_lock(&ls->ls_recover_list_lock); - list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { - list_del_init(&r->res_recover_list); + spin_lock(&ls->ls_recover_idr_lock); + + idr_for_each_entry(&ls->ls_recover_idr, r, id) { + idr_remove(&ls->ls_recover_idr, id); + r->res_id = 0; r->res_recover_locks_count = 0; - dlm_put_rsb(r); ls->ls_recover_list_count--; + + dlm_put_rsb(r); } if (ls->ls_recover_list_count != 0) { @@ -310,7 +371,7 @@ static void recover_list_clear(struct dlm_ls *ls) ls->ls_recover_list_count); ls->ls_recover_list_count = 0; } - spin_unlock(&ls->ls_recover_list_lock); + spin_unlock(&ls->ls_recover_idr_lock); } @@ -361,9 +422,8 @@ static void set_master_lkbs(struct dlm_rsb *r) * rsb's to consider. */ -static void set_new_master(struct dlm_rsb *r, int nodeid) +static void set_new_master(struct dlm_rsb *r) { - r->res_nodeid = nodeid; set_master_lkbs(r); rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER2); @@ -372,31 +432,48 @@ static void set_new_master(struct dlm_rsb *r, int nodeid) /* * We do async lookups on rsb's that need new masters. The rsb's * waiting for a lookup reply are kept on the recover_list. + * + * Another node recovering the master may have sent us a rcom lookup, + * and our dlm_master_lookup() set it as the new master, along with + * NEW_MASTER so that we'll recover it here (this implies dir_nodeid + * equals our_nodeid below). */ -static int recover_master(struct dlm_rsb *r) +static int recover_master(struct dlm_rsb *r, unsigned int *count) { struct dlm_ls *ls = r->res_ls; - int error, ret_nodeid; - int our_nodeid = dlm_our_nodeid(); - int dir_nodeid = dlm_dir_nodeid(r); + int our_nodeid, dir_nodeid; + int is_removed = 0; + int error; + + if (is_master(r)) + return 0; + + is_removed = dlm_is_removed(ls, r->res_nodeid); + + if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER)) + return 0; + + our_nodeid = dlm_our_nodeid(); + dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid == our_nodeid) { - error = dlm_dir_lookup(ls, our_nodeid, r->res_name, - r->res_length, &ret_nodeid); - if (error) - log_error(ls, "recover dir lookup error %d", error); + if (is_removed) { + r->res_master_nodeid = our_nodeid; + r->res_nodeid = 0; + } - if (ret_nodeid == our_nodeid) - ret_nodeid = 0; - lock_rsb(r); - set_new_master(r, ret_nodeid); - unlock_rsb(r); + /* set master of lkbs to ourself when is_removed, or to + another new master which we set along with NEW_MASTER + in dlm_master_lookup */ + set_new_master(r); + error = 0; } else { - recover_list_add(r); + recover_idr_add(r); error = dlm_send_rcom_lookup(r, dir_nodeid); } + (*count)++; return error; } @@ -415,7 +492,7 @@ static int recover_master(struct dlm_rsb *r) * resent. */ -static int recover_master_static(struct dlm_rsb *r) +static int recover_master_static(struct dlm_rsb *r, unsigned int *count) { int dir_nodeid = dlm_dir_nodeid(r); int new_master = dir_nodeid; @@ -423,11 +500,12 @@ static int recover_master_static(struct dlm_rsb *r) if (dir_nodeid == dlm_our_nodeid()) new_master = 0; - lock_rsb(r); dlm_purge_mstcpy_locks(r); - set_new_master(r, new_master); - unlock_rsb(r); - return 1; + r->res_master_nodeid = dir_nodeid; + r->res_nodeid = new_master; + set_new_master(r); + (*count)++; + return 0; } /* @@ -443,9 +521,12 @@ static int recover_master_static(struct dlm_rsb *r) int dlm_recover_masters(struct dlm_ls *ls) { struct dlm_rsb *r; - int error = 0, count = 0; + unsigned int total = 0; + unsigned int count = 0; + int nodir = dlm_no_directory(ls); + int error; - log_debug(ls, "dlm_recover_masters"); + log_rinfo(ls, "dlm_recover_masters"); down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { @@ -455,50 +536,58 @@ int dlm_recover_masters(struct dlm_ls *ls) goto out; } - if (dlm_no_directory(ls)) - count += recover_master_static(r); - else if (!is_master(r) && - (dlm_is_removed(ls, r->res_nodeid) || - rsb_flag(r, RSB_NEW_MASTER))) { - recover_master(r); - count++; - } + lock_rsb(r); + if (nodir) + error = recover_master_static(r, &count); + else + error = recover_master(r, &count); + unlock_rsb(r); + cond_resched(); + total++; - schedule(); + if (error) { + up_read(&ls->ls_root_sem); + goto out; + } } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_masters %d resources", count); + log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); - error = dlm_wait_function(ls, &recover_list_empty); + error = dlm_wait_function(ls, &recover_idr_empty); out: if (error) - recover_list_clear(ls); + recover_idr_clear(ls); return error; } int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) { struct dlm_rsb *r; - int nodeid; + int ret_nodeid, new_master; - r = recover_list_find(ls, rc->rc_id); + r = recover_idr_find(ls, rc->rc_id); if (!r) { log_error(ls, "dlm_recover_master_reply no id %llx", (unsigned long long)rc->rc_id); goto out; } - nodeid = rc->rc_result; - if (nodeid == dlm_our_nodeid()) - nodeid = 0; + ret_nodeid = rc->rc_result; + + if (ret_nodeid == dlm_our_nodeid()) + new_master = 0; + else + new_master = ret_nodeid; lock_rsb(r); - set_new_master(r, nodeid); + r->res_master_nodeid = ret_nodeid; + r->res_nodeid = new_master; + set_new_master(r); unlock_rsb(r); - recover_list_del(r); + recover_idr_del(r); - if (recover_list_empty(ls)) + if (recover_idr_empty(ls)) wake_up(&ls->ls_wait_general); out: return 0; @@ -596,7 +685,7 @@ int dlm_recover_locks(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_locks %d out", count); + log_rinfo(ls, "dlm_recover_locks %d out", count); error = dlm_wait_function(ls, &recover_list_empty); out: @@ -624,8 +713,14 @@ void dlm_recovered_lock(struct dlm_rsb *r) * the VALNOTVALID flag if necessary, and determining the correct lvb contents * based on the lvb's of the locks held on the rsb. * - * RSB_VALNOTVALID is set if there are only NL/CR locks on the rsb. If it - * was already set prior to recovery, it's not cleared, regardless of locks. + * RSB_VALNOTVALID is set in two cases: + * + * 1. we are master, but not new, and we purged an EX/PW lock held by a + * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL) + * + * 2. we are a new master, and there are only NL/CR locks left. + * (We could probably improve this by only invaliding in this way when + * the previous master left uncleanly. VMS docs mention that.) * * The LVB contents are only considered for changing when this is a new master * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with @@ -641,6 +736,19 @@ static void recover_lvb(struct dlm_rsb *r) int big_lock_exists = 0; int lvblen = r->res_ls->ls_lvblen; + if (!rsb_flag(r, RSB_NEW_MASTER2) && + rsb_flag(r, RSB_RECOVER_LVB_INVAL)) { + /* case 1 above */ + rsb_set_flag(r, RSB_VALNOTVALID); + return; + } + + if (!rsb_flag(r, RSB_NEW_MASTER2)) + return; + + /* we are the new master, so figure out if VALNOTVALID should + be set, and set the rsb lvb from the best lkb available. */ + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) continue; @@ -679,13 +787,10 @@ static void recover_lvb(struct dlm_rsb *r) if (!lock_lvb_exists) goto out; + /* lvb is invalidated if only NL/CR locks remain */ if (!big_lock_exists) rsb_set_flag(r, RSB_VALNOTVALID); - /* don't mess with the lvb unless we're the new master */ - if (!rsb_flag(r, RSB_NEW_MASTER2)) - goto out; - if (!r->res_lvbptr) { r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) @@ -711,6 +816,7 @@ static void recover_lvb(struct dlm_rsb *r) static void recover_conversion(struct dlm_rsb *r) { + struct dlm_ls *ls = r->res_ls; struct dlm_lkb *lkb; int grmode = -1; @@ -725,10 +831,15 @@ static void recover_conversion(struct dlm_rsb *r) list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { if (lkb->lkb_grmode != DLM_LOCK_IV) continue; - if (grmode == -1) + if (grmode == -1) { + log_debug(ls, "recover_conversion %x set gr to rq %d", + lkb->lkb_id, lkb->lkb_rqmode); lkb->lkb_grmode = lkb->lkb_rqmode; - else + } else { + log_debug(ls, "recover_conversion %x set gr %d", + lkb->lkb_id, grmode); lkb->lkb_grmode = grmode; + } } } @@ -753,19 +864,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls) if (is_master(r)) { if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); + + /* recover lvb before granting locks so the updated + lvb/VALNOTVALID is presented in the completion */ + recover_lvb(r); + if (rsb_flag(r, RSB_NEW_MASTER2)) recover_grant(r); - recover_lvb(r); count++; + } else { + rsb_clear_flag(r, RSB_VALNOTVALID); } rsb_clear_flag(r, RSB_RECOVER_CONVERT); + rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL); rsb_clear_flag(r, RSB_NEW_MASTER2); unlock_rsb(r); } up_read(&ls->ls_root_sem); if (count) - log_debug(ls, "dlm_recover_rsbs %d done", count); + log_rinfo(ls, "dlm_recover_rsbs %d done", count); } /* Create a single list of all root rsb's to be used during recovery */ @@ -791,20 +909,8 @@ int dlm_create_root_list(struct dlm_ls *ls) dlm_hold_rsb(r); } - /* If we're using a directory, add tossed rsbs to the root - list; they'll have entries created in the new directory, - but no other recovery steps should do anything with them. */ - - if (dlm_no_directory(ls)) { - spin_unlock(&ls->ls_rsbtbl[i].lock); - continue; - } - - for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) { - r = rb_entry(n, struct dlm_rsb, res_hashnode); - list_add(&r->res_root_list, &ls->ls_root_list); - dlm_hold_rsb(r); - } + if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss)) + log_error(ls, "dlm_create_root_list toss not empty"); spin_unlock(&ls->ls_rsbtbl[i].lock); } out: @@ -824,28 +930,26 @@ void dlm_release_root_list(struct dlm_ls *ls) up_write(&ls->ls_root_sem); } -/* If not using a directory, clear the entire toss list, there's no benefit to - caching the master value since it's fixed. If we are using a dir, keep the - rsb's we're the master of. Recovery will add them to the root list and from - there they'll be entered in the rebuilt directory. */ - -void dlm_clear_toss_list(struct dlm_ls *ls) +void dlm_clear_toss(struct dlm_ls *ls) { struct rb_node *n, *next; - struct dlm_rsb *rsb; + struct dlm_rsb *r; + unsigned int count = 0; int i; for (i = 0; i < ls->ls_rsbtbl_size; i++) { spin_lock(&ls->ls_rsbtbl[i].lock); for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) { - next = rb_next(n);; - rsb = rb_entry(n, struct dlm_rsb, res_hashnode); - if (dlm_no_directory(ls) || !is_master(rsb)) { - rb_erase(n, &ls->ls_rsbtbl[i].toss); - dlm_free_rsb(rsb); - } + next = rb_next(n); + r = rb_entry(n, struct dlm_rsb, res_hashnode); + rb_erase(n, &ls->ls_rsbtbl[i].toss); + dlm_free_rsb(r); + count++; } spin_unlock(&ls->ls_rsbtbl[i].lock); } + + if (count) + log_rinfo(ls, "dlm_clear_toss %u done", count); } diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index ebd0363f1e0..d8c8738c70e 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -27,7 +27,7 @@ int dlm_recover_locks(struct dlm_ls *ls); void dlm_recovered_lock(struct dlm_rsb *r); int dlm_create_root_list(struct dlm_ls *ls); void dlm_release_root_list(struct dlm_ls *ls); -void dlm_clear_toss_list(struct dlm_ls *ls); +void dlm_clear_toss(struct dlm_ls *ls); void dlm_recover_rsbs(struct dlm_ls *ls); #endif /* __RECOVER_DOT_H__ */ diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index f1a9073c083..6859b4bf971 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -41,6 +41,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) set_bit(LSFL_RUNNING, &ls->ls_flags); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); + clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); error = 0; } spin_unlock(&ls->ls_recover_lock); @@ -54,18 +55,13 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) unsigned long start; int error, neg = 0; - log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq); + log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq); mutex_lock(&ls->ls_recoverd_active); dlm_callback_suspend(ls); - /* - * Free non-master tossed rsb's. Master rsb's are kept on toss - * list and put on root list to be included in resdir recovery. - */ - - dlm_clear_toss_list(ls); + dlm_clear_toss(ls); /* * This list of root rsb's will be the basis of most of the recovery @@ -80,17 +76,21 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members(ls, rv, &neg); if (error) { - log_debug(ls, "dlm_recover_members error %d", error); + log_rinfo(ls, "dlm_recover_members error %d", error); goto fail; } + dlm_recover_dir_nodeid(ls); + + ls->ls_recover_dir_sent_res = 0; + ls->ls_recover_dir_sent_msg = 0; ls->ls_recover_locks_in = 0; dlm_set_recover_status(ls, DLM_RS_NODES); error = dlm_recover_members_wait(ls); if (error) { - log_debug(ls, "dlm_recover_members_wait error %d", error); + log_rinfo(ls, "dlm_recover_members_wait error %d", error); goto fail; } @@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory(ls); if (error) { - log_debug(ls, "dlm_recover_directory error %d", error); + log_rinfo(ls, "dlm_recover_directory error %d", error); goto fail; } @@ -111,10 +111,13 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory_wait(ls); if (error) { - log_debug(ls, "dlm_recover_directory_wait error %d", error); + log_rinfo(ls, "dlm_recover_directory_wait error %d", error); goto fail; } + log_rinfo(ls, "dlm_recover_directory %u out %u messages", + ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg); + /* * We may have outstanding operations that are waiting for a reply from * a failed node. Mark these to be resent after recovery. Unlock and @@ -141,7 +144,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_masters(ls); if (error) { - log_debug(ls, "dlm_recover_masters error %d", error); + log_rinfo(ls, "dlm_recover_masters error %d", error); goto fail; } @@ -151,7 +154,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks(ls); if (error) { - log_debug(ls, "dlm_recover_locks error %d", error); + log_rinfo(ls, "dlm_recover_locks error %d", error); goto fail; } @@ -159,11 +162,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls); if (error) { - log_debug(ls, "dlm_recover_locks_wait error %d", error); + log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; } - log_debug(ls, "dlm_recover_locks %u in", + log_rinfo(ls, "dlm_recover_locks %u in", ls->ls_recover_locks_in); /* @@ -183,7 +186,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls); if (error) { - log_debug(ls, "dlm_recover_locks_wait error %d", error); + log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; } } @@ -202,7 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_done_wait(ls); if (error) { - log_debug(ls, "dlm_recover_done_wait error %d", error); + log_rinfo(ls, "dlm_recover_done_wait error %d", error); goto fail; } @@ -214,25 +217,25 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = enable_locking(ls, rv->seq); if (error) { - log_debug(ls, "enable_locking error %d", error); + log_rinfo(ls, "enable_locking error %d", error); goto fail; } error = dlm_process_requestqueue(ls); if (error) { - log_debug(ls, "dlm_process_requestqueue error %d", error); + log_rinfo(ls, "dlm_process_requestqueue error %d", error); goto fail; } error = dlm_recover_waiters_post(ls); if (error) { - log_debug(ls, "dlm_recover_waiters_post error %d", error); + log_rinfo(ls, "dlm_recover_waiters_post error %d", error); goto fail; } dlm_recover_grant(ls); - log_debug(ls, "dlm_recover %llu generation %u done: %u ms", + log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms", (unsigned long long)rv->seq, ls->ls_generation, jiffies_to_msecs(jiffies - start)); mutex_unlock(&ls->ls_recoverd_active); @@ -242,7 +245,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) fail: dlm_release_root_list(ls); - log_debug(ls, "dlm_recover %llu error %d", + log_rinfo(ls, "dlm_recover %llu error %d", (unsigned long long)rv->seq, error); mutex_unlock(&ls->ls_recoverd_active); return error; @@ -260,7 +263,7 @@ static void do_ls_recovery(struct dlm_ls *ls) rv = ls->ls_recover_args; ls->ls_recover_args = NULL; if (rv && ls->ls_recover_seq == rv->seq) - clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags); spin_unlock(&ls->ls_recover_lock); if (rv) { @@ -280,26 +283,34 @@ static int dlm_recoverd(void *arg) return -1; } + down_write(&ls->ls_in_recovery); + set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); + wake_up(&ls->ls_recover_lock_wait); + while (!kthread_should_stop()) { set_current_state(TASK_INTERRUPTIBLE); - if (!test_bit(LSFL_WORK, &ls->ls_flags)) + if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) && + !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) schedule(); set_current_state(TASK_RUNNING); - if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags)) + if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) { + down_write(&ls->ls_in_recovery); + set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); + wake_up(&ls->ls_recover_lock_wait); + } + + if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags)) do_ls_recovery(ls); } + if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)) + up_write(&ls->ls_in_recovery); + dlm_put_lockspace(ls); return 0; } -void dlm_recoverd_kick(struct dlm_ls *ls) -{ - set_bit(LSFL_WORK, &ls->ls_flags); - wake_up_process(ls->ls_recoverd_task); -} - int dlm_recoverd_start(struct dlm_ls *ls) { struct task_struct *p; diff --git a/fs/dlm/recoverd.h b/fs/dlm/recoverd.h index 866657c5d69..8856079733f 100644 --- a/fs/dlm/recoverd.h +++ b/fs/dlm/recoverd.h @@ -14,7 +14,6 @@ #ifndef __RECOVERD_DOT_H__ #define __RECOVERD_DOT_H__ -void dlm_recoverd_kick(struct dlm_ls *ls); void dlm_recoverd_stop(struct dlm_ls *ls); int dlm_recoverd_start(struct dlm_ls *ls); void dlm_recoverd_suspend(struct dlm_ls *ls); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index eb4ed9ba309..142e21655ee 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -493,7 +493,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, { struct dlm_user_proc *proc = file->private_data; struct dlm_write_request *kbuf; - sigset_t tmpsig, allsigs; int error; #ifdef CONFIG_COMPAT @@ -503,6 +502,13 @@ static ssize_t device_write(struct file *file, const char __user *buf, #endif return -EINVAL; + /* + * can't compare against COMPAT/dlm_write_request32 because + * we don't yet know if is64bit is zero + */ + if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN) + return -EINVAL; + kbuf = kzalloc(count + 1, GFP_NOFS); if (!kbuf) return -ENOMEM; @@ -550,9 +556,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, goto out_free; } - sigfillset(&allsigs); - sigprocmask(SIG_BLOCK, &allsigs, &tmpsig); - error = -EINVAL; switch (kbuf->cmd) @@ -560,7 +563,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_LOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_lock(proc, &kbuf->i.lock); break; @@ -568,7 +571,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_UNLOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_unlock(proc, &kbuf->i.lock); break; @@ -576,7 +579,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_DEADLOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_deadlock(proc, &kbuf->i.lock); break; @@ -584,7 +587,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_CREATE_LOCKSPACE: if (proc) { log_print("create/remove only on control device"); - goto out_sig; + goto out_free; } error = device_create_lockspace(&kbuf->i.lspace); break; @@ -592,7 +595,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_REMOVE_LOCKSPACE: if (proc) { log_print("create/remove only on control device"); - goto out_sig; + goto out_free; } error = device_remove_lockspace(&kbuf->i.lspace); break; @@ -600,7 +603,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_PURGE: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_purge(proc, &kbuf->i.purge); break; @@ -610,8 +613,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, kbuf->cmd); } - out_sig: - sigprocmask(SIG_SETMASK, &tmpsig, NULL); out_free: kfree(kbuf); return error; @@ -652,15 +653,11 @@ static int device_close(struct inode *inode, struct file *file) { struct dlm_user_proc *proc = file->private_data; struct dlm_ls *ls; - sigset_t tmpsig, allsigs; ls = dlm_find_lockspace_local(proc->lockspace); if (!ls) return -ENOENT; - sigfillset(&allsigs); - sigprocmask(SIG_BLOCK, &allsigs, &tmpsig); - set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags); dlm_clear_proc_locks(ls, proc); @@ -678,9 +675,6 @@ static int device_close(struct inode *inode, struct file *file) /* FIXME: AUTOFREE: if this ls is no longer used do device_remove_lockspace() */ - sigprocmask(SIG_SETMASK, &tmpsig, NULL); - recalc_sigpending(); - return 0; } |
