diff options
Diffstat (limited to 'net/tipc')
-rw-r--r-- | net/tipc/addr.h | 2 | ||||
-rw-r--r-- | net/tipc/bcast.c | 49 | ||||
-rw-r--r-- | net/tipc/bcast.h | 4 | ||||
-rw-r--r-- | net/tipc/bearer.c | 437 | ||||
-rw-r--r-- | net/tipc/bearer.h | 67 | ||||
-rw-r--r-- | net/tipc/config.c | 114 | ||||
-rw-r--r-- | net/tipc/config.h | 5 | ||||
-rw-r--r-- | net/tipc/core.c | 112 | ||||
-rw-r--r-- | net/tipc/core.h | 5 | ||||
-rw-r--r-- | net/tipc/discover.c | 51 | ||||
-rw-r--r-- | net/tipc/discover.h | 5 | ||||
-rw-r--r-- | net/tipc/eth_media.c | 326 | ||||
-rw-r--r-- | net/tipc/handler.c | 1 | ||||
-rw-r--r-- | net/tipc/ib_media.c | 319 | ||||
-rw-r--r-- | net/tipc/link.c | 827 | ||||
-rw-r--r-- | net/tipc/link.h | 78 | ||||
-rw-r--r-- | net/tipc/name_distr.c | 22 | ||||
-rw-r--r-- | net/tipc/name_distr.h | 2 | ||||
-rw-r--r-- | net/tipc/name_table.c | 43 | ||||
-rw-r--r-- | net/tipc/net.c | 22 | ||||
-rw-r--r-- | net/tipc/netlink.c | 8 | ||||
-rw-r--r-- | net/tipc/node.c | 132 | ||||
-rw-r--r-- | net/tipc/node.h | 9 | ||||
-rw-r--r-- | net/tipc/port.c | 310 | ||||
-rw-r--r-- | net/tipc/port.h | 130 | ||||
-rw-r--r-- | net/tipc/ref.c | 30 | ||||
-rw-r--r-- | net/tipc/ref.h | 1 | ||||
-rw-r--r-- | net/tipc/server.c | 21 | ||||
-rw-r--r-- | net/tipc/server.h | 2 | ||||
-rw-r--r-- | net/tipc/socket.c | 739 | ||||
-rw-r--r-- | net/tipc/socket.h | 72 | ||||
-rw-r--r-- | net/tipc/subscr.c | 50 |
32 files changed, 1715 insertions, 2280 deletions
diff --git a/net/tipc/addr.h b/net/tipc/addr.h index 60b00ab93d7..a74acf9ee80 100644 --- a/net/tipc/addr.h +++ b/net/tipc/addr.h @@ -37,6 +37,8 @@ #ifndef _TIPC_ADDR_H #define _TIPC_ADDR_H +#include "core.h" + #define TIPC_ZONE_MASK 0xff000000u #define TIPC_CLUSTER_MASK 0xfffff000u diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 0d4402587fd..95ab5ef9292 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -41,9 +41,9 @@ #include "bcast.h" #include "name_distr.h" -#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ - -#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ +#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ +#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ +#define BCBEARER MAX_BEARERS /** * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link @@ -356,9 +356,9 @@ static void bclink_peek_nack(struct tipc_msg *msg) } /* - * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster + * tipc_bclink_xmit - broadcast a packet to all nodes in cluster */ -int tipc_bclink_send_msg(struct sk_buff *buf) +int tipc_bclink_xmit(struct sk_buff *buf) { int res; @@ -370,7 +370,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf) goto exit; } - res = tipc_link_send_buf(bcl, buf); + res = __tipc_link_xmit(bcl, buf); if (likely(res >= 0)) { bclink_set_last_sent(); bcl->stats.queue_sz_counts++; @@ -399,19 +399,18 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) */ if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { - tipc_link_send_proto_msg( - node->active_links[node->addr & 1], - STATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(node->active_links[node->addr & 1], + STATE_MSG, 0, 0, 0, 0, 0); bcl->stats.sent_acks++; } } /** - * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards + * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards * * tipc_net_lock is read_locked, no other locks set */ -void tipc_bclink_recv_pkt(struct sk_buff *buf) +void tipc_bclink_rcv(struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); struct tipc_node *node; @@ -468,7 +467,7 @@ receive: spin_unlock_bh(&bc_lock); tipc_node_unlock(node); if (likely(msg_mcast(msg))) - tipc_port_recv_mcast(buf, NULL); + tipc_port_mcast_rcv(buf, NULL); else kfree_skb(buf); } else if (msg_user(msg) == MSG_BUNDLER) { @@ -478,12 +477,12 @@ receive: bcl->stats.recv_bundled += msg_msgcnt(msg); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); - tipc_link_recv_bundle(buf); + tipc_link_bundle_rcv(buf); } else if (msg_user(msg) == MSG_FRAGMENTER) { int ret; - ret = tipc_link_recv_fragment(&node->bclink.reasm_head, - &node->bclink.reasm_tail, - &buf); + ret = tipc_link_frag_rcv(&node->bclink.reasm_head, + &node->bclink.reasm_tail, + &buf); if (ret == LINK_REASM_ERROR) goto unlock; spin_lock_bh(&bc_lock); @@ -503,7 +502,7 @@ receive: bclink_accept_pkt(node, seqno); spin_unlock_bh(&bc_lock); tipc_node_unlock(node); - tipc_named_recv(buf); + tipc_named_rcv(buf); } else { spin_lock_bh(&bc_lock); bclink_accept_pkt(node, seqno); @@ -621,12 +620,6 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, if (!p) break; /* No more bearers to try */ - if (tipc_bearer_blocked(p)) { - if (!s || tipc_bearer_blocked(s)) - continue; /* Can't use either bearer */ - b = s; - } - tipc_nmap_diff(&bcbearer->remains, &b->nodes, &bcbearer->remains_new); if (bcbearer->remains_new.count == bcbearer->remains.count) @@ -675,9 +668,8 @@ void tipc_bcbearer_sort(void) memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); for (b_index = 0; b_index < MAX_BEARERS; b_index++) { - struct tipc_bearer *b = &tipc_bearers[b_index]; - - if (!b->active || !b->nodes.count) + struct tipc_bearer *b = bearer_list[b_index]; + if (!b || !b->nodes.count) continue; if (!bp_temp[b->priority].primary) @@ -791,8 +783,8 @@ void tipc_bclink_init(void) bcl->owner = &bclink->node; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); - spin_lock_init(&bcbearer->bearer.lock); bcl->b_ptr = &bcbearer->bearer; + bearer_list[BCBEARER] = &bcbearer->bearer; bcl->state = WORKING_WORKING; strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); } @@ -800,9 +792,10 @@ void tipc_bclink_init(void) void tipc_bclink_stop(void) { spin_lock_bh(&bc_lock); - tipc_link_stop(bcl); + tipc_link_purge_queues(bcl); spin_unlock_bh(&bc_lock); + bearer_list[BCBEARER] = NULL; memset(bclink, 0, sizeof(*bclink)); memset(bcbearer, 0, sizeof(*bcbearer)); } diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 6ee587b469f..a80ef54b818 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -90,8 +90,8 @@ void tipc_bclink_add_node(u32 addr); void tipc_bclink_remove_node(u32 addr); struct tipc_node *tipc_bclink_retransmit_to(void); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -int tipc_bclink_send_msg(struct sk_buff *buf); -void tipc_bclink_recv_pkt(struct sk_buff *buf); +int tipc_bclink_xmit(struct sk_buff *buf); +void tipc_bclink_rcv(struct sk_buff *buf); u32 tipc_bclink_get_last_sent(void); u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr); void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 3f9707a16d0..3fef7eb776d 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -1,8 +1,8 @@ /* * net/tipc/bearer.c: TIPC bearer code * - * Copyright (c) 1996-2006, Ericsson AB - * Copyright (c) 2004-2006, 2010-2011, Wind River Systems + * Copyright (c) 1996-2006, 2013, Ericsson AB + * Copyright (c) 2004-2006, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,12 +41,17 @@ #define MAX_ADDR_STR 60 -static struct tipc_media *media_list[MAX_MEDIA]; -static u32 media_count; +static struct tipc_media * const media_info_array[] = { + ð_media_info, +#ifdef CONFIG_TIPC_MEDIA_IB + &ib_media_info, +#endif + NULL +}; -struct tipc_bearer tipc_bearers[MAX_BEARERS]; +struct tipc_bearer *bearer_list[MAX_BEARERS + 1]; -static void bearer_disable(struct tipc_bearer *b_ptr); +static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down); /** * tipc_media_find - locates specified media object by name @@ -55,11 +60,11 @@ struct tipc_media *tipc_media_find(const char *name) { u32 i; - for (i = 0; i < media_count; i++) { - if (!strcmp(media_list[i]->name, name)) - return media_list[i]; + for (i = 0; media_info_array[i] != NULL; i++) { + if (!strcmp(media_info_array[i]->name, name)) + break; } - return NULL; + return media_info_array[i]; } /** @@ -69,44 +74,11 @@ static struct tipc_media *media_find_id(u8 type) { u32 i; - for (i = 0; i < media_count; i++) { - if (media_list[i]->type_id == type) - return media_list[i]; + for (i = 0; media_info_array[i] != NULL; i++) { + if (media_info_array[i]->type_id == type) + break; } - return NULL; -} - -/** - * tipc_register_media - register a media type - * - * Bearers for this media type must be activated separately at a later stage. - */ -int tipc_register_media(struct tipc_media *m_ptr) -{ - int res = -EINVAL; - - write_lock_bh(&tipc_net_lock); - - if ((strlen(m_ptr->name) + 1) > TIPC_MAX_MEDIA_NAME) - goto exit; - if (m_ptr->priority > TIPC_MAX_LINK_PRI) - goto exit; - if ((m_ptr->tolerance < TIPC_MIN_LINK_TOL) || - (m_ptr->tolerance > TIPC_MAX_LINK_TOL)) - goto exit; - if (media_count >= MAX_MEDIA) - goto exit; - if (tipc_media_find(m_ptr->name) || media_find_id(m_ptr->type_id)) - goto exit; - - media_list[media_count] = m_ptr; - media_count++; - res = 0; -exit: - write_unlock_bh(&tipc_net_lock); - if (res) - pr_warn("Media <%s> registration error\n", m_ptr->name); - return res; + return media_info_array[i]; } /** @@ -144,13 +116,11 @@ struct sk_buff *tipc_media_get_names(void) if (!buf) return NULL; - read_lock_bh(&tipc_net_lock); - for (i = 0; i < media_count; i++) { + for (i = 0; media_info_array[i] != NULL; i++) { tipc_cfg_append_tlv(buf, TIPC_TLV_MEDIA_NAME, - media_list[i]->name, - strlen(media_list[i]->name) + 1); + media_info_array[i]->name, + strlen(media_info_array[i]->name) + 1); } - read_unlock_bh(&tipc_net_lock); return buf; } @@ -207,27 +177,9 @@ struct tipc_bearer *tipc_bearer_find(const char *name) struct tipc_bearer *b_ptr; u32 i; - for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) { - if (b_ptr->active && (!strcmp(b_ptr->name, name))) - return b_ptr; - } - return NULL; -} - -/** - * tipc_bearer_find_interface - locates bearer object with matching interface name - */ -struct tipc_bearer *tipc_bearer_find_interface(const char *if_name) -{ - struct tipc_bearer *b_ptr; - char *b_if_name; - u32 i; - - for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) { - if (!b_ptr->active) - continue; - b_if_name = strchr(b_ptr->name, ':') + 1; - if (!strcmp(b_if_name, if_name)) + for (i = 0; i < MAX_BEARERS; i++) { + b_ptr = bearer_list[i]; + if (b_ptr && (!strcmp(b_ptr->name, name))) return b_ptr; } return NULL; @@ -239,7 +191,7 @@ struct tipc_bearer *tipc_bearer_find_interface(const char *if_name) struct sk_buff *tipc_bearer_get_names(void) { struct sk_buff *buf; - struct tipc_bearer *b_ptr; + struct tipc_bearer *b; int i, j; buf = tipc_cfg_reply_alloc(MAX_BEARERS * TLV_SPACE(TIPC_MAX_BEARER_NAME)); @@ -247,13 +199,15 @@ struct sk_buff *tipc_bearer_get_names(void) return NULL; read_lock_bh(&tipc_net_lock); - for (i = 0; i < media_count; i++) { + for (i = 0; media_info_array[i] != NULL; i++) { for (j = 0; j < MAX_BEARERS; j++) { - b_ptr = &tipc_bearers[j]; - if (b_ptr->active && (b_ptr->media == media_list[i])) { + b = bearer_list[j]; + if (!b) + continue; + if (b->media == media_info_array[i]) { tipc_cfg_append_tlv(buf, TIPC_TLV_BEARER_NAME, - b_ptr->name, - strlen(b_ptr->name) + 1); + b->name, + strlen(b->name) + 1); } } } @@ -275,31 +229,6 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest) tipc_disc_remove_dest(b_ptr->link_req); } -/* - * Interrupt enabling new requests after bearer blocking: - * See bearer_send(). - */ -void tipc_continue(struct tipc_bearer *b) -{ - spin_lock_bh(&b->lock); - b->blocked = 0; - spin_unlock_bh(&b->lock); -} - -/* - * tipc_bearer_blocked - determines if bearer is currently blocked - */ -int tipc_bearer_blocked(struct tipc_bearer *b) -{ - int res; - - spin_lock_bh(&b->lock); - res = b->blocked; - spin_unlock_bh(&b->lock); - - return res; -} - /** * tipc_enable_bearer - enable bearer with the given name */ @@ -358,16 +287,17 @@ restart: bearer_id = MAX_BEARERS; with_this_prio = 1; for (i = MAX_BEARERS; i-- != 0; ) { - if (!tipc_bearers[i].active) { + b_ptr = bearer_list[i]; + if (!b_ptr) { bearer_id = i; continue; } - if (!strcmp(name, tipc_bearers[i].name)) { + if (!strcmp(name, b_ptr->name)) { pr_warn("Bearer <%s> rejected, already enabled\n", name); goto exit; } - if ((tipc_bearers[i].priority == priority) && + if ((b_ptr->priority == priority) && (++with_this_prio > 2)) { if (priority-- == 0) { pr_warn("Bearer <%s> rejected, duplicate priority\n", @@ -385,8 +315,13 @@ restart: goto exit; } - b_ptr = &tipc_bearers[bearer_id]; + b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC); + if (!b_ptr) { + res = -ENOMEM; + goto exit; + } strcpy(b_ptr->name, name); + b_ptr->media = m_ptr; res = m_ptr->enable_media(b_ptr); if (res) { pr_warn("Bearer <%s> rejected, enable failure (%d)\n", @@ -395,22 +330,22 @@ restart: } b_ptr->identity = bearer_id; - b_ptr->media = m_ptr; b_ptr->tolerance = m_ptr->tolerance; b_ptr->window = m_ptr->window; + b_ptr->domain = disc_domain; b_ptr->net_plane = bearer_id + 'A'; - b_ptr->active = 1; b_ptr->priority = priority; - INIT_LIST_HEAD(&b_ptr->links); - spin_lock_init(&b_ptr->lock); - res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr, disc_domain); + res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr); if (res) { - bearer_disable(b_ptr); + bearer_disable(b_ptr, false); pr_warn("Bearer <%s> rejected, discovery object creation failed\n", name); goto exit; } + + bearer_list[bearer_id] = b_ptr; + pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n", name, tipc_addr_string_fill(addr_string, disc_domain), priority); @@ -420,25 +355,15 @@ exit: } /** - * tipc_block_bearer - Block the bearer, and reset all its links + * tipc_reset_bearer - Reset all links established over this bearer */ -int tipc_block_bearer(struct tipc_bearer *b_ptr) +static int tipc_reset_bearer(struct tipc_bearer *b_ptr) { - struct tipc_link *l_ptr; - struct tipc_link *temp_l_ptr; - read_lock_bh(&tipc_net_lock); - pr_info("Blocking bearer <%s>\n", b_ptr->name); - spin_lock_bh(&b_ptr->lock); - b_ptr->blocked = 1; - list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { - struct tipc_node *n_ptr = l_ptr->owner; - - spin_lock_bh(&n_ptr->lock); - tipc_link_reset(l_ptr); - spin_unlock_bh(&n_ptr->lock); - } - spin_unlock_bh(&b_ptr->lock); + pr_info("Resetting bearer <%s>\n", b_ptr->name); + tipc_disc_delete(b_ptr->link_req); + tipc_link_reset_list(b_ptr->identity); + tipc_disc_create(b_ptr, &b_ptr->bcast_addr); read_unlock_bh(&tipc_net_lock); return 0; } @@ -448,27 +373,24 @@ int tipc_block_bearer(struct tipc_bearer *b_ptr) * * Note: This routine assumes caller holds tipc_net_lock. */ -static void bearer_disable(struct tipc_bearer *b_ptr) +static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down) { - struct tipc_link *l_ptr; - struct tipc_link *temp_l_ptr; - struct tipc_link_req *temp_req; + u32 i; pr_info("Disabling bearer <%s>\n", b_ptr->name); - spin_lock_bh(&b_ptr->lock); - b_ptr->blocked = 1; b_ptr->media->disable_media(b_ptr); - list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { - tipc_link_delete(l_ptr); - } - temp_req = b_ptr->link_req; - b_ptr->link_req = NULL; - spin_unlock_bh(&b_ptr->lock); - if (temp_req) - tipc_disc_delete(temp_req); + tipc_link_delete_list(b_ptr->identity, shutting_down); + if (b_ptr->link_req) + tipc_disc_delete(b_ptr->link_req); - memset(b_ptr, 0, sizeof(struct tipc_bearer)); + for (i = 0; i < MAX_BEARERS; i++) { + if (b_ptr == bearer_list[i]) { + bearer_list[i] = NULL; + break; + } + } + kfree(b_ptr); } int tipc_disable_bearer(const char *name) @@ -482,7 +404,7 @@ int tipc_disable_bearer(const char *name) pr_warn("Attempt to disable unknown bearer <%s>\n", name); res = -EINVAL; } else { - bearer_disable(b_ptr); + bearer_disable(b_ptr, false); res = 0; } write_unlock_bh(&tipc_net_lock); @@ -490,14 +412,231 @@ int tipc_disable_bearer(const char *name) } +/* tipc_l2_media_addr_set - initialize Ethernet media address structure + * + * Media-dependent "value" field stores MAC address in first 6 bytes + * and zeroes out the remaining bytes. + */ +void tipc_l2_media_addr_set(const struct tipc_bearer *b, + struct tipc_media_addr *a, char *mac) +{ + int len = b->media->hwaddr_len; + + if (unlikely(sizeof(a->value) < len)) { + WARN_ONCE(1, "Media length invalid\n"); + return; + } + + memcpy(a->value, mac, len); + memset(a->value + len, 0, sizeof(a->value) - len); + a->media_id = b->media->type_id; + a->broadcast = !memcmp(mac, b->bcast_addr.value, len); +} + +int tipc_enable_l2_media(struct tipc_bearer *b) +{ + struct net_device *dev; + char *driver_name = strchr((const char *)b->name, ':') + 1; + + /* Find device with specified name */ + dev = dev_get_by_name(&init_net, driver_name); + if (!dev) + return -ENODEV; + + /* Associate TIPC bearer with Ethernet bearer */ + b->media_ptr = dev; + memset(b->bcast_addr.value, 0, sizeof(b->bcast_addr.value)); + memcpy(b->bcast_addr.value, dev->broadcast, b->media->hwaddr_len); + b->bcast_addr.media_id = b->media->type_id; + b->bcast_addr.broadcast = 1; + b->mtu = dev->mtu; + tipc_l2_media_addr_set(b, &b->addr, (char *)dev->dev_addr); + rcu_assign_pointer(dev->tipc_ptr, b); + return 0; +} + +/* tipc_disable_l2_media - detach TIPC bearer from an Ethernet interface + * + * Mark Ethernet bearer as inactive so that incoming buffers are thrown away, + * then get worker thread to complete bearer cleanup. (Can't do cleanup + * here because cleanup code needs to sleep and caller holds spinlocks.) + */ +void tipc_disable_l2_media(struct tipc_bearer *b) +{ + struct net_device *dev = (struct net_device *)b->media_ptr; + RCU_INIT_POINTER(dev->tipc_ptr, NULL); + dev_put(dev); +} + +/** + * tipc_l2_send_msg - send a TIPC packet out over an Ethernet interface + * @buf: the packet to be sent + * @b_ptr: the bearer through which the packet is to be sent + * @dest: peer destination address + */ +int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, + struct tipc_media_addr *dest) +{ + struct sk_buff *clone; + int delta; + struct net_device *dev = (struct net_device *)b->media_ptr; + + clone = skb_clone(buf, GFP_ATOMIC); + if (!clone) + return 0; + + delta = dev->hard_header_len - skb_headroom(buf); + if ((delta > 0) && + pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { + kfree_skb(clone); + return 0; + } + + skb_reset_network_header(clone); + clone->dev = dev; + clone->protocol = htons(ETH_P_TIPC); + dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, + dev->dev_addr, clone->len); + dev_queue_xmit(clone); + return 0; +} + +/* tipc_bearer_send- sends buffer to destination over bearer + * + * IMPORTANT: + * The media send routine must not alter the buffer being passed in + * as it may be needed for later retransmission! + */ +void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, + struct tipc_media_addr *dest) +{ + b->media->send_msg(buf, b, dest); +} + +/** + * tipc_l2_rcv_msg - handle incoming TIPC message from an interface + * @buf: the received packet + * @dev: the net device that the packet was received on + * @pt: the packet_type structure which was used to register this handler + * @orig_dev: the original receive net device in case the device is a bond + * + * Accept only packets explicitly sent to this node, or broadcast packets; + * ignores packets sent using interface multicast, and traffic sent to other + * nodes (which can happen if interface is running in promiscuous mode). + */ +static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev, + struct packet_type *pt, struct net_device *orig_dev) +{ + struct tipc_bearer *b_ptr; + + if (!net_eq(dev_net(dev), &init_net)) { + kfree_skb(buf); + return NET_RX_DROP; + } + + rcu_read_lock(); + b_ptr = rcu_dereference(dev->tipc_ptr); + if (likely(b_ptr)) { + if (likely(buf->pkt_type <= PACKET_BROADCAST)) { + buf->next = NULL; + tipc_rcv(buf, b_ptr); + rcu_read_unlock(); + return NET_RX_SUCCESS; + } + } + rcu_read_unlock(); + + kfree_skb(buf); + return NET_RX_DROP; +} + +/** + * tipc_l2_device_event - handle device events from network device + * @nb: the context of the notification + * @evt: the type of event + * @ptr: the net device that the event was on + * + * This function is called by the Ethernet driver in case of link + * change event. + */ +static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt, + void *ptr) +{ + struct tipc_bearer *b_ptr; + struct net_device *dev = netdev_notifier_info_to_dev(ptr); + + if (!net_eq(dev_net(dev), &init_net)) + return NOTIFY_DONE; + + rcu_read_lock(); + b_ptr = rcu_dereference(dev->tipc_ptr); + if (!b_ptr) { + rcu_read_unlock(); + return NOTIFY_DONE; + } + + b_ptr->mtu = dev->mtu; + + switch (evt) { + case NETDEV_CHANGE: + if (netif_carrier_ok(dev)) + break; + case NETDEV_DOWN: + case NETDEV_CHANGEMTU: + tipc_reset_bearer(b_ptr); + break; + case NETDEV_CHANGEADDR: + tipc_l2_media_addr_set(b_ptr, &b_ptr->addr, + (char *)dev->dev_addr); + tipc_reset_bearer(b_ptr); + break; + case NETDEV_UNREGISTER: + case NETDEV_CHANGENAME: + tipc_disable_bearer(b_ptr->name); + break; + } + rcu_read_unlock(); + + return NOTIFY_OK; +} + +static struct packet_type tipc_packet_type __read_mostly = { + .type = htons(ETH_P_TIPC), + .func = tipc_l2_rcv_msg, +}; + +static struct notifier_block notifier = { + .notifier_call = tipc_l2_device_event, + .priority = 0, +}; + +int tipc_bearer_setup(void) +{ + int err; + + err = register_netdevice_notifier(¬ifier); + if (err) + return err; + dev_add_pack(&tipc_packet_type); + return 0; +} + +void tipc_bearer_cleanup(void) +{ + unregister_netdevice_notifier(¬ifier); + dev_remove_pack(&tipc_packet_type); +} void tipc_bearer_stop(void) { + struct tipc_bearer *b_ptr; u32 i; for (i = 0; i < MAX_BEARERS; i++) { - if (tipc_bearers[i].active) - bearer_disable(&tipc_bearers[i]); + b_ptr = bearer_list[i]; + if (b_ptr) { + bearer_disable(b_ptr, true); + bearer_list[i] = NULL; + } } - media_count = 0; } diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index e5e04be6fff..ba48145e871 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -1,7 +1,7 @@ /* * net/tipc/bearer.h: Include file for TIPC bearer code * - * Copyright (c) 1996-2006, Ericsson AB + * Copyright (c) 1996-2006, 2013, Ericsson AB * Copyright (c) 2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -73,18 +73,18 @@ struct tipc_media_addr { struct tipc_bearer; /** - * struct tipc_media - TIPC media information available to internal users + * struct tipc_media - Media specific info exposed to generic bearer layer * @send_msg: routine which handles buffer transmission * @enable_media: routine which enables a media * @disable_media: routine which disables a media * @addr2str: routine which converts media address to string * @addr2msg: routine which converts media address to protocol message area * @msg2addr: routine which converts media address from protocol message area - * @bcast_addr: media address used in broadcasting * @priority: default link (and bearer) priority * @tolerance: default time (in ms) before declaring link failure * @window: default window (in packets) before declaring link congestion * @type_id: TIPC media identifier + * @hwaddr_len: TIPC media address len * @name: media name */ struct tipc_media { @@ -101,25 +101,24 @@ struct tipc_media { u32 tolerance; u32 window; u32 type_id; + u32 hwaddr_len; char name[TIPC_MAX_MEDIA_NAME]; }; /** - * struct tipc_bearer - TIPC bearer structure - * @usr_handle: pointer to additional media-specific information about bearer + * struct tipc_bearer - Generic TIPC bearer structure + * @media_ptr: pointer to additional media-specific information about bearer * @mtu: max packet size bearer can support - * @blocked: non-zero if bearer is blocked - * @lock: spinlock for controlling access to bearer * @addr: media-specific address associated with bearer * @name: bearer name (format = media:interface) * @media: ptr to media structure associated with bearer + * @bcast_addr: media address used in broadcasting * @priority: default link priority for bearer * @window: default window size for bearer * @tolerance: default link tolerance for bearer + * @domain: network domain to which links can be established * @identity: array index of this bearer within TIPC bearer array * @link_req: ptr to (optional) structure making periodic link setup requests - * @links: list of non-congested links associated with bearer - * @active: non-zero if bearer structure is represents a bearer * @net_plane: network plane ('A' through 'H') currently associated with bearer * @nodes: indicates which nodes in cluster can be reached through bearer * @@ -128,21 +127,18 @@ struct tipc_media { * care of initializing all other fields. */ struct tipc_bearer { - void *usr_handle; /* initalized by media */ + void *media_ptr; /* initalized by media */ u32 mtu; /* initalized by media */ - int blocked; /* initalized by media */ struct tipc_media_addr addr; /* initalized by media */ char name[TIPC_MAX_BEARER_NAME]; - spinlock_t lock; struct tipc_media *media; struct tipc_media_addr bcast_addr; u32 priority; u32 window; u32 tolerance; + u32 domain; u32 identity; struct tipc_link_req *link_req; - struct list_head links; - int active; char net_plane; struct tipc_node_map nodes; }; @@ -154,60 +150,45 @@ struct tipc_bearer_names { struct tipc_link; -extern struct tipc_bearer tipc_bearers[]; +extern struct tipc_bearer *bearer_list[]; /* * TIPC routines available to supported media types */ -int tipc_register_media(struct tipc_media *m_ptr); - -void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr); - -int tipc_block_bearer(struct tipc_bearer *b_ptr); -void tipc_continue(struct tipc_bearer *tb_ptr); +void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); int tipc_disable_bearer(const char *name); /* * Routines made available to TIPC by supported media types */ -int tipc_eth_media_start(void); -void tipc_eth_media_stop(void); +extern struct tipc_media eth_media_info; #ifdef CONFIG_TIPC_MEDIA_IB -int tipc_ib_media_start(void); -void tipc_ib_media_stop(void); -#else -static inline int tipc_ib_media_start(void) { return 0; } -static inline void tipc_ib_media_stop(void) { return; } +extern struct tipc_media ib_media_info; #endif int tipc_media_set_priority(const char *name, u32 new_value); int tipc_media_set_window(const char *name, u32 new_value); void tipc_media_addr_printf(char *buf, int len, struct tipc_media_addr *a); struct sk_buff *tipc_media_get_names(void); +void tipc_l2_media_addr_set(const struct tipc_bearer *b, + struct tipc_media_addr *a, char *mac); +int tipc_enable_l2_media(struct tipc_bearer *b); +void tipc_disable_l2_media(struct tipc_bearer *b); +int tipc_l2_send_msg(struct sk_buff *buf, struct tipc_bearer *b, + struct tipc_media_addr *dest); struct sk_buff *tipc_bearer_get_names(void); void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); struct tipc_bearer *tipc_bearer_find(const char *name); -struct tipc_bearer *tipc_bearer_find_interface(const char *if_name); struct tipc_media *tipc_media_find(const char *name); -int tipc_bearer_blocked(struct tipc_bearer *b_ptr); +int tipc_bearer_setup(void); +void tipc_bearer_cleanup(void); void tipc_bearer_stop(void); - -/** - * tipc_bearer_send- sends buffer to destination over bearer - * - * IMPORTANT: - * The media send routine must not alter the buffer being passed in - * as it may be needed for later retransmission! - */ -static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, - struct tipc_media_addr *dest) -{ - b->media->send_msg(buf, b, dest); -} +void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf, + struct tipc_media_addr *dest); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/config.c b/net/tipc/config.c index c301a9a592d..4b981c05382 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -43,13 +43,11 @@ #define REPLY_TRUNCATED "<truncated>\n" static DEFINE_MUTEX(config_mutex); -static struct tipc_server cfgsrv; static const void *req_tlv_area; /* request message TLV area */ static int req_tlv_space; /* request message TLV area size */ static int rep_headroom; /* reply message headroom to use */ - struct sk_buff *tipc_cfg_reply_alloc(int payload_size) { struct sk_buff *buf; @@ -181,19 +179,7 @@ static struct sk_buff *cfg_set_own_addr(void) if (tipc_own_addr) return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot change node address once assigned)"); - tipc_core_start_net(addr); - return tipc_cfg_reply_none(); -} - -static struct sk_buff *cfg_set_remote_mng(void) -{ - u32 value; - - if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED)) - return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); - - value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area)); - tipc_remote_management = (value != 0); + tipc_net_start(addr); return tipc_cfg_reply_none(); } @@ -247,21 +233,10 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area /* Check command authorization */ if (likely(in_own_node(orig_node))) { /* command is permitted */ - } else if (cmd >= 0x8000) { + } else { rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (cannot be done remotely)"); goto exit; - } else if (!tipc_remote_management) { - rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NO_REMOTE); - goto exit; - } else if (cmd >= 0x4000) { - u32 domain = 0; - - if ((tipc_nametbl_translate(TIPC_ZM_SRV, 0, &domain) == 0) || - (domain != orig_node)) { - rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_ZONE_MSTR); - goto exit; - } } /* Call appropriate processing routine */ @@ -310,18 +285,12 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area case TIPC_CMD_SET_NODE_ADDR: rep_tlv_buf = cfg_set_own_addr(); break; - case TIPC_CMD_SET_REMOTE_MNG: - rep_tlv_buf = cfg_set_remote_mng(); - break; case TIPC_CMD_SET_MAX_PORTS: rep_tlv_buf = cfg_set_max_ports(); break; case TIPC_CMD_SET_NETID: rep_tlv_buf = cfg_set_netid(); break; - case TIPC_CMD_GET_REMOTE_MNG: - rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_remote_management); - break; case TIPC_CMD_GET_MAX_PORTS: rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports); break; @@ -345,6 +314,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area case TIPC_CMD_SET_MAX_PUBL: case TIPC_CMD_GET_MAX_PUBL: case TIPC_CMD_SET_LOG_SIZE: + case TIPC_CMD_SET_REMOTE_MNG: + case TIPC_CMD_GET_REMOTE_MNG: case TIPC_CMD_DUMP_LOG: rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (obsolete command)"); @@ -369,80 +340,3 @@ exit: mutex_unlock(&config_mutex); return rep_tlv_buf; } - -static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr, - void *usr_data, void *buf, size_t len) -{ - struct tipc_cfg_msg_hdr *req_hdr; - struct tipc_cfg_msg_hdr *rep_hdr; - struct sk_buff *rep_buf; - int ret; - - /* Validate configuration message header (ignore invalid message) */ - req_hdr = (struct tipc_cfg_msg_hdr *)buf; - if ((len < sizeof(*req_hdr)) || - (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || - (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) { - pr_warn("Invalid configuration message discarded\n"); - return; - } - - /* Generate reply for request (if can't, return request) */ - rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type), - buf + sizeof(*req_hdr), - len - sizeof(*req_hdr), - BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr)); - if (rep_buf) { - skb_push(rep_buf, sizeof(*rep_hdr)); - rep_hdr = (struct tipc_cfg_msg_hdr *)rep_buf->data; - memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr)); - rep_hdr->tcm_len = htonl(rep_buf->len); - rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST); - - ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data, - rep_buf->len); - if (ret < 0) - pr_err("Sending cfg reply message failed, no memory\n"); - - kfree_skb(rep_buf); - } -} - -static struct sockaddr_tipc cfgsrv_addr __read_mostly = { - .family = AF_TIPC, - .addrtype = TIPC_ADDR_NAMESEQ, - .addr.nameseq.type = TIPC_CFG_SRV, - .addr.nameseq.lower = 0, - .addr.nameseq.upper = 0, - .scope = TIPC_ZONE_SCOPE -}; - -static struct tipc_server cfgsrv __read_mostly = { - .saddr = &cfgsrv_addr, - .imp = TIPC_CRITICAL_IMPORTANCE, - .type = SOCK_RDM, - .max_rcvbuf_size = 64 * 1024, - .name = "cfg_server", - .tipc_conn_recvmsg = cfg_conn_msg_event, - .tipc_conn_new = NULL, - .tipc_conn_shutdown = NULL -}; - -int tipc_cfg_init(void) -{ - return tipc_server_start(&cfgsrv); -} - -void tipc_cfg_reinit(void) -{ - tipc_server_stop(&cfgsrv); - - cfgsrv_addr.addr.nameseq.lower = tipc_own_addr; - cfgsrv_addr.addr.nameseq.upper = tipc_own_addr; - tipc_server_start(&cfgsrv); -} - -void tipc_cfg_stop(void) -{ - tipc_server_stop(&cfgsrv); -} diff --git a/net/tipc/config.h b/net/tipc/config.h index 1f252f3fa05..47b1bf18161 100644 --- a/net/tipc/config.h +++ b/net/tipc/config.h @@ -64,9 +64,4 @@ static inline struct sk_buff *tipc_cfg_reply_ultra_string(char *string) struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *req_tlv_area, int req_tlv_space, int headroom); - -int tipc_cfg_init(void); -void tipc_cfg_reinit(void); -void tipc_cfg_stop(void); - #endif diff --git a/net/tipc/core.c b/net/tipc/core.c index c6d3f75a9e1..50d57429ebc 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -1,7 +1,7 @@ /* * net/tipc/core.c: TIPC module code * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2013, Ericsson AB * Copyright (c) 2005-2006, 2010-2013, Wind River Systems * All rights reserved. * @@ -50,7 +50,6 @@ int tipc_random __read_mostly; u32 tipc_own_addr __read_mostly; int tipc_max_ports __read_mostly; int tipc_net_id __read_mostly; -int tipc_remote_management __read_mostly; int sysctl_tipc_rmem[3] __read_mostly; /* min/default/max */ /** @@ -77,43 +76,14 @@ struct sk_buff *tipc_buf_acquire(u32 size) } /** - * tipc_core_stop_net - shut down TIPC networking sub-systems - */ -static void tipc_core_stop_net(void) -{ - tipc_net_stop(); - tipc_eth_media_stop(); - tipc_ib_media_stop(); -} - -/** - * start_net - start TIPC networking sub-systems - */ -int tipc_core_start_net(unsigned long addr) -{ - int res; - - tipc_net_start(addr); - res = tipc_eth_media_start(); - if (res < 0) - goto err; - res = tipc_ib_media_start(); - if (res < 0) - goto err; - return res; - -err: - tipc_core_stop_net(); - return res; -} - -/** * tipc_core_stop - switch TIPC from SINGLE NODE to NOT RUNNING mode */ static void tipc_core_stop(void) { + tipc_handler_stop(); + tipc_net_stop(); + tipc_bearer_cleanup(); tipc_netlink_stop(); - tipc_cfg_stop(); tipc_subscr_stop(); tipc_nametbl_stop(); tipc_ref_table_stop(); @@ -126,30 +96,59 @@ static void tipc_core_stop(void) */ static int tipc_core_start(void) { - int res; + int err; get_random_bytes(&tipc_random, sizeof(tipc_random)); - res = tipc_handler_start(); - if (!res) - res = tipc_ref_table_init(tipc_max_ports, tipc_random); - if (!res) - res = tipc_nametbl_init(); - if (!res) - res = tipc_netlink_start(); - if (!res) - res = tipc_socket_init(); - if (!res) - res = tipc_register_sysctl(); - if (!res) - res = tipc_subscr_start(); - if (!res) - res = tipc_cfg_init(); - if (res) { - tipc_handler_stop(); - tipc_core_stop(); - } - return res; + err = tipc_handler_start(); + if (err) + goto out_handler; + + err = tipc_ref_table_init(tipc_max_ports, tipc_random); + if (err) + goto out_reftbl; + + err = tipc_nametbl_init(); + if (err) + goto out_nametbl; + + err = tipc_netlink_start(); + if (err) + goto out_netlink; + + err = tipc_socket_init(); + if (err) + goto out_socket; + + err = tipc_register_sysctl(); + if (err) + goto out_sysctl; + + err = tipc_subscr_start(); + if (err) + goto out_subscr; + + err = tipc_bearer_setup(); + if (err) + goto out_bearer; + + return 0; +out_bearer: + tipc_subscr_stop(); +out_subscr: + tipc_unregister_sysctl(); +out_sysctl: + tipc_socket_stop(); +out_socket: + tipc_netlink_stop(); +out_netlink: + tipc_nametbl_stop(); +out_nametbl: + tipc_ref_table_stop(); +out_reftbl: + tipc_handler_stop(); +out_handler: + return err; } static int __init tipc_init(void) @@ -159,7 +158,6 @@ static int __init tipc_init(void) pr_info("Activated (version " TIPC_MOD_VER ")\n"); tipc_own_addr = 0; - tipc_remote_management = 1; tipc_max_ports = CONFIG_TIPC_PORTS; tipc_net_id = 4711; @@ -178,8 +176,6 @@ static int __init tipc_init(void) static void __exit tipc_exit(void) { - tipc_handler_stop(); - tipc_core_stop_net(); tipc_core_stop(); pr_info("Deactivated\n"); } diff --git a/net/tipc/core.h b/net/tipc/core.h index 94895d4e86a..8985bbcb942 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -47,7 +47,7 @@ #include <linux/mm.h> #include <linux/timer.h> #include <linux/string.h> -#include <asm/uaccess.h> +#include <linux/uaccess.h> #include <linux/interrupt.h> #include <linux/atomic.h> #include <asm/hardirq.h> @@ -79,7 +79,6 @@ int tipc_snprintf(char *buf, int len, const char *fmt, ...); extern u32 tipc_own_addr __read_mostly; extern int tipc_max_ports __read_mostly; extern int tipc_net_id __read_mostly; -extern int tipc_remote_management __read_mostly; extern int sysctl_tipc_rmem[3] __read_mostly; /* @@ -90,7 +89,6 @@ extern int tipc_random __read_mostly; /* * Routines available to privileged subsystems */ -int tipc_core_start_net(unsigned long); int tipc_handler_start(void); void tipc_handler_stop(void); int tipc_netlink_start(void); @@ -192,6 +190,7 @@ static inline void k_term_timer(struct timer_list *timer) struct tipc_skb_cb { void *handle; + bool deferred; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index ecc758c6eac..542fe3413dc 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -48,8 +48,8 @@ * struct tipc_link_req - information about an ongoing link setup request * @bearer: bearer issuing requests * @dest: destination address for request messages - * @domain: network domain to which links can be established * @num_nodes: number of nodes currently discovered (i.e. with an active link) + * @lock: spinlock for controlling access to requests * @buf: request message to be (repeatedly) sent * @timer: timer governing period between requests * @timer_intv: current interval between requests (in ms) @@ -57,8 +57,8 @@ struct tipc_link_req { struct tipc_bearer *bearer; struct tipc_media_addr dest; - u32 domain; int num_nodes; + spinlock_t lock; struct sk_buff *buf; struct timer_list timer; unsigned int timer_intv; @@ -67,14 +67,13 @@ struct tipc_link_req { /** * tipc_disc_init_msg - initialize a link setup message * @type: message type (request or response) - * @dest_domain: network domain of node(s) which should respond to message * @b_ptr: ptr to bearer issuing message */ -static struct sk_buff *tipc_disc_init_msg(u32 type, u32 dest_domain, - struct tipc_bearer *b_ptr) +static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr) { struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE); struct tipc_msg *msg; + u32 dest_domain = b_ptr->domain; if (buf) { msg = buf_msg(buf); @@ -108,11 +107,11 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr, } /** - * tipc_disc_recv_msg - handle incoming link setup message (request or response) + * tipc_disc_rcv - handle incoming link setup message (request or response) * @buf: buffer containing message * @b_ptr: bearer that message arrived on */ -void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr) { struct tipc_node *n_ptr; struct tipc_link *link; @@ -147,7 +146,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) } if (!tipc_in_scope(dest, tipc_own_addr)) return; - if (!tipc_in_scope(b_ptr->link_req->domain, orig)) + if (!tipc_in_scope(b_ptr->domain, orig)) return; /* Locate structure corresponding to requesting node */ @@ -239,8 +238,8 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) /* Accept discovery message & send response, if necessary */ link_fully_up = link_working_working(link); - if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) { - rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); + if ((type == DSC_REQ_MSG) && !link_fully_up) { + rbuf = tipc_disc_init_msg(DSC_RESP_MSG, b_ptr); if (rbuf) { tipc_bearer_send(b_ptr, rbuf, &media_addr); kfree_skb(rbuf); @@ -274,7 +273,9 @@ static void disc_update(struct tipc_link_req *req) */ void tipc_disc_add_dest(struct tipc_link_req *req) { + spin_lock_bh(&req->lock); req->num_nodes++; + spin_unlock_bh(&req->lock); } /** @@ -283,18 +284,10 @@ void tipc_disc_add_dest(struct tipc_link_req *req) */ void tipc_disc_remove_dest(struct tipc_link_req *req) { + spin_lock_bh(&req->lock); req->num_nodes--; disc_update(req); -} - -/** - * disc_send_msg - send link setup request message - * @req: ptr to link request structure - */ -static void disc_send_msg(struct tipc_link_req *req) -{ - if (!req->bearer->blocked) - tipc_bearer_send(req->bearer, req->buf, &req->dest); + spin_unlock_bh(&req->lock); } /** @@ -307,10 +300,10 @@ static void disc_timeout(struct tipc_link_req *req) { int max_delay; - spin_lock_bh(&req->bearer->lock); + spin_lock_bh(&req->lock); /* Stop searching if only desired node has been found */ - if (tipc_node(req->domain) && req->num_nodes) { + if (tipc_node(req->bearer->domain) && req->num_nodes) { req->timer_intv = TIPC_LINK_REQ_INACTIVE; goto exit; } @@ -322,7 +315,8 @@ static void disc_timeout(struct tipc_link_req *req) * hold at fast polling rate if don't have any associated nodes, * otherwise hold at slow polling rate */ - disc_send_msg(req); + tipc_bearer_send(req->bearer, req->buf, &req->dest); + req->timer_intv *= 2; if (req->num_nodes) @@ -334,7 +328,7 @@ static void disc_timeout(struct tipc_link_req *req) k_start_timer(&req->timer, req->timer_intv); exit: - spin_unlock_bh(&req->bearer->lock); + spin_unlock_bh(&req->lock); } /** @@ -345,8 +339,7 @@ exit: * * Returns 0 if successful, otherwise -errno. */ -int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, - u32 dest_domain) +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest) { struct tipc_link_req *req; @@ -354,7 +347,7 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, if (!req) return -ENOMEM; - req->buf = tipc_disc_init_msg(DSC_REQ_MSG, dest_domain, b_ptr); + req->buf = tipc_disc_init_msg(DSC_REQ_MSG, b_ptr); if (!req->buf) { kfree(req); return -ENOMSG; @@ -362,13 +355,13 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, memcpy(&req->dest, dest, sizeof(*dest)); req->bearer = b_ptr; - req->domain = dest_domain; req->num_nodes = 0; req->timer_intv = TIPC_LINK_REQ_INIT; + spin_lock_init(&req->lock); k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req); k_start_timer(&req->timer, req->timer_intv); b_ptr->link_req = req; - disc_send_msg(req); + tipc_bearer_send(req->bearer, req->buf, &req->dest); return 0; } diff --git a/net/tipc/discover.h b/net/tipc/discover.h index 75b67c403aa..07f34729459 100644 --- a/net/tipc/discover.h +++ b/net/tipc/discover.h @@ -39,11 +39,10 @@ struct tipc_link_req; -int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, - u32 dest_domain); +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest); void tipc_disc_delete(struct tipc_link_req *req); void tipc_disc_add_dest(struct tipc_link_req *req); void tipc_disc_remove_dest(struct tipc_link_req *req); -void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr); +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr); #endif diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index f80d59f5a16..67cf3f935db 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -1,7 +1,7 @@ /* * net/tipc/eth_media.c: Ethernet bearer support for TIPC * - * Copyright (c) 2001-2007, Ericsson AB + * Copyright (c) 2001-2007, 2013, Ericsson AB * Copyright (c) 2005-2008, 2011-2013, Wind River Systems * All rights reserved. * @@ -37,259 +37,11 @@ #include "core.h" #include "bearer.h" -#define MAX_ETH_MEDIA MAX_BEARERS - #define ETH_ADDR_OFFSET 4 /* message header offset of MAC address */ -/** - * struct eth_media - Ethernet bearer data structure - * @bearer: ptr to associated "generic" bearer structure - * @dev: ptr to associated Ethernet network device - * @tipc_packet_type: used in binding TIPC to Ethernet driver - * @setup: work item used when enabling bearer - * @cleanup: work item used when disabling bearer - */ -struct eth_media { - struct tipc_bearer *bearer; - struct net_device *dev; - struct packet_type tipc_packet_type; - struct work_struct setup; - struct work_struct cleanup; -}; - -static struct tipc_media eth_media_info; -static struct eth_media eth_media_array[MAX_ETH_MEDIA]; -static int eth_started; - -static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *dv); -/* - * Network device notifier info - */ -static struct notifier_block notifier = { - .notifier_call = recv_notification, - .priority = 0 -}; - -/** - * eth_media_addr_set - initialize Ethernet media address structure - * - * Media-dependent "value" field stores MAC address in first 6 bytes - * and zeroes out the remaining bytes. - */ -static void eth_media_addr_set(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *mac) -{ - memcpy(a->value, mac, ETH_ALEN); - memset(a->value + ETH_ALEN, 0, sizeof(a->value) - ETH_ALEN); - a->media_id = TIPC_MEDIA_TYPE_ETH; - a->broadcast = !memcmp(mac, tb_ptr->bcast_addr.value, ETH_ALEN); -} - -/** - * send_msg - send a TIPC message out over an Ethernet interface - */ -static int send_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr, - struct tipc_media_addr *dest) -{ - struct sk_buff *clone; - struct net_device *dev; - int delta; - - clone = skb_clone(buf, GFP_ATOMIC); - if (!clone) - return 0; - - dev = ((struct eth_media *)(tb_ptr->usr_handle))->dev; - delta = dev->hard_header_len - skb_headroom(buf); - - if ((delta > 0) && - pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { - kfree_skb(clone); - return 0; - } - - skb_reset_network_header(clone); - clone->dev = dev; - clone->protocol = htons(ETH_P_TIPC); - dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, - dev->dev_addr, clone->len); - dev_queue_xmit(clone); - return 0; -} - -/** - * recv_msg - handle incoming TIPC message from an Ethernet interface - * - * Accept only packets explicitly sent to this node, or broadcast packets; - * ignores packets sent using Ethernet multicast, and traffic sent to other - * nodes (which can happen if interface is running in promiscuous mode). - */ -static int recv_msg(struct sk_buff *buf, struct net_device *dev, - struct packet_type *pt, struct net_device *orig_dev) -{ - struct eth_media *eb_ptr = (struct eth_media *)pt->af_packet_priv; - - if (!net_eq(dev_net(dev), &init_net)) { - kfree_skb(buf); - return NET_RX_DROP; - } - - if (likely(eb_ptr->bearer)) { - if (likely(buf->pkt_type <= PACKET_BROADCAST)) { - buf->next = NULL; - tipc_recv_msg(buf, eb_ptr->bearer); - return NET_RX_SUCCESS; - } - } - kfree_skb(buf); - return NET_RX_DROP; -} - -/** - * setup_media - setup association between Ethernet bearer and interface - */ -static void setup_media(struct work_struct *work) -{ - struct eth_media *eb_ptr = - container_of(work, struct eth_media, setup); - - dev_add_pack(&eb_ptr->tipc_packet_type); -} - -/** - * enable_media - attach TIPC bearer to an Ethernet interface - */ -static int enable_media(struct tipc_bearer *tb_ptr) -{ - struct net_device *dev; - struct eth_media *eb_ptr = ð_media_array[0]; - struct eth_media *stop = ð_media_array[MAX_ETH_MEDIA]; - char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1; - int pending_dev = 0; - - /* Find unused Ethernet bearer structure */ - while (eb_ptr->dev) { - if (!eb_ptr->bearer) - pending_dev++; - if (++eb_ptr == stop) - return pending_dev ? -EAGAIN : -EDQUOT; - } - - /* Find device with specified name */ - dev = dev_get_by_name(&init_net, driver_name); - if (!dev) - return -ENODEV; - - /* Create Ethernet bearer for device */ - eb_ptr->dev = dev; - eb_ptr->tipc_packet_type.type = htons(ETH_P_TIPC); - eb_ptr->tipc_packet_type.dev = dev; - eb_ptr->tipc_packet_type.func = recv_msg; - eb_ptr->tipc_packet_type.af_packet_priv = eb_ptr; - INIT_LIST_HEAD(&(eb_ptr->tipc_packet_type.list)); - INIT_WORK(&eb_ptr->setup, setup_media); - schedule_work(&eb_ptr->setup); - - /* Associate TIPC bearer with Ethernet bearer */ - eb_ptr->bearer = tb_ptr; - tb_ptr->usr_handle = (void *)eb_ptr; - memset(tb_ptr->bcast_addr.value, 0, sizeof(tb_ptr->bcast_addr.value)); - memcpy(tb_ptr->bcast_addr.value, dev->broadcast, ETH_ALEN); - tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_ETH; - tb_ptr->bcast_addr.broadcast = 1; - tb_ptr->mtu = dev->mtu; - tb_ptr->blocked = 0; - eth_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr); - return 0; -} - -/** - * cleanup_media - break association between Ethernet bearer and interface - * - * This routine must be invoked from a work queue because it can sleep. - */ -static void cleanup_media(struct work_struct *work) -{ - struct eth_media *eb_ptr = - container_of(work, struct eth_media, cleanup); - - dev_remove_pack(&eb_ptr->tipc_packet_type); - dev_put(eb_ptr->dev); - eb_ptr->dev = NULL; -} - -/** - * disable_media - detach TIPC bearer from an Ethernet interface - * - * Mark Ethernet bearer as inactive so that incoming buffers are thrown away, - * then get worker thread to complete bearer cleanup. (Can't do cleanup - * here because cleanup code needs to sleep and caller holds spinlocks.) - */ -static void disable_media(struct tipc_bearer *tb_ptr) -{ - struct eth_media *eb_ptr = (struct eth_media *)tb_ptr->usr_handle; - - eb_ptr->bearer = NULL; - INIT_WORK(&eb_ptr->cleanup, cleanup_media); - schedule_work(&eb_ptr->cleanup); -} - -/** - * recv_notification - handle device updates from OS - * - * Change the state of the Ethernet bearer (if any) associated with the - * specified device. - */ -static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *ptr) -{ - struct net_device *dev = netdev_notifier_info_to_dev(ptr); - struct eth_media *eb_ptr = ð_media_array[0]; - struct eth_media *stop = ð_media_array[MAX_ETH_MEDIA]; - - if (!net_eq(dev_net(dev), &init_net)) - return NOTIFY_DONE; - - while ((eb_ptr->dev != dev)) { - if (++eb_ptr == stop) - return NOTIFY_DONE; /* couldn't find device */ - } - if (!eb_ptr->bearer) - return NOTIFY_DONE; /* bearer had been disabled */ - - eb_ptr->bearer->mtu = dev->mtu; - - switch (evt) { - case NETDEV_CHANGE: - if (netif_carrier_ok(dev)) - tipc_continue(eb_ptr->bearer); - else - tipc_block_bearer(eb_ptr->bearer); - break; - case NETDEV_UP: - tipc_continue(eb_ptr->bearer); - break; - case NETDEV_DOWN: - tipc_block_bearer(eb_ptr->bearer); - break; - case NETDEV_CHANGEMTU: - case NETDEV_CHANGEADDR: - tipc_block_bearer(eb_ptr->bearer); - tipc_continue(eb_ptr->bearer); - break; - case NETDEV_UNREGISTER: - case NETDEV_CHANGENAME: - tipc_disable_bearer(eb_ptr->bearer->name); - break; - } - return NOTIFY_OK; -} - -/** - * eth_addr2str - convert Ethernet address to string - */ -static int eth_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) +/* convert Ethernet address to string */ +static int tipc_eth_addr2str(struct tipc_media_addr *a, char *str_buf, + int str_size) { if (str_size < 18) /* 18 = strlen("aa:bb:cc:dd:ee:ff\0") */ return 1; @@ -298,10 +50,8 @@ static int eth_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) return 0; } -/** - * eth_str2addr - convert Ethernet address format to message header format - */ -static int eth_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* convert Ethernet address format to message header format */ +static int tipc_eth_addr2msg(struct tipc_media_addr *a, char *msg_area) { memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_ETH; @@ -309,68 +59,30 @@ static int eth_addr2msg(struct tipc_media_addr *a, char *msg_area) return 0; } -/** - * eth_str2addr - convert message header address format to Ethernet format - */ -static int eth_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* convert message header address format to Ethernet format */ +static int tipc_eth_msg2addr(const struct tipc_bearer *tb_ptr, + struct tipc_media_addr *a, char *msg_area) { if (msg_area[TIPC_MEDIA_TYPE_OFFSET] != TIPC_MEDIA_TYPE_ETH) return 1; - eth_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET); + tipc_l2_media_addr_set(tb_ptr, a, msg_area + ETH_ADDR_OFFSET); return 0; } -/* - * Ethernet media registration info - */ -static struct tipc_media eth_media_info = { - .send_msg = send_msg, - .enable_media = enable_media, - .disable_media = disable_media, - .addr2str = eth_addr2str, - .addr2msg = eth_addr2msg, - .msg2addr = eth_msg2addr, +/* Ethernet media registration info */ +struct tipc_media eth_media_info = { + .send_msg = tipc_l2_send_msg, + .enable_media = tipc_enable_l2_media, + .disable_media = tipc_disable_l2_media, + .addr2str = tipc_eth_addr2str, + .addr2msg = tipc_eth_addr2msg, + .msg2addr = tipc_eth_msg2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, .type_id = TIPC_MEDIA_TYPE_ETH, + .hwaddr_len = ETH_ALEN, .name = "eth" }; -/** - * tipc_eth_media_start - activate Ethernet bearer support - * - * Register Ethernet media type with TIPC bearer code. Also register - * with OS for notifications about device state changes. - */ -int tipc_eth_media_start(void) -{ - int res; - - if (eth_started) - return -EINVAL; - - res = tipc_register_media(ð_media_info); - if (res) - return res; - - res = register_netdevice_notifier(¬ifier); - if (!res) - eth_started = 1; - return res; -} - -/** - * tipc_eth_media_stop - deactivate Ethernet bearer support - */ -void tipc_eth_media_stop(void) -{ - if (!eth_started) - return; - - flush_scheduled_work(); - unregister_netdevice_notifier(¬ifier); - eth_started = 0; -} diff --git a/net/tipc/handler.c b/net/tipc/handler.c index e4bc8a29674..1fabf160501 100644 --- a/net/tipc/handler.c +++ b/net/tipc/handler.c @@ -58,7 +58,6 @@ unsigned int tipc_k_signal(Handler routine, unsigned long argument) spin_lock_bh(&qitem_lock); if (!handler_enabled) { - pr_err("Signal request ignored by handler\n"); spin_unlock_bh(&qitem_lock); return -ENOPROTOOPT; } diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c index c1398929746..844a77e2582 100644 --- a/net/tipc/ib_media.c +++ b/net/tipc/ib_media.c @@ -42,252 +42,9 @@ #include "core.h" #include "bearer.h" -#define MAX_IB_MEDIA MAX_BEARERS - -/** - * struct ib_media - Infiniband media data structure - * @bearer: ptr to associated "generic" bearer structure - * @dev: ptr to associated Infiniband network device - * @tipc_packet_type: used in binding TIPC to Infiniband driver - * @cleanup: work item used when disabling bearer - */ - -struct ib_media { - struct tipc_bearer *bearer; - struct net_device *dev; - struct packet_type tipc_packet_type; - struct work_struct setup; - struct work_struct cleanup; -}; - -static struct tipc_media ib_media_info; -static struct ib_media ib_media_array[MAX_IB_MEDIA]; -static int ib_started; - -/** - * ib_media_addr_set - initialize Infiniband media address structure - * - * Media-dependent "value" field stores MAC address in first 6 bytes - * and zeroes out the remaining bytes. - */ -static void ib_media_addr_set(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *mac) -{ - BUILD_BUG_ON(sizeof(a->value) < INFINIBAND_ALEN); - memcpy(a->value, mac, INFINIBAND_ALEN); - a->media_id = TIPC_MEDIA_TYPE_IB; - a->broadcast = !memcmp(mac, tb_ptr->bcast_addr.value, INFINIBAND_ALEN); -} - -/** - * send_msg - send a TIPC message out over an InfiniBand interface - */ -static int send_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr, - struct tipc_media_addr *dest) -{ - struct sk_buff *clone; - struct net_device *dev; - int delta; - - clone = skb_clone(buf, GFP_ATOMIC); - if (!clone) - return 0; - - dev = ((struct ib_media *)(tb_ptr->usr_handle))->dev; - delta = dev->hard_header_len - skb_headroom(buf); - - if ((delta > 0) && - pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { - kfree_skb(clone); - return 0; - } - - skb_reset_network_header(clone); - clone->dev = dev; - clone->protocol = htons(ETH_P_TIPC); - dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, - dev->dev_addr, clone->len); - dev_queue_xmit(clone); - return 0; -} - -/** - * recv_msg - handle incoming TIPC message from an InfiniBand interface - * - * Accept only packets explicitly sent to this node, or broadcast packets; - * ignores packets sent using InfiniBand multicast, and traffic sent to other - * nodes (which can happen if interface is running in promiscuous mode). - */ -static int recv_msg(struct sk_buff *buf, struct net_device *dev, - struct packet_type *pt, struct net_device *orig_dev) -{ - struct ib_media *ib_ptr = (struct ib_media *)pt->af_packet_priv; - - if (!net_eq(dev_net(dev), &init_net)) { - kfree_skb(buf); - return NET_RX_DROP; - } - - if (likely(ib_ptr->bearer)) { - if (likely(buf->pkt_type <= PACKET_BROADCAST)) { - buf->next = NULL; - tipc_recv_msg(buf, ib_ptr->bearer); - return NET_RX_SUCCESS; - } - } - kfree_skb(buf); - return NET_RX_DROP; -} - -/** - * setup_bearer - setup association between InfiniBand bearer and interface - */ -static void setup_media(struct work_struct *work) -{ - struct ib_media *ib_ptr = - container_of(work, struct ib_media, setup); - - dev_add_pack(&ib_ptr->tipc_packet_type); -} - -/** - * enable_media - attach TIPC bearer to an InfiniBand interface - */ -static int enable_media(struct tipc_bearer *tb_ptr) -{ - struct net_device *dev; - struct ib_media *ib_ptr = &ib_media_array[0]; - struct ib_media *stop = &ib_media_array[MAX_IB_MEDIA]; - char *driver_name = strchr((const char *)tb_ptr->name, ':') + 1; - int pending_dev = 0; - - /* Find unused InfiniBand bearer structure */ - while (ib_ptr->dev) { - if (!ib_ptr->bearer) - pending_dev++; - if (++ib_ptr == stop) - return pending_dev ? -EAGAIN : -EDQUOT; - } - - /* Find device with specified name */ - dev = dev_get_by_name(&init_net, driver_name); - if (!dev) - return -ENODEV; - - /* Create InfiniBand bearer for device */ - ib_ptr->dev = dev; - ib_ptr->tipc_packet_type.type = htons(ETH_P_TIPC); - ib_ptr->tipc_packet_type.dev = dev; - ib_ptr->tipc_packet_type.func = recv_msg; - ib_ptr->tipc_packet_type.af_packet_priv = ib_ptr; - INIT_LIST_HEAD(&(ib_ptr->tipc_packet_type.list)); - INIT_WORK(&ib_ptr->setup, setup_media); - schedule_work(&ib_ptr->setup); - - /* Associate TIPC bearer with InfiniBand bearer */ - ib_ptr->bearer = tb_ptr; - tb_ptr->usr_handle = (void *)ib_ptr; - memset(tb_ptr->bcast_addr.value, 0, sizeof(tb_ptr->bcast_addr.value)); - memcpy(tb_ptr->bcast_addr.value, dev->broadcast, INFINIBAND_ALEN); - tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_IB; - tb_ptr->bcast_addr.broadcast = 1; - tb_ptr->mtu = dev->mtu; - tb_ptr->blocked = 0; - ib_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr); - return 0; -} - -/** - * cleanup_bearer - break association between InfiniBand bearer and interface - * - * This routine must be invoked from a work queue because it can sleep. - */ -static void cleanup_bearer(struct work_struct *work) -{ - struct ib_media *ib_ptr = - container_of(work, struct ib_media, cleanup); - - dev_remove_pack(&ib_ptr->tipc_packet_type); - dev_put(ib_ptr->dev); - ib_ptr->dev = NULL; -} - -/** - * disable_media - detach TIPC bearer from an InfiniBand interface - * - * Mark InfiniBand bearer as inactive so that incoming buffers are thrown away, - * then get worker thread to complete bearer cleanup. (Can't do cleanup - * here because cleanup code needs to sleep and caller holds spinlocks.) - */ -static void disable_media(struct tipc_bearer *tb_ptr) -{ - struct ib_media *ib_ptr = (struct ib_media *)tb_ptr->usr_handle; - - ib_ptr->bearer = NULL; - INIT_WORK(&ib_ptr->cleanup, cleanup_bearer); - schedule_work(&ib_ptr->cleanup); -} - -/** - * recv_notification - handle device updates from OS - * - * Change the state of the InfiniBand bearer (if any) associated with the - * specified device. - */ -static int recv_notification(struct notifier_block *nb, unsigned long evt, - void *ptr) -{ - struct net_device *dev = netdev_notifier_info_to_dev(ptr); - struct ib_media *ib_ptr = &ib_media_array[0]; - struct ib_media *stop = &ib_media_array[MAX_IB_MEDIA]; - - if (!net_eq(dev_net(dev), &init_net)) - return NOTIFY_DONE; - - while ((ib_ptr->dev != dev)) { - if (++ib_ptr == stop) - return NOTIFY_DONE; /* couldn't find device */ - } - if (!ib_ptr->bearer) - return NOTIFY_DONE; /* bearer had been disabled */ - - ib_ptr->bearer->mtu = dev->mtu; - - switch (evt) { - case NETDEV_CHANGE: - if (netif_carrier_ok(dev)) - tipc_continue(ib_ptr->bearer); - else - tipc_block_bearer(ib_ptr->bearer); - break; - case NETDEV_UP: - tipc_continue(ib_ptr->bearer); - break; - case NETDEV_DOWN: - tipc_block_bearer(ib_ptr->bearer); - break; - case NETDEV_CHANGEMTU: - case NETDEV_CHANGEADDR: - tipc_block_bearer(ib_ptr->bearer); - tipc_continue(ib_ptr->bearer); - break; - case NETDEV_UNREGISTER: - case NETDEV_CHANGENAME: - tipc_disable_bearer(ib_ptr->bearer->name); - break; - } - return NOTIFY_OK; -} - -static struct notifier_block notifier = { - .notifier_call = recv_notification, - .priority = 0, -}; - -/** - * ib_addr2str - convert InfiniBand address to string - */ -static int ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) +/* convert InfiniBand address to string */ +static int tipc_ib_addr2str(struct tipc_media_addr *a, char *str_buf, + int str_size) { if (str_size < 60) /* 60 = 19 * strlen("xx:") + strlen("xx\0") */ return 1; @@ -297,10 +54,8 @@ static int ib_addr2str(struct tipc_media_addr *a, char *str_buf, int str_size) return 0; } -/** - * ib_addr2msg - convert InfiniBand address format to message header format - */ -static int ib_addr2msg(struct tipc_media_addr *a, char *msg_area) +/* convert InfiniBand address format to message header format */ +static int tipc_ib_addr2msg(struct tipc_media_addr *a, char *msg_area) { memset(msg_area, 0, TIPC_MEDIA_ADDR_SIZE); msg_area[TIPC_MEDIA_TYPE_OFFSET] = TIPC_MEDIA_TYPE_IB; @@ -308,65 +63,27 @@ static int ib_addr2msg(struct tipc_media_addr *a, char *msg_area) return 0; } -/** - * ib_msg2addr - convert message header address format to InfiniBand format - */ -static int ib_msg2addr(const struct tipc_bearer *tb_ptr, - struct tipc_media_addr *a, char *msg_area) +/* convert message header address format to InfiniBand format */ +static int tipc_ib_msg2addr(const struct tipc_bearer *tb_ptr, + struct tipc_media_addr *a, char *msg_area) { - ib_media_addr_set(tb_ptr, a, msg_area); + tipc_l2_media_addr_set(tb_ptr, a, msg_area); return 0; } -/* - * InfiniBand media registration info - */ -static struct tipc_media ib_media_info = { - .send_msg = send_msg, - .enable_media = enable_media, - .disable_media = disable_media, - .addr2str = ib_addr2str, - .addr2msg = ib_addr2msg, - .msg2addr = ib_msg2addr, +/* InfiniBand media registration info */ +struct tipc_media ib_media_info = { + .send_msg = tipc_l2_send_msg, + .enable_media = tipc_enable_l2_media, + .disable_media = tipc_disable_l2_media, + .addr2str = tipc_ib_addr2str, + .addr2msg = tipc_ib_addr2msg, + .msg2addr = tipc_ib_msg2addr, .priority = TIPC_DEF_LINK_PRI, .tolerance = TIPC_DEF_LINK_TOL, .window = TIPC_DEF_LINK_WIN, .type_id = TIPC_MEDIA_TYPE_IB, + .hwaddr_len = INFINIBAND_ALEN, .name = "ib" }; -/** - * tipc_ib_media_start - activate InfiniBand bearer support - * - * Register InfiniBand media type with TIPC bearer code. Also register - * with OS for notifications about device state changes. - */ -int tipc_ib_media_start(void) -{ - int res; - - if (ib_started) - return -EINVAL; - - res = tipc_register_media(&ib_media_info); - if (res) - return res; - - res = register_netdevice_notifier(¬ifier); - if (!res) - ib_started = 1; - return res; -} - -/** - * tipc_ib_media_stop - deactivate InfiniBand bearer support - */ -void tipc_ib_media_stop(void) -{ - if (!ib_started) - return; - - flush_scheduled_work(); - unregister_netdevice_notifier(¬ifier); - ib_started = 0; -} diff --git a/net/tipc/link.c b/net/tipc/link.c index 13b98774582..c5190ab7529 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@ /* * net/tipc/link.c: TIPC link code * - * Copyright (c) 1996-2007, 2012, Ericsson AB + * Copyright (c) 1996-2007, 2012-2014, Ericsson AB * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -77,20 +77,19 @@ static const char *link_unk_evt = "Unknown link event "; static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, struct sk_buff *buf); -static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); -static int link_recv_changeover_msg(struct tipc_link **l_ptr, - struct sk_buff **buf); +static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); +static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, + struct sk_buff **buf); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); -static int link_send_sections_long(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); +static int tipc_link_iovec_long_xmit(struct tipc_port *sender, + struct iovec const *msg_sect, + unsigned int len, u32 destnode); static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void link_start(struct tipc_link *l_ptr); -static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); -static void tipc_link_send_sync(struct tipc_link *l); -static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_sync_xmit(struct tipc_link *l); +static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); /* * Simple link routines @@ -148,11 +147,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr) /** * link_timeout - handle expiration of link timer * @l_ptr: pointer to link - * - * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict - * with tipc_link_delete(). (There is no risk that the node will be deleted by - * another thread because tipc_link_delete() always cancels the link timer before - * tipc_node_delete() is called.) */ static void link_timeout(struct tipc_link *l_ptr) { @@ -214,8 +208,8 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time) * Returns pointer to link. */ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, - struct tipc_bearer *b_ptr, - const struct tipc_media_addr *media_addr) + struct tipc_bearer *b_ptr, + const struct tipc_media_addr *media_addr) { struct tipc_link *l_ptr; struct tipc_msg *msg; @@ -278,45 +272,44 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, tipc_node_attach_link(n_ptr, l_ptr); - k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); - list_add_tail(&l_ptr->link_list, &b_ptr->links); - tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); + k_init_timer(&l_ptr->timer, (Handler)link_timeout, + (unsigned long)l_ptr); + + link_state_event(l_ptr, STARTING_EVT); return l_ptr; } -/** - * tipc_link_delete - delete a link - * @l_ptr: pointer to link - * - * Note: 'tipc_net_lock' is write_locked, bearer is locked. - * This routine must not grab the node lock until after link timer cancellation - * to avoid a potential deadlock situation. - */ -void tipc_link_delete(struct tipc_link *l_ptr) +void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) { - if (!l_ptr) { - pr_err("Attempt to delete non-existent link\n"); - return; - } - - k_cancel_timer(&l_ptr->timer); - - tipc_node_lock(l_ptr->owner); - tipc_link_reset(l_ptr); - tipc_node_detach_link(l_ptr->owner, l_ptr); - tipc_link_stop(l_ptr); - list_del_init(&l_ptr->link_list); - tipc_node_unlock(l_ptr->owner); - k_term_timer(&l_ptr->timer); - kfree(l_ptr); -} + struct tipc_link *l_ptr; + struct tipc_node *n_ptr; -static void link_start(struct tipc_link *l_ptr) -{ - tipc_node_lock(l_ptr->owner); - link_state_event(l_ptr, STARTING_EVT); - tipc_node_unlock(l_ptr->owner); + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { + spin_lock_bh(&n_ptr->lock); + l_ptr = n_ptr->links[bearer_id]; + if (l_ptr) { + tipc_link_reset(l_ptr); + if (shutting_down || !tipc_node_is_up(n_ptr)) { + tipc_node_detach_link(l_ptr->owner, l_ptr); + tipc_link_reset_fragments(l_ptr); + spin_unlock_bh(&n_ptr->lock); + + /* Nobody else can access this link now: */ + del_timer_sync(&l_ptr->timer); + kfree(l_ptr); + } else { + /* Detach/delete when failover is finished: */ + l_ptr->flags |= LINK_STOPPED; + spin_unlock_bh(&n_ptr->lock); + del_timer_sync(&l_ptr->timer); + } + continue; + } + spin_unlock_bh(&n_ptr->lock); + } + rcu_read_unlock(); } /** @@ -335,8 +328,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) spin_lock_bh(&tipc_port_list_lock); p_ptr = tipc_port_lock(origport); if (p_ptr) { - if (!p_ptr->wakeup) - goto exit; if (!list_empty(&p_ptr->wait_list)) goto exit; p_ptr->congested = 1; @@ -371,7 +362,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) list_del_init(&p_ptr->wait_list); spin_lock_bh(p_ptr->lock); p_ptr->congested = 0; - p_ptr->wakeup(p_ptr); + tipc_port_wakeup(p_ptr); win -= p_ptr->waiting_pkts; spin_unlock_bh(p_ptr->lock); } @@ -386,14 +377,7 @@ exit: */ static void link_release_outqueue(struct tipc_link *l_ptr) { - struct sk_buff *buf = l_ptr->first_out; - struct sk_buff *next; - - while (buf) { - next = buf->next; - kfree_skb(buf); - buf = next; - } + kfree_skb_list(l_ptr->first_out); l_ptr->first_out = NULL; l_ptr->out_queue_size = 0; } @@ -410,37 +394,20 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) } /** - * tipc_link_stop - purge all inbound and outbound messages associated with link + * tipc_link_purge_queues - purge all pkt queues associated with link * @l_ptr: pointer to link */ -void tipc_link_stop(struct tipc_link *l_ptr) +void tipc_link_purge_queues(struct tipc_link *l_ptr) { - struct sk_buff *buf; - struct sk_buff *next; - - buf = l_ptr->oldest_deferred_in; - while (buf) { - next = buf->next; - kfree_skb(buf); - buf = next; - } - - buf = l_ptr->first_out; - while (buf) { - next = buf->next; - kfree_skb(buf); - buf = next; - } - + kfree_skb_list(l_ptr->oldest_deferred_in); + kfree_skb_list(l_ptr->first_out); tipc_link_reset_fragments(l_ptr); - kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; } void tipc_link_reset(struct tipc_link *l_ptr) { - struct sk_buff *buf; u32 prev_state = l_ptr->state; u32 checkpoint = l_ptr->next_in_no; int was_active_link = tipc_link_is_active(l_ptr); @@ -461,8 +428,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) tipc_node_link_down(l_ptr->owner, l_ptr); tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); - if (was_active_link && tipc_node_active_links(l_ptr->owner) && - l_ptr->owner->permit_changeover) { + if (was_active_link && tipc_node_active_links(l_ptr->owner)) { l_ptr->reset_checkpoint = checkpoint; l_ptr->exp_msg_count = START_CHANGEOVER; } @@ -471,12 +437,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) link_release_outqueue(l_ptr); kfree_skb(l_ptr->proto_msg_queue); l_ptr->proto_msg_queue = NULL; - buf = l_ptr->oldest_deferred_in; - while (buf) { - struct sk_buff *next = buf->next; - kfree_skb(buf); - buf = next; - } + kfree_skb_list(l_ptr->oldest_deferred_in); if (!list_empty(&l_ptr->waiting_ports)) tipc_link_wakeup_ports(l_ptr, 1); @@ -496,6 +457,21 @@ void tipc_link_reset(struct tipc_link *l_ptr) link_reset_statistics(l_ptr); } +void tipc_link_reset_list(unsigned int bearer_id) +{ + struct tipc_link *l_ptr; + struct tipc_node *n_ptr; + + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { + spin_lock_bh(&n_ptr->lock); + l_ptr = n_ptr->links[bearer_id]; + if (l_ptr) + tipc_link_reset(l_ptr); + spin_unlock_bh(&n_ptr->lock); + } + rcu_read_unlock(); +} static void link_activate(struct tipc_link *l_ptr) { @@ -514,13 +490,17 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) struct tipc_link *other; u32 cont_intv = l_ptr->continuity_interval; - if (!l_ptr->started && (event != STARTING_EVT)) + if (l_ptr->flags & LINK_STOPPED) + return; + + if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT)) return; /* Not yet. */ - if (link_blocked(l_ptr)) { + /* Check whether changeover is going on */ + if (l_ptr->exp_msg_count) { if (event == TIMEOUT_EVT) link_set_timer(l_ptr, cont_intv); - return; /* Changeover going on */ + return; } switch (l_ptr->state) { @@ -533,12 +513,12 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) if (l_ptr->next_in_no != l_ptr->checkpoint) { l_ptr->checkpoint = l_ptr->next_in_no; if (tipc_bclink_acks_missing(l_ptr->owner)) { - tipc_link_send_proto_msg(l_ptr, STATE_MSG, - 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { - tipc_link_send_proto_msg(l_ptr, STATE_MSG, - 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, + 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; } link_set_timer(l_ptr, cont_intv); @@ -546,7 +526,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) } l_ptr->state = WORKING_UNKNOWN; l_ptr->fsm_msg_cnt = 0; - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv / 4); break; @@ -556,7 +536,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) tipc_link_reset(l_ptr); l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; - tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -578,7 +559,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) tipc_link_reset(l_ptr); l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; - tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -588,14 +570,14 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->fsm_msg_cnt = 0; l_ptr->checkpoint = l_ptr->next_in_no; if (tipc_bclink_acks_missing(l_ptr->owner)) { - tipc_link_send_proto_msg(l_ptr, STATE_MSG, - 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; } link_set_timer(l_ptr, cont_intv); } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { - tipc_link_send_proto_msg(l_ptr, STATE_MSG, - 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, + 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv / 4); } else { /* Link has failed */ @@ -604,8 +586,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) tipc_link_reset(l_ptr); l_ptr->state = RESET_UNKNOWN; l_ptr->fsm_msg_cnt = 0; - tipc_link_send_proto_msg(l_ptr, RESET_MSG, - 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, RESET_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); } @@ -625,24 +607,25 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = WORKING_WORKING; l_ptr->fsm_msg_cnt = 0; link_activate(l_ptr); - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; if (l_ptr->owner->working_links == 1) - tipc_link_send_sync(l_ptr); + tipc_link_sync_xmit(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: l_ptr->state = RESET_RESET; l_ptr->fsm_msg_cnt = 0; - tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, + 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; case STARTING_EVT: - l_ptr->started = 1; + l_ptr->flags |= LINK_STARTED; /* fall through */ case TIMEOUT_EVT: - tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -660,16 +643,17 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event) l_ptr->state = WORKING_WORKING; l_ptr->fsm_msg_cnt = 0; link_activate(l_ptr); - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; if (l_ptr->owner->working_links == 1) - tipc_link_send_sync(l_ptr); + tipc_link_sync_xmit(l_ptr); link_set_timer(l_ptr, cont_intv); break; case RESET_MSG: break; case TIMEOUT_EVT: - tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, + 0, 0, 0, 0, 0); l_ptr->fsm_msg_cnt++; link_set_timer(l_ptr, cont_intv); break; @@ -755,11 +739,11 @@ static void link_add_chain_to_outqueue(struct tipc_link *l_ptr, } /* - * tipc_link_send_buf() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_buf + * tipc_link_xmit() is the 'full path' for messages, called from + * inside TIPC when the 'fast path' in tipc_send_xmit * has failed, and from link_send() */ -int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); u32 size = msg_size(msg); @@ -787,11 +771,10 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) /* Fragmentation needed ? */ if (size > max_packet) - return link_send_long_buf(l_ptr, buf); + return tipc_link_frag_xmit(l_ptr, buf); /* Packet can be queued or sent. */ - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) && - !link_congested(l_ptr))) { + if (likely(!link_congested(l_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); @@ -832,11 +815,11 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) } /* - * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has - * not been selected yet, and the the owner node is not locked + * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use + * has not been selected yet, and the the owner node is not locked * Called by TIPC internal users, e.g. the name distributor */ -int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector) { struct tipc_link *l_ptr; struct tipc_node *n_ptr; @@ -848,7 +831,7 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) tipc_node_lock(n_ptr); l_ptr = n_ptr->active_links[selector & 1]; if (l_ptr) - res = tipc_link_send_buf(l_ptr, buf); + res = __tipc_link_xmit(l_ptr, buf); else kfree_skb(buf); tipc_node_unlock(n_ptr); @@ -860,14 +843,14 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) } /* - * tipc_link_send_sync - synchronize broadcast link endpoints. + * tipc_link_sync_xmit - synchronize broadcast link endpoints. * * Give a newly added peer node the sequence number where it should * start receiving and acking broadcast packets. * * Called with node locked */ -static void tipc_link_send_sync(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *l) { struct sk_buff *buf; struct tipc_msg *msg; @@ -884,14 +867,14 @@ static void tipc_link_send_sync(struct tipc_link *l) } /* - * tipc_link_recv_sync - synchronize broadcast link endpoints. + * tipc_link_sync_rcv - synchronize broadcast link endpoints. * Receive the sequence number where we should start receiving and * acking broadcast packets from a newly added peer node, and open * up for reception of such packets. * * Called with node locked */ -static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); @@ -901,7 +884,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) } /* - * tipc_link_send_names - send name table entries to new neighbor + * tipc_link_names_xmit - send name table entries to new neighbor * * Send routine for bulk delivery of name table messages when contact * with a new neighbor occurs. No link congestion checking is performed @@ -909,7 +892,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) * small enough not to require fragmentation. * Called without any locks held. */ -void tipc_link_send_names(struct list_head *message_list, u32 dest) +void tipc_link_names_xmit(struct list_head *message_list, u32 dest) { struct tipc_node *n_ptr; struct tipc_link *l_ptr; @@ -944,41 +927,40 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest) } /* - * link_send_buf_fast: Entry for data messages where the + * tipc_link_xmit_fast: Entry for data messages where the * destination link is known and the header is complete, * inclusive total message length. Very time critical. * Link is locked. Returns user data length. */ -static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, - u32 *used_max_pkt) +static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, + u32 *used_max_pkt) { struct tipc_msg *msg = buf_msg(buf); int res = msg_data_sz(msg); if (likely(!link_congested(l_ptr))) { if (likely(msg_size(msg) <= l_ptr->max_pkt)) { - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) { - link_add_to_outqueue(l_ptr, buf, msg); - tipc_bearer_send(l_ptr->b_ptr, buf, - &l_ptr->media_addr); - l_ptr->unacked_window = 0; - return res; - } - } else + link_add_to_outqueue(l_ptr, buf, msg); + tipc_bearer_send(l_ptr->b_ptr, buf, + &l_ptr->media_addr); + l_ptr->unacked_window = 0; + return res; + } + else *used_max_pkt = l_ptr->max_pkt; } - return tipc_link_send_buf(l_ptr, buf); /* All other cases */ + return __tipc_link_xmit(l_ptr, buf); /* All other cases */ } /* - * tipc_link_send_sections_fast: Entry for messages where the + * tipc_link_iovec_xmit_fast: Entry for messages where the * destination processor is known and the header is complete, * except for total message length. * Returns user data length or errno. */ -int tipc_link_send_sections_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) +int tipc_link_iovec_xmit_fast(struct tipc_port *sender, + struct iovec const *msg_sect, + unsigned int len, u32 destaddr) { struct tipc_msg *hdr = &sender->phdr; struct tipc_link *l_ptr; @@ -1004,8 +986,8 @@ again: l_ptr = node->active_links[selector]; if (likely(l_ptr)) { if (likely(buf)) { - res = link_send_buf_fast(l_ptr, buf, - &sender->max_pkt); + res = tipc_link_xmit_fast(l_ptr, buf, + &sender->max_pkt); exit: tipc_node_unlock(node); read_unlock_bh(&tipc_net_lock); @@ -1013,8 +995,7 @@ exit: } /* Exit if link (or bearer) is congested */ - if (link_congested(l_ptr) || - tipc_bearer_blocked(l_ptr->b_ptr)) { + if (link_congested(l_ptr)) { res = link_schedule_port(l_ptr, sender->ref, res); goto exit; @@ -1032,24 +1013,21 @@ exit: if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) goto again; - return link_send_sections_long(sender, msg_sect, len, - destaddr); + return tipc_link_iovec_long_xmit(sender, msg_sect, + len, destaddr); } tipc_node_unlock(node); } read_unlock_bh(&tipc_net_lock); /* Couldn't find a link to the destination node */ - if (buf) - return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); - if (res >= 0) - return tipc_port_reject_sections(sender, hdr, msg_sect, - len, TIPC_ERR_NO_NODE); - return res; + kfree_skb(buf); + tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE); + return -ENETUNREACH; } /* - * link_send_sections_long(): Entry for long messages where the + * tipc_link_iovec_long_xmit(): Entry for long messages where the * destination node is known and the header is complete, * inclusive total message length. * Link and bearer congestion status have been checked to be ok, @@ -1062,9 +1040,9 @@ exit: * * Returns user data length or errno. */ -static int link_send_sections_long(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destaddr) +static int tipc_link_iovec_long_xmit(struct tipc_port *sender, + struct iovec const *msg_sect, + unsigned int len, u32 destaddr) { struct tipc_link *l_ptr; struct tipc_node *node; @@ -1127,10 +1105,7 @@ again: if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { res = -EFAULT; error: - for (; buf_chain; buf_chain = buf) { - buf = buf_chain->next; - kfree_skb(buf_chain); - } + kfree_skb_list(buf_chain); return res; } sect_crs += sz; @@ -1180,20 +1155,15 @@ error: if (l_ptr->max_pkt < max_pkt) { sender->max_pkt = l_ptr->max_pkt; tipc_node_unlock(node); - for (; buf_chain; buf_chain = buf) { - buf = buf_chain->next; - kfree_skb(buf_chain); - } + kfree_skb_list(buf_chain); goto again; } } else { reject: - for (; buf_chain; buf_chain = buf) { - buf = buf_chain->next; - kfree_skb(buf_chain); - } - return tipc_port_reject_sections(sender, hdr, msg_sect, - len, TIPC_ERR_NO_NODE); + kfree_skb_list(buf_chain); + tipc_port_iovec_reject(sender, hdr, msg_sect, len, + TIPC_ERR_NO_NODE); + return -ENETUNREACH; } /* Append chain of fragments to send queue & send them */ @@ -1209,7 +1179,7 @@ reject: /* * tipc_link_push_packet: Push one unsent packet to the media */ -u32 tipc_link_push_packet(struct tipc_link *l_ptr) +static u32 tipc_link_push_packet(struct tipc_link *l_ptr) { struct sk_buff *buf = l_ptr->first_out; u32 r_q_size = l_ptr->retransm_queue_size; @@ -1281,9 +1251,6 @@ void tipc_link_push_queue(struct tipc_link *l_ptr) { u32 res; - if (tipc_bearer_blocked(l_ptr->b_ptr)) - return; - do { res = tipc_link_push_packet(l_ptr); } while (!res); @@ -1370,26 +1337,15 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, msg = buf_msg(buf); - if (tipc_bearer_blocked(l_ptr->b_ptr)) { - if (l_ptr->retransm_queue_size == 0) { - l_ptr->retransm_queue_head = msg_seqno(msg); - l_ptr->retransm_queue_size = retransmits; - } else { - pr_err("Unexpected retransmit on link %s (qsize=%d)\n", - l_ptr->name, l_ptr->retransm_queue_size); + /* Detect repeated retransmit failures */ + if (l_ptr->last_retransmitted == msg_seqno(msg)) { + if (++l_ptr->stale_count > 100) { + link_retransmit_failure(l_ptr, buf); + return; } - return; } else { - /* Detect repeated retransmit failures on unblocked bearer */ - if (l_ptr->last_retransmitted == msg_seqno(msg)) { - if (++l_ptr->stale_count > 100) { - link_retransmit_failure(l_ptr, buf); - return; - } - } else { - l_ptr->last_retransmitted = msg_seqno(msg); - l_ptr->stale_count = 1; - } + l_ptr->last_retransmitted = msg_seqno(msg); + l_ptr->stale_count = 1; } while (retransmits && (buf != l_ptr->next_out) && buf) { @@ -1451,6 +1407,12 @@ static int link_recv_buf_validate(struct sk_buff *buf) u32 hdr_size; u32 min_hdr_size; + /* If this packet comes from the defer queue, the skb has already + * been validated + */ + if (unlikely(TIPC_SKB_CB(buf)->deferred)) + return 1; + if (unlikely(buf->len < MIN_H_SIZE)) return 0; @@ -1476,14 +1438,14 @@ static int link_recv_buf_validate(struct sk_buff *buf) } /** - * tipc_recv_msg - process TIPC messages arriving from off-node + * tipc_rcv - process TIPC packets/messages arriving from off-node * @head: pointer to message buffer chain * @tb_ptr: pointer to bearer message arrived on * * Invoked with no locks held. Bearer pointer must point to a valid bearer * structure (i.e. cannot be NULL), but bearer can be inactive. */ -void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) +void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) { read_lock_bh(&tipc_net_lock); while (head) { @@ -1495,15 +1457,10 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) u32 seq_no; u32 ackd; u32 released = 0; - int type; head = head->next; buf->next = NULL; - /* Ensure bearer is still enabled */ - if (unlikely(!b_ptr->active)) - goto discard; - /* Ensure message is well-formed */ if (unlikely(!link_recv_buf_validate(buf))) goto discard; @@ -1517,9 +1474,9 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) if (unlikely(msg_non_seq(msg))) { if (msg_user(msg) == LINK_CONFIG) - tipc_disc_recv_msg(buf, b_ptr); + tipc_disc_rcv(buf, b_ptr); else - tipc_bclink_recv_pkt(buf); + tipc_bclink_rcv(buf); continue; } @@ -1543,7 +1500,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) if ((n_ptr->block_setup & WAIT_PEER_DOWN) && msg_user(msg) == LINK_PROTOCOL && (msg_type(msg) == RESET_MSG || - msg_type(msg) == ACTIVATE_MSG) && + msg_type(msg) == ACTIVATE_MSG) && !msg_redundant_link(msg)) n_ptr->block_setup &= ~WAIT_PEER_DOWN; @@ -1562,7 +1519,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) while ((crs != l_ptr->next_out) && less_eq(buf_seqno(crs), ackd)) { struct sk_buff *next = crs->next; - kfree_skb(crs); crs = next; released++; @@ -1575,18 +1531,19 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) /* Try sending any messages link endpoint has pending */ if (unlikely(l_ptr->next_out)) tipc_link_push_queue(l_ptr); + if (unlikely(!list_empty(&l_ptr->waiting_ports))) tipc_link_wakeup_ports(l_ptr, 0); + if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { l_ptr->stats.sent_acks++; - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } - /* Now (finally!) process the incoming message */ -protocol_check: + /* Process the incoming packet */ if (unlikely(!link_working_working(l_ptr))) { if (msg_user(msg) == LINK_PROTOCOL) { - link_recv_proto_msg(l_ptr, buf); + tipc_link_proto_rcv(l_ptr, buf); head = link_insert_deferred_queue(l_ptr, head); tipc_node_unlock(n_ptr); continue; @@ -1615,67 +1572,65 @@ protocol_check: l_ptr->next_in_no++; if (unlikely(l_ptr->oldest_deferred_in)) head = link_insert_deferred_queue(l_ptr, head); -deliver: - if (likely(msg_isdata(msg))) { - tipc_node_unlock(n_ptr); - tipc_port_recv_msg(buf); - continue; + + /* Deliver packet/message to correct user: */ + if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { + if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { + tipc_node_unlock(n_ptr); + continue; + } + msg = buf_msg(buf); + } else if (msg_user(msg) == MSG_FRAGMENTER) { + int rc; + + l_ptr->stats.recv_fragments++; + rc = tipc_link_frag_rcv(&l_ptr->reasm_head, + &l_ptr->reasm_tail, + &buf); + if (rc == LINK_REASM_COMPLETE) { + l_ptr->stats.recv_fragmented++; + msg = buf_msg(buf); + } else { + if (rc == LINK_REASM_ERROR) + tipc_link_reset(l_ptr); + tipc_node_unlock(n_ptr); + continue; + } } + switch (msg_user(msg)) { - int ret; + case TIPC_LOW_IMPORTANCE: + case TIPC_MEDIUM_IMPORTANCE: + case TIPC_HIGH_IMPORTANCE: + case TIPC_CRITICAL_IMPORTANCE: + tipc_node_unlock(n_ptr); + tipc_port_rcv(buf); + continue; case MSG_BUNDLER: l_ptr->stats.recv_bundles++; l_ptr->stats.recv_bundled += msg_msgcnt(msg); tipc_node_unlock(n_ptr); - tipc_link_recv_bundle(buf); + tipc_link_bundle_rcv(buf); continue; case NAME_DISTRIBUTOR: n_ptr->bclink.recv_permitted = true; tipc_node_unlock(n_ptr); - tipc_named_recv(buf); - continue; - case BCAST_PROTOCOL: - tipc_link_recv_sync(n_ptr, buf); - tipc_node_unlock(n_ptr); + tipc_named_rcv(buf); continue; case CONN_MANAGER: tipc_node_unlock(n_ptr); - tipc_port_recv_proto_msg(buf); - continue; - case MSG_FRAGMENTER: - l_ptr->stats.recv_fragments++; - ret = tipc_link_recv_fragment(&l_ptr->reasm_head, - &l_ptr->reasm_tail, - &buf); - if (ret == LINK_REASM_COMPLETE) { - l_ptr->stats.recv_fragmented++; - msg = buf_msg(buf); - goto deliver; - } - if (ret == LINK_REASM_ERROR) - tipc_link_reset(l_ptr); - tipc_node_unlock(n_ptr); + tipc_port_proto_rcv(buf); continue; - case CHANGEOVER_PROTOCOL: - type = msg_type(msg); - if (link_recv_changeover_msg(&l_ptr, &buf)) { - msg = buf_msg(buf); - seq_no = msg_seqno(msg); - if (type == ORIGINAL_MSG) - goto deliver; - goto protocol_check; - } + case BCAST_PROTOCOL: + tipc_link_sync_rcv(n_ptr, buf); break; default: kfree_skb(buf); - buf = NULL; break; } tipc_node_unlock(n_ptr); - tipc_net_route_msg(buf); continue; unlock_discard: - tipc_node_unlock(n_ptr); discard: kfree_skb(buf); @@ -1742,7 +1697,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, u32 seq_no = buf_seqno(buf); if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { - link_recv_proto_msg(l_ptr, buf); + tipc_link_proto_rcv(l_ptr, buf); return; } @@ -1763,8 +1718,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, &l_ptr->newest_deferred_in, buf)) { l_ptr->deferred_inqueue_sz++; l_ptr->stats.deferred_recv++; + TIPC_SKB_CB(buf)->deferred = true; if ((l_ptr->deferred_inqueue_sz % 16) == 1) - tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); } else l_ptr->stats.duplicates++; } @@ -1772,9 +1728,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, /* * Send protocol message to the other endpoint. */ -void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, - int probe_msg, u32 gap, u32 tolerance, - u32 priority, u32 ack_mtu) +void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, + u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) { struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; @@ -1787,7 +1742,8 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, l_ptr->proto_msg_queue = NULL; } - if (link_blocked(l_ptr)) + /* Don't send protocol message during link changeover */ + if (l_ptr->exp_msg_count) return; /* Abort non-RESET send if communication with node is prohibited */ @@ -1862,12 +1818,6 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); buf->priority = TC_PRIO_CONTROL; - /* Defer message if bearer is already blocked */ - if (tipc_bearer_blocked(l_ptr->b_ptr)) { - l_ptr->proto_msg_queue = buf; - return; - } - tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr); l_ptr->unacked_window = 0; kfree_skb(buf); @@ -1878,7 +1828,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, * Note that network plane id propagates through the network, and may * change at any time. The node with lowest address rules */ -static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) +static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) { u32 rec_gap = 0; u32 max_pkt_info; @@ -1886,7 +1836,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) u32 msg_tol; struct tipc_msg *msg = buf_msg(buf); - if (link_blocked(l_ptr)) + /* Discard protocol message during link changeover */ + if (l_ptr->exp_msg_count) goto exit; /* record unnumbered packet arrival (force mismatch on next timeout) */ @@ -1896,8 +1847,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) if (tipc_own_addr > msg_prevnode(msg)) l_ptr->b_ptr->net_plane = msg_net_plane(msg); - l_ptr->owner->permit_changeover = msg_redundant_link(msg); - switch (msg_type(msg)) { case RESET_MSG: @@ -1998,8 +1947,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) msg_last_bcast(msg)); if (rec_gap || (msg_probe(msg))) { - tipc_link_send_proto_msg(l_ptr, STATE_MSG, - 0, rec_gap, 0, 0, max_pkt_ack); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0, + 0, max_pkt_ack); } if (msg_seq_gap(msg)) { l_ptr->stats.recv_nacks++; @@ -2013,13 +1962,13 @@ exit: } -/* - * tipc_link_tunnel(): Send one message via a link belonging to - * another bearer. Owner node is locked. +/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to + * a different bearer. Owner node is locked. */ -static void tipc_link_tunnel(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, - u32 selector) +static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, + struct tipc_msg *tunnel_hdr, + struct tipc_msg *msg, + u32 selector) { struct tipc_link *tunnel; struct sk_buff *buf; @@ -2038,16 +1987,17 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr, } skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); - tipc_link_send_buf(tunnel, buf); + __tipc_link_xmit(tunnel, buf); } - -/* - * changeover(): Send whole message queue via the remaining link - * Owner node is locked. +/* tipc_link_failover_send_queue(): A link has gone down, but a second + * link is still active. We can do failover. Tunnel the failing link's + * whole send queue via the remaining link. This way, we don't lose + * any packets, and sequence order is preserved for subsequent traffic + * sent over the remaining link. Owner node is locked. */ -void tipc_link_changeover(struct tipc_link *l_ptr) +void tipc_link_failover_send_queue(struct tipc_link *l_ptr) { u32 msgcount = l_ptr->out_queue_size; struct sk_buff *crs = l_ptr->first_out; @@ -2058,11 +2008,6 @@ void tipc_link_changeover(struct tipc_link *l_ptr) if (!tunnel) return; - if (!l_ptr->owner->permit_changeover) { - pr_warn("%speer did not permit changeover\n", link_co_err); - return; - } - tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); @@ -2075,7 +2020,7 @@ void tipc_link_changeover(struct tipc_link *l_ptr) if (buf) { skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); msg_set_size(&tunnel_hdr, INT_H_SIZE); - tipc_link_send_buf(tunnel, buf); + __tipc_link_xmit(tunnel, buf); } else { pr_warn("%sunable to send changeover msg\n", link_co_err); @@ -2096,20 +2041,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr) msgcount = msg_msgcnt(msg); while (msgcount--) { msg_set_seqno(m, msg_seqno(msg)); - tipc_link_tunnel(l_ptr, &tunnel_hdr, m, - msg_link_selector(m)); + tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, + msg_link_selector(m)); pos += align(msg_size(m)); m = (struct tipc_msg *)pos; } } else { - tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, - msg_link_selector(msg)); + tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, + msg_link_selector(msg)); } crs = crs->next; } } -void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel) +/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a + * duplicate of the first link's send queue via the new link. This way, we + * are guaranteed that currently queued packets from a socket are delivered + * before future traffic from the same socket, even if this is using the + * new link. The last arriving copy of each duplicate packet is dropped at + * the receiving end by the regular protocol check, so packet cardinality + * and sequence order is preserved per sender/receiver socket pair. + * Owner node is locked. + */ +void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, + struct tipc_link *tunnel) { struct sk_buff *iter; struct tipc_msg tunnel_hdr; @@ -2138,7 +2093,7 @@ void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel) skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, length); - tipc_link_send_buf(tunnel, outbuf); + __tipc_link_xmit(tunnel, outbuf); if (!tipc_link_is_up(l_ptr)) return; iter = iter->next; @@ -2165,87 +2120,114 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) return eb; } -/* - * link_recv_changeover_msg(): Receive tunneled packet sent - * via other link. Node is locked. Return extracted buffer. + + +/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. + * Owner node is locked. */ -static int link_recv_changeover_msg(struct tipc_link **l_ptr, - struct sk_buff **buf) +static void tipc_link_dup_rcv(struct tipc_link *l_ptr, + struct sk_buff *t_buf) { - struct sk_buff *tunnel_buf = *buf; - struct tipc_link *dest_link; - struct tipc_msg *msg; - struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); - u32 msg_typ = msg_type(tunnel_msg); - u32 msg_count = msg_msgcnt(tunnel_msg); - u32 bearer_id = msg_bearer_id(tunnel_msg); + struct sk_buff *buf; - if (bearer_id >= MAX_BEARERS) - goto exit; - dest_link = (*l_ptr)->owner->links[bearer_id]; - if (!dest_link) - goto exit; - if (dest_link == *l_ptr) { - pr_err("Unexpected changeover message on link <%s>\n", - (*l_ptr)->name); - goto exit; + if (!tipc_link_is_up(l_ptr)) + return; + + buf = buf_extract(t_buf, INT_H_SIZE); + if (buf == NULL) { + pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); + return; } - *l_ptr = dest_link; - msg = msg_get_wrapped(tunnel_msg); - if (msg_typ == DUPLICATE_MSG) { - if (less(msg_seqno(msg), mod(dest_link->next_in_no))) - goto exit; - *buf = buf_extract(tunnel_buf, INT_H_SIZE); - if (*buf == NULL) { - pr_warn("%sduplicate msg dropped\n", link_co_err); + /* Add buffer to deferred queue, if applicable: */ + link_handle_out_of_seq_msg(l_ptr, buf); +} + +/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet + * Owner node is locked. + */ +static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, + struct sk_buff *t_buf) +{ + struct tipc_msg *t_msg = buf_msg(t_buf); + struct sk_buff *buf = NULL; + struct tipc_msg *msg; + + if (tipc_link_is_up(l_ptr)) + tipc_link_reset(l_ptr); + + /* First failover packet? */ + if (l_ptr->exp_msg_count == START_CHANGEOVER) + l_ptr->exp_msg_count = msg_msgcnt(t_msg); + + /* Should there be an inner packet? */ + if (l_ptr->exp_msg_count) { + l_ptr->exp_msg_count--; + buf = buf_extract(t_buf, INT_H_SIZE); + if (buf == NULL) { + pr_warn("%sno inner failover pkt\n", link_co_err); goto exit; } - kfree_skb(tunnel_buf); - return 1; - } + msg = buf_msg(buf); - /* First original message ?: */ - if (tipc_link_is_up(dest_link)) { - pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg, - dest_link->name); - tipc_link_reset(dest_link); - dest_link->exp_msg_count = msg_count; - if (!msg_count) - goto exit; - } else if (dest_link->exp_msg_count == START_CHANGEOVER) { - dest_link->exp_msg_count = msg_count; - if (!msg_count) + if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) { + kfree_skb(buf); + buf = NULL; goto exit; + } + if (msg_user(msg) == MSG_FRAGMENTER) { + l_ptr->stats.recv_fragments++; + tipc_link_frag_rcv(&l_ptr->reasm_head, + &l_ptr->reasm_tail, + &buf); + } } +exit: + if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) { + tipc_node_detach_link(l_ptr->owner, l_ptr); + kfree(l_ptr); + } + return buf; +} + +/* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent + * via other link as result of a failover (ORIGINAL_MSG) or + * a new active link (DUPLICATE_MSG). Failover packets are + * returned to the active link for delivery upwards. + * Owner node is locked. + */ +static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, + struct sk_buff **buf) +{ + struct sk_buff *t_buf = *buf; + struct tipc_link *l_ptr; + struct tipc_msg *t_msg = buf_msg(t_buf); + u32 bearer_id = msg_bearer_id(t_msg); + + *buf = NULL; - /* Receive original message */ - if (dest_link->exp_msg_count == 0) { - pr_warn("%sgot too many tunnelled messages\n", link_co_err); + if (bearer_id >= MAX_BEARERS) goto exit; - } - dest_link->exp_msg_count--; - if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { + + l_ptr = n_ptr->links[bearer_id]; + if (!l_ptr) goto exit; - } else { - *buf = buf_extract(tunnel_buf, INT_H_SIZE); - if (*buf != NULL) { - kfree_skb(tunnel_buf); - return 1; - } else { - pr_warn("%soriginal msg dropped\n", link_co_err); - } - } + + if (msg_type(t_msg) == DUPLICATE_MSG) + tipc_link_dup_rcv(l_ptr, t_buf); + else if (msg_type(t_msg) == ORIGINAL_MSG) + *buf = tipc_link_failover_rcv(l_ptr, t_buf); + else + pr_warn("%sunknown tunnel pkt received\n", link_co_err); exit: - *buf = NULL; - kfree_skb(tunnel_buf); - return 0; + kfree_skb(t_buf); + return *buf != NULL; } /* * Bundler functionality: */ -void tipc_link_recv_bundle(struct sk_buff *buf) +void tipc_link_bundle_rcv(struct sk_buff *buf) { u32 msgcount = msg_msgcnt(buf_msg(buf)); u32 pos = INT_H_SIZE; @@ -2268,11 +2250,11 @@ void tipc_link_recv_bundle(struct sk_buff *buf) */ /* - * link_send_long_buf: Entry for buffers needing fragmentation. + * tipc_link_frag_xmit: Entry for buffers needing fragmentation. * The buffer is complete, inclusive total message length. * Returns user data length. */ -static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) +static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf) { struct sk_buff *buf_chain = NULL; struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; @@ -2307,11 +2289,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); if (fragm == NULL) { kfree_skb(buf); - while (buf_chain) { - buf = buf_chain; - buf_chain = buf_chain->next; - kfree_skb(buf); - } + kfree_skb_list(buf_chain); return -ENOMEM; } msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); @@ -2339,12 +2317,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) return dsz; } -/* - * tipc_link_recv_fragment(): Called with node lock on. Returns +/* tipc_link_frag_rcv(): Called with node lock on. Returns * the reassembled buffer if message is complete. */ -int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff **fbuf) +int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail, + struct sk_buff **fbuf) { struct sk_buff *frag = *fbuf; struct tipc_msg *msg = buf_msg(frag); @@ -2358,6 +2335,7 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, goto out_free; *head = frag; skb_frag_list_init(*head); + *fbuf = NULL; return 0; } else if (*head && skb_try_coalesce(*head, frag, &headstolen, &delta)) { @@ -2377,10 +2355,12 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, *tail = *head = NULL; return LINK_REASM_COMPLETE; } + *fbuf = NULL; return 0; out_free: pr_warn_ratelimited("Link unable to reassemble fragmented message\n"); kfree_skb(*fbuf); + *fbuf = NULL; return LINK_REASM_ERROR; } @@ -2414,35 +2394,41 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; } -/** - * link_find_link - locate link by name - * @name: ptr to link name string - * @node: ptr to area to be filled with ptr to associated node - * +/* tipc_link_find_owner - locate owner node of link by link's name + * @name: pointer to link name string + * @bearer_id: pointer to index in 'node->links' array where the link was found. * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; * this also prevents link deletion. * - * Returns pointer to link (or 0 if invalid link name). + * Returns pointer to node owning the link, or 0 if no matching link is found. */ -static struct tipc_link *link_find_link(const char *name, - struct tipc_node **node) +static struct tipc_node *tipc_link_find_owner(const char *link_name, + unsigned int *bearer_id) { struct tipc_link *l_ptr; struct tipc_node *n_ptr; + struct tipc_node *found_node = 0; int i; - list_for_each_entry(n_ptr, &tipc_node_list, list) { + *bearer_id = 0; + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { + tipc_node_lock(n_ptr); for (i = 0; i < MAX_BEARERS; i++) { l_ptr = n_ptr->links[i]; - if (l_ptr && !strcmp(l_ptr->name, name)) - goto found; + if (l_ptr && !strcmp(l_ptr->name, link_name)) { + *bearer_id = i; + found_node = n_ptr; + break; + } } + tipc_node_unlock(n_ptr); + if (found_node) + break; } - l_ptr = NULL; - n_ptr = NULL; -found: - *node = n_ptr; - return l_ptr; + rcu_read_unlock(); + + return found_node; } /** @@ -2484,32 +2470,33 @@ static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd) struct tipc_link *l_ptr; struct tipc_bearer *b_ptr; struct tipc_media *m_ptr; + int bearer_id; int res = 0; - l_ptr = link_find_link(name, &node); - if (l_ptr) { - /* - * acquire node lock for tipc_link_send_proto_msg(). - * see "TIPC locking policy" in net.c. - */ + node = tipc_link_find_owner(name, &bearer_id); + if (node) { tipc_node_lock(node); - switch (cmd) { - case TIPC_CMD_SET_LINK_TOL: - link_set_supervision_props(l_ptr, new_value); - tipc_link_send_proto_msg(l_ptr, - STATE_MSG, 0, 0, new_value, 0, 0); - break; - case TIPC_CMD_SET_LINK_PRI: - l_ptr->priority = new_value; - tipc_link_send_proto_msg(l_ptr, - STATE_MSG, 0, 0, 0, new_value, 0); - break; - case TIPC_CMD_SET_LINK_WINDOW: - tipc_link_set_queue_limits(l_ptr, new_value); - break; - default: - res = -EINVAL; - break; + l_ptr = node->links[bearer_id]; + + if (l_ptr) { + switch (cmd) { + case TIPC_CMD_SET_LINK_TOL: + link_set_supervision_props(l_ptr, new_value); + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, + new_value, 0, 0); + break; + case TIPC_CMD_SET_LINK_PRI: + l_ptr->priority = new_value; + tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, + 0, new_value, 0); + break; + case TIPC_CMD_SET_LINK_WINDOW: + tipc_link_set_queue_limits(l_ptr, new_value); + break; + default: + res = -EINVAL; + break; + } } tipc_node_unlock(node); return res; @@ -2604,6 +2591,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_ char *link_name; struct tipc_link *l_ptr; struct tipc_node *node; + unsigned int bearer_id; if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); @@ -2614,15 +2602,19 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_ return tipc_cfg_reply_error_string("link not found"); return tipc_cfg_reply_none(); } - read_lock_bh(&tipc_net_lock); - l_ptr = link_find_link(link_name, &node); - if (!l_ptr) { + node = tipc_link_find_owner(link_name, &bearer_id); + if (!node) { read_unlock_bh(&tipc_net_lock); return tipc_cfg_reply_error_string("link not found"); } - tipc_node_lock(node); + l_ptr = node->links[bearer_id]; + if (!l_ptr) { + tipc_node_unlock(node); + read_unlock_bh(&tipc_net_lock); + return tipc_cfg_reply_error_string("link not found"); + } link_reset_statistics(l_ptr); tipc_node_unlock(node); read_unlock_bh(&tipc_net_lock); @@ -2652,18 +2644,27 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) struct tipc_node *node; char *status; u32 profile_total = 0; + unsigned int bearer_id; int ret; if (!strcmp(name, tipc_bclink_name)) return tipc_bclink_stats(buf, buf_size); read_lock_bh(&tipc_net_lock); - l = link_find_link(name, &node); - if (!l) { + node = tipc_link_find_owner(name, &bearer_id); + if (!node) { read_unlock_bh(&tipc_net_lock); return 0; } tipc_node_lock(node); + + l = node->links[bearer_id]; + if (!l) { + tipc_node_unlock(node); + read_unlock_bh(&tipc_net_lock); + return 0; + } + s = &l->stats; if (tipc_link_is_active(l)) diff --git a/net/tipc/link.h b/net/tipc/link.h index 8a6c1026644..8c0b49b5b2e 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -1,7 +1,7 @@ /* * net/tipc/link.h: Include file for TIPC link code * - * Copyright (c) 1995-2006, Ericsson AB + * Copyright (c) 1995-2006, 2013, Ericsson AB * Copyright (c) 2004-2005, 2010-2011, Wind River Systems * All rights reserved. * @@ -40,27 +40,28 @@ #include "msg.h" #include "node.h" -/* - * Link reassembly status codes +/* Link reassembly status codes */ #define LINK_REASM_ERROR -1 #define LINK_REASM_COMPLETE 1 -/* - * Out-of-range value for link sequence numbers +/* Out-of-range value for link sequence numbers */ #define INVALID_LINK_SEQ 0x10000 -/* - * Link states +/* Link working states */ #define WORKING_WORKING 560810u #define WORKING_UNKNOWN 560811u #define RESET_UNKNOWN 560812u #define RESET_RESET 560813u -/* - * Starting value for maximum packet size negotiation on unicast links +/* Link endpoint execution states + */ +#define LINK_STARTED 0x0001 +#define LINK_STOPPED 0x0002 + +/* Starting value for maximum packet size negotiation on unicast links * (unless bearer MTU is less) */ #define MAX_PKT_DEFAULT 1500 @@ -102,8 +103,7 @@ struct tipc_stats { * @media_addr: media address to use when sending messages over link * @timer: link timer * @owner: pointer to peer node - * @link_list: adjacent links in bearer's list of links - * @started: indicates if link has been started + * @flags: execution state flags for link endpoint instance * @checkpoint: reference point for triggering link continuity checking * @peer_session: link session # being used by peer end of link * @peer_bearer_id: bearer id used by link's peer endpoint @@ -112,7 +112,6 @@ struct tipc_stats { * @continuity_interval: link continuity testing interval [in ms] * @abort_limit: # of unacknowledged continuity probes needed to reset link * @state: current state of link FSM - * @blocked: indicates if link has been administratively blocked * @fsm_msg_cnt: # of protocol messages link FSM has sent in current state * @proto_msg: template for control messages generated by link * @pmsg: convenience pointer to "proto_msg" field @@ -150,10 +149,9 @@ struct tipc_link { struct tipc_media_addr media_addr; struct timer_list timer; struct tipc_node *owner; - struct list_head link_list; /* Management and link supervision data */ - int started; + unsigned int flags; u32 checkpoint; u32 peer_session; u32 peer_bearer_id; @@ -162,7 +160,6 @@ struct tipc_link { u32 continuity_interval; u32 abort_limit; int state; - int blocked; u32 fsm_msg_cnt; struct { unchar hdr[INT_H_SIZE]; @@ -217,35 +214,39 @@ struct tipc_port; struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, struct tipc_bearer *b_ptr, const struct tipc_media_addr *media_addr); -void tipc_link_delete(struct tipc_link *l_ptr); -void tipc_link_changeover(struct tipc_link *l_ptr); -void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *dest); +void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down); +void tipc_link_failover_send_queue(struct tipc_link *l_ptr); +void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest); void tipc_link_reset_fragments(struct tipc_link *l_ptr); int tipc_link_is_up(struct tipc_link *l_ptr); int tipc_link_is_active(struct tipc_link *l_ptr); -u32 tipc_link_push_packet(struct tipc_link *l_ptr); -void tipc_link_stop(struct tipc_link *l_ptr); -struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, u16 cmd); -struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space); -struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space); +void tipc_link_purge_queues(struct tipc_link *l_ptr); +struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, + int req_tlv_space, + u16 cmd); +struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, + int req_tlv_space); +struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, + int req_tlv_space); void tipc_link_reset(struct tipc_link *l_ptr); -int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); -void tipc_link_send_names(struct list_head *message_list, u32 dest); +void tipc_link_reset_list(unsigned int bearer_id); +int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); +void tipc_link_names_xmit(struct list_head *message_list, u32 dest); +int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); u32 tipc_link_get_max_pkt(u32 dest, u32 selector); -int tipc_link_send_sections_fast(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len, u32 destnode); -void tipc_link_recv_bundle(struct sk_buff *buf); -int tipc_link_recv_fragment(struct sk_buff **reasm_head, - struct sk_buff **reasm_tail, - struct sk_buff **fbuf); -void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, - u32 gap, u32 tolerance, u32 priority, - u32 acked_mtu); +int tipc_link_iovec_xmit_fast(struct tipc_port *sender, + struct iovec const *msg_sect, + unsigned int len, u32 destnode); +void tipc_link_bundle_rcv(struct sk_buff *buf); +int tipc_link_frag_rcv(struct sk_buff **reasm_head, + struct sk_buff **reasm_tail, + struct sk_buff **fbuf); +void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, + u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); void tipc_link_push_queue(struct tipc_link *l_ptr); u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, - struct sk_buff *buf); + struct sk_buff *buf); void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all); void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); void tipc_link_retransmit(struct tipc_link *l_ptr, @@ -312,11 +313,6 @@ static inline int link_reset_reset(struct tipc_link *l_ptr) return l_ptr->state == RESET_RESET; } -static inline int link_blocked(struct tipc_link *l_ptr) -{ - return l_ptr->exp_msg_count || l_ptr->blocked; -} - static inline int link_congested(struct tipc_link *l_ptr) { return l_ptr->out_queue_size >= l_ptr->queue_limit[0]; diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index e0d08055754..aff8041dc15 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -131,16 +131,24 @@ static void named_cluster_distribute(struct sk_buff *buf) { struct sk_buff *buf_copy; struct tipc_node *n_ptr; + struct tipc_link *l_ptr; - list_for_each_entry(n_ptr, &tipc_node_list, list) { - if (tipc_node_active_links(n_ptr)) { + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { + spin_lock_bh(&n_ptr->lock); + l_ptr = n_ptr->active_links[n_ptr->addr & 1]; + if (l_ptr) { buf_copy = skb_copy(buf, GFP_ATOMIC); - if (!buf_copy) + if (!buf_copy) { + spin_unlock_bh(&n_ptr->lock); break; + } msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); - tipc_link_send(buf_copy, n_ptr->addr, n_ptr->addr); + __tipc_link_xmit(l_ptr, buf_copy); } + spin_unlock_bh(&n_ptr->lock); } + rcu_read_unlock(); kfree_skb(buf); } @@ -262,7 +270,7 @@ void tipc_named_node_up(unsigned long nodearg) named_distribute(&message_list, node, &publ_zone, max_item_buf); read_unlock_bh(&tipc_nametbl_lock); - tipc_link_send_names(&message_list, node); + tipc_link_names_xmit(&message_list, node); } /** @@ -293,9 +301,9 @@ static void named_purge_publ(struct publication *publ) } /** - * tipc_named_recv - process name table update message sent by another node + * tipc_named_rcv - process name table update message sent by another node */ -void tipc_named_recv(struct sk_buff *buf) +void tipc_named_rcv(struct sk_buff *buf) { struct publication *publ; struct tipc_msg *msg = buf_msg(buf); diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index 1e41bdd4f25..9b312ccfd43 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -42,7 +42,7 @@ void tipc_named_publish(struct publication *publ); void tipc_named_withdraw(struct publication *publ); void tipc_named_node_up(unsigned long node); -void tipc_named_recv(struct sk_buff *buf); +void tipc_named_rcv(struct sk_buff *buf); void tipc_named_reinit(void); #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 09dcd54b04e..042e8e3cabc 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -148,8 +148,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper, */ static struct sub_seq *tipc_subseq_alloc(u32 cnt) { - struct sub_seq *sseq = kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC); - return sseq; + return kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC); } /** @@ -942,20 +941,48 @@ int tipc_nametbl_init(void) return 0; } -void tipc_nametbl_stop(void) +/** + * tipc_purge_publications - remove all publications for a given type + * + * tipc_nametbl_lock must be held when calling this function + */ +static void tipc_purge_publications(struct name_seq *seq) { - u32 i; + struct publication *publ, *safe; + struct sub_seq *sseq; + struct name_info *info; - if (!table.types) + if (!seq->sseqs) { + nameseq_delete_empty(seq); return; + } + sseq = seq->sseqs; + info = sseq->info; + list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { + tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, + publ->ref, publ->key); + } +} + +void tipc_nametbl_stop(void) +{ + u32 i; + struct name_seq *seq; + struct hlist_head *seq_head; + struct hlist_node *safe; - /* Verify name table is empty, then release it */ + /* Verify name table is empty and purge any lingering + * publications, then release the name table + */ write_lock_bh(&tipc_nametbl_lock); for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { if (hlist_empty(&table.types[i])) continue; - pr_err("nametbl_stop(): orphaned hash chain detected\n"); - break; + seq_head = &table.types[i]; + hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) { + tipc_purge_publications(seq); + } + continue; } kfree(table.types); table.types = NULL; diff --git a/net/tipc/net.c b/net/tipc/net.c index 7d305ecc09c..4c564eb69e1 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -146,19 +146,19 @@ void tipc_net_route_msg(struct sk_buff *buf) if (tipc_in_scope(dnode, tipc_own_addr)) { if (msg_isdata(msg)) { if (msg_mcast(msg)) - tipc_port_recv_mcast(buf, NULL); + tipc_port_mcast_rcv(buf, NULL); else if (msg_destport(msg)) - tipc_port_recv_msg(buf); + tipc_port_rcv(buf); else net_route_named_msg(buf); return; } switch (msg_user(msg)) { case NAME_DISTRIBUTOR: - tipc_named_recv(buf); + tipc_named_rcv(buf); break; case CONN_MANAGER: - tipc_port_recv_proto_msg(buf); + tipc_port_proto_rcv(buf); break; default: kfree_skb(buf); @@ -168,7 +168,7 @@ void tipc_net_route_msg(struct sk_buff *buf) /* Handle message for another node */ skb_trim(buf, msg_size(msg)); - tipc_link_send(buf, dnode, msg_link_selector(msg)); + tipc_link_xmit(buf, dnode, msg_link_selector(msg)); } void tipc_net_start(u32 addr) @@ -182,8 +182,8 @@ void tipc_net_start(u32 addr) tipc_bclink_init(); write_unlock_bh(&tipc_net_lock); - tipc_cfg_reinit(); - + tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr, + TIPC_ZONE_SCOPE, 0, tipc_own_addr); pr_info("Started in network mode\n"); pr_info("Own node address %s, network identity %u\n", tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); @@ -191,15 +191,15 @@ void tipc_net_start(u32 addr) void tipc_net_stop(void) { - struct tipc_node *node, *t_node; - if (!tipc_own_addr) return; + + tipc_nametbl_withdraw(TIPC_CFG_SRV, tipc_own_addr, 0, tipc_own_addr); write_lock_bh(&tipc_net_lock); tipc_bearer_stop(); tipc_bclink_stop(); - list_for_each_entry_safe(node, t_node, &tipc_node_list, list) - tipc_node_delete(node); + tipc_node_stop(); write_unlock_bh(&tipc_net_lock); + pr_info("Left network mode\n"); } diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index 9f72a637636..3aaf73de9e2 100644 --- a/net/tipc/netlink.c +++ b/net/tipc/netlink.c @@ -83,8 +83,6 @@ static struct genl_ops tipc_genl_ops[] = { }, }; -static int tipc_genl_family_registered; - int tipc_netlink_start(void) { int res; @@ -94,16 +92,10 @@ int tipc_netlink_start(void) pr_err("Failed to register netlink interface\n"); return res; } - - tipc_genl_family_registered = 1; return 0; } void tipc_netlink_stop(void) { - if (!tipc_genl_family_registered) - return; - genl_unregister_family(&tipc_genl_family); - tipc_genl_family_registered = 0; } diff --git a/net/tipc/node.c b/net/tipc/node.c index 25100c0a6fe..1d3a4999a70 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -2,7 +2,7 @@ * net/tipc/node.c: TIPC node management routines * * Copyright (c) 2000-2006, 2012 Ericsson AB - * Copyright (c) 2005-2006, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2010-2014, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -44,13 +44,11 @@ static void node_lost_contact(struct tipc_node *n_ptr); static void node_established_contact(struct tipc_node *n_ptr); -static DEFINE_SPINLOCK(node_create_lock); - static struct hlist_head node_htable[NODE_HTABLE_SIZE]; LIST_HEAD(tipc_node_list); static u32 tipc_num_nodes; - -static atomic_t tipc_num_links = ATOMIC_INIT(0); +static u32 tipc_num_links; +static DEFINE_SPINLOCK(node_list_lock); /* * A trivial power-of-two bitmask technique is used for speed, since this @@ -73,37 +71,26 @@ struct tipc_node *tipc_node_find(u32 addr) if (unlikely(!in_own_cluster_exact(addr))) return NULL; - hlist_for_each_entry(node, &node_htable[tipc_hashfn(addr)], hash) { - if (node->addr == addr) + rcu_read_lock(); + hlist_for_each_entry_rcu(node, &node_htable[tipc_hashfn(addr)], hash) { + if (node->addr == addr) { + rcu_read_unlock(); return node; + } } + rcu_read_unlock(); return NULL; } -/** - * tipc_node_create - create neighboring node - * - * Currently, this routine is called by neighbor discovery code, which holds - * net_lock for reading only. We must take node_create_lock to ensure a node - * isn't created twice if two different bearers discover the node at the same - * time. (It would be preferable to switch to holding net_lock in write mode, - * but this is a non-trivial change.) - */ struct tipc_node *tipc_node_create(u32 addr) { struct tipc_node *n_ptr, *temp_node; - spin_lock_bh(&node_create_lock); - - n_ptr = tipc_node_find(addr); - if (n_ptr) { - spin_unlock_bh(&node_create_lock); - return n_ptr; - } + spin_lock_bh(&node_list_lock); n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC); if (!n_ptr) { - spin_unlock_bh(&node_create_lock); + spin_unlock_bh(&node_list_lock); pr_warn("Node creation failed, no memory\n"); return NULL; } @@ -114,31 +101,41 @@ struct tipc_node *tipc_node_create(u32 addr) INIT_LIST_HEAD(&n_ptr->list); INIT_LIST_HEAD(&n_ptr->nsub); - hlist_add_head(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); + hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); - list_for_each_entry(temp_node, &tipc_node_list, list) { + list_for_each_entry_rcu(temp_node, &tipc_node_list, list) { if (n_ptr->addr < temp_node->addr) break; } - list_add_tail(&n_ptr->list, &temp_node->list); + list_add_tail_rcu(&n_ptr->list, &temp_node->list); n_ptr->block_setup = WAIT_PEER_DOWN; n_ptr->signature = INVALID_NODE_SIG; tipc_num_nodes++; - spin_unlock_bh(&node_create_lock); + spin_unlock_bh(&node_list_lock); return n_ptr; } -void tipc_node_delete(struct tipc_node *n_ptr) +static void tipc_node_delete(struct tipc_node *n_ptr) { - list_del(&n_ptr->list); - hlist_del(&n_ptr->hash); - kfree(n_ptr); + list_del_rcu(&n_ptr->list); + hlist_del_rcu(&n_ptr->hash); + kfree_rcu(n_ptr, rcu); tipc_num_nodes--; } +void tipc_node_stop(void) +{ + struct tipc_node *node, *t_node; + + spin_lock_bh(&node_list_lock); + list_for_each_entry_safe(node, t_node, &tipc_node_list, list) + tipc_node_delete(node); + spin_unlock_bh(&node_list_lock); +} + /** * tipc_node_link_up - handle addition of link * @@ -162,7 +159,7 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) pr_info("New link <%s> becomes standby\n", l_ptr->name); return; } - tipc_link_send_duplicate(active[0], l_ptr); + tipc_link_dup_queue_xmit(active[0], l_ptr); if (l_ptr->priority == active[0]->priority) { active[0] = l_ptr; return; @@ -225,7 +222,7 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) if (active[0] == l_ptr) node_select_active_links(n_ptr); if (tipc_node_is_up(n_ptr)) - tipc_link_changeover(l_ptr); + tipc_link_failover_send_queue(l_ptr); else node_lost_contact(n_ptr); } @@ -235,11 +232,6 @@ int tipc_node_active_links(struct tipc_node *n_ptr) return n_ptr->active_links[0] != NULL; } -int tipc_node_redundant_links(struct tipc_node *n_ptr) -{ - return n_ptr->working_links > 1; -} - int tipc_node_is_up(struct tipc_node *n_ptr) { return tipc_node_active_links(n_ptr); @@ -248,15 +240,25 @@ int tipc_node_is_up(struct tipc_node *n_ptr) void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { n_ptr->links[l_ptr->b_ptr->identity] = l_ptr; - atomic_inc(&tipc_num_links); + spin_lock_bh(&node_list_lock); + tipc_num_links++; + spin_unlock_bh(&node_list_lock); n_ptr->link_cnt++; } void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) { - n_ptr->links[l_ptr->b_ptr->identity] = NULL; - atomic_dec(&tipc_num_links); - n_ptr->link_cnt--; + int i; + + for (i = 0; i < MAX_BEARERS; i++) { + if (l_ptr != n_ptr->links[i]) + continue; + n_ptr->links[i] = NULL; + spin_lock_bh(&node_list_lock); + tipc_num_links--; + spin_unlock_bh(&node_list_lock); + n_ptr->link_cnt--; + } } static void node_established_contact(struct tipc_node *n_ptr) @@ -291,11 +293,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Flush broadcast link info associated with lost node */ if (n_ptr->bclink.recv_permitted) { - while (n_ptr->bclink.deferred_head) { - struct sk_buff *buf = n_ptr->bclink.deferred_head; - n_ptr->bclink.deferred_head = buf->next; - kfree_skb(buf); - } + kfree_skb_list(n_ptr->bclink.deferred_head); n_ptr->bclink.deferred_size = 0; if (n_ptr->bclink.reasm_head) { @@ -344,27 +342,28 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE " (network address)"); - read_lock_bh(&tipc_net_lock); + spin_lock_bh(&node_list_lock); if (!tipc_num_nodes) { - read_unlock_bh(&tipc_net_lock); + spin_unlock_bh(&node_list_lock); return tipc_cfg_reply_none(); } /* For now, get space for all other nodes */ payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes; if (payload_size > 32768u) { - read_unlock_bh(&tipc_net_lock); + spin_unlock_bh(&node_list_lock); return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (too many nodes)"); } + spin_unlock_bh(&node_list_lock); + buf = tipc_cfg_reply_alloc(payload_size); - if (!buf) { - read_unlock_bh(&tipc_net_lock); + if (!buf) return NULL; - } /* Add TLVs for all nodes in scope */ - list_for_each_entry(n_ptr, &tipc_node_list, list) { + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { if (!tipc_in_scope(domain, n_ptr->addr)) continue; node_info.addr = htonl(n_ptr->addr); @@ -372,8 +371,7 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) tipc_cfg_append_tlv(buf, TIPC_TLV_NODE_INFO, &node_info, sizeof(node_info)); } - - read_unlock_bh(&tipc_net_lock); + rcu_read_unlock(); return buf; } @@ -396,21 +394,19 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) if (!tipc_own_addr) return tipc_cfg_reply_none(); - read_lock_bh(&tipc_net_lock); - + spin_lock_bh(&node_list_lock); /* Get space for all unicast links + broadcast link */ - payload_size = TLV_SPACE(sizeof(link_info)) * - (atomic_read(&tipc_num_links) + 1); + payload_size = TLV_SPACE((sizeof(link_info)) * (tipc_num_links + 1)); if (payload_size > 32768u) { - read_unlock_bh(&tipc_net_lock); + spin_unlock_bh(&node_list_lock); return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED " (too many links)"); } + spin_unlock_bh(&node_list_lock); + buf = tipc_cfg_reply_alloc(payload_size); - if (!buf) { - read_unlock_bh(&tipc_net_lock); + if (!buf) return NULL; - } /* Add TLV for broadcast link */ link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr)); @@ -419,7 +415,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info)); /* Add TLVs for any other links in scope */ - list_for_each_entry(n_ptr, &tipc_node_list, list) { + rcu_read_lock(); + list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { u32 i; if (!tipc_in_scope(domain, n_ptr->addr)) @@ -436,7 +433,6 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space) } tipc_node_unlock(n_ptr); } - - read_unlock_bh(&tipc_net_lock); + rcu_read_unlock(); return buf; } diff --git a/net/tipc/node.h b/net/tipc/node.h index e5e96c04e16..7cbb8cec1a9 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -2,7 +2,7 @@ * net/tipc/node.h: Include file for TIPC node management routines * * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems + * Copyright (c) 2005, 2010-2014, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -64,9 +64,9 @@ * @working_links: number of working links to node (both active and standby) * @block_setup: bit mask of conditions preventing link establishment to node * @link_cnt: number of links to node - * @permit_changeover: non-zero if node has redundant links to this system * @signature: node instance identifier * @bclink: broadcast-related info + * @rcu: rcu struct for tipc_node * @acked: sequence # of last outbound b'cast message acknowledged by node * @last_in: sequence # of last in-sequence b'cast message received from node * @last_sent: sequence # of last b'cast message sent by node @@ -89,8 +89,8 @@ struct tipc_node { int link_cnt; int working_links; int block_setup; - int permit_changeover; u32 signature; + struct rcu_head rcu; struct { u32 acked; u32 last_in; @@ -109,13 +109,12 @@ extern struct list_head tipc_node_list; struct tipc_node *tipc_node_find(u32 addr); struct tipc_node *tipc_node_create(u32 addr); -void tipc_node_delete(struct tipc_node *n_ptr); +void tipc_node_stop(void); void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr); void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr); int tipc_node_active_links(struct tipc_node *n_ptr); -int tipc_node_redundant_links(struct tipc_node *n_ptr); int tipc_node_is_up(struct tipc_node *n_ptr); struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space); struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); diff --git a/net/tipc/port.c b/net/tipc/port.c index d43f3182b1d..5c14c7801ee 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -1,7 +1,7 @@ /* * net/tipc/port.c: TIPC port code * - * Copyright (c) 1992-2007, Ericsson AB + * Copyright (c) 1992-2007, 2014, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -38,6 +38,7 @@ #include "config.h" #include "port.h" #include "name_table.h" +#include "socket.h" /* Connection management: */ #define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ @@ -54,17 +55,6 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); static void port_timeout(unsigned long ref); - -static u32 port_peernode(struct tipc_port *p_ptr) -{ - return msg_destnode(&p_ptr->phdr); -} - -static u32 port_peerport(struct tipc_port *p_ptr) -{ - return msg_destport(&p_ptr->phdr); -} - /** * tipc_port_peer_msg - verify message was sent by connected port's peer * @@ -76,33 +66,32 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg) u32 peernode; u32 orignode; - if (msg_origport(msg) != port_peerport(p_ptr)) + if (msg_origport(msg) != tipc_port_peerport(p_ptr)) return 0; orignode = msg_orignode(msg); - peernode = port_peernode(p_ptr); + peernode = tipc_port_peernode(p_ptr); return (orignode == peernode) || (!orignode && (peernode == tipc_own_addr)) || (!peernode && (orignode == tipc_own_addr)); } /** - * tipc_multicast - send a multicast message to local and remote destinations + * tipc_port_mcast_xmit - send a multicast message to local and remote + * destinations */ -int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, - struct iovec const *msg_sect, unsigned int len) +int tipc_port_mcast_xmit(struct tipc_port *oport, + struct tipc_name_seq const *seq, + struct iovec const *msg_sect, + unsigned int len) { struct tipc_msg *hdr; struct sk_buff *buf; struct sk_buff *ibuf = NULL; struct tipc_port_list dports = {0, NULL, }; - struct tipc_port *oport = tipc_port_deref(ref); int ext_targets; int res; - if (unlikely(!oport)) - return -EINVAL; - /* Create multicast message */ hdr = &oport->phdr; msg_set_type(hdr, TIPC_MCAST_MSG); @@ -131,7 +120,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, return -ENOMEM; } } - res = tipc_bclink_send_msg(buf); + res = tipc_bclink_xmit(buf); if ((res < 0) && (dports.count != 0)) kfree_skb(ibuf); } else { @@ -140,7 +129,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, if (res >= 0) { if (ibuf) - tipc_port_recv_mcast(ibuf, &dports); + tipc_port_mcast_rcv(ibuf, &dports); } else { tipc_port_list_free(&dports); } @@ -148,11 +137,11 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, } /** - * tipc_port_recv_mcast - deliver multicast message to all destination ports + * tipc_port_mcast_rcv - deliver multicast message to all destination ports * * If there is no port list, perform a lookup to create one */ -void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) +void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp) { struct tipc_msg *msg; struct tipc_port_list dports = {0, NULL, }; @@ -176,7 +165,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) msg_set_destnode(msg, tipc_own_addr); if (dp->count == 1) { msg_set_destport(msg, dp->ports[0]); - tipc_port_recv_msg(buf); + tipc_port_rcv(buf); tipc_port_list_free(dp); return; } @@ -191,7 +180,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) if ((index == 0) && (cnt != 0)) item = item->next; msg_set_destport(buf_msg(b), item->ports[index]); - tipc_port_recv_msg(b); + tipc_port_rcv(b); } } exit: @@ -199,40 +188,32 @@ exit: tipc_port_list_free(dp); } -/** - * tipc_createport - create a generic TIPC port + +void tipc_port_wakeup(struct tipc_port *port) +{ + tipc_sock_wakeup(tipc_port_to_sock(port)); +} + +/* tipc_port_init - intiate TIPC port and lock it * - * Returns pointer to (locked) TIPC port, or NULL if unable to create it + * Returns obtained reference if initialization is successful, zero otherwise */ -struct tipc_port *tipc_createport(struct sock *sk, - u32 (*dispatcher)(struct tipc_port *, - struct sk_buff *), - void (*wakeup)(struct tipc_port *), - const u32 importance) +u32 tipc_port_init(struct tipc_port *p_ptr, + const unsigned int importance) { - struct tipc_port *p_ptr; struct tipc_msg *msg; u32 ref; - p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); - if (!p_ptr) { - pr_warn("Port creation failed, no memory\n"); - return NULL; - } ref = tipc_ref_acquire(p_ptr, &p_ptr->lock); if (!ref) { - pr_warn("Port creation failed, ref. table exhausted\n"); - kfree(p_ptr); - return NULL; + pr_warn("Port registration failed, ref. table exhausted\n"); + return 0; } - p_ptr->sk = sk; p_ptr->max_pkt = MAX_PKT_DEFAULT; p_ptr->ref = ref; INIT_LIST_HEAD(&p_ptr->wait_list); INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); - p_ptr->dispatcher = dispatcher; - p_ptr->wakeup = wakeup; k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); INIT_LIST_HEAD(&p_ptr->publications); INIT_LIST_HEAD(&p_ptr->port_list); @@ -248,10 +229,10 @@ struct tipc_port *tipc_createport(struct sock *sk, msg_set_origport(msg, ref); list_add_tail(&p_ptr->port_list, &ports); spin_unlock_bh(&tipc_port_list_lock); - return p_ptr; + return ref; } -int tipc_deleteport(struct tipc_port *p_ptr) +void tipc_port_destroy(struct tipc_port *p_ptr) { struct sk_buff *buf = NULL; @@ -272,67 +253,7 @@ int tipc_deleteport(struct tipc_port *p_ptr) list_del(&p_ptr->wait_list); spin_unlock_bh(&tipc_port_list_lock); k_term_timer(&p_ptr->timer); - kfree(p_ptr); tipc_net_route_msg(buf); - return 0; -} - -static int port_unreliable(struct tipc_port *p_ptr) -{ - return msg_src_droppable(&p_ptr->phdr); -} - -int tipc_portunreliable(u32 ref, unsigned int *isunreliable) -{ - struct tipc_port *p_ptr; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - *isunreliable = port_unreliable(p_ptr); - tipc_port_unlock(p_ptr); - return 0; -} - -int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) -{ - struct tipc_port *p_ptr; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0)); - tipc_port_unlock(p_ptr); - return 0; -} - -static int port_unreturnable(struct tipc_port *p_ptr) -{ - return msg_dest_droppable(&p_ptr->phdr); -} - -int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) -{ - struct tipc_port *p_ptr; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - *isunrejectable = port_unreturnable(p_ptr); - tipc_port_unlock(p_ptr); - return 0; -} - -int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) -{ - struct tipc_port *p_ptr; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0)); - tipc_port_unlock(p_ptr); - return 0; } /* @@ -350,8 +271,8 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr, if (buf) { msg = buf_msg(buf); tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, - port_peernode(p_ptr)); - msg_set_destport(msg, port_peerport(p_ptr)); + tipc_port_peernode(p_ptr)); + msg_set_destport(msg, tipc_port_peerport(p_ptr)); msg_set_origport(msg, p_ptr->ref); msg_set_msgcnt(msg, ack); } @@ -422,17 +343,17 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err) /* send returned message & dispose of rejected message */ src_node = msg_prevnode(msg); if (in_own_node(src_node)) - tipc_port_recv_msg(rbuf); + tipc_port_rcv(rbuf); else - tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); + tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg)); exit: kfree_skb(buf); return data_sz; } -int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, - struct iovec const *msg_sect, unsigned int len, - int err) +int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr, + struct iovec const *msg_sect, unsigned int len, + int err) { struct sk_buff *buf; int res; @@ -519,7 +440,7 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er return buf; } -void tipc_port_recv_proto_msg(struct sk_buff *buf) +void tipc_port_proto_rcv(struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); struct tipc_port *p_ptr; @@ -547,13 +468,12 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf) /* Process protocol message sent by peer */ switch (msg_type(msg)) { case CONN_ACK: - wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && - p_ptr->wakeup; + wakeable = tipc_port_congested(p_ptr) && p_ptr->congested; p_ptr->acked += msg_msgcnt(msg); if (!tipc_port_congested(p_ptr)) { p_ptr->congested = 0; if (wakeable) - p_ptr->wakeup(p_ptr); + tipc_port_wakeup(p_ptr); } break; case CONN_PROBE: @@ -584,8 +504,8 @@ static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id) ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref); if (p_ptr->connected) { - u32 dport = port_peerport(p_ptr); - u32 destnode = port_peernode(p_ptr); + u32 dport = tipc_port_peerport(p_ptr); + u32 destnode = tipc_port_peernode(p_ptr); ret += tipc_snprintf(buf + ret, len - ret, " connected to <%u.%u.%u:%u>", @@ -673,34 +593,6 @@ void tipc_acknowledge(u32 ref, u32 ack) tipc_net_route_msg(buf); } -int tipc_portimportance(u32 ref, unsigned int *importance) -{ - struct tipc_port *p_ptr; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - *importance = (unsigned int)msg_importance(&p_ptr->phdr); - tipc_port_unlock(p_ptr); - return 0; -} - -int tipc_set_portimportance(u32 ref, unsigned int imp) -{ - struct tipc_port *p_ptr; - - if (imp > TIPC_CRITICAL_IMPORTANCE) - return -EINVAL; - - p_ptr = tipc_port_lock(ref); - if (!p_ptr) - return -EINVAL; - msg_set_importance(&p_ptr->phdr, (u32)imp); - tipc_port_unlock(p_ptr); - return 0; -} - - int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, struct tipc_name_seq const *seq) { @@ -760,7 +652,7 @@ int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope, return res; } -int tipc_connect(u32 ref, struct tipc_portid const *peer) +int tipc_port_connect(u32 ref, struct tipc_portid const *peer) { struct tipc_port *p_ptr; int res; @@ -768,17 +660,17 @@ int tipc_connect(u32 ref, struct tipc_portid const *peer) p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; - res = __tipc_connect(ref, p_ptr, peer); + res = __tipc_port_connect(ref, p_ptr, peer); tipc_port_unlock(p_ptr); return res; } /* - * __tipc_connect - connect to a remote peer + * __tipc_port_connect - connect to a remote peer * * Port must be locked. */ -int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, struct tipc_portid const *peer) { struct tipc_msg *msg; @@ -815,26 +707,23 @@ exit: * * Port must be locked. */ -int __tipc_disconnect(struct tipc_port *tp_ptr) +int __tipc_port_disconnect(struct tipc_port *tp_ptr) { - int res; - if (tp_ptr->connected) { tp_ptr->connected = 0; /* let timer expire on it's own to avoid deadlock! */ tipc_nodesub_unsubscribe(&tp_ptr->subscription); - res = 0; - } else { - res = -ENOTCONN; + return 0; } - return res; + + return -ENOTCONN; } /* - * tipc_disconnect(): Disconnect port form peer. + * tipc_port_disconnect(): Disconnect port form peer. * This is a node local operation. */ -int tipc_disconnect(u32 ref) +int tipc_port_disconnect(u32 ref) { struct tipc_port *p_ptr; int res; @@ -842,15 +731,15 @@ int tipc_disconnect(u32 ref) p_ptr = tipc_port_lock(ref); if (!p_ptr) return -EINVAL; - res = __tipc_disconnect(p_ptr); + res = __tipc_port_disconnect(p_ptr); tipc_port_unlock(p_ptr); return res; } /* - * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect + * tipc_port_shutdown(): Send a SHUTDOWN msg to peer and disconnect */ -int tipc_shutdown(u32 ref) +int tipc_port_shutdown(u32 ref) { struct tipc_port *p_ptr; struct sk_buff *buf = NULL; @@ -862,13 +751,13 @@ int tipc_shutdown(u32 ref) buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN); tipc_port_unlock(p_ptr); tipc_net_route_msg(buf); - return tipc_disconnect(ref); + return tipc_port_disconnect(ref); } /** - * tipc_port_recv_msg - receive message from lower layer and deliver to port user + * tipc_port_rcv - receive message from lower layer and deliver to port user */ -int tipc_port_recv_msg(struct sk_buff *buf) +int tipc_port_rcv(struct sk_buff *buf) { struct tipc_port *p_ptr; struct tipc_msg *msg = buf_msg(buf); @@ -885,7 +774,7 @@ int tipc_port_recv_msg(struct sk_buff *buf) /* validate destination & pass to port, otherwise reject message */ p_ptr = tipc_port_lock(destport); if (likely(p_ptr)) { - err = p_ptr->dispatcher(p_ptr, buf); + err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf); tipc_port_unlock(p_ptr); if (likely(!err)) return dsz; @@ -897,43 +786,43 @@ int tipc_port_recv_msg(struct sk_buff *buf) } /* - * tipc_port_recv_sections(): Concatenate and deliver sectioned - * message for this node. + * tipc_port_iovec_rcv: Concatenate and deliver sectioned + * message for this node. */ -static int tipc_port_recv_sections(struct tipc_port *sender, - struct iovec const *msg_sect, - unsigned int len) +static int tipc_port_iovec_rcv(struct tipc_port *sender, + struct iovec const *msg_sect, + unsigned int len) { struct sk_buff *buf; int res; res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf); if (likely(buf)) - tipc_port_recv_msg(buf); + tipc_port_rcv(buf); return res; } /** * tipc_send - send message sections on connection */ -int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len) +int tipc_send(struct tipc_port *p_ptr, + struct iovec const *msg_sect, + unsigned int len) { - struct tipc_port *p_ptr; u32 destnode; int res; - p_ptr = tipc_port_deref(ref); - if (!p_ptr || !p_ptr->connected) + if (!p_ptr->connected) return -EINVAL; p_ptr->congested = 1; if (!tipc_port_congested(p_ptr)) { - destnode = port_peernode(p_ptr); + destnode = tipc_port_peernode(p_ptr); if (likely(!in_own_node(destnode))) - res = tipc_link_send_sections_fast(p_ptr, msg_sect, - len, destnode); + res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, + destnode); else - res = tipc_port_recv_sections(p_ptr, msg_sect, len); + res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); if (likely(res != -ELINKCONG)) { p_ptr->congested = 0; @@ -942,7 +831,7 @@ int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len) return res; } } - if (port_unreliable(p_ptr)) { + if (tipc_port_unreliable(p_ptr)) { p_ptr->congested = 0; return len; } @@ -952,17 +841,18 @@ int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len) /** * tipc_send2name - send message sections to port name */ -int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, - struct iovec const *msg_sect, unsigned int len) +int tipc_send2name(struct tipc_port *p_ptr, + struct tipc_name const *name, + unsigned int domain, + struct iovec const *msg_sect, + unsigned int len) { - struct tipc_port *p_ptr; struct tipc_msg *msg; u32 destnode = domain; u32 destport; int res; - p_ptr = tipc_port_deref(ref); - if (!p_ptr || p_ptr->connected) + if (p_ptr->connected) return -EINVAL; msg = &p_ptr->phdr; @@ -977,39 +867,39 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, if (likely(destport || destnode)) { if (likely(in_own_node(destnode))) - res = tipc_port_recv_sections(p_ptr, msg_sect, len); + res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); else if (tipc_own_addr) - res = tipc_link_send_sections_fast(p_ptr, msg_sect, - len, destnode); + res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, + destnode); else - res = tipc_port_reject_sections(p_ptr, msg, msg_sect, - len, TIPC_ERR_NO_NODE); + res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, + len, TIPC_ERR_NO_NODE); if (likely(res != -ELINKCONG)) { if (res > 0) p_ptr->sent++; return res; } - if (port_unreliable(p_ptr)) { + if (tipc_port_unreliable(p_ptr)) return len; - } + return -ELINKCONG; } - return tipc_port_reject_sections(p_ptr, msg, msg_sect, len, - TIPC_ERR_NO_NAME); + return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, + TIPC_ERR_NO_NAME); } /** * tipc_send2port - send message sections to port identity */ -int tipc_send2port(u32 ref, struct tipc_portid const *dest, - struct iovec const *msg_sect, unsigned int len) +int tipc_send2port(struct tipc_port *p_ptr, + struct tipc_portid const *dest, + struct iovec const *msg_sect, + unsigned int len) { - struct tipc_port *p_ptr; struct tipc_msg *msg; int res; - p_ptr = tipc_port_deref(ref); - if (!p_ptr || p_ptr->connected) + if (p_ptr->connected) return -EINVAL; msg = &p_ptr->phdr; @@ -1020,20 +910,20 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest, msg_set_hdr_sz(msg, BASIC_H_SIZE); if (in_own_node(dest->node)) - res = tipc_port_recv_sections(p_ptr, msg_sect, len); + res = tipc_port_iovec_rcv(p_ptr, msg_sect, len); else if (tipc_own_addr) - res = tipc_link_send_sections_fast(p_ptr, msg_sect, len, - dest->node); + res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, + dest->node); else - res = tipc_port_reject_sections(p_ptr, msg, msg_sect, len, + res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, TIPC_ERR_NO_NODE); if (likely(res != -ELINKCONG)) { if (res > 0) p_ptr->sent++; return res; } - if (port_unreliable(p_ptr)) { + if (tipc_port_unreliable(p_ptr)) return len; - } + return -ELINKCONG; } diff --git a/net/tipc/port.h b/net/tipc/port.h index 34f12bd4074..a00397393bd 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -1,7 +1,7 @@ /* * net/tipc/port.h: Include file for TIPC port code * - * Copyright (c) 1994-2007, Ericsson AB + * Copyright (c) 1994-2007, 2014, Ericsson AB * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -48,7 +48,6 @@ /** * struct tipc_port - TIPC port structure - * @sk: pointer to socket handle * @lock: pointer to spinlock for controlling access to port * @connected: non-zero if port is currently connected to a peer port * @conn_type: TIPC type used when connection was established @@ -60,8 +59,6 @@ * @ref: unique reference to port in TIPC object registry * @phdr: preformatted message header used when sending messages * @port_list: adjacent ports in TIPC's global list of ports - * @dispatcher: ptr to routine which handles received messages - * @wakeup: ptr to routine to call when port is no longer congested * @wait_list: adjacent ports in list of ports waiting on link congestion * @waiting_pkts: * @sent: # of non-empty messages sent by port @@ -74,7 +71,6 @@ * @subscription: "node down" subscription used to terminate failed connections */ struct tipc_port { - struct sock *sk; spinlock_t *lock; int connected; u32 conn_type; @@ -86,8 +82,6 @@ struct tipc_port { u32 ref; struct tipc_msg phdr; struct list_head port_list; - u32 (*dispatcher)(struct tipc_port *, struct sk_buff *); - void (*wakeup)(struct tipc_port *); struct list_head wait_list; u32 waiting_pkts; u32 sent; @@ -106,68 +100,71 @@ struct tipc_port_list; /* * TIPC port manipulation routines */ -struct tipc_port *tipc_createport(struct sock *sk, - u32 (*dispatcher)(struct tipc_port *, - struct sk_buff *), - void (*wakeup)(struct tipc_port *), - const u32 importance); +u32 tipc_port_init(struct tipc_port *p_ptr, + const unsigned int importance); int tipc_reject_msg(struct sk_buff *buf, u32 err); void tipc_acknowledge(u32 port_ref, u32 ack); -int tipc_deleteport(struct tipc_port *p_ptr); - -int tipc_portimportance(u32 portref, unsigned int *importance); -int tipc_set_portimportance(u32 portref, unsigned int importance); - -int tipc_portunreliable(u32 portref, unsigned int *isunreliable); -int tipc_set_portunreliable(u32 portref, unsigned int isunreliable); - -int tipc_portunreturnable(u32 portref, unsigned int *isunreturnable); -int tipc_set_portunreturnable(u32 portref, unsigned int isunreturnable); +void tipc_port_destroy(struct tipc_port *p_ptr); int tipc_publish(struct tipc_port *p_ptr, unsigned int scope, struct tipc_name_seq const *name_seq); + int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope, struct tipc_name_seq const *name_seq); -int tipc_connect(u32 portref, struct tipc_portid const *port); +int tipc_port_connect(u32 portref, struct tipc_portid const *port); -int tipc_disconnect(u32 portref); +int tipc_port_disconnect(u32 portref); -int tipc_shutdown(u32 ref); +int tipc_port_shutdown(u32 ref); +void tipc_port_wakeup(struct tipc_port *port); /* * The following routines require that the port be locked on entry */ -int __tipc_disconnect(struct tipc_port *tp_ptr); -int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +int __tipc_port_disconnect(struct tipc_port *tp_ptr); +int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr, struct tipc_portid const *peer); int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); /* * TIPC messaging routines */ -int tipc_port_recv_msg(struct sk_buff *buf); -int tipc_send(u32 portref, struct iovec const *msg_sect, unsigned int len); - -int tipc_send2name(u32 portref, struct tipc_name const *name, u32 domain, - struct iovec const *msg_sect, unsigned int len); +int tipc_port_rcv(struct sk_buff *buf); + +int tipc_send(struct tipc_port *port, + struct iovec const *msg_sect, + unsigned int len); + +int tipc_send2name(struct tipc_port *port, + struct tipc_name const *name, + u32 domain, + struct iovec const *msg_sect, + unsigned int len); + +int tipc_send2port(struct tipc_port *port, + struct tipc_portid const *dest, + struct iovec const *msg_sect, + unsigned int len); + +int tipc_port_mcast_xmit(struct tipc_port *port, + struct tipc_name_seq const *seq, + struct iovec const *msg, + unsigned int len); + +int tipc_port_iovec_reject(struct tipc_port *p_ptr, + struct tipc_msg *hdr, + struct iovec const *msg_sect, + unsigned int len, + int err); -int tipc_send2port(u32 portref, struct tipc_portid const *dest, - struct iovec const *msg_sect, unsigned int len); - -int tipc_multicast(u32 portref, struct tipc_name_seq const *seq, - struct iovec const *msg, unsigned int len); - -int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, - struct iovec const *msg_sect, unsigned int len, - int err); struct sk_buff *tipc_port_get_ports(void); -void tipc_port_recv_proto_msg(struct sk_buff *buf); -void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp); +void tipc_port_proto_rcv(struct sk_buff *buf); +void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp); void tipc_port_reinit(void); /** @@ -188,14 +185,53 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr) spin_unlock_bh(p_ptr->lock); } -static inline struct tipc_port *tipc_port_deref(u32 ref) +static inline int tipc_port_congested(struct tipc_port *p_ptr) { - return (struct tipc_port *)tipc_ref_deref(ref); + return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); } -static inline int tipc_port_congested(struct tipc_port *p_ptr) + +static inline u32 tipc_port_peernode(struct tipc_port *p_ptr) { - return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); + return msg_destnode(&p_ptr->phdr); +} + +static inline u32 tipc_port_peerport(struct tipc_port *p_ptr) +{ + return msg_destport(&p_ptr->phdr); +} + +static inline bool tipc_port_unreliable(struct tipc_port *port) +{ + return msg_src_droppable(&port->phdr) != 0; +} + +static inline void tipc_port_set_unreliable(struct tipc_port *port, + bool unreliable) +{ + msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0); +} + +static inline bool tipc_port_unreturnable(struct tipc_port *port) +{ + return msg_dest_droppable(&port->phdr) != 0; +} + +static inline void tipc_port_set_unreturnable(struct tipc_port *port, + bool unreturnable) +{ + msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0); +} + + +static inline int tipc_port_importance(struct tipc_port *port) +{ + return msg_importance(&port->phdr); +} + +static inline void tipc_port_set_importance(struct tipc_port *port, int imp) +{ + msg_set_importance(&port->phdr, (u32)imp); } #endif diff --git a/net/tipc/ref.c b/net/tipc/ref.c index 2a2a938dc22..3d4ecd754ee 100644 --- a/net/tipc/ref.c +++ b/net/tipc/ref.c @@ -89,7 +89,7 @@ struct ref_table { static struct ref_table tipc_ref_table; -static DEFINE_RWLOCK(ref_table_lock); +static DEFINE_SPINLOCK(ref_table_lock); /** * tipc_ref_table_init - create reference table for objects @@ -126,9 +126,6 @@ int tipc_ref_table_init(u32 requested_size, u32 start) */ void tipc_ref_table_stop(void) { - if (!tipc_ref_table.entries) - return; - vfree(tipc_ref_table.entries); tipc_ref_table.entries = NULL; } @@ -162,7 +159,7 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock) } /* take a free entry, if available; otherwise initialize a new entry */ - write_lock_bh(&ref_table_lock); + spin_lock_bh(&ref_table_lock); if (tipc_ref_table.first_free) { index = tipc_ref_table.first_free; entry = &(tipc_ref_table.entries[index]); @@ -178,7 +175,7 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock) } else { ref = 0; } - write_unlock_bh(&ref_table_lock); + spin_unlock_bh(&ref_table_lock); /* * Grab the lock so no one else can modify this entry @@ -219,7 +216,7 @@ void tipc_ref_discard(u32 ref) index = ref & index_mask; entry = &(tipc_ref_table.entries[index]); - write_lock_bh(&ref_table_lock); + spin_lock_bh(&ref_table_lock); if (!entry->object) { pr_err("Attempt to discard ref. to non-existent obj\n"); @@ -245,7 +242,7 @@ void tipc_ref_discard(u32 ref) tipc_ref_table.last_free = index; exit: - write_unlock_bh(&ref_table_lock); + spin_unlock_bh(&ref_table_lock); } /** @@ -267,20 +264,3 @@ void *tipc_ref_lock(u32 ref) } return NULL; } - - -/** - * tipc_ref_deref - return pointer referenced object (without locking it) - */ -void *tipc_ref_deref(u32 ref) -{ - if (likely(tipc_ref_table.entries)) { - struct reference *entry; - - entry = &tipc_ref_table.entries[ref & - tipc_ref_table.index_mask]; - if (likely(entry->ref == ref)) - return entry->object; - } - return NULL; -} diff --git a/net/tipc/ref.h b/net/tipc/ref.h index 5bc8e7ab84d..d01aa1df63b 100644 --- a/net/tipc/ref.h +++ b/net/tipc/ref.h @@ -44,6 +44,5 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock); void tipc_ref_discard(u32 ref); void *tipc_ref_lock(u32 ref); -void *tipc_ref_deref(u32 ref); #endif diff --git a/net/tipc/server.c b/net/tipc/server.c index fd3fa57a410..646a930eefb 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -55,7 +55,7 @@ * @usr_data: user-specified field * @rx_action: what to do when connection socket is active * @outqueue: pointer to first outbound message in queue - * @outqueue_lock: controll access to the outqueue + * @outqueue_lock: control access to the outqueue * @outqueue: list of connection objects for its server * @swork: send work item */ @@ -87,7 +87,6 @@ static void tipc_clean_outqueues(struct tipc_conn *con); static void tipc_conn_kref_release(struct kref *kref) { struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); - struct tipc_server *s = con->server; if (con->sock) { tipc_sock_release_local(con->sock); @@ -95,10 +94,6 @@ static void tipc_conn_kref_release(struct kref *kref) } tipc_clean_outqueues(con); - - if (con->conid) - s->tipc_conn_shutdown(con->conid, con->usr_data); - kfree(con); } @@ -181,6 +176,9 @@ static void tipc_close_conn(struct tipc_conn *con) struct tipc_server *s = con->server; if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { + if (con->conid) + s->tipc_conn_shutdown(con->conid, con->usr_data); + spin_lock_bh(&s->idr_lock); idr_remove(&s->conn_idr, con->conid); s->idr_in_use--; @@ -429,10 +427,12 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid, list_add_tail(&e->list, &con->outqueue); spin_unlock_bh(&con->outqueue_lock); - if (test_bit(CF_CONNECTED, &con->flags)) + if (test_bit(CF_CONNECTED, &con->flags)) { if (!queue_work(s->send_wq, &con->swork)) conn_put(con); - + } else { + conn_put(con); + } return 0; } @@ -573,7 +573,6 @@ int tipc_server_start(struct tipc_server *s) kmem_cache_destroy(s->rcvbuf_cache); return ret; } - s->enabled = 1; return ret; } @@ -583,10 +582,6 @@ void tipc_server_stop(struct tipc_server *s) int total = 0; int id; - if (!s->enabled) - return; - - s->enabled = 0; spin_lock_bh(&s->idr_lock); for (id = 0; total < s->idr_in_use; id++) { con = idr_find(&s->conn_idr, id); diff --git a/net/tipc/server.h b/net/tipc/server.h index 98b23f20bc0..be817b0b547 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -56,7 +56,6 @@ * @name: server name * @imp: message importance * @type: socket type - * @enabled: identify whether server is launched or not */ struct tipc_server { struct idr conn_idr; @@ -74,7 +73,6 @@ struct tipc_server { const char name[TIPC_SERVER_NAME_LEN]; int imp; int type; - int enabled; }; int tipc_conn_sendmsg(struct tipc_server *s, int conid, diff --git a/net/tipc/socket.c b/net/tipc/socket.c index e741416d1d2..adc12e22730 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012 Ericsson AB + * Copyright (c) 2001-2007, 2012-2014, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -38,33 +38,17 @@ #include "port.h" #include <linux/export.h> -#include <net/sock.h> #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ -struct tipc_sock { - struct sock sk; - struct tipc_port *p; - struct tipc_portid peer_name; - unsigned int conn_timeout; -}; - -#define tipc_sk(sk) ((struct tipc_sock *)(sk)) -#define tipc_sk_port(sk) (tipc_sk(sk)->p) - -#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \ - (sock->state == SS_DISCONNECTING)) - static int backlog_rcv(struct sock *sk, struct sk_buff *skb); -static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); -static void wakeupdispatch(struct tipc_port *tport); static void tipc_data_ready(struct sock *sk, int len); static void tipc_write_space(struct sock *sk); -static int release(struct socket *sock); -static int accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_release(struct socket *sock); +static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -73,8 +57,6 @@ static const struct proto_ops msg_ops; static struct proto tipc_proto; static struct proto tipc_proto_kern; -static int sockets_enabled; - /* * Revised TIPC socket locking policy: * @@ -120,6 +102,8 @@ static int sockets_enabled; * - port reference */ +#include "socket.h" + /** * advance_rx_queue - discard first buffer in socket receive queue * @@ -155,13 +139,15 @@ static void reject_rx_queue(struct sock *sk) * * Returns 0 on success, errno otherwise */ -static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, - int kern) +static int tipc_sk_create(struct net *net, struct socket *sock, + int protocol, int kern) { const struct proto_ops *ops; socket_state state; struct sock *sk; - struct tipc_port *tp_ptr; + struct tipc_sock *tsk; + struct tipc_port *port; + u32 ref; /* Validate arguments */ if (unlikely(protocol != 0)) @@ -194,10 +180,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, if (sk == NULL) return -ENOMEM; - /* Allocate TIPC port for socket to use */ - tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, - TIPC_LOW_IMPORTANCE); - if (unlikely(!tp_ptr)) { + tsk = tipc_sk(sk); + port = &tsk->port; + + ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE); + if (!ref) { + pr_warn("Socket registration failed, ref. table exhausted\n"); sk_free(sk); return -ENOMEM; } @@ -211,17 +199,14 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; - tipc_sk(sk)->p = tp_ptr; tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; - - spin_unlock_bh(tp_ptr->lock); + tipc_port_unlock(port); if (sock->state == SS_READY) { - tipc_set_portunreturnable(tp_ptr->ref, 1); + tipc_port_set_unreturnable(port, true); if (sock->type == SOCK_DGRAM) - tipc_set_portunreliable(tp_ptr->ref, 1); + tipc_port_set_unreliable(port, true); } - return 0; } @@ -239,7 +224,6 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, int tipc_sock_create_local(int type, struct socket **res) { int rc; - struct sock *sk; rc = sock_create_lite(AF_TIPC, type, 0, res); if (rc < 0) { @@ -248,8 +232,6 @@ int tipc_sock_create_local(int type, struct socket **res) } tipc_sk_create(&init_net, *res, 0, 1); - sk = (*res)->sk; - return 0; } @@ -262,7 +244,7 @@ int tipc_sock_create_local(int type, struct socket **res) */ void tipc_sock_release_local(struct socket *sock) { - release(sock); + tipc_release(sock); sock->ops = NULL; sock_release(sock); } @@ -288,7 +270,7 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, if (ret < 0) return ret; - ret = accept(sock, *newsock, flags); + ret = tipc_accept(sock, *newsock, flags); if (ret < 0) { sock_release(*newsock); return ret; @@ -298,7 +280,7 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, } /** - * release - destroy a TIPC socket + * tipc_release - destroy a TIPC socket * @sock: socket to destroy * * This routine cleans up any messages that are still queued on the socket. @@ -313,12 +295,12 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, * * Returns 0 on success, errno otherwise */ -static int release(struct socket *sock) +static int tipc_release(struct socket *sock) { struct sock *sk = sock->sk; - struct tipc_port *tport; + struct tipc_sock *tsk; + struct tipc_port *port; struct sk_buff *buf; - int res; /* * Exit if socket isn't fully initialized (occurs when a failed accept() @@ -327,7 +309,8 @@ static int release(struct socket *sock) if (sk == NULL) return 0; - tport = tipc_sk_port(sk); + tsk = tipc_sk(sk); + port = &tsk->port; lock_sock(sk); /* @@ -344,17 +327,16 @@ static int release(struct socket *sock) if ((sock->state == SS_CONNECTING) || (sock->state == SS_CONNECTED)) { sock->state = SS_DISCONNECTING; - tipc_disconnect(tport->ref); + tipc_port_disconnect(port->ref); } tipc_reject_msg(buf, TIPC_ERR_NO_PORT); } } - /* - * Delete TIPC port; this ensures no more messages are queued - * (also disconnects an active connection & sends a 'FIN-' to peer) + /* Destroy TIPC port; also disconnects an active connection and + * sends a 'FIN-' to peer. */ - res = tipc_deleteport(tport); + tipc_port_destroy(port); /* Discard any remaining (connection-based) messages in receive queue */ __skb_queue_purge(&sk->sk_receive_queue); @@ -366,11 +348,11 @@ static int release(struct socket *sock) sock_put(sk); sock->sk = NULL; - return res; + return 0; } /** - * bind - associate or disassocate TIPC name(s) with a socket + * tipc_bind - associate or disassocate TIPC name(s) with a socket * @sock: socket structure * @uaddr: socket address describing name(s) and desired operation * @uaddr_len: size of socket address data structure @@ -384,16 +366,17 @@ static int release(struct socket *sock) * NOTE: This routine doesn't need to take the socket lock since it doesn't * access any non-constant socket information. */ -static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) +static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, + int uaddr_len) { struct sock *sk = sock->sk; struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; - struct tipc_port *tport = tipc_sk_port(sock->sk); + struct tipc_sock *tsk = tipc_sk(sk); int res = -EINVAL; lock_sock(sk); if (unlikely(!uaddr_len)) { - res = tipc_withdraw(tport, 0, NULL); + res = tipc_withdraw(&tsk->port, 0, NULL); goto exit; } @@ -421,15 +404,15 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) } res = (addr->scope > 0) ? - tipc_publish(tport, addr->scope, &addr->addr.nameseq) : - tipc_withdraw(tport, -addr->scope, &addr->addr.nameseq); + tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) : + tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq); exit: release_sock(sk); return res; } /** - * get_name - get port ID of socket or peer socket + * tipc_getname - get port ID of socket or peer socket * @sock: socket structure * @uaddr: area for returned socket address * @uaddr_len: area for returned length of socket address @@ -441,21 +424,21 @@ exit: * accesses socket information that is unchanging (or which changes in * a completely predictable manner). */ -static int get_name(struct socket *sock, struct sockaddr *uaddr, - int *uaddr_len, int peer) +static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, + int *uaddr_len, int peer) { struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; - struct tipc_sock *tsock = tipc_sk(sock->sk); + struct tipc_sock *tsk = tipc_sk(sock->sk); memset(addr, 0, sizeof(*addr)); if (peer) { if ((sock->state != SS_CONNECTED) && ((peer != 2) || (sock->state != SS_DISCONNECTING))) return -ENOTCONN; - addr->addr.id.ref = tsock->peer_name.ref; - addr->addr.id.node = tsock->peer_name.node; + addr->addr.id.ref = tipc_port_peerport(&tsk->port); + addr->addr.id.node = tipc_port_peernode(&tsk->port); } else { - addr->addr.id.ref = tsock->p->ref; + addr->addr.id.ref = tsk->port.ref; addr->addr.id.node = tipc_own_addr; } @@ -469,7 +452,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr, } /** - * poll - read and possibly block on pollmask + * tipc_poll - read and possibly block on pollmask * @file: file structure associated with the socket * @sock: socket for which to calculate the poll bits * @wait: ??? @@ -508,22 +491,23 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr, * imply that the operation will succeed, merely that it should be performed * and will not block. */ -static unsigned int poll(struct file *file, struct socket *sock, - poll_table *wait) +static unsigned int tipc_poll(struct file *file, struct socket *sock, + poll_table *wait) { struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); u32 mask = 0; sock_poll_wait(file, sk_sleep(sk), wait); switch ((int)sock->state) { case SS_UNCONNECTED: - if (!tipc_sk_port(sk)->congested) + if (!tsk->port.congested) mask |= POLLOUT; break; case SS_READY: case SS_CONNECTED: - if (!tipc_sk_port(sk)->congested) + if (!tsk->port.congested) mask |= POLLOUT; /* fall thru' */ case SS_CONNECTING: @@ -570,8 +554,34 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) return 0; } +static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p) +{ + struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); + DEFINE_WAIT(wait); + int done; + + do { + int err = sock_error(sk); + if (err) + return err; + if (sock->state == SS_DISCONNECTING) + return -EPIPE; + if (!*timeo_p) + return -EAGAIN; + if (signal_pending(current)) + return sock_intr_errno(*timeo_p); + + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); + done = sk_wait_event(sk, timeo_p, !tsk->port.congested); + finish_wait(sk_sleep(sk), &wait); + } while (!done); + return 0; +} + + /** - * send_msg - send message in connectionless manner + * tipc_sendmsg - send message in connectionless manner * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure * @m: message to send @@ -584,14 +594,15 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m) * * Returns the number of bytes sent on success, or errno otherwise */ -static int send_msg(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t total_len) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); - struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); int needs_conn; - long timeout_val; + long timeo; int res = -EINVAL; if (unlikely(!dest)) @@ -615,33 +626,32 @@ static int send_msg(struct kiocb *iocb, struct socket *sock, res = -EISCONN; goto exit; } - if (tport->published) { + if (tsk->port.published) { res = -EOPNOTSUPP; goto exit; } if (dest->addrtype == TIPC_ADDR_NAME) { - tport->conn_type = dest->addr.name.name.type; - tport->conn_instance = dest->addr.name.name.instance; + tsk->port.conn_type = dest->addr.name.name.type; + tsk->port.conn_instance = dest->addr.name.name.instance; } /* Abort any pending connection attempts (very unlikely) */ reject_rx_queue(sk); } - timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); - + timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); do { if (dest->addrtype == TIPC_ADDR_NAME) { res = dest_name_check(dest, m); if (res) break; - res = tipc_send2name(tport->ref, + res = tipc_send2name(port, &dest->addr.name.name, dest->addr.name.domain, m->msg_iov, total_len); } else if (dest->addrtype == TIPC_ADDR_ID) { - res = tipc_send2port(tport->ref, + res = tipc_send2port(port, &dest->addr.id, m->msg_iov, total_len); @@ -653,24 +663,19 @@ static int send_msg(struct kiocb *iocb, struct socket *sock, res = dest_name_check(dest, m); if (res) break; - res = tipc_multicast(tport->ref, - &dest->addr.nameseq, - m->msg_iov, - total_len); + res = tipc_port_mcast_xmit(port, + &dest->addr.nameseq, + m->msg_iov, + total_len); } if (likely(res != -ELINKCONG)) { if (needs_conn && (res >= 0)) sock->state = SS_CONNECTING; break; } - if (timeout_val <= 0L) { - res = timeout_val ? timeout_val : -EWOULDBLOCK; + res = tipc_wait_for_sndmsg(sock, &timeo); + if (res) break; - } - release_sock(sk); - timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk), - !tport->congested, timeout_val); - lock_sock(sk); } while (1); exit: @@ -679,8 +684,37 @@ exit: return res; } +static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) +{ + struct sock *sk = sock->sk; + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; + DEFINE_WAIT(wait); + int done; + + do { + int err = sock_error(sk); + if (err) + return err; + if (sock->state == SS_DISCONNECTING) + return -EPIPE; + else if (sock->state != SS_CONNECTED) + return -ENOTCONN; + if (!*timeo_p) + return -EAGAIN; + if (signal_pending(current)) + return sock_intr_errno(*timeo_p); + + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); + done = sk_wait_event(sk, timeo_p, + (!port->congested || !port->connected)); + finish_wait(sk_sleep(sk), &wait); + } while (!done); + return 0; +} + /** - * send_packet - send a connection-oriented message + * tipc_send_packet - send a connection-oriented message * @iocb: if NULL, indicates that socket lock is already held * @sock: socket structure * @m: message to send @@ -690,18 +724,18 @@ exit: * * Returns the number of bytes sent on success, or errno otherwise */ -static int send_packet(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t total_len) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); - struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; - long timeout_val; - int res; + struct tipc_sock *tsk = tipc_sk(sk); + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + int res = -EINVAL; + long timeo; /* Handle implied connection establishment */ if (unlikely(dest)) - return send_msg(iocb, sock, m, total_len); + return tipc_sendmsg(iocb, sock, m, total_len); if (total_len > TIPC_MAX_USER_MSG_SIZE) return -EMSGSIZE; @@ -709,37 +743,31 @@ static int send_packet(struct kiocb *iocb, struct socket *sock, if (iocb) lock_sock(sk); - timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); + if (unlikely(sock->state != SS_CONNECTED)) { + if (sock->state == SS_DISCONNECTING) + res = -EPIPE; + else + res = -ENOTCONN; + goto exit; + } + timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); do { - if (unlikely(sock->state != SS_CONNECTED)) { - if (sock->state == SS_DISCONNECTING) - res = -EPIPE; - else - res = -ENOTCONN; - break; - } - - res = tipc_send(tport->ref, m->msg_iov, total_len); + res = tipc_send(&tsk->port, m->msg_iov, total_len); if (likely(res != -ELINKCONG)) break; - if (timeout_val <= 0L) { - res = timeout_val ? timeout_val : -EWOULDBLOCK; + res = tipc_wait_for_sndpkt(sock, &timeo); + if (res) break; - } - release_sock(sk); - timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk), - (!tport->congested || !tport->connected), timeout_val); - lock_sock(sk); } while (1); - +exit: if (iocb) release_sock(sk); return res; } /** - * send_stream - send stream-oriented data + * tipc_send_stream - send stream-oriented data * @iocb: (unused) * @sock: socket structure * @m: data to send @@ -750,11 +778,11 @@ static int send_packet(struct kiocb *iocb, struct socket *sock, * Returns the number of bytes sent on success (or partial success), * or errno if no data sent */ -static int send_stream(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t total_len) +static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t total_len) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); struct msghdr my_msg; struct iovec my_iov; struct iovec *curr_iov; @@ -770,16 +798,11 @@ static int send_stream(struct kiocb *iocb, struct socket *sock, /* Handle special cases where there is no connection */ if (unlikely(sock->state != SS_CONNECTED)) { - if (sock->state == SS_UNCONNECTED) { - res = send_packet(NULL, sock, m, total_len); - goto exit; - } else if (sock->state == SS_DISCONNECTING) { - res = -EPIPE; - goto exit; - } else { - res = -ENOTCONN; - goto exit; - } + if (sock->state == SS_UNCONNECTED) + res = tipc_send_packet(NULL, sock, m, total_len); + else + res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN; + goto exit; } if (unlikely(m->msg_name)) { @@ -807,21 +830,22 @@ static int send_stream(struct kiocb *iocb, struct socket *sock, my_msg.msg_name = NULL; bytes_sent = 0; - hdr_size = msg_hdr_sz(&tport->phdr); + hdr_size = msg_hdr_sz(&tsk->port.phdr); while (curr_iovlen--) { curr_start = curr_iov->iov_base; curr_left = curr_iov->iov_len; while (curr_left) { - bytes_to_send = tport->max_pkt - hdr_size; + bytes_to_send = tsk->port.max_pkt - hdr_size; if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE) bytes_to_send = TIPC_MAX_USER_MSG_SIZE; if (curr_left < bytes_to_send) bytes_to_send = curr_left; my_iov.iov_base = curr_start; my_iov.iov_len = bytes_to_send; - res = send_packet(NULL, sock, &my_msg, bytes_to_send); + res = tipc_send_packet(NULL, sock, &my_msg, + bytes_to_send); if (res < 0) { if (bytes_sent) res = bytes_sent; @@ -842,27 +866,25 @@ exit: /** * auto_connect - complete connection setup to a remote port - * @sock: socket structure + * @tsk: tipc socket structure * @msg: peer's response message * * Returns 0 on success, errno otherwise */ -static int auto_connect(struct socket *sock, struct tipc_msg *msg) +static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg) { - struct tipc_sock *tsock = tipc_sk(sock->sk); - struct tipc_port *p_ptr; + struct tipc_port *port = &tsk->port; + struct socket *sock = tsk->sk.sk_socket; + struct tipc_portid peer; - tsock->peer_name.ref = msg_origport(msg); - tsock->peer_name.node = msg_orignode(msg); - p_ptr = tipc_port_deref(tsock->p->ref); - if (!p_ptr) - return -EINVAL; + peer.ref = msg_origport(msg); + peer.node = msg_orignode(msg); - __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); + __tipc_port_connect(port->ref, port, &peer); if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE) return -EINVAL; - msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg)); + msg_set_importance(&port->phdr, (u32)msg_importance(msg)); sock->state = SS_CONNECTED; return 0; } @@ -876,7 +898,7 @@ static int auto_connect(struct socket *sock, struct tipc_msg *msg) */ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) { - struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name; + DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); if (addr) { addr->family = AF_TIPC; @@ -961,8 +983,39 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, return 0; } +static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo) +{ + struct sock *sk = sock->sk; + DEFINE_WAIT(wait); + int err; + + for (;;) { + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); + if (timeo && skb_queue_empty(&sk->sk_receive_queue)) { + if (sock->state == SS_DISCONNECTING) { + err = -ENOTCONN; + break; + } + release_sock(sk); + timeo = schedule_timeout(timeo); + lock_sock(sk); + } + err = 0; + if (!skb_queue_empty(&sk->sk_receive_queue)) + break; + err = sock_intr_errno(timeo); + if (signal_pending(current)) + break; + err = -EAGAIN; + if (!timeo) + break; + } + finish_wait(sk_sleep(sk), &wait); + return err; +} + /** - * recv_msg - receive packet-oriented message + * tipc_recvmsg - receive packet-oriented message * @iocb: (unused) * @m: descriptor for message info * @buf_len: total size of user buffer area @@ -973,14 +1026,15 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg, * * Returns size of returned message data, errno otherwise */ -static int recv_msg(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t buf_len, int flags) +static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t buf_len, int flags) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; - long timeout; + long timeo; unsigned int sz; u32 err; int res; @@ -996,25 +1050,13 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock, goto exit; } - timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); restart: /* Look for a message in receive queue; wait if necessary */ - while (skb_queue_empty(&sk->sk_receive_queue)) { - if (sock->state == SS_DISCONNECTING) { - res = -ENOTCONN; - goto exit; - } - if (timeout <= 0L) { - res = timeout ? timeout : -EWOULDBLOCK; - goto exit; - } - release_sock(sk); - timeout = wait_event_interruptible_timeout(*sk_sleep(sk), - tipc_rx_ready(sock), - timeout); - lock_sock(sk); - } + res = tipc_wait_for_rcvmsg(sock, timeo); + if (res) + goto exit; /* Look at first message in receive queue */ buf = skb_peek(&sk->sk_receive_queue); @@ -1032,7 +1074,7 @@ restart: set_orig_addr(m, msg); /* Capture ancillary data (optional) */ - res = anc_data_recv(m, msg, tport); + res = anc_data_recv(m, msg, port); if (res) goto exit; @@ -1058,8 +1100,8 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { if ((sock->state != SS_READY) && - (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) - tipc_acknowledge(tport->ref, tport->conn_unacked); + (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } exit: @@ -1068,7 +1110,7 @@ exit: } /** - * recv_stream - receive stream-oriented data + * tipc_recv_stream - receive stream-oriented data * @iocb: (unused) * @m: descriptor for message info * @buf_len: total size of user buffer area @@ -1079,14 +1121,15 @@ exit: * * Returns size of returned message data, errno otherwise */ -static int recv_stream(struct kiocb *iocb, struct socket *sock, - struct msghdr *m, size_t buf_len, int flags) +static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, + struct msghdr *m, size_t buf_len, int flags) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; struct sk_buff *buf; struct tipc_msg *msg; - long timeout; + long timeo; unsigned int sz; int sz_to_copy, target, needed; int sz_copied = 0; @@ -1099,31 +1142,19 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock, lock_sock(sk); - if (unlikely((sock->state == SS_UNCONNECTED))) { + if (unlikely(sock->state == SS_UNCONNECTED)) { res = -ENOTCONN; goto exit; } target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len); - timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); restart: /* Look for a message in receive queue; wait if necessary */ - while (skb_queue_empty(&sk->sk_receive_queue)) { - if (sock->state == SS_DISCONNECTING) { - res = -ENOTCONN; - goto exit; - } - if (timeout <= 0L) { - res = timeout ? timeout : -EWOULDBLOCK; - goto exit; - } - release_sock(sk); - timeout = wait_event_interruptible_timeout(*sk_sleep(sk), - tipc_rx_ready(sock), - timeout); - lock_sock(sk); - } + res = tipc_wait_for_rcvmsg(sock, timeo); + if (res) + goto exit; /* Look at first message in receive queue */ buf = skb_peek(&sk->sk_receive_queue); @@ -1140,7 +1171,7 @@ restart: /* Optionally capture sender's address & ancillary data of first msg */ if (sz_copied == 0) { set_orig_addr(m, msg); - res = anc_data_recv(m, msg, tport); + res = anc_data_recv(m, msg, port); if (res) goto exit; } @@ -1178,8 +1209,8 @@ restart: /* Consume received message (optional) */ if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) - tipc_acknowledge(tport->ref, tport->conn_unacked); + if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) + tipc_acknowledge(port->ref, port->conn_unacked); advance_rx_queue(sk); } @@ -1231,17 +1262,19 @@ static void tipc_data_ready(struct sock *sk, int len) /** * filter_connect - Handle all incoming messages for a connection-based socket - * @tsock: TIPC socket + * @tsk: TIPC socket * @msg: message * * Returns TIPC error status code and socket error status code * once it encounters some errors */ -static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf) { - struct socket *sock = tsock->sk.sk_socket; + struct sock *sk = &tsk->sk; + struct tipc_port *port = &tsk->port; + struct socket *sock = sk->sk_socket; struct tipc_msg *msg = buf_msg(*buf); - struct sock *sk = &tsock->sk; + u32 retval = TIPC_ERR_NO_PORT; int res; @@ -1251,10 +1284,10 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) switch ((int)sock->state) { case SS_CONNECTED: /* Accept only connection-based messages sent by peer */ - if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { + if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) { if (unlikely(msg_errcode(msg))) { sock->state = SS_DISCONNECTING; - __tipc_disconnect(tsock->p); + __tipc_port_disconnect(port); } retval = TIPC_OK; } @@ -1271,7 +1304,7 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) if (unlikely(!msg_connected(msg))) break; - res = auto_connect(sock, msg); + res = auto_connect(tsk, msg); if (res) { sock->state = SS_DISCONNECTING; sk->sk_err = -res; @@ -1327,14 +1360,12 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) { struct tipc_msg *msg = buf_msg(buf); - unsigned int limit; if (msg_connected(msg)) - limit = sysctl_tipc_rmem[2]; - else - limit = sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << - msg_importance(msg); - return limit; + return sysctl_tipc_rmem[2]; + + return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << + msg_importance(msg); } /** @@ -1352,6 +1383,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) { struct socket *sock = sk->sk_socket; + struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *msg = buf_msg(buf); unsigned int limit = rcvbuf_limit(sk, buf); u32 res = TIPC_OK; @@ -1364,7 +1396,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf) if (msg_connected(msg)) return TIPC_ERR_NO_PORT; } else { - res = filter_connect(tipc_sk(sk), &buf); + res = filter_connect(tsk, &buf); if (res != TIPC_OK || buf == NULL) return res; } @@ -1402,17 +1434,16 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf) } /** - * dispatch - handle incoming message - * @tport: TIPC port that received message + * tipc_sk_rcv - handle incoming message + * @sk: socket receiving message * @buf: message * * Called with port lock already taken. * * Returns TIPC error status code (TIPC_OK if message is not to be rejected) */ -static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) +u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf) { - struct sock *sk = tport->sk; u32 res; /* @@ -1435,21 +1466,30 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) return res; } -/** - * wakeupdispatch - wake up port after congestion - * @tport: port to wakeup - * - * Called with port lock already taken. - */ -static void wakeupdispatch(struct tipc_port *tport) +static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) { - struct sock *sk = tport->sk; + struct sock *sk = sock->sk; + DEFINE_WAIT(wait); + int done; - sk->sk_write_space(sk); + do { + int err = sock_error(sk); + if (err) + return err; + if (!*timeo_p) + return -ETIMEDOUT; + if (signal_pending(current)) + return sock_intr_errno(*timeo_p); + + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); + done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING); + finish_wait(sk_sleep(sk), &wait); + } while (!done); + return 0; } /** - * connect - establish a connection to another TIPC port + * tipc_connect - establish a connection to another TIPC port * @sock: socket structure * @dest: socket address for destination port * @destlen: size of socket address data structure @@ -1457,13 +1497,14 @@ static void wakeupdispatch(struct tipc_port *tport) * * Returns 0 on success, errno otherwise */ -static int connect(struct socket *sock, struct sockaddr *dest, int destlen, - int flags) +static int tipc_connect(struct socket *sock, struct sockaddr *dest, + int destlen, int flags) { struct sock *sk = sock->sk; struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; struct msghdr m = {NULL,}; - unsigned int timeout; + long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; + socket_state previous; int res; lock_sock(sk); @@ -1485,8 +1526,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, goto exit; } - timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout; - + previous = sock->state; switch (sock->state) { case SS_UNCONNECTED: /* Send a 'SYN-' to destination */ @@ -1499,7 +1539,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, if (!timeout) m.msg_flags = MSG_DONTWAIT; - res = send_msg(NULL, sock, &m, 0); + res = tipc_sendmsg(NULL, sock, &m, 0); if ((res < 0) && (res != -EWOULDBLOCK)) goto exit; @@ -1508,56 +1548,35 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen, * case is EINPROGRESS, rather than EALREADY. */ res = -EINPROGRESS; - break; case SS_CONNECTING: - res = -EALREADY; + if (previous == SS_CONNECTING) + res = -EALREADY; + if (!timeout) + goto exit; + timeout = msecs_to_jiffies(timeout); + /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ + res = tipc_wait_for_connect(sock, &timeout); break; case SS_CONNECTED: res = -EISCONN; break; default: res = -EINVAL; - goto exit; - } - - if (sock->state == SS_CONNECTING) { - if (!timeout) - goto exit; - - /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ - release_sock(sk); - res = wait_event_interruptible_timeout(*sk_sleep(sk), - sock->state != SS_CONNECTING, - timeout ? (long)msecs_to_jiffies(timeout) - : MAX_SCHEDULE_TIMEOUT); - lock_sock(sk); - if (res <= 0) { - if (res == 0) - res = -ETIMEDOUT; - else - ; /* leave "res" unchanged */ - goto exit; - } + break; } - - if (unlikely(sock->state == SS_DISCONNECTING)) - res = sock_error(sk); - else - res = 0; - exit: release_sock(sk); return res; } /** - * listen - allow socket to listen for incoming connections + * tipc_listen - allow socket to listen for incoming connections * @sock: socket structure * @len: (unused) * * Returns 0 on success, errno otherwise */ -static int listen(struct socket *sock, int len) +static int tipc_listen(struct socket *sock, int len) { struct sock *sk = sock->sk; int res; @@ -1575,23 +1594,59 @@ static int listen(struct socket *sock, int len) return res; } +static int tipc_wait_for_accept(struct socket *sock, long timeo) +{ + struct sock *sk = sock->sk; + DEFINE_WAIT(wait); + int err; + + /* True wake-one mechanism for incoming connections: only + * one process gets woken up, not the 'whole herd'. + * Since we do not 'race & poll' for established sockets + * anymore, the common case will execute the loop only once. + */ + for (;;) { + prepare_to_wait_exclusive(sk_sleep(sk), &wait, + TASK_INTERRUPTIBLE); + if (timeo && skb_queue_empty(&sk->sk_receive_queue)) { + release_sock(sk); + timeo = schedule_timeout(timeo); + lock_sock(sk); + } + err = 0; + if (!skb_queue_empty(&sk->sk_receive_queue)) + break; + err = -EINVAL; + if (sock->state != SS_LISTENING) + break; + err = sock_intr_errno(timeo); + if (signal_pending(current)) + break; + err = -EAGAIN; + if (!timeo) + break; + } + finish_wait(sk_sleep(sk), &wait); + return err; +} + /** - * accept - wait for connection request + * tipc_accept - wait for connection request * @sock: listening socket * @newsock: new socket that is to be connected * @flags: file-related flags associated with socket * * Returns 0 on success, errno otherwise */ -static int accept(struct socket *sock, struct socket *new_sock, int flags) +static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags) { struct sock *new_sk, *sk = sock->sk; struct sk_buff *buf; - struct tipc_sock *new_tsock; - struct tipc_port *new_tport; + struct tipc_port *new_port; struct tipc_msg *msg; + struct tipc_portid peer; u32 new_ref; - + long timeo; int res; lock_sock(sk); @@ -1600,19 +1655,10 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) res = -EINVAL; goto exit; } - - while (skb_queue_empty(&sk->sk_receive_queue)) { - if (flags & O_NONBLOCK) { - res = -EWOULDBLOCK; - goto exit; - } - release_sock(sk); - res = wait_event_interruptible(*sk_sleep(sk), - (!skb_queue_empty(&sk->sk_receive_queue))); - lock_sock(sk); - if (res) - goto exit; - } + timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK); + res = tipc_wait_for_accept(sock, timeo); + if (res) + goto exit; buf = skb_peek(&sk->sk_receive_queue); @@ -1621,9 +1667,8 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) goto exit; new_sk = new_sock->sk; - new_tsock = tipc_sk(new_sk); - new_tport = new_tsock->p; - new_ref = new_tport->ref; + new_port = &tipc_sk(new_sk)->port; + new_ref = new_port->ref; msg = buf_msg(buf); /* we lock on new_sk; but lockdep sees the lock on sk */ @@ -1636,15 +1681,15 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) reject_rx_queue(new_sk); /* Connect new socket to it's peer */ - new_tsock->peer_name.ref = msg_origport(msg); - new_tsock->peer_name.node = msg_orignode(msg); - tipc_connect(new_ref, &new_tsock->peer_name); + peer.ref = msg_origport(msg); + peer.node = msg_orignode(msg); + tipc_port_connect(new_ref, &peer); new_sock->state = SS_CONNECTED; - tipc_set_portimportance(new_ref, msg_importance(msg)); + tipc_port_set_importance(new_port, msg_importance(msg)); if (msg_named(msg)) { - new_tport->conn_type = msg_nametype(msg); - new_tport->conn_instance = msg_nameinst(msg); + new_port->conn_type = msg_nametype(msg); + new_port->conn_instance = msg_nameinst(msg); } /* @@ -1655,21 +1700,20 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags) struct msghdr m = {NULL,}; advance_rx_queue(sk); - send_packet(NULL, new_sock, &m, 0); + tipc_send_packet(NULL, new_sock, &m, 0); } else { __skb_dequeue(&sk->sk_receive_queue); __skb_queue_head(&new_sk->sk_receive_queue, buf); skb_set_owner_r(buf, new_sk); } release_sock(new_sk); - exit: release_sock(sk); return res; } /** - * shutdown - shutdown socket connection + * tipc_shutdown - shutdown socket connection * @sock: socket structure * @how: direction to close (must be SHUT_RDWR) * @@ -1677,10 +1721,11 @@ exit: * * Returns 0 on success, errno otherwise */ -static int shutdown(struct socket *sock, int how) +static int tipc_shutdown(struct socket *sock, int how) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; struct sk_buff *buf; int res; @@ -1701,10 +1746,10 @@ restart: kfree_skb(buf); goto restart; } - tipc_disconnect(tport->ref); + tipc_port_disconnect(port->ref); tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); } else { - tipc_shutdown(tport->ref); + tipc_port_shutdown(port->ref); } sock->state = SS_DISCONNECTING; @@ -1730,7 +1775,7 @@ restart: } /** - * setsockopt - set socket option + * tipc_setsockopt - set socket option * @sock: socket structure * @lvl: option level * @opt: option identifier @@ -1742,11 +1787,12 @@ restart: * * Returns 0 on success, errno otherwise */ -static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, - unsigned int ol) +static int tipc_setsockopt(struct socket *sock, int lvl, int opt, + char __user *ov, unsigned int ol) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; u32 value; int res; @@ -1764,16 +1810,16 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, switch (opt) { case TIPC_IMPORTANCE: - res = tipc_set_portimportance(tport->ref, value); + tipc_port_set_importance(port, value); break; case TIPC_SRC_DROPPABLE: if (sock->type != SOCK_STREAM) - res = tipc_set_portunreliable(tport->ref, value); + tipc_port_set_unreliable(port, value); else res = -ENOPROTOOPT; break; case TIPC_DEST_DROPPABLE: - res = tipc_set_portunreturnable(tport->ref, value); + tipc_port_set_unreturnable(port, value); break; case TIPC_CONN_TIMEOUT: tipc_sk(sk)->conn_timeout = value; @@ -1789,7 +1835,7 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, } /** - * getsockopt - get socket option + * tipc_getsockopt - get socket option * @sock: socket structure * @lvl: option level * @opt: option identifier @@ -1801,11 +1847,12 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, * * Returns 0 on success, errno otherwise */ -static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, - int __user *ol) +static int tipc_getsockopt(struct socket *sock, int lvl, int opt, + char __user *ov, int __user *ol) { struct sock *sk = sock->sk; - struct tipc_port *tport = tipc_sk_port(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_port *port = &tsk->port; int len; u32 value; int res; @@ -1822,13 +1869,13 @@ static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, switch (opt) { case TIPC_IMPORTANCE: - res = tipc_portimportance(tport->ref, &value); + value = tipc_port_importance(port); break; case TIPC_SRC_DROPPABLE: - res = tipc_portunreliable(tport->ref, &value); + value = tipc_port_unreliable(port); break; case TIPC_DEST_DROPPABLE: - res = tipc_portunreturnable(tport->ref, &value); + value = tipc_port_unreturnable(port); break; case TIPC_CONN_TIMEOUT: value = tipc_sk(sk)->conn_timeout; @@ -1863,20 +1910,20 @@ static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, static const struct proto_ops msg_ops = { .owner = THIS_MODULE, .family = AF_TIPC, - .release = release, - .bind = bind, - .connect = connect, + .release = tipc_release, + .bind = tipc_bind, + .connect = tipc_connect, .socketpair = sock_no_socketpair, .accept = sock_no_accept, - .getname = get_name, - .poll = poll, + .getname = tipc_getname, + .poll = tipc_poll, .ioctl = sock_no_ioctl, .listen = sock_no_listen, - .shutdown = shutdown, - .setsockopt = setsockopt, - .getsockopt = getsockopt, - .sendmsg = send_msg, - .recvmsg = recv_msg, + .shutdown = tipc_shutdown, + .setsockopt = tipc_setsockopt, + .getsockopt = tipc_getsockopt, + .sendmsg = tipc_sendmsg, + .recvmsg = tipc_recvmsg, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage }; @@ -1884,20 +1931,20 @@ static const struct proto_ops msg_ops = { static const struct proto_ops packet_ops = { .owner = THIS_MODULE, .family = AF_TIPC, - .release = release, - .bind = bind, - .connect = connect, + .release = tipc_release, + .bind = tipc_bind, + .connect = tipc_connect, .socketpair = sock_no_socketpair, - .accept = accept, - .getname = get_name, - .poll = poll, + .accept = tipc_accept, + .getname = tipc_getname, + .poll = tipc_poll, .ioctl = sock_no_ioctl, - .listen = listen, - .shutdown = shutdown, - .setsockopt = setsockopt, - .getsockopt = getsockopt, - .sendmsg = send_packet, - .recvmsg = recv_msg, + .listen = tipc_listen, + .shutdown = tipc_shutdown, + .setsockopt = tipc_setsockopt, + .getsockopt = tipc_getsockopt, + .sendmsg = tipc_send_packet, + .recvmsg = tipc_recvmsg, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage }; @@ -1905,20 +1952,20 @@ static const struct proto_ops packet_ops = { static const struct proto_ops stream_ops = { .owner = THIS_MODULE, .family = AF_TIPC, - .release = release, - .bind = bind, - .connect = connect, + .release = tipc_release, + .bind = tipc_bind, + .connect = tipc_connect, .socketpair = sock_no_socketpair, - .accept = accept, - .getname = get_name, - .poll = poll, + .accept = tipc_accept, + .getname = tipc_getname, + .poll = tipc_poll, .ioctl = sock_no_ioctl, - .listen = listen, - .shutdown = shutdown, - .setsockopt = setsockopt, - .getsockopt = getsockopt, - .sendmsg = send_stream, - .recvmsg = recv_stream, + .listen = tipc_listen, + .shutdown = tipc_shutdown, + .setsockopt = tipc_setsockopt, + .getsockopt = tipc_getsockopt, + .sendmsg = tipc_send_stream, + .recvmsg = tipc_recv_stream, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage }; @@ -1963,8 +2010,6 @@ int tipc_socket_init(void) proto_unregister(&tipc_proto); goto out; } - - sockets_enabled = 1; out: return res; } @@ -1974,10 +2019,6 @@ int tipc_socket_init(void) */ void tipc_socket_stop(void) { - if (!sockets_enabled) - return; - - sockets_enabled = 0; sock_unregister(tipc_family_ops.family); proto_unregister(&tipc_proto); } diff --git a/net/tipc/socket.h b/net/tipc/socket.h new file mode 100644 index 00000000000..74e5c7f195a --- /dev/null +++ b/net/tipc/socket.h @@ -0,0 +1,72 @@ +/* net/tipc/socket.h: Include file for TIPC socket code + * + * Copyright (c) 2014, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SOCK_H +#define _TIPC_SOCK_H + +#include "port.h" +#include <net/sock.h> + +/** + * struct tipc_sock - TIPC socket structure + * @sk: socket - interacts with 'port' and with user via the socket API + * @port: port - interacts with 'sk' and with the rest of the TIPC stack + * @peer_name: the peer of the connection, if any + * @conn_timeout: the time we can wait for an unresponded setup request + */ + +struct tipc_sock { + struct sock sk; + struct tipc_port port; + unsigned int conn_timeout; +}; + +static inline struct tipc_sock *tipc_sk(const struct sock *sk) +{ + return container_of(sk, struct tipc_sock, sk); +} + +static inline struct tipc_sock *tipc_port_to_sock(const struct tipc_port *port) +{ + return container_of(port, struct tipc_sock, port); +} + +static inline void tipc_sock_wakeup(struct tipc_sock *tsk) +{ + tsk->sk.sk_write_space(&tsk->sk); +} + +u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf); + +#endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index d38bb45d82e..642437231ad 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -42,7 +42,7 @@ /** * struct tipc_subscriber - TIPC network topology subscriber * @conid: connection identifier to server connecting to subscriber - * @lock: controll access to subscriber + * @lock: control access to subscriber * @subscription_list: list of subscription objects for this subscriber */ struct tipc_subscriber { @@ -96,20 +96,16 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, { struct tipc_subscriber *subscriber = sub->subscriber; struct kvec msg_sect; - int ret; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); - sub->evt.event = htohl(event, sub->swap); sub->evt.found_lower = htohl(found_lower, sub->swap); sub->evt.found_upper = htohl(found_upper, sub->swap); sub->evt.port.ref = htohl(port_ref, sub->swap); sub->evt.port.node = htohl(node, sub->swap); - ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, - msg_sect.iov_base, msg_sect.iov_len); - if (ret < 0) - pr_err("Sending subscription event failed, no memory\n"); + tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, + msg_sect.iov_len); } /** @@ -153,14 +149,6 @@ static void subscr_timeout(struct tipc_subscription *sub) /* The spin lock per subscriber is used to protect its members */ spin_lock_bh(&subscriber->lock); - /* Validate if the connection related to the subscriber is - * closed (in case subscriber is terminating) - */ - if (subscriber->conid == 0) { - spin_unlock_bh(&subscriber->lock); - return; - } - /* Validate timeout (in case subscription is being cancelled) */ if (sub->timeout == TIPC_WAIT_FOREVER) { spin_unlock_bh(&subscriber->lock); @@ -215,9 +203,6 @@ static void subscr_release(struct tipc_subscriber *subscriber) spin_lock_bh(&subscriber->lock); - /* Invalidate subscriber reference */ - subscriber->conid = 0; - /* Destroy any existing subscriptions for subscriber */ list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, subscription_list) { @@ -278,9 +263,9 @@ static void subscr_cancel(struct tipc_subscr *s, * * Called with subscriber lock held. */ -static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, - struct tipc_subscriber *subscriber) -{ +static int subscr_subscribe(struct tipc_subscr *s, + struct tipc_subscriber *subscriber, + struct tipc_subscription **sub_p) { struct tipc_subscription *sub; int swap; @@ -291,23 +276,21 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); subscr_cancel(s, subscriber); - return NULL; + return 0; } /* Refuse subscription if global limit exceeded */ if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { pr_warn("Subscription rejected, limit reached (%u)\n", TIPC_MAX_SUBSCRIPTIONS); - subscr_terminate(subscriber); - return NULL; + return -EINVAL; } /* Allocate subscription object */ sub = kmalloc(sizeof(*sub), GFP_ATOMIC); if (!sub) { pr_warn("Subscription rejected, no memory\n"); - subscr_terminate(subscriber); - return NULL; + return -ENOMEM; } /* Initialize subscription object */ @@ -321,8 +304,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, (sub->seq.lower > sub->seq.upper)) { pr_warn("Subscription rejected, illegal request\n"); kfree(sub); - subscr_terminate(subscriber); - return NULL; + return -EINVAL; } INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); @@ -335,8 +317,8 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, (Handler)subscr_timeout, (unsigned long)sub); k_start_timer(&sub->timer, sub->timeout); } - - return sub; + *sub_p = sub; + return 0; } /* Handle one termination request for the subscriber */ @@ -350,10 +332,14 @@ static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, void *usr_data, void *buf, size_t len) { struct tipc_subscriber *subscriber = usr_data; - struct tipc_subscription *sub; + struct tipc_subscription *sub = NULL; spin_lock_bh(&subscriber->lock); - sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber); + if (subscr_subscribe((struct tipc_subscr *)buf, subscriber, &sub) < 0) { + spin_unlock_bh(&subscriber->lock); + subscr_terminate(subscriber); + return; + } if (sub) tipc_nametbl_subscribe(sub); spin_unlock_bh(&subscriber->lock); |