diff options
Diffstat (limited to 'fs/dlm/lockspace.c')
| -rw-r--r-- | fs/dlm/lockspace.c | 371 |
1 files changed, 228 insertions, 143 deletions
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index c010ecfc0d2..f3e72787e7f 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2008 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -15,7 +15,6 @@ #include "lockspace.h" #include "member.h" #include "recoverd.h" -#include "ast.h" #include "dir.h" #include "lowcomms.h" #include "config.h" @@ -24,6 +23,7 @@ #include "recover.h" #include "requestqueue.h" #include "user.h" +#include "ast.h" static int ls_count; static struct mutex ls_lock; @@ -35,8 +35,11 @@ static struct task_struct * scand_task; static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { ssize_t ret = len; - int n = simple_strtol(buf, NULL, 0); + int n; + int rc = kstrtoint(buf, 0, &n); + if (rc) + return rc; ls = dlm_find_lockspace_local(ls->ls_local_handle); if (!ls) return -EINVAL; @@ -57,7 +60,10 @@ static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_uevent_result = simple_strtol(buf, NULL, 0); + int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); + + if (rc) + return rc; set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); wake_up(&ls->ls_uevent_wait); return len; @@ -70,7 +76,27 @@ static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) { - ls->ls_global_id = simple_strtoul(buf, NULL, 0); + int rc = kstrtouint(buf, 0, &ls->ls_global_id); + + if (rc) + return rc; + return len; +} + +static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) +{ + return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); +} + +static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) +{ + int val; + int rc = kstrtoint(buf, 0, &val); + + if (rc) + return rc; + if (val == 1) + set_bit(LSFL_NODIR, &ls->ls_flags); return len; } @@ -107,6 +133,12 @@ static struct dlm_attr dlm_attr_id = { .store = dlm_id_store }; +static struct dlm_attr dlm_attr_nodir = { + .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, + .show = dlm_nodir_show, + .store = dlm_nodir_store +}; + static struct dlm_attr dlm_attr_recover_status = { .attr = {.name = "recover_status", .mode = S_IRUGO}, .show = dlm_recover_status_show @@ -121,6 +153,7 @@ static struct attribute *dlm_attrs[] = { &dlm_attr_control.attr, &dlm_attr_event.attr, &dlm_attr_id.attr, + &dlm_attr_nodir.attr, &dlm_attr_recover_status.attr, &dlm_attr_recover_nodeid.attr, NULL, @@ -148,7 +181,7 @@ static void lockspace_kobj_release(struct kobject *k) kfree(ls); } -static struct sysfs_ops dlm_attr_ops = { +static const struct sysfs_ops dlm_attr_ops = { .show = dlm_attr_show, .store = dlm_attr_store, }; @@ -170,7 +203,7 @@ static int do_uevent(struct dlm_ls *ls, int in) else kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); - log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving"); + log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); /* dlm_controld will see the uevent, do the necessary group management and then write to sysfs to wake us */ @@ -178,7 +211,7 @@ static int do_uevent(struct dlm_ls *ls, int in) error = wait_event_interruptible(ls->ls_uevent_wait, test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); - log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result); + log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result); if (error) goto out; @@ -191,6 +224,18 @@ static int do_uevent(struct dlm_ls *ls, int in) return error; } +static int dlm_uevent(struct kset *kset, struct kobject *kobj, + struct kobj_uevent_env *env) +{ + struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); + + add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); + return 0; +} + +static struct kset_uevent_ops dlm_uevent_ops = { + .uevent = dlm_uevent, +}; int __init dlm_lockspace_init(void) { @@ -199,7 +244,7 @@ int __init dlm_lockspace_init(void) INIT_LIST_HEAD(&lslist); spin_lock_init(&lslist_lock); - dlm_kset = kset_create_and_add("dlm", NULL, kernel_kobj); + dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); if (!dlm_kset) { printk(KERN_WARNING "%s: can not create kset\n", __func__); return -ENOMEM; @@ -231,7 +276,6 @@ static struct dlm_ls *find_ls_to_scan(void) static int dlm_scand(void *data) { struct dlm_ls *ls; - int timeout_jiffies = dlm_config.ci_scan_secs * HZ; while (!kthread_should_stop()) { ls = find_ls_to_scan(); @@ -240,13 +284,14 @@ static int dlm_scand(void *data) ls->ls_scan_time = jiffies; dlm_scan_rsbs(ls); dlm_scan_timeout(ls); + dlm_scan_waiters(ls); dlm_unlock_recovery(ls); } else { ls->ls_scan_time += HZ; } - } else { - schedule_timeout_interruptible(timeout_jiffies); + continue; } + schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); } return 0; } @@ -347,17 +392,10 @@ static int threads_start(void) { int error; - /* Thread which process lock requests for all lockspace's */ - error = dlm_astd_start(); - if (error) { - log_print("cannot start dlm_astd thread %d", error); - goto fail; - } - error = dlm_scand_start(); if (error) { log_print("cannot start dlm_scand thread %d", error); - goto astd_fail; + goto fail; } /* Thread for sending/receiving messages for all lockspace's */ @@ -371,8 +409,6 @@ static int threads_start(void) scand_fail: dlm_scand_stop(); - astd_fail: - dlm_astd_stop(); fail: return error; } @@ -381,15 +417,17 @@ static void threads_stop(void) { dlm_scand_stop(); dlm_lowcomms_stop(); - dlm_astd_stop(); } -static int new_lockspace(const char *name, int namelen, void **lockspace, - uint32_t flags, int lvblen) +static int new_lockspace(const char *name, const char *cluster, + uint32_t flags, int lvblen, + const struct dlm_lockspace_ops *ops, void *ops_arg, + int *ops_result, dlm_lockspace_t **lockspace) { struct dlm_ls *ls; int i, size, error; int do_unreg = 0; + int namelen = strlen(name); if (namelen > DLM_LOCKSPACE_LEN) return -EINVAL; @@ -401,8 +439,24 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, return -EINVAL; if (!dlm_user_daemon_available()) { - module_put(THIS_MODULE); - return -EUNATCH; + log_print("dlm user daemon not available"); + error = -EUNATCH; + goto out; + } + + if (ops && ops_result) { + if (!dlm_config.ci_recover_callbacks) + *ops_result = -EOPNOTSUPP; + else + *ops_result = 0; + } + + if (dlm_config.ci_recover_callbacks && cluster && + strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { + log_print("dlm cluster name %s mismatch %s", + dlm_config.ci_cluster_name, cluster); + error = -EBADR; + goto out; } error = 0; @@ -440,6 +494,11 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, ls->ls_flags = 0; ls->ls_scan_time = jiffies; + if (ops && dlm_config.ci_recover_callbacks) { + ls->ls_ops = ops; + ls->ls_ops_arg = ops_arg; + } + if (flags & DLM_LSFL_TIMEWARN) set_bit(LSFL_TIMEWARN, &ls->ls_flags); @@ -451,37 +510,26 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, size = dlm_config.ci_rsbtbl_size; ls->ls_rsbtbl_size = size; - ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS); + ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size); if (!ls->ls_rsbtbl) goto out_lsfree; for (i = 0; i < size; i++) { - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list); - INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss); + ls->ls_rsbtbl[i].keep.rb_node = NULL; + ls->ls_rsbtbl[i].toss.rb_node = NULL; spin_lock_init(&ls->ls_rsbtbl[i].lock); } - size = dlm_config.ci_lkbtbl_size; - ls->ls_lkbtbl_size = size; + spin_lock_init(&ls->ls_remove_spin); - ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS); - if (!ls->ls_lkbtbl) - goto out_rsbfree; - for (i = 0; i < size; i++) { - INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list); - rwlock_init(&ls->ls_lkbtbl[i].lock); - ls->ls_lkbtbl[i].counter = 1; + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, + GFP_KERNEL); + if (!ls->ls_remove_names[i]) + goto out_rsbtbl; } - size = dlm_config.ci_dirtbl_size; - ls->ls_dirtbl_size = size; - - ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS); - if (!ls->ls_dirtbl) - goto out_lkbfree; - for (i = 0; i < size; i++) { - INIT_LIST_HEAD(&ls->ls_dirtbl[i].list); - spin_lock_init(&ls->ls_dirtbl[i].lock); - } + idr_init(&ls->ls_lkbidr); + spin_lock_init(&ls->ls_lkbidr_spin); INIT_LIST_HEAD(&ls->ls_waiters); mutex_init(&ls->ls_waiters_mutex); @@ -490,6 +538,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, INIT_LIST_HEAD(&ls->ls_timeout); mutex_init(&ls->ls_timeout_mutex); + INIT_LIST_HEAD(&ls->ls_new_rsb); + spin_lock_init(&ls->ls_new_rsb_spin); + INIT_LIST_HEAD(&ls->ls_nodes); INIT_LIST_HEAD(&ls->ls_nodes_gone); ls->ls_num_nodes = 0; @@ -508,6 +559,9 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, init_completion(&ls->ls_members_done); ls->ls_members_result = -1; + mutex_init(&ls->ls_cb_mutex); + INIT_LIST_HEAD(&ls->ls_cb_delay); + ls->ls_recoverd_task = NULL; mutex_init(&ls->ls_recoverd_active); spin_lock_init(&ls->ls_recover_lock); @@ -524,35 +578,59 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); if (!ls->ls_recover_buf) - goto out_dirfree; + goto out_lkbidr; + + ls->ls_slot = 0; + ls->ls_num_slots = 0; + ls->ls_slots_size = 0; + ls->ls_slots = NULL; INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); + idr_init(&ls->ls_recover_idr); + spin_lock_init(&ls->ls_recover_idr_lock); ls->ls_recover_list_count = 0; ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_root_list); init_rwsem(&ls->ls_root_sem); - down_write(&ls->ls_in_recovery); - spin_lock(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); spin_unlock(&lslist_lock); - /* needs to find ls in lslist */ + if (flags & DLM_LSFL_FS) { + error = dlm_callback_start(ls); + if (error) { + log_error(ls, "can't start dlm_callback %d", error); + goto out_delist; + } + } + + init_waitqueue_head(&ls->ls_recover_lock_wait); + + /* + * Once started, dlm_recoverd first looks for ls in lslist, then + * initializes ls_in_recovery as locked in "down" mode. We need + * to wait for the wakeup from dlm_recoverd because in_recovery + * has to start out in down mode. + */ + error = dlm_recoverd_start(ls); if (error) { log_error(ls, "can't start dlm_recoverd %d", error); - goto out_delist; + goto out_callback; } + wait_event(ls->ls_recover_lock_wait, + test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); + ls->ls_kobj.kset = dlm_kset; error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, "%s", ls->ls_name); if (error) - goto out_stop; + goto out_recoverd; kobject_uevent(&ls->ls_kobj, KOBJ_ADD); /* let kobject handle freeing of ls if there's an error */ @@ -566,7 +644,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, error = do_uevent(ls, 1); if (error) - goto out_stop; + goto out_recoverd; wait_for_completion(&ls->ls_members_done); error = ls->ls_members_result; @@ -575,7 +653,7 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, dlm_create_debug_file(ls); - log_debug(ls, "join complete"); + log_rinfo(ls, "join complete"); *lockspace = ls; return 0; @@ -583,19 +661,24 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, do_uevent(ls, 0); dlm_clear_members(ls); kfree(ls->ls_node_array); - out_stop: + out_recoverd: dlm_recoverd_stop(ls); + out_callback: + dlm_callback_stop(ls); out_delist: spin_lock(&lslist_lock); list_del(&ls->ls_list); spin_unlock(&lslist_lock); + idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); - out_dirfree: - kfree(ls->ls_dirtbl); - out_lkbfree: - kfree(ls->ls_lkbtbl); - out_rsbfree: - kfree(ls->ls_rsbtbl); + out_lkbidr: + idr_destroy(&ls->ls_lkbidr); + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { + if (ls->ls_remove_names[i]) + kfree(ls->ls_remove_names[i]); + } + out_rsbtbl: + vfree(ls->ls_rsbtbl); out_lsfree: if (do_unreg) kobject_put(&ls->ls_kobj); @@ -606,8 +689,10 @@ static int new_lockspace(const char *name, int namelen, void **lockspace, return error; } -int dlm_new_lockspace(const char *name, int namelen, void **lockspace, - uint32_t flags, int lvblen) +int dlm_new_lockspace(const char *name, const char *cluster, + uint32_t flags, int lvblen, + const struct dlm_lockspace_ops *ops, void *ops_arg, + int *ops_result, dlm_lockspace_t **lockspace) { int error = 0; @@ -617,7 +702,8 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace, if (error) goto out; - error = new_lockspace(name, namelen, lockspace, flags, lvblen); + error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, + ops_result, lockspace); if (!error) ls_count++; if (error > 0) @@ -629,50 +715,62 @@ int dlm_new_lockspace(const char *name, int namelen, void **lockspace, return error; } -/* Return 1 if the lockspace still has active remote locks, - * 2 if the lockspace still has active local locks. - */ -static int lockspace_busy(struct dlm_ls *ls) -{ - int i, lkb_found = 0; - struct dlm_lkb *lkb; - - /* NOTE: We check the lockidtbl here rather than the resource table. - This is because there may be LKBs queued as ASTs that have been - unlinked from their RSBs and are pending deletion once the AST has - been delivered */ - - for (i = 0; i < ls->ls_lkbtbl_size; i++) { - read_lock(&ls->ls_lkbtbl[i].lock); - if (!list_empty(&ls->ls_lkbtbl[i].list)) { - lkb_found = 1; - list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list, - lkb_idtbl_list) { - if (!lkb->lkb_nodeid) { - read_unlock(&ls->ls_lkbtbl[i].lock); - return 2; - } - } - } - read_unlock(&ls->ls_lkbtbl[i].lock); +static int lkb_idr_is_local(int id, void *p, void *data) +{ + struct dlm_lkb *lkb = p; + + return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; +} + +static int lkb_idr_is_any(int id, void *p, void *data) +{ + return 1; +} + +static int lkb_idr_free(int id, void *p, void *data) +{ + struct dlm_lkb *lkb = p; + + if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) + dlm_free_lvb(lkb->lkb_lvbptr); + + dlm_free_lkb(lkb); + return 0; +} + +/* NOTE: We check the lkbidr here rather than the resource table. + This is because there may be LKBs queued as ASTs that have been unlinked + from their RSBs and are pending deletion once the AST has been delivered */ + +static int lockspace_busy(struct dlm_ls *ls, int force) +{ + int rv; + + spin_lock(&ls->ls_lkbidr_spin); + if (force == 0) { + rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); + } else if (force == 1) { + rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); + } else { + rv = 0; } - return lkb_found; + spin_unlock(&ls->ls_lkbidr_spin); + return rv; } static int release_lockspace(struct dlm_ls *ls, int force) { - struct dlm_lkb *lkb; struct dlm_rsb *rsb; - struct list_head *head; + struct rb_node *n; int i, busy, rv; - busy = lockspace_busy(ls); + busy = lockspace_busy(ls, force); spin_lock(&lslist_lock); if (ls->ls_create_count == 1) { - if (busy > force) + if (busy) { rv = -EBUSY; - else { + } else { /* remove_lockspace takes ls off lslist */ ls->ls_create_count = 0; rv = 0; @@ -696,69 +794,50 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_recoverd_stop(ls); + dlm_callback_stop(ls); + remove_lockspace(ls); dlm_delete_debug_file(ls); - dlm_astd_suspend(); - kfree(ls->ls_recover_buf); /* - * Free direntry structs. + * Free all lkb's in idr */ - dlm_dir_clear(ls); - kfree(ls->ls_dirtbl); - - /* - * Free all lkb's on lkbtbl[] lists. - */ - - for (i = 0; i < ls->ls_lkbtbl_size; i++) { - head = &ls->ls_lkbtbl[i].list; - while (!list_empty(head)) { - lkb = list_entry(head->next, struct dlm_lkb, - lkb_idtbl_list); - - list_del(&lkb->lkb_idtbl_list); - - dlm_del_ast(lkb); - - if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) - dlm_free_lvb(lkb->lkb_lvbptr); - - dlm_free_lkb(lkb); - } - } - dlm_astd_resume(); - - kfree(ls->ls_lkbtbl); + idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); + idr_destroy(&ls->ls_lkbidr); /* * Free all rsb's on rsbtbl[] lists */ for (i = 0; i < ls->ls_rsbtbl_size; i++) { - head = &ls->ls_rsbtbl[i].list; - while (!list_empty(head)) { - rsb = list_entry(head->next, struct dlm_rsb, - res_hashchain); - - list_del(&rsb->res_hashchain); + while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { + rsb = rb_entry(n, struct dlm_rsb, res_hashnode); + rb_erase(n, &ls->ls_rsbtbl[i].keep); dlm_free_rsb(rsb); } - head = &ls->ls_rsbtbl[i].toss; - while (!list_empty(head)) { - rsb = list_entry(head->next, struct dlm_rsb, - res_hashchain); - list_del(&rsb->res_hashchain); + while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { + rsb = rb_entry(n, struct dlm_rsb, res_hashnode); + rb_erase(n, &ls->ls_rsbtbl[i].toss); dlm_free_rsb(rsb); } } - kfree(ls->ls_rsbtbl); + vfree(ls->ls_rsbtbl); + + for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) + kfree(ls->ls_remove_names[i]); + + while (!list_empty(&ls->ls_new_rsb)) { + rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, + res_hashchain); + list_del(&rsb->res_hashchain); + dlm_free_rsb(rsb); + } /* * Free structures on any other lists @@ -766,11 +845,10 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_purge_requestqueue(ls); kfree(ls->ls_recover_args); - dlm_clear_free_entries(ls); dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); - log_debug(ls, "release_lockspace final free"); + log_rinfo(ls, "release_lockspace final free"); kobject_put(&ls->ls_kobj); /* The ls structure will be freed when the kobject is done with */ @@ -816,17 +894,24 @@ int dlm_release_lockspace(void *lockspace, int force) void dlm_stop_lockspaces(void) { struct dlm_ls *ls; + int count; restart: + count = 0; spin_lock(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { - if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) + if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { + count++; continue; + } spin_unlock(&lslist_lock); log_error(ls, "no userland control daemon, stopping lockspace"); dlm_ls_stop(ls); goto restart; } spin_unlock(&lslist_lock); + + if (count) + log_print("dlm user daemon left %d lockspaces", count); } |
