diff options
author | Linus Torvalds <torvalds@woody.linux-foundation.org> | 2007-02-07 08:09:00 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@woody.linux-foundation.org> | 2007-02-07 08:09:00 -0800 |
commit | d3f8fd765e94b9137e1f27bbb0ac25289f9e565c (patch) | |
tree | a9ee7f05b3ef9c03292b101e1e2e0ed0e1c3e85a /fs | |
parent | 0670afdf0e69e5e73c8358da9c39bf3a8807b03e (diff) | |
parent | a2cf822274b3d58a16a65c8338e299e18b3dc3a4 (diff) |
Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (57 commits)
[GFS2] make gfs2_writepages() static
[GFS2] Unlock page on prepare_write try lock failure
[GFS2] nfsd readdirplus assertion failure
[DLM] fix softlockup in dlm_recv
[DLM] zero new user lvbs
[DLM/GFS2] indent help text
[GFS2] Fix unlink deadlocks
[GFS2] Put back semaphore to avoid umount problem
[GFS2] more CURRENT_TIME_SEC
[GFS2/DLM] fix GFS2 circular dependency
[GFS2/DLM] use sysfs
[GFS2] make lock_dlm drop_count tunable in sysfs
[GFS2] increase default lock limit
[GFS2] Fix list corruption in lops.c
[GFS2] Fix recursive locking attempt with NFS
[DLM] can miss clearing resend flag
[DLM] saved dlm message can be dropped
[DLM] Make sock_sem into a mutex
[GFS2] Fix typo in glock.c
[GFS2] use CURRENT_TIME_SEC instead of get_seconds in gfs2
...
Diffstat (limited to 'fs')
39 files changed, 866 insertions, 1119 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index b5654a284fe..6fa7b0d5c04 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,21 +3,21 @@ menu "Distributed Lock Manager" config DLM tristate "Distributed Lock Manager (DLM)" - depends on IPV6 || IPV6=n + depends on SYSFS && (IPV6 || IPV6=n) select CONFIGFS_FS select IP_SCTP if DLM_SCTP help - A general purpose distributed lock manager for kernel or userspace - applications. + A general purpose distributed lock manager for kernel or userspace + applications. choice prompt "Select DLM communications protocol" depends on DLM default DLM_TCP help - The DLM Can use TCP or SCTP for it's network communications. - SCTP supports multi-homed operations whereas TCP doesn't. - However, SCTP seems to have stability problems at the moment. + The DLM Can use TCP or SCTP for it's network communications. + SCTP supports multi-homed operations whereas TCP doesn't. + However, SCTP seems to have stability problems at the moment. config DLM_TCP bool "TCP/IP" @@ -31,8 +31,8 @@ config DLM_DEBUG bool "DLM debugging" depends on DLM help - Under the debugfs mount point, the name of each lockspace will - appear as a file in the "dlm" directory. The output is the - list of resource and locks the local node knows about. + Under the debugfs mount point, the name of each lockspace will + appear as a file in the "dlm" directory. The output is the + list of resource and locks the local node knows about. endmenu diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 88553054bbf..8665c88e5af 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -54,6 +54,11 @@ static struct config_item *make_node(struct config_group *, const char *); static void drop_node(struct config_group *, struct config_item *); static void release_node(struct config_item *); +static ssize_t show_cluster(struct config_item *i, struct configfs_attribute *a, + char *buf); +static ssize_t store_cluster(struct config_item *i, + struct configfs_attribute *a, + const char *buf, size_t len); static ssize_t show_comm(struct config_item *i, struct configfs_attribute *a, char *buf); static ssize_t store_comm(struct config_item *i, struct configfs_attribute *a, @@ -73,6 +78,101 @@ static ssize_t node_nodeid_write(struct node *nd, const char *buf, size_t len); static ssize_t node_weight_read(struct node *nd, char *buf); static ssize_t node_weight_write(struct node *nd, const char *buf, size_t len); +struct cluster { + struct config_group group; + unsigned int cl_tcp_port; + unsigned int cl_buffer_size; + unsigned int cl_rsbtbl_size; + unsigned int cl_lkbtbl_size; + unsigned int cl_dirtbl_size; + unsigned int cl_recover_timer; + unsigned int cl_toss_secs; + unsigned int cl_scan_secs; + unsigned int cl_log_debug; +}; + +enum { + CLUSTER_ATTR_TCP_PORT = 0, + CLUSTER_ATTR_BUFFER_SIZE, + CLUSTER_ATTR_RSBTBL_SIZE, + CLUSTER_ATTR_LKBTBL_SIZE, + CLUSTER_ATTR_DIRTBL_SIZE, + CLUSTER_ATTR_RECOVER_TIMER, + CLUSTER_ATTR_TOSS_SECS, + CLUSTER_ATTR_SCAN_SECS, + CLUSTER_ATTR_LOG_DEBUG, +}; + +struct cluster_attribute { + struct configfs_attribute attr; + ssize_t (*show)(struct cluster *, char *); + ssize_t (*store)(struct cluster *, const char *, size_t); +}; + +static ssize_t cluster_set(struct cluster *cl, unsigned int *cl_field, + unsigned int *info_field, int check_zero, + const char *buf, size_t len) +{ + unsigned int x; + + if (!capable(CAP_SYS_ADMIN)) + return -EACCES; + + x = simple_strtoul(buf, NULL, 0); + + if (check_zero && !x) + return -EINVAL; + + *cl_field = x; + *info_field = x; + + return len; +} + +#define __CONFIGFS_ATTR(_name,_mode,_read,_write) { \ + .attr = { .ca_name = __stringify(_name), \ + .ca_mode = _mode, \ + .ca_owner = THIS_MODULE }, \ + .show = _read, \ + .store = _write, \ +} + +#define CLUSTER_ATTR(name, check_zero) \ +static ssize_t name##_write(struct cluster *cl, const char *buf, size_t len) \ +{ \ + return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \ + check_zero, buf, len); \ +} \ +static ssize_t name##_read(struct cluster *cl, char *buf) \ +{ \ + return snprintf(buf, PAGE_SIZE, "%u\n", cl->cl_##name); \ +} \ +static struct cluster_attribute cluster_attr_##name = \ +__CONFIGFS_ATTR(name, 0644, name##_read, name##_write) + +CLUSTER_ATTR(tcp_port, 1); +CLUSTER_ATTR(buffer_size, 1); +CLUSTER_ATTR(rsbtbl_size, 1); +CLUSTER_ATTR(lkbtbl_size, 1); +CLUSTER_ATTR(dirtbl_size, 1); +CLUSTER_ATTR(recover_timer, 1); +CLUSTER_ATTR(toss_secs, 1); +CLUSTER_ATTR(scan_secs, 1); +CLUSTER_ATTR(log_debug, 0); + +static struct configfs_attribute *cluster_attrs[] = { + [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, + [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr, + [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr, + [CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr, + [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr, + [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr, + [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, + [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, + [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, + NULL, +}; + enum { COMM_ATTR_NODEID = 0, COMM_ATTR_LOCAL, @@ -152,10 +252,6 @@ struct clusters { struct configfs_subsystem subsys; }; -struct cluster { - struct config_group group; -}; - struct spaces { struct config_group ss_group; }; @@ -197,6 +293,8 @@ static struct configfs_group_operations clusters_ops = { static struct configfs_item_operations cluster_ops = { .release = release_cluster, + .show_attribute = show_cluster, + .store_attribute = store_cluster, }; static struct configfs_group_operations spaces_ops = { @@ -237,6 +335,7 @@ static struct config_item_type clusters_type = { static struct config_item_type cluster_type = { .ct_item_ops = &cluster_ops, + .ct_attrs = cluster_attrs, .ct_owner = THIS_MODULE, }; @@ -317,6 +416,16 @@ static struct config_group *make_cluster(struct config_group *g, cl->group.default_groups[1] = &cms->cs_group; cl->group.default_groups[2] = NULL; + cl->cl_tcp_port = dlm_config.ci_tcp_port; + cl->cl_buffer_size = dlm_config.ci_buffer_size; + cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size; + cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size; + cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size; + cl->cl_recover_timer = dlm_config.ci_recover_timer; + cl->cl_toss_secs = dlm_config.ci_toss_secs; + cl->cl_scan_secs = dlm_config.ci_scan_secs; + cl->cl_log_debug = dlm_config.ci_log_debug; + space_list = &sps->ss_group; comm_list = &cms->cs_group; return &cl->group; @@ -509,6 +618,25 @@ void dlm_config_exit(void) * Functions for user space to read/write attributes */ +static ssize_t show_cluster(struct config_item *i, struct configfs_attribute *a, + char *buf) +{ + struct cluster *cl = to_cluster(i); + struct cluster_attribute *cla = + container_of(a, struct cluster_attribute, attr); + return cla->show ? cla->show(cl, buf) : 0; +} + +static ssize_t store_cluster(struct config_item *i, + struct configfs_attribute *a, + const char *buf, size_t len) +{ + struct cluster *cl = to_cluster(i); + struct cluster_attribute *cla = + container_of(a, struct cluster_attribute, attr); + return cla->store ? cla->store(cl, buf, len) : -EINVAL; +} + static ssize_t show_comm(struct config_item *i, struct configfs_attribute *a, char *buf) { @@ -775,15 +903,17 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 +#define DEFAULT_LOG_DEBUG 0 struct dlm_config_info dlm_config = { - .tcp_port = DEFAULT_TCP_PORT, - .buffer_size = DEFAULT_BUFFER_SIZE, - .rsbtbl_size = DEFAULT_RSBTBL_SIZE, - .lkbtbl_size = DEFAULT_LKBTBL_SIZE, - .dirtbl_size = DEFAULT_DIRTBL_SIZE, - .recover_timer = DEFAULT_RECOVER_TIMER, - .toss_secs = DEFAULT_TOSS_SECS, - .scan_secs = DEFAULT_SCAN_SECS + .ci_tcp_port = DEFAULT_TCP_PORT, + .ci_buffer_size = DEFAULT_BUFFER_SIZE, + .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE, + .ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE, + .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE, + .ci_recover_timer = DEFAULT_RECOVER_TIMER, + .ci_toss_secs = DEFAULT_TOSS_SECS, + .ci_scan_secs = DEFAULT_SCAN_SECS, + .ci_log_debug = DEFAULT_LOG_DEBUG }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 9da7839958a..1e978611a96 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -17,14 +17,15 @@ #define DLM_MAX_ADDR_COUNT 3 struct dlm_config_info { - int tcp_port; - int buffer_size; - int rsbtbl_size; - int lkbtbl_size; - int dirtbl_size; - int recover_timer; - int toss_secs; - int scan_secs; + int ci_tcp_port; + int ci_buffer_size; + int ci_rsbtbl_size; + int ci_lkbtbl_size; + int ci_dirtbl_size; + int ci_recover_timer; + int ci_toss_secs; + int ci_scan_secs; + int ci_log_debug; }; extern struct dlm_config_info dlm_config; diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 1ee8195e6fc..61d93201e1b 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -41,6 +41,7 @@ #include <asm/uaccess.h> #include <linux/dlm.h> +#include "config.h" #define DLM_LOCKSPACE_LEN 64 @@ -69,12 +70,12 @@ struct dlm_mhandle; #define log_error(ls, fmt, args...) \ printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args) -#define DLM_LOG_DEBUG -#ifdef DLM_LOG_DEBUG -#define log_debug(ls, fmt, args...) log_error(ls, fmt, ##args) -#else -#define log_debug(ls, fmt, args...) -#endif +#define log_debug(ls, fmt, args...) \ +do { \ + if (dlm_config.ci_log_debug) \ + printk(KERN_DEBUG "dlm: %s: " fmt "\n", \ + (ls)->ls_name , ##args); \ +} while (0) #define DLM_ASSERT(x, do) \ { \ @@ -309,8 +310,8 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag) /* dlm_header is first element of all structs sent between nodes */ -#define DLM_HEADER_MAJOR 0x00020000 -#define DLM_HEADER_MINOR 0x00000001 +#define DLM_HEADER_MAJOR 0x00030000 +#define DLM_HEADER_MINOR 0x00000000 #define DLM_MSG 1 #define DLM_RCOM 2 @@ -386,6 +387,8 @@ struct dlm_rcom { uint32_t rc_type; /* DLM_RCOM_ */ int rc_result; /* multi-purpose */ uint64_t rc_id; /* match reply with request */ + uint64_t rc_seq; /* sender's ls_recover_seq */ + uint64_t rc_seq_reply; /* remote ls_recover_seq */ char rc_buf[0]; }; @@ -523,6 +526,7 @@ struct dlm_user_proc { spinlock_t asts_spin; struct list_head locks; spinlock_t locks_spin; + struct list_head unlocking; wait_queue_head_t wait; }; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 30878defaeb..e725005fafd 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -754,6 +754,11 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype) mutex_unlock(&ls->ls_waiters_mutex); } +/* We clear the RESEND flag because we might be taking an lkb off the waiters + list as part of process_requestqueue (e.g. a lookup that has an optimized + request reply on the requestqueue) between dlm_recover_waiters_pre() which + set RESEND and dlm_recover_waiters_post() */ + static int _remove_from_waiters(struct dlm_lkb *lkb) { int error = 0; @@ -764,6 +769,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb) goto out; } lkb->lkb_wait_type = 0; + lkb->lkb_flags &= ~DLM_IFL_RESEND; list_del(&lkb->lkb_wait_reply); unhold_lkb(lkb); out: @@ -810,7 +816,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b) list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, res_hashchain) { if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.toss_secs * HZ)) + dlm_config.ci_toss_secs * HZ)) continue; found = 1; break; @@ -2144,12 +2150,24 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, if (lkb->lkb_astaddr) ms->m_asts |= AST_COMP; - if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP) - memcpy(ms->m_extra, r->res_name, r->res_length); + /* compare with switch in create_message; send_remove() doesn't + use send_args() */ - else if (lkb->lkb_lvbptr) + switch (ms->m_type) { + case DLM_MSG_REQUEST: + case DLM_MSG_LOOKUP: + memcpy(ms->m_extra, r->res_name, r->res_length); + break; + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_REQUEST_REPLY: + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_GRANT: + if (!lkb->lkb_lvbptr) + break; memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); - + break; + } } static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) @@ -2418,8 +2436,12 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); - if (receive_lvb(ls, lkb, ms)) - return -ENOMEM; + if (lkb->lkb_exflags & DLM_LKF_VALBLK) { + /* lkb was just created so there won't be an lvb yet */ + lkb->lkb_lvbptr = allocate_lvb(ls); + if (!lkb->lkb_lvbptr) + return -ENOMEM; + } return 0; } @@ -3002,7 +3024,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) { struct dlm_message *ms = (struct dlm_message *) hd; struct dlm_ls *ls; - int error; + int error = 0; if (!recovery) dlm_message_in(ms); @@ -3119,7 +3141,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) out: dlm_put_lockspace(ls); dlm_astd_wake(); - return 0; + return error; } @@ -3132,6 +3154,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) if (middle_conversion(lkb)) { hold_lkb(lkb); ls->ls_stub_ms.m_result = -EINPROGRESS; + ls->ls_stub_ms.m_flags = lkb->lkb_flags; _remove_from_waiters(lkb); _receive_convert_reply(lkb, &ls->ls_stub_ms); @@ -3205,6 +3228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_UNLOCK: hold_lkb(lkb); ls->ls_stub_ms.m_result = -DLM_EUNLOCK; + ls->ls_stub_ms.m_flags = lkb->lkb_flags; _remove_from_waiters(lkb); _receive_unlock_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); @@ -3213,6 +3237,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls) case DLM_MSG_CANCEL: hold_lkb(lkb); ls->ls_stub_ms.m_result = -DLM_ECANCEL; + ls->ls_stub_ms.m_flags = lkb->lkb_flags; _remove_from_waiters(lkb); _receive_cancel_reply(lkb, &ls->ls_stub_ms); dlm_put_lkb(lkb); @@ -3571,6 +3596,14 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) lock_rsb(r); switch (error) { + case -EBADR: + /* There's a chance the new master received our lock before + dlm_recover_master_reply(), this wouldn't happen if we did + a barrier between recover_masters and recover_locks. */ + log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, + (unsigned long)r, r->res_name); + dlm_send_rcom_lock(r, lkb); + goto out; case -EEXIST: log_debug(ls, "master copy exists %x", lkb->lkb_id); /* fall through */ @@ -3585,7 +3618,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) /* an ack for dlm_recover_locks() which waits for replies from all the locks it sends to new masters */ dlm_recovered_lock(r); - + out: unlock_rsb(r); put_rsb(r); dlm_put_lkb(lkb); @@ -3610,7 +3643,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, } if (flags & DLM_LKF_VALBLK) { - ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL); + ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); if (!ua->lksb.sb_lvbptr) { kfree(ua); __put_lkb(ls, lkb); @@ -3679,7 +3712,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, ua = (struct dlm_user_args *)lkb->lkb_astparam; if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { - ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL); + ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); if (!ua->lksb.sb_lvbptr) { error = -ENOMEM; goto out_put; @@ -3745,12 +3778,10 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, goto out_put; spin_lock(&ua->proc->locks_spin); - list_del_init(&lkb->lkb_ownqueue); + /* dlm_user_add_ast() may have already taken lkb off the proc list */ + if (!list_empty(&lkb->lkb_ownqueue)) + list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); spin_unlock(&ua->proc->locks_spin); - - /* this removes the reference for the proc->locks list added by - dlm_user_request */ - unhold_lkb(lkb); out_put: dlm_put_lkb(lkb); out: @@ -3790,9 +3821,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, /* this lkb was removed from the WAITING queue */ if (lkb->lkb_grmode == DLM_LOCK_IV) { spin_lock(&ua->proc->locks_spin); - list_del_init(&lkb->lkb_ownqueue); + list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); spin_unlock(&ua->proc->locks_spin); - unhold_lkb(lkb); } out_put: dlm_put_lkb(lkb); @@ -3853,11 +3883,6 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) mutex_lock(&ls->ls_clear_proc_locks); list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { - if (lkb->lkb_ast_type) { - list_del(&lkb->lkb_astqueue); - unhold_lkb(lkb); - } - list_del_init(&lkb->lkb_ownqueue); if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { @@ -3874,6 +3899,20 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) dlm_put_lkb(lkb); } + + /* in-progress unlocks */ + list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { + list_del_init(&lkb->lkb_ownqueue); + lkb->lkb_flags |= DLM_IFL_DEAD; + dlm_put_lkb(lkb); + } + + list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { + list_del(&lkb->lkb_astqueue); + dlm_put_lkb(lkb); + } + mutex_unlock(&ls->ls_clear_proc_locks); unlock_recovery(ls); } + diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 59012b089e8..f40817b53c6 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -236,7 +236,7 @@ static int dlm_scand(void *data) while (!kthread_should_stop()) { list_for_each_entry(ls, &lslist, ls_list) dlm_scan_rsbs(ls); - schedule_timeout_interruptible(dlm_config.scan_secs * HZ); + schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); } return 0; } @@ -422,7 +422,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, ls->ls_count = 0; ls->ls_flags = 0; - size = dlm_config.rsbtbl_size; + size = dlm_config.ci_rsbtbl_size; ls->ls_rsbtbl_size = size; ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL); @@ -434,7 +434,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, rwlock_init(&ls->ls_rsbtbl[i].lock); } - size = dlm_config.lkbtbl_size; + size = dlm_config.ci_lkbtbl_size; ls->ls_lkbtbl_size = size; ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL); @@ -446,7 +446,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, ls->ls_lkbtbl[i].counter = 1; } - size = dlm_config.dirtbl_size; + size = dlm_config.ci_dirtbl_size; ls->ls_dirtbl_size = size; ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL); @@ -489,7 +489,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace, mutex_init(&ls->ls_requestqueue_mutex); mutex_init(&ls->ls_clear_proc_locks); - ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL); + ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL); if (!ls->ls_recover_buf) goto out_dirfree; diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index fe158d7a928..dc83a9d979b 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c @@ -72,6 +72,8 @@ struct nodeinfo { struct list_head writequeue; /* outgoing writequeue_entries */ spinlock_t writequeue_lock; int nodeid; + struct work_struct swork; /* Send workqueue */ + struct work_struct lwork; /* Locking workqueue */ }; static DEFINE_IDR(nodeinfo_idr); @@ -96,6 +98,7 @@ struct connection { atomic_t waiting_requests; struct cbuf cb; int eagain_flag; + struct work_struct work; /* Send workqueue */ }; /* An entry waiting to be sent */ @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) static LIST_HEAD(write_nodes); static DEFINE_SPINLOCK(write_nodes_lock); + /* Maximum number of incoming messages to process before * doing a schedule() */ #define MAX_RX_MSG_COUNT 25 -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *lock_workqueue; /* The SCTP connection */ static struct connection sctp_con; +static void process_send_sockets(struct work_struct *work); +static void process_recv_sockets(struct work_struct *work); +static void process_lock_request(struct work_struct *work); static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) { @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) spin_lock_init(&ni->lock); INIT_LIST_HEAD(&ni->writequeue); spin_lock_init(&ni->writequeue_lock); + INIT_WORK(&ni->lwork, process_lock_request); + INIT_WORK(&ni->swork, process_send_sockets); ni->nodeid = nodeid; if (nodeid > max_nodeid) @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) /* Data or notification available on socket */ static void lowcomms_data_ready(struct sock *sk, int count_unused) { - atomic_inc(&sctp_con.waiting_requests); if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - return; - - wake_up_interruptible(&lowcomms_recv_wait); + queue_work(recv_workqueue, &sctp_con.work); } @@ -361,10 +367,10 @@ static void init_failed(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } } } - wake_up_process(send_task); } /* Something happened to an association */ @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } break; @@ -580,8 +586,8 @@ static int receive_from_sock(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } } @@ -590,6 +596,7 @@ static int receive_from_sock(void) return 0; cbuf_add(&sctp_con.cb, ret); + // PJC: TODO: Add to node's workqueue....can we ?? ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), page_address(sctp_con.rx_page), sctp_con.cb.base, sctp_con.cb.len, @@ -635,7 +642,7 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) if (result < 0) log_print("Can't bind to port %d addr number %d", - dlm_config.tcp_port, num); + dlm_config.ci_tcp_port, num); return result; } @@ -711,7 +718,7 @@ static int init_sock(void) /* Bind to all interfaces. */ for (i = 0; i < dlm_local_count; i++) { memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); - make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len); + make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); result = add_bind_addr(&localaddr, addr_len, num); if (result) @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) |