diff options
author | Patrick Caulfield <pcaulfie@redhat.com> | 2007-04-17 15:39:57 +0100 |
---|---|---|
committer | Steven Whitehouse <swhiteho@redhat.com> | 2007-05-01 09:11:23 +0100 |
commit | 6ed7257b46709e87d79ac2b6b819b7e0c9184998 (patch) | |
tree | 502f68849175f8fb52bb141501df2df9efc8e06c /fs/dlm | |
parent | fc7c44f03d95f20b5446d06f5bb9605cddd53203 (diff) |
[DLM] Consolidate transport protocols
This patch consolidates the TCP & SCTP protocols for the DLM into a single file
and makes it switchable at run-time (well, at least before the DLM actually
starts up!)
For RHEL5 this patch requires Neil Horman's patch that expands the in-kernel
socket API but that has already been twice ACKed so it should be OK.
The patch adds a new lowcomms.c file that replaces the existing lowcomms-sctp.c
& lowcomms-tcp.c files.
Signed-off-By: Patrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/Kconfig | 31 | ||||
-rw-r--r-- | fs/dlm/Makefile | 6 | ||||
-rw-r--r-- | fs/dlm/config.c | 10 | ||||
-rw-r--r-- | fs/dlm/config.h | 3 | ||||
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 1210 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c (renamed from fs/dlm/lowcomms-tcp.c) | 742 |
6 files changed, 621 insertions, 1381 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 6fa7b0d5c04..69a94690e49 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,36 +3,19 @@ menu "Distributed Lock Manager" config DLM tristate "Distributed Lock Manager (DLM)" - depends on SYSFS && (IPV6 || IPV6=n) + depends on IPV6 || IPV6=n select CONFIGFS_FS - select IP_SCTP if DLM_SCTP + select IP_SCTP help - A general purpose distributed lock manager for kernel or userspace - applications. - -choice - prompt "Select DLM communications protocol" - depends on DLM - default DLM_TCP - help - The DLM Can use TCP or SCTP for it's network communications. - SCTP supports multi-homed operations whereas TCP doesn't. - However, SCTP seems to have stability problems at the moment. - -config DLM_TCP - bool "TCP/IP" - -config DLM_SCTP - bool "SCTP" - -endchoice + A general purpose distributed lock manager for kernel or userspace + applications. config DLM_DEBUG bool "DLM debugging" depends on DLM help - Under the debugfs mount point, the name of each lockspace will - appear as a file in the "dlm" directory. The output is the - list of resource and locks the local node knows about. + Under the debugfs mount point, the name of each lockspace will + appear as a file in the "dlm" directory. The output is the + list of resource and locks the local node knows about. endmenu diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile index 65388944eba..604cf7dc5f3 100644 --- a/fs/dlm/Makefile +++ b/fs/dlm/Makefile @@ -8,14 +8,12 @@ dlm-y := ast.o \ member.o \ memory.o \ midcomms.o \ + lowcomms.o \ rcom.o \ recover.o \ recoverd.o \ requestqueue.o \ user.o \ - util.o + util.o dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o -dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o - -dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o
\ No newline at end of file diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 8665c88e5af..822abdcd143 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -89,6 +89,7 @@ struct cluster { unsigned int cl_toss_secs; unsigned int cl_scan_secs; unsigned int cl_log_debug; + unsigned int cl_protocol; }; enum { @@ -101,6 +102,7 @@ enum { CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_SCAN_SECS, CLUSTER_ATTR_LOG_DEBUG, + CLUSTER_ATTR_PROTOCOL, }; struct cluster_attribute { @@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(scan_secs, 1); CLUSTER_ATTR(log_debug, 0); +CLUSTER_ATTR(protocol, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, @@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, + [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, NULL, }; @@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 #define DEFAULT_LOG_DEBUG 0 +#define DEFAULT_PROTOCOL 0 struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, @@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = { .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS, - .ci_log_debug = DEFAULT_LOG_DEBUG + .ci_log_debug = DEFAULT_LOG_DEBUG, + .ci_protocol = DEFAULT_PROTOCOL }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 1e978611a96..967cc3d72e5 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -26,6 +26,7 @@ struct dlm_config_info { int ci_toss_secs; int ci_scan_secs; int ci_log_debug; + int ci_protocol; }; extern struct dlm_config_info dlm_config; diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c deleted file mode 100644 index dc83a9d979b..00000000000 --- a/fs/dlm/lowcomms-sctp.c +++ /dev/null @@ -1,1210 +0,0 @@ -/****************************************************************************** -******************************************************************************* -** -** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. -** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. -** -******************************************************************************* -******************************************************************************/ - -/* - * lowcomms.c - * - * This is the "low-level" comms layer. - * - * It is responsible for sending/receiving messages - * from other nodes in the cluster. - * - * Cluster nodes are referred to by their nodeids. nodeids are - * simply 32 bit numbers to the locking module - if they need to - * be expanded for the cluster infrastructure then that is it's - * responsibility. It is this layer's - * responsibility to resolve these into IP address or - * whatever it needs for inter-node communication. - * - * The comms level is two kernel threads that deal mainly with - * the receiving of messages from other nodes and passing them - * up to the mid-level comms layer (which understands the - * message format) for execution by the locking core, and - * a send thread which does all the setting up of connections - * to remote nodes and the sending of data. Threads are not allowed - * to send their own data because it may cause them to wait in times - * of high load. Also, this way, the sending thread can collect together - * messages bound for one node and send them in one block. - * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never (well, hardly ever) waits. - * - */ - -#include <asm/ioctls.h> -#include <net/sock.h> -#include <net/tcp.h> -#include <net/sctp/user.h> -#include <linux/pagemap.h> -#include <linux/socket.h> -#include <linux/idr.h> - -#include "dlm_internal.h" -#include "lowcomms.h" -#include "config.h" -#include "midcomms.h" - -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; -static int dlm_local_count; -static int dlm_local_nodeid; - -/* One of these per connected node */ - -#define NI_INIT_PENDING 1 -#define NI_WRITE_PENDING 2 - -struct nodeinfo { - spinlock_t lock; - sctp_assoc_t assoc_id; - unsigned long flags; - struct list_head write_list; /* nodes with pending writes */ - struct list_head writequeue; /* outgoing writequeue_entries */ - spinlock_t writequeue_lock; - int nodeid; - struct work_struct swork; /* Send workqueue */ - struct work_struct lwork; /* Locking workqueue */ -}; - -static DEFINE_IDR(nodeinfo_idr); -static DECLARE_RWSEM(nodeinfo_lock); -static int max_nodeid; - -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -/* Just the one of these, now. But this struct keeps - the connection-specific variables together */ - -#define CF_READ_PENDING 1 - -struct connection { - struct socket *sock; - unsigned long flags; - struct page *rx_page; - atomic_t waiting_requests; - struct cbuf cb; - int eagain_flag; - struct work_struct work; /* Send workqueue */ -}; - -/* An entry waiting to be sent */ - -struct writequeue_entry { - struct list_head list; - struct page *page; - int offset; - int len; - int end; - int users; - struct nodeinfo *ni; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -/* List of nodes which have writes pending */ -static LIST_HEAD(write_nodes); -static DEFINE_SPINLOCK(write_nodes_lock); - - -/* Maximum number of incoming messages to process before - * doing a schedule() - */ -#define MAX_RX_MSG_COUNT 25 - -/* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; -static struct workqueue_struct *lock_workqueue; - -/* The SCTP connection */ -static struct connection sctp_con; - -static void process_send_sockets(struct work_struct *work); -static void process_recv_sockets(struct work_struct *work); -static void process_lock_request(struct work_struct *work); - -static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) -{ - struct sockaddr_storage addr; - int error; - - if (!dlm_local_count) - return -1; - - error = dlm_nodeid_to_addr(nodeid, &addr); - if (error) - return error; - - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; - struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; - ret4->sin_addr.s_addr = in4->sin_addr.s_addr; - } else { - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; - struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; - memcpy(&ret6->sin6_addr, &in6->sin6_addr, - sizeof(in6->sin6_addr)); - } - - return 0; -} - -/* If alloc is 0 here we will not attempt to allocate a new - nodeinfo struct */ -static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) -{ - struct nodeinfo *ni; - int r; - int n; - - down_read(&nodeinfo_lock); - ni = idr_find(&nodeinfo_idr, nodeid); - up_read(&nodeinfo_lock); - - if (ni || !alloc) - return ni; - - down_write(&nodeinfo_lock); - - ni = idr_find(&nodeinfo_idr, nodeid); - if (ni) - goto out_up; - - r = idr_pre_get(&nodeinfo_idr, alloc); - if (!r) - goto out_up; - - ni = kmalloc(sizeof(struct nodeinfo), alloc); - if (!ni) - goto out_up; - - r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); - if (r) { - kfree(ni); - ni = NULL; - goto out_up; - } - if (n != nodeid) { - idr_remove(&nodeinfo_idr, n); - kfree(ni); - ni = NULL; - goto out_up; - } - memset(ni, 0, sizeof(struct nodeinfo)); - spin_lock_init(&ni->lock); - INIT_LIST_HEAD(&ni->writequeue); - spin_lock_init(&ni->writequeue_lock); - INIT_WORK(&ni->lwork, process_lock_request); - INIT_WORK(&ni->swork, process_send_sockets); - ni->nodeid = nodeid; - - if (nodeid > max_nodeid) - max_nodeid = nodeid; -out_up: - up_write(&nodeinfo_lock); - - return ni; -} - -/* Don't call this too often... */ -static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (ni && ni->assoc_id == assoc) - return ni; - } - return NULL; -} - -/* Data or notification available on socket */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) -{ - if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - queue_work(recv_workqueue, &sctp_con.work); -} - - -/* Add the port number to an IP6 or 4 sockaddr and return the address length. - Also padd out the struct with zeros to make comparisons meaningful */ - -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, - int *addr_len) -{ - struct sockaddr_in *local4_addr; - struct sockaddr_in6 *local6_addr; - - if (!dlm_local_count) - return; - - if (!port) { - if (dlm_local_addr[0]->ss_family == AF_INET) { - local4_addr = (struct sockaddr_in *)dlm_local_addr[0]; - port = be16_to_cpu(local4_addr->sin_port); - } else { - local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0]; - port = be16_to_cpu(local6_addr->sin6_port); - } - } - - saddr->ss_family = dlm_local_addr[0]->ss_family; - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); - memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); - memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in)); - *addr_len = sizeof(struct sockaddr_in); - } else { - struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); - memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in6)); - *addr_len = sizeof(struct sockaddr_in6); - } -} - -/* Close the connection and tidy up */ -static void close_connection(void) -{ - if (sctp_con.sock) { - sock_release(sctp_con.sock); - sctp_con.sock = NULL; - } - - if (sctp_con.rx_page) { - __free_page(sctp_con.rx_page); - sctp_con.rx_page = NULL; - } -} - -/* We only send shutdown messages to nodes that are not part of the cluster */ -static void send_shutdown(sctp_assoc_t associd) -{ - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - - outmessage.msg_name = NULL; - outmessage.msg_namelen = 0; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - outmessage.msg_controllen = cmsg->cmsg_len; - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - - sinfo->sinfo_flags |= MSG_EOF; - sinfo->sinfo_assoc_id = associd; - - ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0); - - if (ret != 0) - log_print("send EOF to node failed: %d", ret); -} - - -/* INIT failed but we don't know which node... - restart INIT on all pending nodes */ -static void init_failed(void) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (!ni) - continue; - - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - ni->assoc_id = 0; - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - } -} - -/* Something happened to an association */ -static void process_sctp_notification(struct msghdr *msg, char *buf) -{ - union sctp_notification *sn = (union sctp_notification *)buf; - - if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { - switch (sn->sn_assoc_change.sac_state) { - - case SCTP_COMM_UP: - case SCTP_RESTART: - { - /* Check that the new node is in the lockspace */ - struct sctp_prim prim; - mm_segment_t fs; - int nodeid; - int prim_len, ret; - int addr_len; - struct nodeinfo *ni; - - /* This seems to happen when we received a connection - * too early... or something... anyway, it happens but - * we always seem to get a real message too, see - * receive_from_sock */ - - if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { - log_print("COMM_UP for invalid assoc ID %d", - (int)sn->sn_assoc_change.sac_assoc_id); - init_failed(); - return; - } - memset(&prim, 0, sizeof(struct sctp_prim)); - prim_len = sizeof(struct sctp_prim); - prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; - - fs = get_fs(); - set_fs(get_ds()); - ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, - IPPROTO_SCTP, - SCTP_PRIMARY_ADDR, - (char*)&prim, - &prim_len); - set_fs(fs); - if (ret < 0) { - struct nodeinfo *ni; - - log_print("getsockopt/sctp_primary_addr on " - "new assoc %d failed : %d", - (int)sn->sn_assoc_change.sac_assoc_id, - ret); - - /* Retry INIT later */ - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) - clear_bit(NI_INIT_PENDING, &ni->flags); - return; - } - make_sockaddr(&prim.ssp_addr, 0, &addr_len); - if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { - log_print("reject connect from unknown addr"); - send_shutdown(prim.ssp_assoc_id); - return; - } - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - /* Save the assoc ID */ - ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; - - log_print("got new/restarted association %d nodeid %d", - (int)sn->sn_assoc_change.sac_assoc_id, nodeid); - - /* Send any pending writes */ - clear_bit(NI_INIT_PENDING, &ni->flags); - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - break; - - case SCTP_COMM_LOST: - case SCTP_SHUTDOWN_COMP: - { - struct nodeinfo *ni; - - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) { - spin_lock(&ni->lock); - ni->assoc_id = 0; - spin_unlock(&ni->lock); - } - } - break; - - /* We don't know which INIT failed, so clear the PENDING flags - * on them all. if assoc_id is zero then it will then try - * again */ - - case SCTP_CANT_STR_ASSOC: - { - log_print("Can't start SCTP association - retrying"); - init_failed(); - } - break; - - default: - log_print("unexpected SCTP assoc change id=%d state=%d", - (int)sn->sn_assoc_change.sac_assoc_id, - sn->sn_assoc_change.sac_state); - } - } -} - -/* Data received from remote end */ -static int receive_from_sock(void) -{ - int ret = 0; - struct msghdr msg; - struct kvec iov[2]; - unsigned len; - int r; - struct sctp_sndrcvinfo *sinfo; - struct cmsghdr *cmsg; - struct nodeinfo *ni; - - /* These two are marginally too big for stack allocation, but this - * function is (currently) only called by dlm_recvd so static should be - * OK. - */ - static struct sockaddr_storage msgname; - static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - - if (sctp_con.sock == NULL) - goto out; - - if (sctp_con.rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - sctp_con.rx_page = alloc_page(GFP_ATOMIC); - if (sctp_con.rx_page == NULL) - goto out_resched; - cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE); - } - - memset(&incmsg, 0, sizeof(incmsg)); - memset(&msgname, 0, sizeof(msgname)); - - msg.msg_name = &msgname; - msg.msg_namelen = sizeof(msgname); - msg.msg_flags = 0; - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - msg.msg_iovlen = 1; - - /* I don't see why this circular buffer stuff is necessary for SCTP - * which is a packet-based protocol, but the whole thing breaks under - * load without it! The overhead is minimal (and is in the TCP lowcomms - * anyway, of course) so I'll leave it in until I can figure out what's - * really happening. - */ - - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb); - iov[0].iov_base = page_address(sctp_con.rx_page) + - cbuf_data(&sctp_con.cb); - iov[1].iov_len = 0; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) { - iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb); - iov[1].iov_len = sctp_con.cb.base; - iov[1].iov_base = page_address(sctp_con.rx_page); - msg.msg_iovlen = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - - r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len, - MSG_NOSIGNAL | MSG_DONTWAIT); - if (ret <= 0) - goto out_close; - - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - cmsg = CMSG_FIRSTHDR(&msg); - sinfo = CMSG_DATA(cmsg); - - if (msg.msg_flags & MSG_NOTIFICATION) { - process_sctp_notification(&msg, page_address(sctp_con.rx_page)); - return 0; - } - - /* Is this a new association ? */ - ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL); - if (ni) { - ni->assoc_id = sinfo->sinfo_assoc_id; - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - queue_work(send_workqueue, &ni->swork); - } - } - } - - /* INIT sends a message with length of 1 - ignore it */ - if (r == 1) - return 0; - - cbuf_add(&sctp_con.cb, ret); - // PJC: TODO: Add to node's workqueue....can we ?? - ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), - page_address(sctp_con.rx_page), - sctp_con.cb.base, sctp_con.cb.len, - PAGE_CACHE_SIZE); - if (ret < 0) - goto out_close; - cbuf_eat(&sctp_con.cb, ret); - -out: - ret = 0; - goto out_ret; - -out_resched: - lowcomms_data_ready(sctp_con.sock->sk, 0); - ret = 0; - cond_resched(); - goto out_ret; - -out_close: - if (ret != -EAGAIN) - log_print("error reading from sctp socket: %d", ret); -out_ret: - return ret; -} - -/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ -static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) -{ - mm_segment_t fs; - int result = 0; - - fs = get_fs(); - set_fs(get_ds()); - if (num == 1) - result = sctp_con.sock->ops->bind(sctp_con.sock, - (struct sockaddr *) addr, - addr_len); - else - result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, - SCTP_SOCKOPT_BINDX_ADD, - (char *)addr, addr_len); - set_fs(fs); - - if (result < 0) - log_print("Can't bind to port %d addr number %d", - dlm_config.ci_tcp_port, num); - - return result; -} - -static void init_local(void) -{ - struct sockaddr_storage sas, *addr; - int i; - - dlm_local_nodeid = dlm_our_nodeid(); - - for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { - if (dlm_our_addr(&sas, i)) - break; - - addr = kmalloc(sizeof(*addr), GFP_KERNEL); - if (!addr) - break; - memcpy(addr, &sas, sizeof(*addr)); - dlm_local_addr[dlm_local_count++] = addr; - } -} - -/* Initialise SCTP socket and bind to all interfaces */ -static int init_sock(void) -{ - mm_segment_t fs; - struct socket *sock = NULL; - struct sockaddr_storage localaddr; - struct sctp_event_subscribe subscribe; - int result = -EINVAL, num = 1, i, addr_len; - - if (!dlm_local_count) { - init_local(); - if (!dlm_local_count) { - log_print("no local IP address has been set"); - goto out; - } - } - - result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, - IPPROTO_SCTP, &sock); - if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); - goto out; - } - - /* Listen for events */ - memset(&subscribe, 0, sizeof(subscribe)); - subscribe.sctp_data_io_event = 1; - subscribe.sctp_association_event = 1; - subscribe.sctp_send_failure_event = 1; - subscribe.sctp_shutdown_event = 1; - subscribe.sctp_partial_delivery_event = 1; - - fs = get_fs(); - set_fs(get_ds()); - result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS, - (char *)&subscribe, sizeof(subscribe)); - set_fs(fs); - - if (result < 0) { - log_print("Failed to set SCTP_EVENTS on socket: result=%d", - result); - goto create_delsock; - } - - /* Init con struct */ - sock->sk->sk_user_data = &sctp_con; - sctp_con.sock = sock; - sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready; - - /* Bind to all interfaces. */ - for (i = 0; i < dlm_local_count; i++) { - memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); - make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); - - result = add_bind_addr(&localaddr, addr_len, num); - if (result) - goto create_delsock; - ++num; - } - - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't set socket listening"); - goto create_delsock; - } - - return 0; - -create_delsock: - sock_release(sock); - sctp_con.sock = NULL; -out: - return result; -} - - -static struct writequeue_entry *new_writequeue_entry(gfp_t allocation) -{ - struct writequeue_entry *entry; - - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; - - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; - } - - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - - return entry; -} - -void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) -{ - struct writequeue_entry *e; - int offset = 0; - int users = 0; - struct nodeinfo *ni; - - ni = nodeid2nodeinfo(nodeid, allocation); - if (!ni) - return NULL; - - spin_lock(&ni->writequeue_lock); - e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); - if ((&e->list == &ni->writequeue) || - (PAGE_CACHE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - users = e->users++; - } - spin_unlock(&ni->writequeue_lock); - - if (e) { - got_one: - if (users == 0) - kmap(e->page); - *ppc = page_address(e->page) + offset; - return e; - } - - e = new_writequeue_entry(allocation); - if (e) { - spin_lock(&ni->writequeue_lock); - offset = e->end; - e->end += len; - e->ni = ni; - users = e->users++; - list_add_tail(&e->list, &ni->writequeue); - spin_unlock(&ni->writequeue_lock); - goto got_one; - } - return NULL; -} - -void dlm_lowcomms_commit_buffer(void *arg) -{ - struct writequeue_entry *e = (struct writequeue_entry *) arg; - int users; - struct nodeinfo *ni = e->ni; - - spin_lock(&ni->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - kunmap(e->page); - spin_unlock(&ni->writequeue_lock); - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - - queue_work(send_workqueue, &ni->swork); - } - return; - -out: - spin_unlock(&ni->writequeue_lock); - return; -} - -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - -/* Initiate an SCTP association. In theory we could just use sendmsg() on - the first IP address and it should work, but this allows us to set up the - association before sending any valuable data that we can't afford to lose. - It also keeps the send path clean as it can now always use the association ID */ -static void initiate_association(int nodeid) -{ - struct sockaddr_storage rem_addr; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - int addrlen; - char buf[1]; - struct kvec iov[1]; - struct nodeinfo *ni; - - log_print("Initiating association with node %d", nodeid); - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) { - log_print("no address for nodeid %d", nodeid); - return; - } - - make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); - - outmessage.msg_name = &rem_addr; - outmessage.msg_namelen = addrlen; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - iov[0].iov_base = buf; - iov[0].iov_len = 1; - - /* Real INIT messages seem to cause trouble. Just send a 1 byte message - we can afford to lose */ - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - - outmessage.msg_controllen = cmsg->cmsg_len; - ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1); - if (ret < 0) { - log_print("send INIT to node failed: %d", ret); - /* Try again later */ - clear_bit(NI_INIT_PENDING, &ni->flags); - } -} - -/* Send a message */ -static void send_to_sock(struct nodeinfo *ni) -{ - int ret = 0; - struct writequeue_entry *e; - int len, offset; - struct msghdr outmsg; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - struct kvec iov; - - /* See if we need to init an association before we start - sending precious messages */ - spin_lock(&ni->lock); - if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { - spin_unlock(&ni->lock); - initiate_association(ni->nodeid); - return; - } - spin_unlock(&ni->lock); - - outmsg.msg_name = NULL; /* We use assoc_id */ - outmsg.msg_namelen = 0; - outmsg.msg_control = outcmsg; - outmsg.msg_controllen = sizeof(outcmsg); - outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmsg); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - sinfo->sinfo_assoc_id = ni->assoc_id; - outmsg.msg_controllen = cmsg->cmsg_len; - - spin_lock(&ni->writequeue_lock); - for (;;) { - if (list_empty(&ni->writequeue)) - break; - e = list_entry(ni->writequeue.next, struct writequeue_entry, - list); - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&ni->writequeue_lock); - kmap(e->page); - - ret = 0; - |