diff options
Diffstat (limited to 'fs/dlm')
| -rw-r--r-- | fs/dlm/Kconfig | 2 | ||||
| -rw-r--r-- | fs/dlm/ast.c | 12 | ||||
| -rw-r--r-- | fs/dlm/config.c | 112 | ||||
| -rw-r--r-- | fs/dlm/config.h | 2 | ||||
| -rw-r--r-- | fs/dlm/debug_fs.c | 34 | ||||
| -rw-r--r-- | fs/dlm/dir.c | 4 | ||||
| -rw-r--r-- | fs/dlm/dlm_internal.h | 52 | ||||
| -rw-r--r-- | fs/dlm/lock.c | 64 | ||||
| -rw-r--r-- | fs/dlm/lockspace.c | 58 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.c | 405 | ||||
| -rw-r--r-- | fs/dlm/lowcomms.h | 2 | ||||
| -rw-r--r-- | fs/dlm/main.c | 2 | ||||
| -rw-r--r-- | fs/dlm/member.c | 44 | ||||
| -rw-r--r-- | fs/dlm/netlink.c | 18 | ||||
| -rw-r--r-- | fs/dlm/plock.c | 18 | ||||
| -rw-r--r-- | fs/dlm/rcom.c | 2 | ||||
| -rw-r--r-- | fs/dlm/recover.c | 99 | ||||
| -rw-r--r-- | fs/dlm/recoverd.c | 61 | ||||
| -rw-r--r-- | fs/dlm/recoverd.h | 1 | ||||
| -rw-r--r-- | fs/dlm/user.c | 32 |
20 files changed, 678 insertions, 346 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 1897eb1b4b6..e4242c3f848 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -1,6 +1,6 @@ menuconfig DLM tristate "Distributed Lock Manager (DLM)" - depends on EXPERIMENTAL && INET + depends on INET depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) select IP_SCTP help diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index 63dc19c54d5..dcea1e37a1b 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -14,9 +14,10 @@ #include "dlm_internal.h" #include "lock.h" #include "user.h" +#include "ast.h" -static uint64_t dlm_cb_seq; -static spinlock_t dlm_cb_seq_spin; +static uint64_t dlm_cb_seq; +static DEFINE_SPINLOCK(dlm_cb_seq_spin); static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) { @@ -267,10 +268,7 @@ void dlm_callback_work(struct work_struct *work) int dlm_callback_start(struct dlm_ls *ls) { ls->ls_callback_wq = alloc_workqueue("dlm_callback", - WQ_UNBOUND | - WQ_MEM_RECLAIM | - WQ_NON_REENTRANT, - 0); + WQ_UNBOUND | WQ_MEM_RECLAIM, 0); if (!ls->ls_callback_wq) { log_print("can't start dlm_callback workqueue"); return -ENOMEM; @@ -311,6 +309,6 @@ void dlm_callback_resume(struct dlm_ls *ls) mutex_unlock(&ls->ls_cb_mutex); if (count) - log_debug(ls, "dlm_callback_resume %d", count); + log_rinfo(ls, "dlm_callback_resume %d", count); } diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 9ccf7346834..d521bddf876 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -138,8 +138,9 @@ static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf) static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl, const char *buf, size_t len) { - strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN); - strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN); + strlcpy(dlm_config.ci_cluster_name, buf, + sizeof(dlm_config.ci_cluster_name)); + strlcpy(cl->cl_cluster_name, buf, sizeof(cl->cl_cluster_name)); return len; } @@ -156,11 +157,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, const char *buf, size_t len) { unsigned int x; + int rc; if (!capable(CAP_SYS_ADMIN)) - return -EACCES; - - x = simple_strtoul(buf, NULL, 0); + return -EPERM; + rc = kstrtouint(buf, 0, &x); + if (rc) + return rc; if (check_zero && !x) return -EINVAL; @@ -729,7 +732,10 @@ static ssize_t comm_nodeid_read(struct dlm_comm *cm, char *buf) static ssize_t comm_nodeid_write(struct dlm_comm *cm, const char *buf, size_t len) { - cm->nodeid = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &cm->nodeid); + + if (rc) + return rc; return len; } @@ -741,7 +747,10 @@ static ssize_t comm_local_read(struct dlm_comm *cm, char *buf) static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf, size_t len) { - cm->local= simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &cm->local); + + if (rc) + return rc; if (cm->local && !local_comm) local_comm = cm; return len; @@ -750,6 +759,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf, static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) { struct sockaddr_storage *addr; + int rv; if (len != sizeof(struct sockaddr_storage)) return -EINVAL; @@ -762,6 +772,13 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) return -ENOMEM; memcpy(addr, buf, len); + + rv = dlm_lowcomms_addr(cm->nodeid, addr, len); + if (rv) { + kfree(addr); + return rv; + } + cm->addr[cm->addr_count++] = addr; return len; } @@ -837,7 +854,10 @@ static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, size_t len) { uint32_t seq = 0; - nd->nodeid = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &nd->nodeid); + + if (rc) + return rc; dlm_comm_seq(nd->nodeid, &seq); nd->comm_seq = seq; return len; @@ -851,7 +871,10 @@ static ssize_t node_weight_read(struct dlm_node *nd, char *buf) static ssize_t node_weight_write(struct dlm_node *nd, const char *buf, size_t len) { - nd->weight = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &nd->weight); + + if (rc) + return rc; return len; } @@ -878,34 +901,7 @@ static void put_space(struct dlm_space *sp) config_item_put(&sp->group.cg_item); } -static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) -{ - switch (x->ss_family) { - case AF_INET: { - struct sockaddr_in *sinx = (struct sockaddr_in *)x; - struct sockaddr_in *siny = (struct sockaddr_in *)y; - if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) - return 0; - if (sinx->sin_port != siny->sin_port) - return 0; - break; - } - case AF_INET6: { - struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; - struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; - if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) - return 0; - if (sinx->sin6_port != siny->sin6_port) - return 0; - break; - } - default: - return 0; - } - return 1; -} - -static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr) +static struct dlm_comm *get_comm(int nodeid) { struct config_item *i; struct dlm_comm *cm = NULL; @@ -919,19 +915,11 @@ static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr) list_for_each_entry(i, &comm_list->cg_children, ci_entry) { cm = config_item_to_comm(i); - if (nodeid) { - if (cm->nodeid != nodeid) - continue; - found = 1; - config_item_get(i); - break; - } else { - if (!cm->addr_count || !addr_compare(cm->addr[0], addr)) - continue; - found = 1; - config_item_get(i); - break; - } + if (cm->nodeid != nodeid) + continue; + found = 1; + config_item_get(i); + break; } mutex_unlock(&clusters_root.subsys.su_mutex); @@ -995,7 +983,7 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int dlm_comm_seq(int nodeid, uint32_t *seq) { - struct dlm_comm *cm = get_comm(nodeid, NULL); + struct dlm_comm *cm = get_comm(nodeid); if (!cm) return -EEXIST; *seq = cm->seq; @@ -1003,28 +991,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) return 0; } -int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr) -{ - struct dlm_comm *cm = get_comm(nodeid, NULL); - if (!cm) - return -EEXIST; - if (!cm->addr_count) - return -ENOENT; - memcpy(addr, cm->addr[0], sizeof(*addr)); - put_comm(cm); - return 0; -} - -int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) -{ - struct dlm_comm *cm = get_comm(0, addr); - if (!cm) - return -EEXIST; - *nodeid = cm->nodeid; - put_comm(cm); - return 0; -} - int dlm_our_nodeid(void) { return local_comm ? local_comm->nodeid : 0; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index dbd35a08f3a..f30697bc278 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -46,8 +46,6 @@ void dlm_config_exit(void); int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int *count_out); int dlm_comm_seq(int nodeid, uint32_t *seq); -int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr); -int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid); int dlm_our_nodeid(void); int dlm_our_addr(struct sockaddr_storage *addr, int num); diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index b969deef9eb..8d77ba7b175 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -68,7 +68,7 @@ static int print_format1_lock(struct seq_file *s, struct dlm_lkb *lkb, if (lkb->lkb_wait_type) seq_printf(s, " wait_type: %d", lkb->lkb_wait_type); - return seq_printf(s, "\n"); + return seq_puts(s, "\n"); } static int print_format1(struct dlm_rsb *res, struct seq_file *s) @@ -92,31 +92,31 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) } if (res->res_nodeid > 0) - rv = seq_printf(s, "\" \nLocal Copy, Master is node %d\n", + rv = seq_printf(s, "\"\nLocal Copy, Master is node %d\n", res->res_nodeid); else if (res->res_nodeid == 0) - rv = seq_printf(s, "\" \nMaster Copy\n"); + rv = seq_puts(s, "\"\nMaster Copy\n"); else if (res->res_nodeid == -1) - rv = seq_printf(s, "\" \nLooking up master (lkid %x)\n", + rv = seq_printf(s, "\"\nLooking up master (lkid %x)\n", res->res_first_lkid); else - rv = seq_printf(s, "\" \nInvalid master %d\n", + rv = seq_printf(s, "\"\nInvalid master %d\n", res->res_nodeid); if (rv) goto out; /* Print the LVB: */ if (res->res_lvbptr) { - seq_printf(s, "LVB: "); + seq_puts(s, "LVB: "); for (i = 0; i < lvblen; i++) { if (i == lvblen / 2) - seq_printf(s, "\n "); + seq_puts(s, "\n "); seq_printf(s, "%02x ", (unsigned char) res->res_lvbptr[i]); } if (rsb_flag(res, RSB_VALNOTVALID)) - seq_printf(s, " (INVALID)"); - rv = seq_printf(s, "\n"); + seq_puts(s, " (INVALID)"); + rv = seq_puts(s, "\n"); if (rv) goto out; } @@ -133,21 +133,21 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) } /* Print the locks attached to this resource */ - seq_printf(s, "Granted Queue\n"); + seq_puts(s, "Granted Queue\n"); list_for_each_entry(lkb, &res->res_grantqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) goto out; } - seq_printf(s, "Conversion Queue\n"); + seq_puts(s, "Conversion Queue\n"); list_for_each_entry(lkb, &res->res_convertqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) goto out; } - seq_printf(s, "Waiting Queue\n"); + seq_puts(s, "Waiting Queue\n"); list_for_each_entry(lkb, &res->res_waitqueue, lkb_statequeue) { rv = print_format1_lock(s, lkb, res); if (rv) @@ -157,13 +157,13 @@ static int print_format1(struct dlm_rsb *res, struct seq_file *s) if (list_empty(&res->res_lookup)) goto out; - seq_printf(s, "Lookup Queue\n"); + seq_puts(s, "Lookup Queue\n"); list_for_each_entry(lkb, &res->res_lookup, lkb_rsb_lookup) { rv = seq_printf(s, "%08x %s", lkb->lkb_id, print_lockmode(lkb->lkb_rqmode)); if (lkb->lkb_wait_type) seq_printf(s, " wait_type: %d", lkb->lkb_wait_type); - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); } out: unlock_rsb(res); @@ -300,7 +300,7 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) else seq_printf(s, " %02x", (unsigned char)r->res_name[i]); } - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); if (rv) goto out; @@ -311,7 +311,7 @@ static int print_format3(struct dlm_rsb *r, struct seq_file *s) for (i = 0; i < lvblen; i++) seq_printf(s, " %02x", (unsigned char)r->res_lvbptr[i]); - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); if (rv) goto out; @@ -377,7 +377,7 @@ static int print_format4(struct dlm_rsb *r, struct seq_file *s) else seq_printf(s, " %02x", (unsigned char)r->res_name[i]); } - rv = seq_printf(s, "\n"); + rv = seq_puts(s, "\n"); out: unlock_rsb(r); return rv; diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 278a75cda44..d975851a7e1 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -68,7 +68,7 @@ int dlm_recover_directory(struct dlm_ls *ls) uint16_t namelen; unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; - log_debug(ls, "dlm_recover_directory"); + log_rinfo(ls, "dlm_recover_directory"); if (dlm_no_directory(ls)) goto out_status; @@ -189,7 +189,7 @@ int dlm_recover_directory(struct dlm_ls *ls) error = 0; dlm_set_recover_status(ls, DLM_RS_DIR); - log_debug(ls, "dlm_recover_directory %u in %u new", + log_rinfo(ls, "dlm_recover_directory %u in %u new", count, count_add); out_free: kfree(last_name); diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9d3e485f88c..5eff6ea3e27 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -65,6 +65,8 @@ struct dlm_mhandle; printk(KERN_ERR "dlm: "fmt"\n" , ##args) #define log_error(ls, fmt, args...) \ printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args) +#define log_rinfo(ls, fmt, args...) \ + printk(KERN_INFO "dlm: %s: " fmt "\n", (ls)->ls_name , ##args); #define log_debug(ls, fmt, args...) \ do { \ @@ -96,10 +98,13 @@ do { \ } +#define DLM_RTF_SHRINK 0x00000001 + struct dlm_rsbtable { struct rb_root keep; struct rb_root toss; spinlock_t lock; + uint32_t flags; }; @@ -337,6 +342,7 @@ enum rsb_flags { RSB_NEW_MASTER2, RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, + RSB_RECOVER_LVB_INVAL, }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -604,6 +610,7 @@ struct dlm_ls { struct idr ls_recover_idr; spinlock_t ls_recover_idr_lock; wait_queue_head_t ls_wait_general; + wait_queue_head_t ls_recover_lock_wait; struct mutex ls_clear_proc_locks; struct list_head ls_root_list; /* root resources */ @@ -616,15 +623,40 @@ struct dlm_ls { char ls_name[1]; }; -#define LSFL_WORK 0 -#define LSFL_RUNNING 1 -#define LSFL_RECOVERY_STOP 2 -#define LSFL_RCOM_READY 3 -#define LSFL_RCOM_WAIT 4 -#define LSFL_UEVENT_WAIT 5 -#define LSFL_TIMEWARN 6 -#define LSFL_CB_DELAY 7 -#define LSFL_NODIR 8 +/* + * LSFL_RECOVER_STOP - dlm_ls_stop() sets this to tell dlm recovery routines + * that they should abort what they're doing so new recovery can be started. + * + * LSFL_RECOVER_DOWN - dlm_ls_stop() sets this to tell dlm_recoverd that it + * should do down_write() on the in_recovery rw_semaphore. (doing down_write + * within dlm_ls_stop causes complaints about the lock acquired/released + * in different contexts.) + * + * LSFL_RECOVER_LOCK - dlm_recoverd holds the in_recovery rw_semaphore. + * It sets this after it is done with down_write() on the in_recovery + * rw_semaphore and clears it after it has released the rw_semaphore. + * + * LSFL_RECOVER_WORK - dlm_ls_start() sets this to tell dlm_recoverd that it + * should begin recovery of the lockspace. + * + * LSFL_RUNNING - set when normal locking activity is enabled. + * dlm_ls_stop() clears this to tell dlm locking routines that they should + * quit what they are doing so recovery can run. dlm_recoverd sets + * this after recovery is finished. + */ + +#define LSFL_RECOVER_STOP 0 +#define LSFL_RECOVER_DOWN 1 +#define LSFL_RECOVER_LOCK 2 +#define LSFL_RECOVER_WORK 3 +#define LSFL_RUNNING 4 + +#define LSFL_RCOM_READY 5 +#define LSFL_RCOM_WAIT 6 +#define LSFL_UEVENT_WAIT 7 +#define LSFL_TIMEWARN 8 +#define LSFL_CB_DELAY 9 +#define LSFL_NODIR 10 /* much of this is just saving user space pointers associated with the lock that we pass back to the user lib with an ast */ @@ -667,7 +699,7 @@ static inline int dlm_locking_stopped(struct dlm_ls *ls) static inline int dlm_recovery_stopped(struct dlm_ls *ls) { - return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + return test_bit(LSFL_RECOVER_STOP, &ls->ls_flags); } static inline int dlm_no_directory(struct dlm_ls *ls) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index b5695075818..83f3d552030 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -687,6 +687,7 @@ static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, log_error(ls, "find_rsb new from_other %d dir %d our %d %s", from_nodeid, dir_nodeid, our_nodeid, r->res_name); dlm_free_rsb(r); + r = NULL; error = -ENOTBLK; goto out_unlock; } @@ -1132,6 +1133,7 @@ static void toss_rsb(struct kref *kref) rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); r->res_toss_time = jiffies; + ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; @@ -1182,7 +1184,7 @@ static void detach_lkb(struct dlm_lkb *lkb) static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - int rv, id; + int rv; lkb = dlm_allocate_lkb(ls); if (!lkb) @@ -1198,19 +1200,13 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) mutex_init(&lkb->lkb_cb_mutex); INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - retry: - rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS); - if (!rv) - return -ENOMEM; - + idr_preload(GFP_NOFS); spin_lock(&ls->ls_lkbidr_spin); - rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id); - if (!rv) - lkb->lkb_id = id; + rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT); + if (rv >= 0) + lkb->lkb_id = rv; spin_unlock(&ls->ls_lkbidr_spin); - - if (rv == -EAGAIN) - goto retry; + idr_preload_end(); if (rv < 0) { log_error(ls, "create_lkb idr error %d", rv); @@ -1659,11 +1655,18 @@ static void shrink_bucket(struct dlm_ls *ls, int b) char *name; int our_nodeid = dlm_our_nodeid(); int remote_count = 0; + int need_shrink = 0; int i, len, rv; memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); spin_lock(&ls->ls_rsbtbl[b].lock); + + if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { + spin_unlock(&ls->ls_rsbtbl[b].lock); + return; + } + for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { next = rb_next(n); r = rb_entry(n, struct dlm_rsb, res_hashnode); @@ -1679,6 +1682,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b) continue; } + need_shrink = 1; + if (!time_after_eq(jiffies, r->res_toss_time + dlm_config.ci_toss_secs * HZ)) { continue; @@ -1710,6 +1715,11 @@ static void shrink_bucket(struct dlm_ls *ls, int b) rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); dlm_free_rsb(r); } + + if (need_shrink) + ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; + else + ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; spin_unlock(&ls->ls_rsbtbl[b].lock); /* @@ -2029,8 +2039,8 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > r->res_ls->ls_lvblen) + len = r->res_ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); lkb->lkb_lvbseq = ms->m_lvbseq; } @@ -3884,8 +3894,8 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, if (!lkb->lkb_lvbptr) return -ENOMEM; len = receive_extralen(ms); - if (len > DLM_RESNAME_MAXLEN) - len = DLM_RESNAME_MAXLEN; + if (len > ls->ls_lvblen) + len = ls->ls_lvblen; memcpy(lkb->lkb_lvbptr, ms->m_extra, len); } return 0; @@ -5393,6 +5403,13 @@ static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, if ((lkb->lkb_nodeid == nodeid_gone) || dlm_is_removed(ls, lkb->lkb_nodeid)) { + /* tell recover_lvb to invalidate the lvb + because a node holding EX/PW failed */ + if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && + (lkb->lkb_grmode >= DLM_LOCK_PW)) { + rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); + } + del_lkb(r, lkb); /* this put should free the lkb */ @@ -5446,7 +5463,7 @@ void dlm_recover_purge(struct dlm_ls *ls) up_write(&ls->ls_root_sem); if (lkb_count) - log_debug(ls, "dlm_recover_purge %u locks for %u nodes", + log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", lkb_count, nodes_count); } @@ -5520,7 +5537,7 @@ void dlm_recover_grant(struct dlm_ls *ls) } if (lkb_count) - log_debug(ls, "dlm_recover_grant %u locks on %u resources", + log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", lkb_count, rsb_count); } @@ -5679,7 +5696,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) put_rsb(r); out: if (error && error != -EEXIST) - log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", + log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", from_nodeid, remid, error); rl->rl_result = cpu_to_le32(error); return error; @@ -6025,15 +6042,18 @@ static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) return error; } -/* The force flag allows the unlock to go ahead even if the lkb isn't granted. - Regardless of what rsb queue the lock is on, it's removed and freed. */ +/* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't + granted. Regardless of what rsb queue the lock is on, it's removed and + freed. The IVVALBLK flag causes the lvb on the resource to be invalidated + if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) { struct dlm_args args; int error; - set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args); + set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, + lkb->lkb_ua, &args); error = unlock_lock(ls, lkb, &args); if (error == -DLM_EUNLOCK) diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 952557d00cc..f3e72787e7f 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -35,8 +35,11 @@ static struct task_struct * scand_task; static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { ssize_t ret = len; - int n = simple_strtol(buf, NULL, 0); + int n; + int rc = kstrtoint(buf, 0, &n); + if (rc) + return rc; ls = dlm_find_lockspace_local(ls->ls_local_handle); if (!ls) return -EINVAL; @@ -57,7 +60,10 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_uevent_result = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); + + if (rc) + return rc; set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); wake_up(&ls->ls_uevent_wait); return len; @@ -70,7 +76,10 @@ static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_global_id = simple_strtoul(buf, NULL, 0); + int rc = kstrtouint(buf, 0, &ls->ls_global_id); + + if (rc) + return rc; return len; } @@ -81,7 +90,11 @@ static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) { - int val = simple_strtoul(buf, NULL, 0); + int val; + int rc = kstrtoint(buf, 0, &val); + + if (rc) + return rc; if (val == 1) set_bit(LSFL_NODIR, &ls->ls_flags); return len; @@ -190,7 +203,7 @@ static int do_uevent(struct dlm_ls *ls, int in) else kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); - log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving"); + log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); /* dlm_controld will see the uevent, do the necessary group management and then write to sysfs to wake us */ @@ -198,7 +211,7 @@ static int do_uevent(struct dlm_ls *ls, int in) error = wait_event_interruptible(ls->ls_uevent_wait, test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); - log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result); + log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result); if (error) goto out; @@ -582,8 +595,6 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_root_list); init_rwsem(&ls->ls_root_sem); - down_write(&ls->ls_in_recovery); - spin_lock(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); @@ -597,13 +608,24 @@ static int new_lockspace(const char *name, const char *cluster, } } - /* needs to find ls in lslist */ + init_waitqueue_head(&ls->ls_recover_lock_wait); + + /* + * Once started, dlm_recoverd first looks for ls in lslist, then + * initializes ls_in_recovery as locked in "down" mode. We need + * to wait for the wakeup from dlm_recoverd because in_recovery + * has to start out in down mode. + */ + error = dlm_recoverd_start(ls); if (error) { log_error(ls, "can't start dlm_recoverd %d", error); goto out_callback; } + wait_event(ls->ls_recover_lock_wait, + test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); + ls->ls_kobj.kset = dlm_kset; error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, "%s", ls->ls_name); @@ -631,7 +653,7 @@ static int new_lockspace(const char *name, const char *cluster, dlm_create_debug_file(ls); - log_debug(ls, "join complete"); + log_rinfo(ls, "join complete"); *lockspace = ls; return 0; @@ -697,9 +719,7 @@ static int lkb_idr_is_local(int id, void *p, void *data) { struct dlm_lkb *lkb = p; - if (!lkb->lkb_nodeid) - return 1; - return 0; + return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; } static int lkb_idr_is_any(int id, void *p, void *data) @@ -787,7 +807,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) */ idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); - idr_remove_all(&ls->ls_lkbidr); idr_destroy(&ls->ls_lkbidr); /* @@ -829,7 +848,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - log_debug(ls, "release_lockspace final free"); + log_rinfo(ls, "release_lockspace final free"); kobject_put(&ls->ls_kobj); /* The ls structure will be freed when the kobject is done with */ @@ -875,17 +894,24 @@ int dlm_release_lockspace(void *lockspace, int force) void dlm_stop_lockspaces(void) { struct dlm_ls *ls; + int count; restart: + count = 0; spin_lock(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { - if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) + if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { + count++; continue; + } spin_unlock(&lslist_lock); log_error(ls, "no userland control daemon, stopping lockspace"); dlm_ls_stop(ls); goto restart; } spin_unlock(&lslist_lock); + + if (count) + log_print("dlm user daemon left %d lockspaces", count); } diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5c1b0e38c7a..d08e079ea5d 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -53,7 +53,6 @@ #include <linux/sctp.h> #include <linux/slab.h> #include <net/sctp/sctp.h> -#include <net/sctp/user.h> #include <net/ipv6.h> #include "dlm_internal.h" @@ -126,6 +125,7 @@ struct connection { struct connection *othercon; struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ + bool try_new_addr; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -140,6 +140,17 @@ struct writequeue_entry { struct connection *con; }; +struct dlm_node_addr { + struct list_head list; + int nodeid; + int addr_count; + int curr_addr_index; + struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; +}; + +static LIST_HEAD(dlm_node_addrs); +static DEFINE_SPINLOCK(dlm_node_addrs_spin); + static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; static int dlm_allow_conn; @@ -167,12 +178,11 @@ static inline int nodeid_hash(int nodeid) static struct connection *__find_con(int nodeid) { int r; - struct hlist_node *h; struct connection *con; r = nodeid_hash(nodeid); - hlist_for_each_entry(con, h, &connection_hash[r], list) { + hlist_for_each_entry(con, &connection_hash[r], list) { if (con->nodeid == nodeid) return con; } @@ -222,13 +232,12 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) static void foreach_conn(void (*conn_func)(struct connection *c)) { int i; - struct hlist_node *h, *n; + struct hlist_node *n; struct connection *con; for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ + hlist_for_each_entry_safe(con, n, &connection_hash[i], list) conn_func(con); - } } } @@ -247,13 +256,12 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation) static struct connection *assoc2con(int assoc_id) { int i; - struct hlist_node *h; struct connection *con; mutex_lock(&connections_lock); for (i = 0 ; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry(con, h, &connection_hash[i], list) { + hlist_for_each_entry(con, &connection_hash[i], list) { if (con->sctp_assoc == assoc_id) { mutex_unlock(&connections_lock); return con; @@ -264,33 +272,159 @@ static struct connection *assoc2con(int assoc_id) return NULL; } -static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) +static struct dlm_node_addr *find_node_addr(int nodeid) { - struct sockaddr_storage addr; - int error; + struct dlm_node_addr *na; + + list_for_each_entry(na, &dlm_node_addrs, list) { + if (na->nodeid == nodeid) + return na; + } + return NULL; +} + +static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) +{ + switch (x->ss_family) { + case AF_INET: { + struct sockaddr_in *sinx = (struct sockaddr_in *)x; + struct sockaddr_in *siny = (struct sockaddr_in *)y; + if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) + return 0; + if (sinx->sin_port != siny->sin_port) + return 0; + break; + } + case AF_INET6: { + struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; + struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; + if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) + return 0; + if (sinx->sin6_port != siny->sin6_port) + return 0; + break; + } + default: + return 0; + } + return 1; +} + +static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, + struct sockaddr *sa_out, bool try_new_addr) +{ + struct sockaddr_storage sas; + struct dlm_node_addr *na; if (!dlm_local_count) return -1; - error = dlm_nodeid_to_addr(nodeid, &addr); - if (error) - return error; + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (na && na->addr_count) { + if (try_new_addr) { + na->curr_addr_index++; + if (na->curr_addr_index == na->addr_count) + na->curr_addr_index = 0; + } + + memcpy(&sas, na->addr[na->curr_addr_index ], + sizeof(struct sockaddr_storage)); + } + spin_unlock(&dlm_node_addrs_spin); + + if (!na) + return -EEXIST; + + if (!na->addr_count) + return -ENOENT; + + if (sas_out) + memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); + + if (!sa_out) + return 0; if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; - struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; + struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; + struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; } else { - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; - struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; + struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; ret6->sin6_addr = in6->sin6_addr; } return 0; } +static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) +{ + struct dlm_node_addr *na; + int rv = -EEXIST; + int addr_i; + + spin_lock(&dlm_node_addrs_spin); + list_for_each_entry(na, &dlm_node_addrs, list) { + if (!na->addr_count) + continue; + + for (addr_i = 0; addr_i < na->addr_count; addr_i++) { + if (addr_compare(na->addr[addr_i], addr)) { + *nodeid = na->nodeid; + rv = 0; + goto unlock; + } + } + } +unlock: + spin_unlock(&dlm_node_addrs_spin); + return rv; +} + +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +{ + struct sockaddr_storage *new_addr; + struct dlm_node_addr *new_node, *na; + + new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); + if (!new_node) + return -ENOMEM; + + new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); + if (!new_addr) { + kfree(new_node); + return -ENOMEM; + } + + memcpy(new_addr, addr, len); + + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (!na) { + new_node->nodeid = nodeid; + new_node->addr[0] = new_addr; + new_node->addr_count = 1; + list_add(&new_node->list, &dlm_node_addrs); + spin_unlock(&dlm_node_addrs_spin); + return 0; + } + + if (na->addr_count >= DLM_MAX_ADDR_COUNT) { + spin_unlock(&dlm_node_addrs_spin); + kfree(new_addr); + kfree(new_node); + return -ENOSPC; + } + + na->addr[na->addr_count++] = new_addr; + spin_unlock(&dlm_node_addrs_spin); + kfree(new_node); + return 0; +} + /* Data available on socket or listen socket received a connect */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) +static void lowcomms_data_ready(struct sock *sk) { struct connection *con = sock2con(sk); if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) @@ -348,7 +482,7 @@ int dlm_lowcomms_connect_node(int nodeid) } /* Make a socket active */ -static int add_sock(struct socket *sock, struct connection *con) +static void add_sock(struct socket *sock, struct connection *con) { con->sock = sock; @@ -358,7 +492,6 @@ static int add_sock(struct socket *sock, struct connection *con) con->sock->sk->sk_state_change = lowcomms_state_change; con->sock->sk->sk_user_data = con; con->sock->sk->sk_allocation = GFP_NOFS; - return 0; } /* Add the port number to an IPv6 or 4 sockaddr and return the address @@ -440,8 +573,23 @@ static void sctp_send_shutdown(sctp_assoc_t associd) static void sctp_init_failed_foreach(struct connection *con) { + + /* + * Don't try to recover base con and handle race where the + * other node's assoc init creates a assoc and we get that + * notification, then we get a notification that our attempt + * failed due. This happens when we are still trying the primary + * address, but the other node has already tried secondary addrs + * and found one that worked. + */ + if (!con->nodeid || con->sctp_assoc) + return; + + log_print("Retrying SCTP association init for node %d\n", con->nodeid); + + con->try_new_addr = true; con->sctp_assoc = 0; - if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) { if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork); } @@ -458,15 +606,62 @@ static void sctp_init_failed(void) mutex_unlock(&connections_lock); } +static void retry_failed_sctp_send(struct connection *recv_con, + struct sctp_send_failed *sn_send_failed, + char *buf) +{ + int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed); + struct dlm_mhandle *mh; + struct connection *con; + char *retry_buf; + int nodeid = sn_send_failed->ssf_info.sinfo_ppid; + + log_print("Retry sending %d bytes to node id %d", len, nodeid); + + if (!nodeid) { + log_print("Shouldn't resend data via listening connection."); + return; + } + + con = nodeid2con(nodeid, 0); + if (!con) { + log_print("Could not look up con for nodeid %d\n", + nodeid); + return; + } + + mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf); + if (!mh) { + log_print("Could not allocate buf for retry."); + return; + } + memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len); + dlm_lowcomms_commit_buffer(mh); + + /* + * If we got a assoc changed event before the send failed event then + * we only need to retry the send. + */ + if (con->sctp_assoc) { + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); + } else + sctp_init_failed_foreach(con); +} + /* Something happened to an association */ static void process_sctp_notification(struct connection *con, struct msghdr *msg, char *buf) { union sctp_notification *sn = (union sctp_notification *)buf; + struct linger linger; - if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + switch (sn->sn_header.sn_type) { + case SCTP_SEND_FAILED: + retry_failed_sctp_send(con, &sn->sn_send_failed, buf); + break; + case SCTP_ASSOC_CHANGE: switch (sn->sn_assoc_change.sac_state) { - case SCTP_COMM_UP: case SCTP_RESTART: { @@ -510,7 +705,7 @@ static void process_sctp_notification(struct connection *con, return; } make_sockaddr(&prim.ssp_addr, 0, &addr_len); - if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { + if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) { unsigned char *b=(unsigned char *)&prim.ssp_addr; log_print("reject connect from unknown addr"); print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, @@ -524,11 +719,11 @@ static void process_sctp_notification(struct connection *con, return; /* Peel off a new sock */ - sctp_lock_sock(con->sock->sk); + lock_sock(con->sock->sk); ret = sctp_do_peeloff(con->sock->sk, sn->sn_assoc_change.sac_assoc_id, &new_con->sock); - sctp_release_sock(con->sock->sk); + release_sock(con->sock->sk); if (ret < 0) { log_print("Can't peel off a socket for " "connection %d to node %d: err=%d", @@ -538,12 +733,21 @@ static void process_sctp_notification(struct connection *con, } add_sock(new_con->sock, new_con); + linger.l_onoff = 1; + linger.l_linger = 0; + ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER, + (char *)&linger, sizeof(linger)); + if (ret < 0) + log_print("set socket option SO_LINGER failed"); + log_print("connecting to %d sctp association %d", nodeid, (int)sn->sn_assoc_change.sac_assoc_id); + new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id; + new_con->try_new_addr = false; /* Send any pending writes */ clear_bit(CF_CONNECT_PENDING, &new_con->flags); - clear_bit(CF_INIT_PENDING, &con->flags); + clear_bit(CF_INIT_PENDING, &new_con->flags); if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { queue_work(send_workqueue, &new_con->swork); } @@ -562,14 +766,10 @@ static void process_sctp_notification(struct connection *con, } break; - /* We don't know which INIT failed, so clear the PENDING flags - * on them all. if assoc_id is zero then it will then try - * again */ - case SCTP_CANT_STR_ASSOC: { + /* Will retry init when we get the send failed notification */ log_print("Can't start SCTP association - retrying"); - sctp_init_failed(); } break; @@ -578,6 +778,8 @@ static void process_sctp_notification(struct connection *con, (int)sn->sn_assoc_change.sac_assoc_id, sn->sn_assoc_change.sac_state); } + default: + ; /* fall through */ } } @@ -747,7 +949,7 @@ static int tcp_accept_from_sock(struct connection *con) /* Get the new node's NODEID */ make_sockaddr(&peeraddr, 0, &len); - if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { + if (addr_to_nodeid(&peeraddr, &nodeid)) { unsigned char *b=(unsigned char *)&peeraddr; log_print("connect from non cluster node"); print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, @@ -837,6 +1039,24 @@ static void free_entry(struct writequeue_entry *e) kfree(e); } +/* + * writequeue_entry_complete - try to delete and free write queue entry + * @e: write queue entry to try to delete + * @completed: bytes completed + * + * writequeue_lock must be held. + */ +static void writequeue_entry_complete(struct writequeue_entry *e, int completed) +{ + e->offset += completed; + e->len -= completed; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + free_entry(e); + } +} + /* Initiate an SCTP association. This is a special case of send_to_sock() in that we don't yet have a peeled-off socket for this association, so we use the listening socket @@ -856,15 +1076,14 @@ static void sctp_init_assoc(struct connection *con) int addrlen; struct kvec iov[1]; + mutex_lock(&con->sock_mutex); if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) - return; - - if (con->retries++ > MAX_CONNECT_RETRIES) - return; + goto unlock; - if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { + if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, + con->try_new_addr)) { log_print("no address for nodeid %d", con->nodeid); - return; + goto unlock; } base_con = nodeid2con(0, 0); BUG_ON(base_con == NULL); @@ -882,17 +1101,25 @@ static void sctp_init_assoc(struct connection *con) if (list_empty(&con->writequeue)) { spin_unlock(&con->writequeue_lock); log_print("writequeue empty for nodeid %d", con->nodeid); - return; + goto unlock; } e = list_first_entry(&con->writequeue, struct writequeue_entry, list); len = e->len; offset = e->offset; - spin_unlock(&con->writequeue_lock); /* Send the first block off the write queue */ iov[0].iov_base = page_address(e->page)+offset; iov[0].iov_len = len; + spin_unlock(&con->writequeue_lock); + + if (rem_addr.ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; + log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr); + } else { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr; + log_print("Trying to connect to %pI6", &sin6->sin6_addr); + } cmsg = CMSG_FIRSTHDR(&outmessage); cmsg->cmsg_level = IPPROTO_SCTP; @@ -900,8 +1127,9 @@ static void sctp_init_assoc(struct connection *con) cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); sinfo = CMSG_DATA(cmsg); memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); + sinfo->sinfo_ppid = cpu_to_le32(con->nodeid); outmessage.msg_controllen = cmsg->cmsg_len; + sinfo->sinfo_flags |= SCTP_ADDR_OVER; ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); if (ret < 0) { @@ -914,25 +1142,22 @@ static void sctp_init_assoc(struct connection *con) } else { spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - } + writequeue_entry_complete(e, ret); spin_unlock(&con->writequeue_lock); } + +unlock: + mutex_unlock(&con->sock_mutex); } /* Connect a new socket to its peer */ static void tcp_connect_to_sock(struct connection *con) { - int result = -EHOSTUNREACH; struct sockaddr_storage saddr, src_addr; int addr_len; struct socket *sock = NULL; int one = 1; + int result; if (con->nodeid == 0) { log_print("attempt to connect sock 0 foiled"); @@ -944,10 +1169,8 @@ static void tcp_connect_to_sock(struct connection *con) goto out; /* Some odd races can cause double-connects, ignore them */ - if (con->sock) { - result = 0; + if (con->sock) goto out; - } /* Create a socket to communicate with */ result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, @@ -956,8 +1179,11 @@ static void tcp_connect_to_sock(struct connection *con) goto out_err; memset(&saddr, 0, sizeof(saddr)); - if (dlm_nodeid_to_addr(con->nodeid, &saddr)) + result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); + if (result < 0) { + log_print("no address for nodeid %d", con->nodeid); goto out_err; + } sock->sk->sk_user_data = con; con->rx_action = receive_from_sock; @@ -983,8 +1209,7 @@ static void tcp_connect_to_sock(struct connection *con) kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, sizeof(one)); - result = - sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, + result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, O_NONBLOCK); if (result == -EINPROGRESS) result = 0; @@ -1002,11 +1227,17 @@ out_err: * Some errors are fatal and this list might need adjusting. For other * errors we try again until the max number of retries is reached. */ - if (result != -EHOSTUNREACH && result != -ENETUNREACH && - result != -ENETDOWN && result != -EINVAL - && result != -EPROTONOSUPPORT) { + if (result != -EHOSTUNREACH && + result != -ENETUNREACH && + result != -ENETDOWN && + result != -EINVAL && + result != -EPROTONOSUPPORT) { + log_print("connect %d try %d error %d", con->nodeid, + con->retries, result); + mutex_unlock(&con->sock_mutex); + msleep(1000); lowcomms_connect_sock(con); - result = 0; + return; } out: mutex_unlock(&con->sock_mutex); @@ -1044,10 +1275,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con, if (result < 0) { log_print("Failed to set SO_REUSEADDR on socket: %d", result); } - sock->sk->sk_user_data = con; con->rx_action = tcp_accept_from_sock; con->connect_action = tcp_connect_to_sock; - con->sock = sock; /* Bind to our port */ make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); @@ -1129,6 +1358,7 @@ static int sctp_listen_for_all(void) int result = -EINVAL, num = 1, i, addr_len; struct connection *con = nodeid2con(0, GFP_NOFS); int bufsize = NEEDED_RMEM; + int one = 1; if (!con) return -ENOMEM; @@ -1163,6 +1393,11 @@ static int sctp_listen_for_all(void) goto create_delsock; } + result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, + sizeof(one)); + if (result < 0) + log_print("Could not set SCTP NODELAY error %d\n", result); + /* Init con struct */ sock->sk->sk_user_data = con; con->sock = sock; @@ -1257,7 +1492,6 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) struct connection *con; struct writequeue_entry *e; int offset = 0; - int users = 0; con = nodeid2con(nodeid, allocation); if (!con) @@ -1271,7 +1505,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) } else { offset = e->end; e->end += len; - users = e->users++; + e->users++; } spin_unlock(&con->writequeue_lock); @@ -1286,7 +1520,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) spin_lock(&con->writequeue_lock); offset = e->end; e->end += len; - users = e->users++; + e->users++; list_add_tail(&e->list, &con->writequeue); spin_unlock(&con->writequeue_lock); goto got_one; @@ -1358,8 +1592,7 @@ static void send_to_sock(struct connection *con) } cond_resched(); goto out; - } - if (ret <= 0) + } else if (ret < 0) goto send_error; } @@ -1370,14 +1603,7 @@ static void send_to_sock(struct connection *con) } spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - continue; - } + writequeue_entry_complete(e, ret); } spin_unlock(&con->writequeue_lock); out: @@ -1394,7 +1620,6 @@ out_connect: mutex_unlock(&con->sock_mutex); if (!test_bit(CF_INIT_PENDING, &con->flags)) lowcomms_connect_sock(con); - return; } static void clean_one_writequeue(struct connection *con) @@ -1414,6 +1639,7 @@ static void clean_one_writequeue(struct connection *con) int dlm_lowcomms_close(int nodeid) { struct connection *con; + struct dlm_node_addr *na; log_print("closing connection to node %d", nodeid); con = nodeid2con(nodeid, 0); @@ -1428,6 +1654,17 @@ int dlm_lowcomms_close(int nodeid) clean_one_writequeue(con); close_connection(con, true); } + + spin_lock(&dlm_node_addrs_spin); + na = find_node_addr(nodeid); + if (na) { + list_del(&na->list); + while (na->addr_count--) + kfree(na->addr[na->addr_count]); + kfree(na); + } + spin_unlock(&dlm_node_addrs_spin); + return 0; } @@ -1577,3 +1814,17 @@ fail_destroy: fail: return error; } + +void dlm_lowcomms_exit(void) +{ + struct dlm_node_addr *na, *safe; + + spin_lock(&dlm_node_addrs_spin); + list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { + list_del(&na->list); + while (na->addr_count--) + kfree(na->addr[na->addr_count]); + kfree(na); + } + spin_unlock(&dlm_node_addrs_spin); +} diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h index 1311e642628..67462e54fc2 100644 --- a/fs/dlm/lowcomms.h +++ b/fs/dlm/lowcomms.h @@ -16,10 +16,12 @@ int dlm_lowcomms_start(void); void dlm_lowcomms_stop(void); +void dlm_lowcomms_exit(void); int dlm_lowcomms_close(int nodeid); void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); void dlm_lowcomms_commit_buffer(void *mh); int dlm_lowcomms_connect_node(int nodeid); +int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); #endif /* __LOWCOMMS_DOT_H__ */ diff --git a/fs/dlm/main.c b/fs/dlm/main.c index 5a59efa0bb4..079c0bd71ab 100644 --- a/fs/dlm/main.c +++ b/fs/dlm/main.c @@ -17,6 +17,7 @@ #include "user.h" #include "memory.h" #include "config.h" +#include "lowcomms.h" static int __init init_dlm(void) { @@ -78,6 +79,7 @@ static void __exit exit_dlm(void) dlm_config_exit(); dlm_memory_exit(); dlm_lockspace_exit(); + dlm_lowcomms_exit(); dlm_unregister_debugfs(); } diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 862640a36d5..9c47f1c14a8 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -60,18 +60,15 @@ void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) #define SLOT_DEBUG_LINE 128 -static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, - struct rcom_slot *ro0, struct dlm_slot *array, - int array_size) +static void log_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, + struct rcom_slot *ro0, struct dlm_slot *array, + int array_size) { char line[SLOT_DEBUG_LINE]; int len = SLOT_DEBUG_LINE - 1; int pos = 0; int ret, i; - if (!dlm_config.ci_log_debug) - return; - memset(line, 0, sizeof(line)); if (array) { @@ -95,7 +92,7 @@ static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, } } - log_debug(ls, "generation %u slots %d%s", gen, num_slots, line); + log_rinfo(ls, "generation %u slots %d%s", gen, num_slots, line); } int dlm_slots_copy_in(struct dlm_ls *ls) @@ -129,7 +126,7 @@ int dlm_slots_copy_in(struct dlm_ls *ls) ro->ro_slot = le16_to_cpu(ro->ro_slot); } - log_debug_slots(ls, gen, num_slots, ro0, NULL, 0); + log_slots(ls, gen, num_slots, ro0, NULL, 0); list_for_each_entry(memb, &ls->ls_nodes, list) { for (i = 0, ro = ro0; i < num_slots; i++, ro++) { @@ -274,7 +271,7 @@ int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, gen++; - log_debug_slots(ls, gen, num, NULL, array, array_size); + log_slots(ls, gen, num, NULL, array, array_size); max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) - sizeof(struct rcom_config)) / sizeof(struct rcom_slot); @@ -447,7 +444,7 @@ static int ping_members(struct dlm_ls *ls) break; } if (error) - log_debug(ls, "ping_members aborted %d last nodeid %d", + log_rinfo(ls, "ping_members aborted %d last nodeid %d", error, ls->ls_recover_nodeid); return error; } @@ -539,7 +536,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) count as a negative change so the "neg" recovery steps will happen */ list_for_each_entry(memb, &ls->ls_nodes_gone, list) { - log_debug(ls, "prev removed member %d", memb->nodeid); + log_rinfo(ls, "prev removed member %d", memb->nodeid); neg++; } @@ -551,10 +548,10 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) continue; if (!node) { - log_debug(ls, "remove member %d", memb->nodeid); + log_rinfo(ls, "remove member %d", memb->nodeid); } else { /* removed and re-added */ - log_debug(ls, "remove member %d comm_seq %u %u", + log_rinfo(ls, "remove member %d comm_seq %u %u", memb->nodeid, memb->comm_seq, node->comm_seq); } @@ -571,7 +568,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) if (dlm_is_member(ls, node->nodeid)) continue; dlm_add_member(ls, node); - log_debug(ls, "add member %d", node->nodeid); + log_rinfo(ls, "add member %d", node->nodeid); } list_for_each_entry(memb, &ls->ls_nodes, list) { @@ -591,7 +588,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) complete(&ls->ls_members_done); } - log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); + log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } @@ -616,13 +613,13 @@ int dlm_ls_stop(struct dlm_ls *ls) down_write(&ls->ls_recv_active); /* - * Abort any recovery that's in progress (see RECOVERY_STOP, + * Abort any recovery that's in progress (see RECOVER_STOP, * dlm_recovery_stopped()) and tell any other threads running in the * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). */ spin_lock(&ls->ls_recover_lock); - set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); ls->ls_recover_seq++; spin_unlock(&ls->ls_recover_lock); @@ -642,12 +639,16 @@ int dlm_ls_stop(struct dlm_ls *ls) * when recovery is complete. */ - if (new) - down_write(&ls->ls_in_recovery); + if (new) { + set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags); + wake_up_process(ls->ls_recoverd_task); + wait_event(ls->ls_recover_lock_wait, + test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); + } /* * The recoverd suspend/resume makes sure that dlm_recoverd (if - * running) has noticed RECOVERY_STOP above and quit processing the + * running) has noticed RECOVER_STOP above and quit processing the * previous recovery. */ @@ -709,7 +710,8 @@ int dlm_ls_start(struct dlm_ls *ls) kfree(rv_old); } - dlm_recoverd_kick(ls); + set_bit(LSFL_RECOVER_WORK, &ls->ls_flags); + wake_up_process(ls->ls_recoverd_task); return 0; fail: diff --git a/fs/dlm/netlink.c b/fs/dlm/netlink.c index ef17e0169da..e7cfbaf8d0e 100644 --- a/fs/dlm/netlink.c +++ b/fs/dlm/netlink.c @@ -14,7 +14,7 @@ #include "dlm_internal.h" static uint32_t dlm_nl_seqnum; -static uint32_t listener_nlpid; +static uint32_t listener_nlportid; static struct genl_family family = { .id = GENL_ID_GENERATE, @@ -64,24 +64,26 @@ static int send_data(struct sk_buff *skb) return rv; } - return genlmsg_unicast(&init_net, skb, listener_nlpid); + return genlmsg_unicast(&init_net, skb, listener_nlportid); } static int user_cmd(struct sk_buff *skb, struct genl_info *info) { - listener_nlpid = info->snd_pid; - printk("user_cmd nlpid %u\n", listener_nlpid); + listener_nlportid = info->snd_portid; + printk("user_cmd nlpid %u\n", listener_nlportid); return 0; } -static struct genl_ops dlm_nl_ops = { - .cmd = DLM_CMD_HELLO, - .doit = user_cmd, +static struct genl_ops dlm_nl_ops[] = { + { + .cmd = DLM_CMD_HELLO, + .doit = user_cmd, + }, }; int __init dlm_netlink_init(void) { - return genl_register_family_with_ops(&family, &dlm_nl_ops, 1); + return genl_register_family_with_ops(&family, dlm_nl_ops); } void dlm_netlink_exit(void) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 01fd5c11a7f..f704458ea5f 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -247,6 +247,7 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct dlm_ls *ls; struct plock_op *op; int rv; + unsigned char fl_flags = fl->fl_flags; ls = dlm_find_lockspace_local(lockspace); if (!ls) @@ -258,9 +259,18 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, goto out; } - if (posix_lock_file_wait(file, fl) < 0) - log_error(ls, "dlm_posix_unlock: vfs unlock error %llx", - (unsigned long long)number); + /* cause the vfs unlock to return ENOENT if lock is not found */ + fl->fl_flags |= FL_EXISTS; + + rv = posix_lock_file_wait(file, fl); + if (rv == -ENOENT) { + rv = 0; + goto out_free; + } + if (rv < 0) { + log_error(ls, "dlm_posix_unlock: vfs unlock error %d %llx", + rv, (unsigned long long)number); + } op->info.optype = DLM_PLOCK_OP_UNLOCK; op->info.pid = fl->fl_pid; @@ -296,9 +306,11 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, if (rv == -ENOENT) rv = 0; +out_free: kfree(op); out: dlm_put_lockspace(ls); + fl->fl_flags = fl_flags; return rv; } EXPORT_SYMBOL_GPL(dlm_posix_unlock); diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 87f1a56eab3..9d61947d473 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -581,7 +581,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) spin_lock(&ls->ls_recover_lock); status = ls->ls_recover_status; - stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags); seq = ls->ls_recover_seq; spin_unlock(&ls->ls_recover_lock); diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 4a7a76e42fc..eaea789bf97 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -305,27 +305,26 @@ static int recover_idr_empty(struct dlm_ls *ls) static int recover_idr_add(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; - int rv, id; - - rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS); - if (!rv) - return -ENOMEM; + int rv; + idr_preload(GFP_NOFS); spin_lock(&ls->ls_recover_idr_lock); if (r->res_id) { - spin_unlock(&ls->ls_recover_idr_lock); - return -1; - } - rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id); - if (rv) { - spin_unlock(&ls->ls_recover_idr_lock); - return rv; + rv = -1; + goto out_unlock; } - r->res_id = id; + rv = idr_alloc(&ls->ls_recover_idr, r, 1, 0, GFP_NOWAIT); + if (rv < 0) + goto out_unlock; + + r->res_id = rv; ls->ls_recover_list_count++; dlm_hold_rsb(r); + rv = 0; +out_unlock: spin_unlock(&ls->ls_recover_idr_lock); - return 0; + idr_preload_end(); + return rv; } static void recover_idr_del(struct dlm_rsb *r) @@ -351,24 +350,21 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id) return r; } -static int recover_idr_clear_rsb(int id, void *p, void *data) +static void recover_idr_clear(struct dlm_ls *ls) { - struct dlm_ls *ls = data; - struct dlm_rsb *r = p; + struct dlm_rsb *r; + int id; - r->res_id = 0; - r->res_recover_locks_count = 0; - ls->ls_recover_list_count--; + spin_lock(&ls->ls_recover_idr_lock); - dlm_put_rsb(r); - return 0; -} + idr_for_each_entry(&ls->ls_recover_idr, r, id) { + idr_remove(&ls->ls_recover_idr, id); + r->res_id = 0; + r->res_recover_locks_count = 0; + ls->ls_recover_list_count--; -static void recover_idr_clear(struct dlm_ls *ls) -{ - spin_lock(&ls->ls_recover_idr_lock); - idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls); - idr_remove_all(&ls->ls_recover_idr); + dlm_put_rsb(r); + } if (ls->ls_recover_list_count != 0) { log_error(ls, "warning: recover_list_count %d", @@ -530,7 +526,7 @@ int dlm_recover_masters(struct dlm_ls *ls) int nodir = dlm_no_directory(ls); int error; - log_debug(ls, "dlm_recover_masters"); + log_rinfo(ls, "dlm_recover_masters"); down_read(&ls->ls_root_sem); list_for_each_entry(r, &ls->ls_root_list, res_root_list) { @@ -556,7 +552,7 @@ int dlm_recover_masters(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_masters %u of %u", count, total); + log_rinfo(ls, "dlm_recover_masters %u of %u", count, total); error = dlm_wait_function(ls, &recover_idr_empty); out: @@ -689,7 +685,7 @@ int dlm_recover_locks(struct dlm_ls *ls) } up_read(&ls->ls_root_sem); - log_debug(ls, "dlm_recover_locks %d out", count); + log_rinfo(ls, "dlm_recover_locks %d out", count); error = dlm_wait_function(ls, &recover_list_empty); out: @@ -717,8 +713,14 @@ void dlm_recovered_lock(struct dlm_rsb *r) * the VALNOTVALID flag if necessary, and determining the correct lvb contents * based on the lvb's of the locks held on the rsb. * - * RSB_VALNOTVALID is set if there are only NL/CR locks on the rsb. If it - * was already set prior to recovery, it's not cleared, regardless of locks. + * RSB_VALNOTVALID is set in two cases: + * + * 1. we are master, but not new, and we purged an EX/PW lock held by a + * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL) + * + * 2. we are a new master, and there are only NL/CR locks left. + * (We could probably improve this by only invaliding in this way when + * the previous master left uncleanly. VMS docs mention that.) * * The LVB contents are only considered for changing when this is a new master * of the rsb (NEW_MASTER2). Then, the rsb's lvb is taken from any lkb with @@ -734,6 +736,19 @@ static void recover_lvb(struct dlm_rsb *r) int big_lock_exists = 0; int lvblen = r->res_ls->ls_lvblen; + if (!rsb_flag(r, RSB_NEW_MASTER2) && + rsb_flag(r, RSB_RECOVER_LVB_INVAL)) { + /* case 1 above */ + rsb_set_flag(r, RSB_VALNOTVALID); + return; + } + + if (!rsb_flag(r, RSB_NEW_MASTER2)) + return; + + /* we are the new master, so figure out if VALNOTVALID should + be set, and set the rsb lvb from the best lkb available. */ + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) continue; @@ -772,13 +787,10 @@ static void recover_lvb(struct dlm_rsb *r) if (!lock_lvb_exists) goto out; + /* lvb is invalidated if only NL/CR locks remain */ if (!big_lock_exists) rsb_set_flag(r, RSB_VALNOTVALID); - /* don't mess with the lvb unless we're the new master */ - if (!rsb_flag(r, RSB_NEW_MASTER2)) - goto out; - if (!r->res_lvbptr) { r->res_lvbptr = dlm_allocate_lvb(r->res_ls); if (!r->res_lvbptr) @@ -852,19 +864,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls) if (is_master(r)) { if (rsb_flag(r, RSB_RECOVER_CONVERT)) recover_conversion(r); + + /* recover lvb before granting locks so the updated + lvb/VALNOTVALID is presented in the completion */ + recover_lvb(r); + if (rsb_flag(r, RSB_NEW_MASTER2)) recover_grant(r); - recover_lvb(r); count++; + } else { + rsb_clear_flag(r, RSB_VALNOTVALID); } rsb_clear_flag(r, RSB_RECOVER_CONVERT); + rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL); rsb_clear_flag(r, RSB_NEW_MASTER2); unlock_rsb(r); } up_read(&ls->ls_root_sem); if (count) - log_debug(ls, "dlm_recover_rsbs %d done", count); + log_rinfo(ls, "dlm_recover_rsbs %d done", count); } /* Create a single list of all root rsb's to be used during recovery */ @@ -931,6 +950,6 @@ void dlm_clear_toss(struct dlm_ls *ls) } if (count) - log_debug(ls, "dlm_clear_toss %u done", count); + log_rinfo(ls, "dlm_clear_toss %u done", count); } diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 88ce65ff021..6859b4bf971 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -41,6 +41,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) set_bit(LSFL_RUNNING, &ls->ls_flags); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); + clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); error = 0; } spin_unlock(&ls->ls_recover_lock); @@ -54,7 +55,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) unsigned long start; int error, neg = 0; - log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq); + log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq); mutex_lock(&ls->ls_recoverd_active); @@ -75,7 +76,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members(ls, rv, &neg); if (error) { - log_debug(ls, "dlm_recover_members error %d", error); + log_rinfo(ls, "dlm_recover_members error %d", error); goto fail; } @@ -89,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members_wait(ls); if (error) { - log_debug(ls, "dlm_recover_members_wait error %d", error); + log_rinfo(ls, "dlm_recover_members_wait error %d", error); goto fail; } @@ -102,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory(ls); if (error) { - log_debug(ls, "dlm_recover_directory error %d", error); + log_rinfo(ls, "dlm_recover_directory error %d", error); goto fail; } @@ -110,11 +111,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory_wait(ls); if (error) { - log_debug(ls, "dlm_recover_directory_wait error %d", error); + log_rinfo(ls, "dlm_recover_directory_wait error %d", error); goto fail; } - log_debug(ls, "dlm_recover_directory %u out %u messages", + log_rinfo(ls, "dlm_recover_directory %u out %u messages", ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg); /* @@ -143,7 +144,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_masters(ls); if (error) { - log_debug(ls, "dlm_recover_masters error %d", error); + log_rinfo(ls, "dlm_recover_masters error %d", error); goto fail; } @@ -153,7 +154,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks(ls); if (error) { - log_debug(ls, "dlm_recover_locks error %d", error); + log_rinfo(ls, "dlm_recover_locks error %d", error); goto fail; } @@ -161,11 +162,11 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls); if (error) { - log_debug(ls, "dlm_recover_locks_wait error %d", error); + log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; } - log_debug(ls, "dlm_recover_locks %u in", + log_rinfo(ls, "dlm_recover_locks %u in", ls->ls_recover_locks_in); /* @@ -185,7 +186,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_locks_wait(ls); if (error) { - log_debug(ls, "dlm_recover_locks_wait error %d", error); + log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; } } @@ -204,7 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_done_wait(ls); if (error) { - log_debug(ls, "dlm_recover_done_wait error %d", error); + log_rinfo(ls, "dlm_recover_done_wait error %d", error); goto fail; } @@ -216,25 +217,25 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = enable_locking(ls, rv->seq); if (error) { - log_debug(ls, "enable_locking error %d", error); + log_rinfo(ls, "enable_locking error %d", error); goto fail; } error = dlm_process_requestqueue(ls); if (error) { - log_debug(ls, "dlm_process_requestqueue error %d", error); + log_rinfo(ls, "dlm_process_requestqueue error %d", error); goto fail; } error = dlm_recover_waiters_post(ls); if (error) { - log_debug(ls, "dlm_recover_waiters_post error %d", error); + log_rinfo(ls, "dlm_recover_waiters_post error %d", error); goto fail; } dlm_recover_grant(ls); - log_debug(ls, "dlm_recover %llu generation %u done: %u ms", + log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms", (unsigned long long)rv->seq, ls->ls_generation, jiffies_to_msecs(jiffies - start)); mutex_unlock(&ls->ls_recoverd_active); @@ -244,7 +245,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) fail: dlm_release_root_list(ls); - log_debug(ls, "dlm_recover %llu error %d", + log_rinfo(ls, "dlm_recover %llu error %d", (unsigned long long)rv->seq, error); mutex_unlock(&ls->ls_recoverd_active); return error; @@ -262,7 +263,7 @@ static void do_ls_recovery(struct dlm_ls *ls) rv = ls->ls_recover_args; ls->ls_recover_args = NULL; if (rv && ls->ls_recover_seq == rv->seq) - clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); + clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags); spin_unlock(&ls->ls_recover_lock); if (rv) { @@ -282,26 +283,34 @@ static int dlm_recoverd(void *arg) return -1; } + down_write(&ls->ls_in_recovery); + set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); + wake_up(&ls->ls_recover_lock_wait); + while (!kthread_should_stop()) { set_current_state(TASK_INTERRUPTIBLE); - if (!test_bit(LSFL_WORK, &ls->ls_flags)) + if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) && + !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) schedule(); set_current_state(TASK_RUNNING); - if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags)) + if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) { + down_write(&ls->ls_in_recovery); + set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); + wake_up(&ls->ls_recover_lock_wait); + } + + if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags)) do_ls_recovery(ls); } + if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)) + up_write(&ls->ls_in_recovery); + dlm_put_lockspace(ls); return 0; } -void dlm_recoverd_kick(struct dlm_ls *ls) -{ - set_bit(LSFL_WORK, &ls->ls_flags); - wake_up_process(ls->ls_recoverd_task); -} - int dlm_recoverd_start(struct dlm_ls *ls) { struct task_struct *p; diff --git a/fs/dlm/recoverd.h b/fs/dlm/recoverd.h index 866657c5d69..8856079733f 100644 --- a/fs/dlm/recoverd.h +++ b/fs/dlm/recoverd.h @@ -14,7 +14,6 @@ #ifndef __RECOVERD_DOT_H__ #define __RECOVERD_DOT_H__ -void dlm_recoverd_kick(struct dlm_ls *ls); void dlm_recoverd_stop(struct dlm_ls *ls); int dlm_recoverd_start(struct dlm_ls *ls); void dlm_recoverd_suspend(struct dlm_ls *ls); diff --git a/fs/dlm/user.c b/fs/dlm/user.c index eb4ed9ba309..142e21655ee 100644 --- a/fs/dlm/user.c +++ b/fs/dlm/user.c @@ -493,7 +493,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, { struct dlm_user_proc *proc = file->private_data; struct dlm_write_request *kbuf; - sigset_t tmpsig, allsigs; int error; #ifdef CONFIG_COMPAT @@ -503,6 +502,13 @@ static ssize_t device_write(struct file *file, const char __user *buf, #endif return -EINVAL; + /* + * can't compare against COMPAT/dlm_write_request32 because + * we don't yet know if is64bit is zero + */ + if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN) + return -EINVAL; + kbuf = kzalloc(count + 1, GFP_NOFS); if (!kbuf) return -ENOMEM; @@ -550,9 +556,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, goto out_free; } - sigfillset(&allsigs); - sigprocmask(SIG_BLOCK, &allsigs, &tmpsig); - error = -EINVAL; switch (kbuf->cmd) @@ -560,7 +563,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_LOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_lock(proc, &kbuf->i.lock); break; @@ -568,7 +571,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_UNLOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_unlock(proc, &kbuf->i.lock); break; @@ -576,7 +579,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_DEADLOCK: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_deadlock(proc, &kbuf->i.lock); break; @@ -584,7 +587,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_CREATE_LOCKSPACE: if (proc) { log_print("create/remove only on control device"); - goto out_sig; + goto out_free; } error = device_create_lockspace(&kbuf->i.lspace); break; @@ -592,7 +595,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_REMOVE_LOCKSPACE: if (proc) { log_print("create/remove only on control device"); - goto out_sig; + goto out_free; } error = device_remove_lockspace(&kbuf->i.lspace); break; @@ -600,7 +603,7 @@ static ssize_t device_write(struct file *file, const char __user *buf, case DLM_USER_PURGE: if (!proc) { log_print("no locking on control device"); - goto out_sig; + goto out_free; } error = device_user_purge(proc, &kbuf->i.purge); break; @@ -610,8 +613,6 @@ static ssize_t device_write(struct file *file, const char __user *buf, kbuf->cmd); } - out_sig: - sigprocmask(SIG_SETMASK, &tmpsig, NULL); out_free: kfree(kbuf); return error; @@ -652,15 +653,11 @@ static int device_close(struct inode *inode, struct file *file) { struct dlm_user_proc *proc = file->private_data; struct dlm_ls *ls; - sigset_t tmpsig, allsigs; ls = dlm_find_lockspace_local(proc->lockspace); if (!ls) return -ENOENT; - sigfillset(&allsigs); - sigprocmask(SIG_BLOCK, &allsigs, &tmpsig); - set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags); dlm_clear_proc_locks(ls, proc); @@ -678,9 +675,6 @@ static int device_close(struct inode *inode, struct file *file) /* FIXME: AUTOFREE: if this ls is no longer used do device_remove_lockspace() */ - sigprocmask(SIG_SETMASK, &tmpsig, NULL); - recalc_sigpending(); - return 0; } |
