diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2011-07-22 13:16:07 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2011-07-22 13:16:07 -0700 |
commit | 6aaf4404ab2efff2f2f3ee4a5bb5379c1c8092b3 (patch) | |
tree | e1108192285784cbdb6974d9c8791c1aa7cef77d | |
parent | ba1f9db908a9ac4038f6b694de3e55959886258d (diff) | |
parent | 10d1459fafbb02a18e7bc8f2c384a9e973560b00 (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
dlm: don't limit active work items
dlm: use workqueue for callbacks
dlm: remove deadlock debug print
dlm: improve rsb searches
dlm: keep lkbs in idr
dlm: fix kmalloc args
dlm: don't do pointless NULL check, use kzalloc and fix order of arguments
dlm: dump address of unknown node
dlm: use vmalloc for hash tables
dlm: show addresses in configfs
-rw-r--r-- | fs/dlm/ast.c | 265 | ||||
-rw-r--r-- | fs/dlm/ast.h | 15 | ||||
-rw-r--r-- | fs/dlm/config.c | 75 | ||||
-rw-r--r-- | fs/dlm/config.h | 2 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 29 | ||||
-rw-r--r-- | fs/dlm/lock.c | 225 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 177 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c | 9 | ||||
-rw-r--r-- | fs/dlm/memory.c | 22 | ||||
-rw-r--r-- | fs/dlm/memory.h | 2 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 12 | ||||
-rw-r--r-- | fs/dlm/user.c | 12 |
12 files changed, 447 insertions, 398 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index abc49f29245..90e5997262e 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -14,17 +14,9 @@ #include "dlm_internal.h" #include "lock.h" #include "user.h" -#include "ast.h" - -#define WAKE_ASTS 0 - -static uint64_t ast_seq_count; -static struct list_head ast_queue; -static spinlock_t ast_queue_lock; -static struct task_struct * astd_task; -static unsigned long astd_wakeflags; -static struct mutex astd_running; +static uint64_t dlm_cb_seq; +static spinlock_t dlm_cb_seq_spin; static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) { @@ -57,21 +49,13 @@ static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) } } -void dlm_del_ast(struct dlm_lkb *lkb) -{ - spin_lock(&ast_queue_lock); - if (!list_empty(&lkb->lkb_astqueue)) - list_del_init(&lkb->lkb_astqueue); - spin_unlock(&ast_queue_lock); -} - int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags, uint64_t seq) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; uint64_t prev_seq; int prev_mode; - int i; + int i, rv; for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { if (lkb->lkb_callbacks[i].seq) @@ -100,7 +84,8 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, mode, (unsigned long long)prev_seq, prev_mode); - return 0; + rv = 0; + goto out; } } @@ -109,6 +94,7 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, lkb->lkb_callbacks[i].mode = mode; lkb->lkb_callbacks[i].sb_status = status; lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); + rv = 0; break; } @@ -117,21 +103,24 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, lkb->lkb_id, (unsigned long long)seq, flags, mode, status, sbflags); dlm_dump_lkb_callbacks(lkb); - return -1; + rv = -1; + goto out; } - - return 0; + out: + return rv; } int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_callback *cb, int *resid) { - int i; + int i, rv; *resid = 0; - if (!lkb->lkb_callbacks[0].seq) - return -ENOENT; + if (!lkb->lkb_callbacks[0].seq) { + rv = -ENOENT; + goto out; + } /* oldest undelivered cb is callbacks[0] */ @@ -163,7 +152,8 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, cb->mode, (unsigned long long)lkb->lkb_last_cast.seq, lkb->lkb_last_cast.mode); - return 0; + rv = 0; + goto out; } } @@ -176,171 +166,150 @@ int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); lkb->lkb_last_bast_time = ktime_get(); } - - return 0; + rv = 0; + out: + return rv; } -void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags) +void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, + uint32_t sbflags) { - uint64_t seq; + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + uint64_t new_seq, prev_seq; int rv; - spin_lock(&ast_queue_lock); - - seq = ++ast_seq_count; + spin_lock(&dlm_cb_seq_spin); + new_seq = ++dlm_cb_seq; + spin_unlock(&dlm_cb_seq_spin); if (lkb->lkb_flags & DLM_IFL_USER) { - spin_unlock(&ast_queue_lock); - dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq); + dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq); return; } - rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq); - if (rv < 0) { - spin_unlock(&ast_queue_lock); - return; - } + mutex_lock(&lkb->lkb_cb_mutex); + prev_seq = lkb->lkb_callbacks[0].seq; - if (list_empty(&lkb->lkb_astqueue)) { + rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq); + if (rv < 0) + goto out; + + if (!prev_seq) { kref_get(&lkb->lkb_ref); - list_add_tail(&lkb->lkb_astqueue, &ast_queue); - } - spin_unlock(&ast_queue_lock); - set_bit(WAKE_ASTS, &astd_wakeflags); - wake_up_process(astd_task); + if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { + mutex_lock(&ls->ls_cb_mutex); + list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); + mutex_unlock(&ls->ls_cb_mutex); + } else { + queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); + } + } + out: + mutex_unlock(&lkb->lkb_cb_mutex); } -static void process_asts(void) +void dlm_callback_work(struct work_struct *work) { - struct dlm_ls *ls = NULL; - struct dlm_rsb *r = NULL; - struct dlm_lkb *lkb; + struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work); + struct dlm_ls *ls = lkb->lkb_resource->res_ls; void (*castfn) (void *astparam); void (*bastfn) (void *astparam, int mode); struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; int i, rv, resid; -repeat: - spin_lock(&ast_queue_lock); - list_for_each_entry(lkb, &ast_queue, lkb_astqueue) { - r = lkb->lkb_resource; - ls = r->res_ls; + memset(&callbacks, 0, sizeof(callbacks)); - if (dlm_locking_stopped(ls)) - continue; - - /* we remove from astqueue list and remove everything in - lkb_callbacks before releasing the spinlock so empty - lkb_astqueue is always consistent with empty lkb_callbacks */ - - list_del_init(&lkb->lkb_astqueue); - - castfn = lkb->lkb_astfn; - bastfn = lkb->lkb_bastfn; + mutex_lock(&lkb->lkb_cb_mutex); + if (!lkb->lkb_callbacks[0].seq) { + /* no callback work exists, shouldn't happen */ + log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id); + dlm_print_lkb(lkb); + dlm_dump_lkb_callbacks(lkb); + } - memset(&callbacks, 0, sizeof(callbacks)); + for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { + rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); + if (rv < 0) + break; + } - for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { - rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); - if (rv < 0) - break; - } - spin_unlock(&ast_queue_lock); + if (resid) { + /* cbs remain, loop should have removed all, shouldn't happen */ + log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id, + resid); + dlm_print_lkb(lkb); + dlm_dump_lkb_callbacks(lkb); + } + mutex_unlock(&lkb->lkb_cb_mutex); - if (resid) { - /* shouldn't happen, for loop should have removed all */ - log_error(ls, "callback resid %d lkb %x", - resid, lkb->lkb_id); - } + castfn = lkb->lkb_astfn; + bastfn = lkb->lkb_bastfn; - for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { - if (!callbacks[i].seq) - break; - if (callbacks[i].flags & DLM_CB_SKIP) { - continue; - } else if (callbacks[i].flags & DLM_CB_BAST) { - bastfn(lkb->lkb_astparam, callbacks[i].mode); - } else if (callbacks[i].flags & DLM_CB_CAST) { - lkb->lkb_lksb->sb_status = callbacks[i].sb_status; - lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; - castfn(lkb->lkb_astparam); - } + for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { + if (!callbacks[i].seq) + break; + if (callbacks[i].flags & DLM_CB_SKIP) { + continue; + } else if (callbacks[i].flags & DLM_CB_BAST) { + bastfn(lkb->lkb_astparam, callbacks[i].mode); + } else if (callbacks[i].flags & DLM_CB_CAST) { + lkb->lkb_lksb->sb_status = callbacks[i].sb_status; + lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; + castfn(lkb->lkb_astparam); } - - /* removes ref for ast_queue, may cause lkb to be freed */ - dlm_put_lkb(lkb); - - cond_resched(); - goto repeat; } - spin_unlock(&ast_queue_lock); -} - -static inline int no_asts(void) -{ - int ret; - spin_lock(&ast_queue_lock); - ret = list_empty(&ast_queue); - spin_unlock(&ast_queue_lock); - return ret; + /* undo kref_get from dlm_add_callback, may cause lkb to be freed */ + dlm_put_lkb(lkb); } -static int dlm_astd(void *data) +int dlm_callback_start(struct dlm_ls *ls) { - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (!test_bit(WAKE_ASTS, &astd_wakeflags)) - schedule(); - set_current_state(TASK_RUNNING); - - mutex_lock(&astd_running); - if (test_and_clear_bit(WAKE_ASTS, &astd_wakeflags)) - process_asts(); - mutex_unlock(&astd_running); + ls->ls_callback_wq = alloc_workqueue("dlm_callback", + WQ_UNBOUND | + WQ_MEM_RECLAIM | + WQ_NON_REENTRANT, + 0); + if (!ls->ls_callback_wq) { + log_print("can't start dlm_callback workqueue"); + return -ENOMEM; } return 0; } -void dlm_astd_wake(void) +void dlm_callback_stop(struct dlm_ls *ls) { - if (!no_asts()) { - set_bit(WAKE_ASTS, &astd_wakeflags); - wake_up_process(astd_task); - } + if (ls->ls_callback_wq) + destroy_workqueue(ls->ls_callback_wq); } -int dlm_astd_start(void) +void dlm_callback_suspend(struct dlm_ls *ls) { - struct task_struct *p; - int error = 0; - - INIT_LIST_HEAD(&ast_queue); - spin_lock_init(&ast_queue_lock); - mutex_init(&astd_running); - - p = kthread_run(dlm_astd, NULL, "dlm_astd"); - if (IS_ERR(p)) - error = PTR_ERR(p); - else - astd_task = p; - return error; -} + set_bit(LSFL_CB_DELAY, &ls->ls_flags); -void dlm_astd_stop(void) -{ - kthread_stop(astd_task); + if (ls->ls_callback_wq) + flush_workqueue(ls->ls_callback_wq); } -void dlm_astd_suspend(void) +void dlm_callback_resume(struct dlm_ls *ls) { - mutex_lock(&astd_running); -} + struct dlm_lkb *lkb, *safe; + int count = 0; -void dlm_astd_resume(void) -{ - mutex_unlock(&astd_running); + clear_bit(LSFL_CB_DELAY, &ls->ls_flags); + + if (!ls->ls_callback_wq) + return; + + mutex_lock(&ls->ls_cb_mutex); + list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { + list_del_init(&lkb->lkb_cb_list); + queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); + count++; + } + mutex_unlock(&ls->ls_cb_mutex); + + log_debug(ls, "dlm_callback_resume %d", count); } diff --git a/fs/dlm/ast.h b/fs/dlm/ast.h index 8aa89c9b561..757b551c682 100644 --- a/fs/dlm/ast.h +++ b/fs/dlm/ast.h @@ -18,14 +18,15 @@ int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, uint32_t sbflags, uint64_t seq); int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_callback *cb, int *resid); -void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, - uint32_t sbflags); +void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, + uint32_t sbflags); -void dlm_astd_wake(void); -int dlm_astd_start(void); -void dlm_astd_stop(void); -void dlm_astd_suspend(void); -void dlm_astd_resume(void); +void dlm_callback_work(struct work_struct *work); +int dlm_callback_start(struct dlm_ls *ls); +void dlm_callback_stop(struct dlm_ls *ls); +void dlm_callback_suspend(struct dlm_ls *ls); +void dlm_callback_resume(struct dlm_ls *ls); #endif + diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 9b026ea8baa..6cf72fcc0d0 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -28,7 +28,8 @@ * /config/dlm/<cluster>/spaces/<space>/nodes/<node>/weight * /config/dlm/<cluster>/comms/<comm>/nodeid * /config/dlm/<cluster>/comms/<comm>/local - * /config/dlm/<cluster>/comms/<comm>/addr + * /config/dlm/<cluster>/comms/<comm>/addr (write only) + * /config/dlm/<cluster>/comms/<comm>/addr_list (read only) * The <cluster> level is useless, but I haven't figured out how to avoid it. */ @@ -80,6 +81,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf, size_t len); static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len); +static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf); static ssize_t node_nodeid_read(struct dlm_node *nd, char *buf); static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf, size_t len); @@ -92,7 +94,6 @@ struct dlm_cluster { unsigned int cl_tcp_port; unsigned int cl_buffer_size; unsigned int cl_rsbtbl_size; - unsigned int cl_lkbtbl_size; unsigned int cl_dirtbl_size; unsigned int cl_recover_timer; unsigned int cl_toss_secs; @@ -101,13 +102,13 @@ struct dlm_cluster { unsigned int cl_protocol; unsigned int cl_timewarn_cs; unsigned int cl_waitwarn_us; + unsigned int cl_new_rsb_count; }; enum { CLUSTER_ATTR_TCP_PORT = 0, CLUSTER_ATTR_BUFFER_SIZE, CLUSTER_ATTR_RSBTBL_SIZE, - CLUSTER_ATTR_LKBTBL_SIZE, CLUSTER_ATTR_DIRTBL_SIZE, CLUSTER_ATTR_RECOVER_TIMER, CLUSTER_ATTR_TOSS_SECS, @@ -116,6 +117,7 @@ enum { CLUSTER_ATTR_PROTOCOL, CLUSTER_ATTR_TIMEWARN_CS, CLUSTER_ATTR_WAITWARN_US, + CLUSTER_ATTR_NEW_RSB_COUNT, }; struct cluster_attribute { @@ -160,7 +162,6 @@ __CONFIGFS_ATTR(name, 0644, name##_read, name##_write) CLUSTER_ATTR(tcp_port, 1); CLUSTER_ATTR(buffer_size, 1); CLUSTER_ATTR(rsbtbl_size, 1); -CLUSTER_ATTR(lkbtbl_size, 1); CLUSTER_ATTR(dirtbl_size, 1); CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(toss_secs, 1); @@ -169,12 +170,12 @@ CLUSTER_ATTR(log_debug, 0); CLUSTER_ATTR(protocol, 0); CLUSTER_ATTR(timewarn_cs, 1); CLUSTER_ATTR(waitwarn_us, 0); +CLUSTER_ATTR(new_rsb_count, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, - [CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr, [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr, [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, @@ -183,6 +184,7 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, [CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr, [CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr, + [CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr, NULL, }; @@ -190,6 +192,7 @@ enum { COMM_ATTR_NODEID = 0, COMM_ATTR_LOCAL, COMM_ATTR_ADDR, + COMM_ATTR_ADDR_LIST, }; struct comm_attribute { @@ -217,14 +220,22 @@ static struct comm_attribute comm_attr_local = { static struct comm_attribute comm_attr_addr = { .attr = { .ca_owner = THIS_MODULE, .ca_name = "addr", - .ca_mode = S_IRUGO | S_IWUSR }, + .ca_mode = S_IWUSR }, .store = comm_addr_write, }; +static struct comm_attribute comm_attr_addr_list = { + .attr = { .ca_owner = THIS_MODULE, + .ca_name = "addr_list", + .ca_mode = S_IRUGO }, + .show = comm_addr_list_read, +}; + static struct configfs_attribute *comm_attrs[] = { [COMM_ATTR_NODEID] = &comm_attr_nodeid.attr, [COMM_ATTR_LOCAL] = &comm_attr_local.attr, [COMM_ATTR_ADDR] = &comm_attr_addr.attr, + [COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list.attr, NULL, }; @@ -435,7 +446,6 @@ static struct config_group *make_cluster(struct config_group *g, cl->cl_tcp_port = dlm_config.ci_tcp_port; cl->cl_buffer_size = dlm_config.ci_buffer_size; cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; - cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size; cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size; cl->cl_recover_timer = dlm_config.ci_recover_timer; cl->cl_toss_secs = dlm_config.ci_toss_secs; @@ -444,6 +454,7 @@ static struct config_group *make_cluster(struct config_group *g, cl->cl_protocol = dlm_config.ci_protocol; cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs; cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us; + cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count; space_list = &sps->ss_group; comm_list = &cms->cs_group; @@ -720,6 +731,50 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len) return len; } +static ssize_t comm_addr_list_read(struct dlm_comm *cm, char *buf) +{ + ssize_t s; + ssize_t allowance; + int i; + struct sockaddr_storage *addr; + struct sockaddr_in *addr_in; + struct sockaddr_in6 *addr_in6; + + /* Taken from ip6_addr_string() defined in lib/vsprintf.c */ + char buf0[sizeof("AF_INET6 xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:255.255.255.255\n")]; + + + /* Derived from SIMPLE_ATTR_SIZE of fs/configfs/file.c */ + allowance = 4096; + buf[0] = '\0'; + + for (i = 0; i < cm->addr_count; i++) { + addr = cm->addr[i]; + + switch(addr->ss_family) { + case AF_INET: + addr_in = (struct sockaddr_in *)addr; + s = sprintf(buf0, "AF_INET %pI4\n", &addr_in->sin_addr.s_addr); + break; + case AF_INET6: + addr_in6 = (struct sockaddr_in6 *)addr; + s = sprintf(buf0, "AF_INET6 %pI6\n", &addr_in6->sin6_addr); + break; + default: + s = sprintf(buf0, "%s\n", "<UNKNOWN>"); + break; + } + allowance -= s; + if (allowance >= 0) + strcat(buf, buf0); + else { + allowance += s; + break; + } + } + return 4096 - allowance; +} + static ssize_t show_node(struct config_item *i, struct configfs_attribute *a, char *buf) { @@ -983,7 +1038,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TCP_PORT 21064 #define DEFAULT_BUFFER_SIZE 4096 #define DEFAULT_RSBTBL_SIZE 1024 -#define DEFAULT_LKBTBL_SIZE 1024 #define DEFAULT_DIRTBL_SIZE 1024 #define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_TOSS_SECS 10 @@ -992,12 +1046,12 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_PROTOCOL 0 #define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */ #define DEFAULT_WAITWARN_US 0 +#define DEFAULT_NEW_RSB_COUNT 128 struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, .ci_buffer_size = DEFAULT_BUFFER_SIZE, .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, - .ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE, .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE, .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, @@ -1005,6 +1059,7 @@ struct dlm_config_info dlm_config = { .ci_log_debug = DEFAULT_LOG_DEBUG, .ci_protocol = DEFAULT_PROTOCOL, .ci_timewarn_cs = DEFAULT_TIMEWARN_CS, - .ci_waitwarn_us = DEFAULT_WAITWARN_US + .ci_waitwarn_us = DEFAULT_WAITWARN_US, + .ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index dd0ce24d5a8..3099d0dd26c 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -20,7 +20,6 @@ struct dlm_config_info { int ci_tcp_port; int ci_buffer_size; int ci_rsbtbl_size; - int ci_lkbtbl_size; int ci_dirtbl_size; int ci_recover_timer; int ci_toss_secs; @@ -29,6 +28,7 @@ struct dlm_config_info { int ci_protocol; int ci_timewarn_cs; int ci_waitwarn_us; + int ci_new_rsb_count; }; extern struct dlm_config_info dlm_config; diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 0262451eb9c..fe2860c0244 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -37,6 +37,7 @@ #include <linux/jhash.h> #include <linux/miscdevice.h> #include <linux/mutex.h> +#include <linux/idr.h> #include <asm/uaccess.h> #include <linux/dlm.h> @@ -52,7 +53,6 @@ struct dlm_ls; struct dlm_lkb; struct dlm_rsb; struct dlm_member; -struct dlm_lkbtable; struct dlm_rsbtable; struct dlm_dirtable; struct dlm_direntry; @@ -108,11 +108,6 @@ struct dlm_rsbtable { spinlock_t lock; }; -struct dlm_lkbtable { - struct list_head list; - rwlock_t lock; - uint16_t counter; -}; /* * Lockspace member (per node in a ls) @@ -248,17 +243,18 @@ struct dlm_lkb { int8_t lkb_wait_count; int lkb_wait_nodeid; /* for debugging */ - struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ struct list_head lkb_statequeue; /* rsb g/c/w list */ struct list_head lkb_rsb_lookup; /* waiting for rsb lookup */ struct list_head lkb_wait_reply; /* waiting for remote reply */ - struct list_head lkb_astqueue; /* need ast to be sent */ struct list_head lkb_ownqueue; /* list of locks for a process */ struct list_head lkb_time_list; ktime_t lkb_timestamp; ktime_t lkb_wait_time; unsigned long lkb_timeout_cs; + struct mutex lkb_cb_mutex; + struct work_struct lkb_cb_work; + struct list_head lkb_cb_list; /* for ls_cb_delay or proc->asts */ struct dlm_callback lkb_callbacks[DLM_CALLBACKS_SIZE]; struct dlm_callback lkb_last_cast; struct dlm_callback lkb_last_bast; @@ -299,7 +295,7 @@ struct dlm_rsb { int res_recover_locks_count; char *res_lvbptr; - char res_name[1]; + char res_name[DLM_RESNAME_MAXLEN+1]; }; /* find_rsb() flags */ @@ -465,12 +461,12 @@ struct dlm_ls { unsigned long ls_scan_time; struct kobject ls_kobj; + struct idr ls_lkbidr; + spinlock_t ls_lkbidr_spin; + struct dlm_rsbtable *ls_rsbtbl; uint32_t ls_rsbtbl_size; - struct dlm_lkbtable *ls_lkbtbl; - uint32_t ls_lkbtbl_size; - struct dlm_dirtable *ls_dirtbl; uint32_t ls_dirtbl_size; @@ -483,6 +479,10 @@ struct dlm_ls { struct mutex ls_timeout_mutex; struct list_head ls_timeout; + spinlock_t ls_new_rsb_spin; + int ls_new_rsb_count; + struct list_head ls_new_rsb; /* new rsb structs */ + struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ @@ -506,8 +506,12 @@ struct dlm_ls { struct miscdevice ls_device; + struct workqueue_struct *ls_callback_wq; + /* recovery related */ + struct mutex ls_cb_mutex; + struct list_head ls_cb_delay; /* save for queue_work later */ struct timer_list ls_timer; struct task_struct *ls_recoverd_task; struct mutex ls_recoverd_active; @@ -544,6 +548,7 @@ struct dlm_ls { #define LSFL_RCOM_WAIT 4 #define LSFL_UEVENT_WAIT 5 #define LSFL_TIMEWARN 6 +#define LSFL_CB_DELAY 7 /* much of this is just saving user space pointers associated with the lock that we pass back to the user lib with an ast */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f71d0b5abd9..83b5e32514e 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -305,7 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) rv = -EDEADLK; } - dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); + dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); } static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -319,7 +319,7 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) if (is_master_copy(lkb)) { send_bast(r, lkb, rqmode); } else { - dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0); + dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); } } @@ -327,19 +327,68 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ -static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) +static int pre_rsb_struct(struct dlm_ls *ls) +{ + struct dlm_rsb *r1, *r2; + int count = 0; + + spin_lock(&ls->ls_new_rsb_spin); + if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { + spin_unlock(&ls->ls_new_rsb_spin); + return 0; + } + spin_unlock(&ls->ls_new_rsb_spin); + + r1 = dlm_allocate_rsb(ls); + r2 = dlm_allocate_rsb(ls); + + spin_lock(&ls->ls_new_rsb_spin); + if (r1) { + list_add(&r1->res_hashchain, &ls->ls_new_rsb); + ls->ls_new_rsb_count++; + } + if (r2) { + list_add(&r2->res_hashchain, &ls->ls_new_rsb); + ls->ls_new_rsb_count++; + } + count = ls->ls_new_rsb_count; + spin_unlock(&ls->ls_new_rsb_spin); + + if (!count) + return -ENOMEM; + return 0; +} + +/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can + unlock any spinlocks, go back and call pre_rsb_struct again. + Otherwise, take an rsb off the list and return it. */ + +static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, + struct dlm_rsb **r_ret) { struct dlm_rsb *r; + int count; - r = dlm_allocate_rsb(ls, len); - if (!r) - return NULL; + spin_lock(&ls->ls_new_rsb_spin); + if (list_empty(&ls->ls_new_rsb)) { + count = ls->ls_new_rsb_count; + spin_unlock(&ls->ls_new_rsb_spin); + log_debug(ls, "find_rsb retry %d %d %s", + count, dlm_config.ci_new_rsb_count, name); + return -EAGAIN; + } + + r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); + list_del(&r->res_hashchain); + ls->ls_new_rsb_count--; + spin_unlock(&ls->ls_new_rsb_spin); r->res_ls = ls; r->res_length = len; memcpy(r->res_name, name, len); mutex_init(&r->res_mutex); + INIT_LIST_HEAD(&r->res_hashchain); INIT_LIST_HEAD(&r->res_lookup); INIT_LIST_HEAD(&r->res_grantqueue); INIT_LIST_HEAD(&r->res_convertqueue); @@ -347,7 +396,8 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_recover_list); - return r; + *r_ret = r; + return 0; } static int search_rsb_list(struct list_head *head, char *name, int len, @@ -405,16 +455,6 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, return error; } -static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, - unsigned int flags, struct dlm_rsb **r_ret) -{ - int error; - spin_lock(&ls->ls_rsbtbl[b].lock); - error = _search_rsb(ls, name, len, b, flags, r_ret); - spin_unlock(&ls->ls_rsbtbl[b].lock); - return error; -} - /* * Find rsb in rsbtbl and potentially create/add one * @@ -432,35 +472,48 @@ static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, static int find_rsb(struct dlm_ls *ls, char *name, int namelen, unsigned int flags, struct dlm_rsb **r_ret) { - struct dlm_rsb *r = NULL, *tmp; + struct dlm_rsb *r = NULL; uint32_t hash, bucket; - int error = -EINVAL; + int error; - if (namelen > DLM_RESNAME_MAXLEN) + if (namelen > DLM_RESNAME_MAXLEN) { + error = -EINVAL; goto out; + } if (dlm_no_directory(ls)) flags |= R_CREATE; - error = 0; hash = jhash(name, namelen, 0); bucket = hash & (ls->ls_rsbtbl_size - 1); - error = search_rsb(ls, name, namelen, bucket, flags, &r); + retry: + if (flags & R_CREATE) { + error = pre_rsb_struct(ls); + if (error < 0) + goto out; + } + + spin_lock(&ls->ls_rsbtbl[bucket].lock); + + error = _search_rsb(ls, name, namelen, bucket, flags, &r); if (!error) - goto out; + goto out_unlock; if (error == -EBADR && !(flags & R_CREATE)) - goto out; + goto out_unlock; /* the rsb was found but wasn't a master copy */ if (error == -ENOTBLK) - goto out; + goto out_unlock; - error = -ENOMEM; - r = create_rsb(ls, name, namelen); - if (!r) - goto out; + error = get_rsb_struct(ls, name, namelen, &r); + if (error == -EAGAIN) { + spin_unlock(&ls->ls_rsbtbl[bucket].lock); + goto retry; + } + if (error) + goto out_unlock; r->res_hash = hash; r->res_bucket = bucket; @@ -474,18 +527,10 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen, nodeid = 0; r->res_nodeid = nodeid; } - - spin_lock(&ls->ls_rsbtbl[bucket].lock); - error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); - if (!error) { - spin_unlock(&ls->ls_rsbtbl[bucket].lock); - dlm_free_rsb(r); - r = tmp; - goto out; - } list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); - spin_unlock(&ls->ls_rsbtbl[bucket].lock); error = 0; + out_unlock: + spin_unlock(&ls->ls_rsbtbl[bucket].lock); out: *r_ret = r; return error; @@ -580,9 +625,8 @@ static void detach_lkb(struct dlm_lkb *lkb) static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) { - struct dlm_lkb *lkb, *tmp; - uint32_t lkid = 0; - uint16_t bucket; + struct dlm_lkb *lkb; + int rv, id; lkb = dlm_allocate_lkb(ls); if (!lkb) @@ -594,60 +638,42 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) INIT_LIST_HEAD(&lkb->lkb_ownqueue); INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); INIT_LIST_HEAD(&lkb->lkb_time_list); - INIT_LIST_HEAD(&lkb->lkb_astqueue); + INIT_LIST_HEAD(&lkb->lkb_cb_list); + mutex_init(&lkb->lkb_cb_mutex); + INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); - get_random_bytes(&bucket, sizeof(bucket)); - bucket &= (ls->ls_lkbtbl_size - 1); - - write_lock(&ls->ls_lkbtbl[bucket].lock); + retry: + rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS); + if (!rv) + return -ENOMEM; - /* counter can roll over so we must verify lkid is not in use */ + spin_lock(&ls->ls_lkbidr_spin); + rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id); + if (!rv) + lkb->lkb_id = id; + spin_unlock(&ls->ls_lkbidr_spin); - while (lkid == 0) { - lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; + if (rv == -EAGAIN) + goto retry; - list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, - lkb_idtbl_list) { - if (tmp->lkb_id != lkid) - continue; - lkid = 0; - break; - } + if (rv < 0) { + log_error(ls, "create_lkb idr error %d", rv); + return rv; } - lkb->lkb_id = lkid; - list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); - write_unlock(&ls->ls_lkbtbl[bucket].lock); - *lkb_ret = lkb; return 0; } -static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) -{ - struct dlm_lkb *lkb; - uint16_t bucket = (lkid >> 16); - - list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { - if (lkb->lkb_id == lkid) - return lkb; - } - return NULL; -} - static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - uint16_t bucket = (lkid >> 16); - - if (bucket >= ls->ls_lkbtbl_size) - return -EBADSLT; - read_lock(&ls->ls_lkbtbl[bucket].lock); - lkb = __find_lkb(ls, lkid); + spin_lock(&ls->ls_lkbidr_spin); + lkb = idr_find(&ls->ls_lkbidr, lkid); if (lkb) kref_get(&lkb->lkb_ref); - read_unlock(&ls->ls_lkbtbl[bucket].lock); + spin_unlock(&ls->ls_lkbidr_spin); *lkb_ret = lkb; |