diff options
Diffstat (limited to 'fs/dlm/lowcomms-sctp.c')
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 145 |
1 files changed, 64 insertions, 81 deletions
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c index 5aeadadd8af..dc83a9d979b 100644 --- a/fs/dlm/lowcomms-sctp.c +++ b/fs/dlm/lowcomms-sctp.c @@ -72,6 +72,8 @@ struct nodeinfo { struct list_head writequeue; /* outgoing writequeue_entries */ spinlock_t writequeue_lock; int nodeid; + struct work_struct swork; /* Send workqueue */ + struct work_struct lwork; /* Locking workqueue */ }; static DEFINE_IDR(nodeinfo_idr); @@ -96,6 +98,7 @@ struct connection { atomic_t waiting_requests; struct cbuf cb; int eagain_flag; + struct work_struct work; /* Send workqueue */ }; /* An entry waiting to be sent */ @@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n) static LIST_HEAD(write_nodes); static DEFINE_SPINLOCK(write_nodes_lock); + /* Maximum number of incoming messages to process before * doing a schedule() */ #define MAX_RX_MSG_COUNT 25 -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; -static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait); +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *lock_workqueue; /* The SCTP connection */ static struct connection sctp_con; +static void process_send_sockets(struct work_struct *work); +static void process_recv_sockets(struct work_struct *work); +static void process_lock_request(struct work_struct *work); static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) { @@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) spin_lock_init(&ni->lock); INIT_LIST_HEAD(&ni->writequeue); spin_lock_init(&ni->writequeue_lock); + INIT_WORK(&ni->lwork, process_lock_request); + INIT_WORK(&ni->swork, process_send_sockets); ni->nodeid = nodeid; if (nodeid > max_nodeid) @@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) /* Data or notification available on socket */ static void lowcomms_data_ready(struct sock *sk, int count_unused) { - atomic_inc(&sctp_con.waiting_requests); if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - return; - - wake_up_interruptible(&lowcomms_recv_wait); + queue_work(recv_workqueue, &sctp_con.work); } @@ -361,10 +367,10 @@ static void init_failed(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } } } - wake_up_process(send_task); } /* Something happened to an association */ @@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } break; @@ -580,8 +586,8 @@ static int receive_from_sock(void) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); + queue_work(send_workqueue, &ni->swork); } - wake_up_process(send_task); } } @@ -590,6 +596,7 @@ static int receive_from_sock(void) return 0; cbuf_add(&sctp_con.cb, ret); + // PJC: TODO: Add to node's workqueue....can we ?? ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), page_address(sctp_con.rx_page), sctp_con.cb.base, sctp_con.cb.len, @@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg) spin_lock_bh(&write_nodes_lock); list_add_tail(&ni->write_list, &write_nodes); spin_unlock_bh(&write_nodes_lock); - wake_up_process(send_task); + + queue_work(send_workqueue, &ni->swork); } return; @@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid) return 0; } -static int write_list_empty(void) +// PJC: The work queue function for receiving. +static void process_recv_sockets(struct work_struct *work) { - int status; + if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { + int ret; + int count = 0; - spin_lock_bh(&write_nodes_lock); - status = list_empty(&write_nodes); - spin_unlock_bh(&write_nodes_lock); + do { + ret = receive_from_sock(); - return status; + /* Don't starve out everyone else */ + if (++count >= MAX_RX_MSG_COUNT) { + cond_resched(); + count = 0; + } + } while (!kthread_should_stop() && ret >=0); + } + cond_resched(); } -static int dlm_recvd(void *data) +// PJC: the work queue function for sending +static void process_send_sockets(struct work_struct *work) { - DECLARE_WAITQUEUE(wait, current); - - while (!kthread_should_stop()) { - int count = 0; - - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&lowcomms_recv_wait, &wait); - if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) - schedule(); - remove_wait_queue(&lowcomms_recv_wait, &wait); - set_current_state(TASK_RUNNING); - - if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { - int ret; - - do { - ret = receive_from_sock(); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - cond_resched(); - count = 0; - } - } while (!kthread_should_stop() && ret >=0); - } - cond_resched(); + if (sctp_con.eagain_flag) { + sctp_con.eagain_flag = 0; + refill_write_queue(); } - - return 0; + process_output_queue(); } -static int dlm_sendd(void *data) +// PJC: Process lock requests from a particular node. +// TODO: can we optimise this out on UP ?? +static void process_lock_request(struct work_struct *work) { - DECLARE_WAITQUEUE(wait, current); - - add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - if (sctp_con.eagain_flag) { - sctp_con.eagain_flag = 0; - refill_write_queue(); - } - process_output_queue(); - } - - remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - return 0; } static void daemons_stop(void) { - kthread_stop(recv_task); - kthread_stop(send_task); + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); + destroy_workqueue(lock_workqueue); } static int daemons_start(void) { - struct task_struct *p; int error; + recv_workqueue = create_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); + if (error) { + log_print("can't start dlm_recv %d", error); + return error; + } - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); if (error) { - log_print("can't start dlm_recvd %d", error); + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); return error; } - recv_task = p; - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); + lock_workqueue = create_workqueue("dlm_rlock"); + error = IS_ERR(lock_workqueue); if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); + log_print("can't start dlm_rlock %d", error); + destroy_workqueue(send_workqueue); + destroy_workqueue(recv_workqueue); return error; } - send_task = p; return 0; } @@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void) { int error; + INIT_WORK(&sctp_con.work, process_recv_sockets); + error = init_sock(); if (error) goto fail_sock; @@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void) for (i = 0; i < dlm_local_count; i++) kfree(dlm_local_addr[i]); } - |